RabbitMq队列
约 1114 字大约 4 分钟
2025-01-11
介绍
RabbitMQ模式
是爬虫框架中基于 RabbitMQ消息队列
实现的任务调度方式。RabbitMQ是一种高性能、可靠、可扩展的消息中间件,采用AMQP(高级消息队列协议)进行消息传递,适合需要 高并发
和 分布式
处理的爬虫任务。
特点与优势
高可用性
:RabbitMQ支持消息持久化、镜像队列和集群部署,保证任务数据的可靠性和高可用性。分布式扩展
:支持多节点分布式部署,多个爬虫节点可以同时消费队列中的任务,实现任务的负载均衡。异步解耦
:生产者和消费者解耦,爬虫任务可以灵活调度和管理,任务流更加高效稳定。可靠性保障
:消息确认机制(ACK)与失败重试功能,确保任务不会因异常而丢失。灵活的路由机制
:支持多种交换机类型(如直连、主题、广播),方便对不同任务类型进行分类调度。
使用场景
大规模分布式爬虫
:任务量大、需要多节点协作的爬虫项目。需要高可靠性的任务调度
:对任务数据丢失敏感,要求高可靠性和稳定性的场景。任务优先级管理
:需要对不同优先级的任务进行灵活调度和处理。
限制与挑战
环境依赖
:需要额外安装和配置RabbitMQ服务器,增加了系统复杂性。资源开销
:RabbitMQ需要额外的CPU和内存资源,对于小型项目可能有些“重量级”。网络延迟
:与内存模式相比,存在一定的网络通信延迟(基本忽略不计),但可以通过优化配置降低影响。
创建方式
你可以在创建项目时创建第一个任务的时候直接选择 RabbitMq 模式的爬虫,也可以使用 generator.py
模块新建爬虫时选择 RabbitMq 模式的爬虫
使用 generator.py
模块创建
generator.py
# -*- coding: utf-8 -*-
# @Author: yuanshaohang
# @Date: 2020-02-23 09:56:50
# @Version: 1.0.0
# @Description: 创建爬虫
from hunterx.utils.generator import production
# spider_dir: 爬虫分层目录名称(路径不存在时会自动创建,无需手动创建目录)
# spider_name: 创建的爬虫名称
# kernel_code: 需要使用的核心引擎 默认优先使用内存优先级队列,默认为3(内存队列),1为rabbitmq队列,2为redis队列
production(spider_name='rabbitmq_spider', kernel_code=1)
这样执行后就会在 spiders
目录下创建一个名为 rabbitmq_spider.py
的爬虫文件,并且该爬虫使用的是 RabbitMq优先级队列
模式,这个模式下你需要进行一些配置
setting配置
主要配置部分是对 settings.py
中 RabbitMq连接部分参数的配置
- 配置示例
settings.py
# ------------------ RabbitMQ连接配置参数 ------------------#
# rabbitmq地址及用户名密码
RABBITMQ_CONFIG = {
'username': 'guest',
'password': 'guest',
'host': 'localhost',
'port': 15672, # 数字类型
}
# 消息淘汰时间,也被称为缓存淘汰机制,超过设定的时间队列中的消息将自动删除(单位秒)
X_MESSAGE_TTL = 86400000
# 是否开启Rabbitmq连接
MQ_ENABLED = True
提示
一定要注意 MQ_ENABLED
参数是否开启,否则即使配置了连接地址也不会进行连接
使用示例
编辑 rabbitmq_spider.py
文件为以下内容
rabbitmq_spider.py
# -*- coding: utf-8 -*-
import hunterx
from hunterx.spiders import RabbitmqSpider
class RabbitmqSpiderSpider(RabbitmqSpider):
name = 'rabbitmq_spider'
def __init__(self):
self.header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36'
}
def start_requests(self):
for i in range(100):
url = 'https://www.baidu.com/'
yield hunterx.Requests(url=url, headers=self.header, callback=self.parse, level=1)
async def parse(self, response):
print(f'parse 接收到的response: {response.status_code}')
for i in range(100):
url = 'https://www.baidu.com/'
yield hunterx.Requests(url=url, headers=self.header, callback=self.height_level, level=2)
async def height_level(self, response):
print(f'height_level 接收到的response: {response.status_code}')
if __name__ == '__main__':
start_run = RabbitmqSpiderSpider()
start_run.run()
可以看到除了爬虫类继承的父类不同以外,其他与内存队列使用方式并无二致。
优先级分解说明
参考 内存队列
优先级分解说明
总结
RabbitMQ模式是一种高可靠性、高并发、支持分布式的队列实现方式。它适用于大规模、多节点协作的爬虫项目,能够保证任务的持久化和安全性。在对稳定性和扩展性要求较高的场景中,RabbitMQ是非常理想的选择。