使用RabbitMQ充分发挥作用,创建一款高度通用的图像爬虫
考虑利用RabbitMQ来有效利用画像爬虫。
在复杂系统中,实现作为RabbitMQ工作者的处理可以是普遍的且不需要保证处理顺序。
- 画像クローラーを実装する場合、オリジナルソース(URL)取得ロジックとパーサー処理、画像URL抽出フィルター処理、画像URLダウンロード処理などにわかれると思います。その中で画像URLダウンロード処理に関しては、処理順序が保証される必要もなく普遍的な処理なのでワーカーとして実装できそうです。
将多个系统中共同的功能实现为RabbitMQ工作程序。
- 画像クローラーと一口に言ってもTwitterのStreamingから画像のリンクを抽出してクローリングするとか、Pixivからクロールして画像ダウンロードするとか様々だと思います。ただその複数のシステムを実装する必要がある場合、画像URLをダウンロードする機能は共通的な処理になります。画像URLをダウンロードする処理をRabbitMQワーカーとして実装しておけば、たとえTwitter画像クローラーをRubyで実装し、Pixiv画像クローラーをJavaで実装したとしても、画像URLダウンロードする処理実装はひとつで済みます。
以下是利用RabbitMQ的图片爬虫系统的概要图。
RabbitMQ和图像爬虫系统
信息结构
PropertyValueSampleNameネームスペースdevelopment.download_urlBody[URL][SPACE][ダウンロードパス]http://i.imgur.com/rqFZVhq.jpg /Users/Siori/code/git_akb428/amqp_rabbitmq_ruby_util_box/private
通过指定Name,可以将消息分为不同的部分。
可以按照环境名(开发/暂存/生产)+功能名或者数据库名+表名等来决定Name,使消息分离得更清晰。
消息发送部门
- 2ch imgur twitter Facebook Pixiv それぞれのクローラーを作成し画像URLが抽出できた段階でRabbitMQにメッセージを送信します。
require "bunny"
download_parent_path = 'ワーカーに画像を保存させたいフォルダを指定'
# extract_image_link 関数を対象のSNSごとに実装する Twitter::extract_image_link とか Pixiv::extract_image_link とか
link_list = extract_image_link(url)
namespace = 'メッセージのNameを指定'
conn = Bunny.new
conn.start
ch = conn.create_channel
q = ch.queue(namespace)
link_list.each do |link|
body = link + ' ' + download_parent_path
ch.default_exchange.publish(body, :routing_key => q.name)
puts " [x] Send #{body}"
end
conn.close
以下是2ch的HTML解析器+过滤器+消息发送的实现链接:
https://github.com/AKB428/megaris
消息接收器(工作人员)
根据预测的负载(下载量),工作人员可以随意启动多个进程。
这个系统的好处是只需要一个工作人员处理。
#RabbitMQモジュールを指定
require "bunny"
require './lib/download.rb'
conn = Bunny.new
conn.start
ch = conn.create_channel
#Nameを指定
q = ch.queue("development.download_url")
puts " [*] Waiting for messages in #{q.name}. To exit press CTRL+C"
while(true) do
q.subscribe(:block => true) do |delivery_info, properties, body|
puts " [x] Received #{body}"
body_list = body.split(' ')
url = body_list[0]
opt = {
'dest_folder' => body_list[1].nil? ? './' : body_list[1]
}
puts url
p opt
download_path = download(url, opt)
puts " [x] Download #{download_path}"
end
end
require 'uri'
require 'httpclient'
require 'pathname'
def download(url, opt)
http_client = HTTPClient.new
header = nil
query = nil
dest_file = File.join('./', File.basename(url))
if opt.has_key?('dest_folder')
conf_dest = File.expand_path(opt['dest_folder'])
dest_file = File.join(conf_dest, File.basename(url))
FileUtils.mkdir_p(conf_dest) unless File.exist?(conf_dest)
end
http_client.receive_timeout = 60 * 10
if opt.has_key?('receive_timeout')
http_client.receive_timeout = opt['receive_timeout']
end
open(dest_file, 'wb') do |file|
http_client.get_content(URI.parse(URI.encode(url)), query, header) do |chunk|
file.write chunk
end
end
dest_file
end
消息封装
这次考虑到调试问题,没有对消息进行结构化处理,但是如果要通过消息队列发送消息,存储对象的标准方式是以序列化的形式存储,因此使用JSON格式或者MessagePack来进行消息交互是更好的选择。
使用RabbitMQ的好处。
RabbitMQ默认带有功能强大的Web监控,因此可以很容易地直观地了解工作负载等信息,非常方便。
(如果不添加插件,Apache Kafka无法使用管理界面)
RabbitMQ是用世界上最好的Erlang语言实现的。
看起来很稳定。RabbitMQ的Erlang进程似乎可以生成大约100万个(使用MacBookAir 8GB内存的默认设置)。
消息队列和其他产品的选择
我认为最近Apache Kafka很流行,所以可以考虑作为RabbitMQ的替代品,这是值得一试的。