[Python] 尝试使用 AWS 的无服务器架构来创建一个事件驱动的Web爬虫
尝试使用无服务器和事件驱动的方式创建网页爬虫。
因为我正在学习ElasticSearch,所以我想通过使用ES来做些什么。于是,我尝试编写了一个基于Kinesis+Lambda的事件驱动的网络爬虫。
-
- 実行環境
CentOS7
python 2.7
工作流程
大致的流程如下:
1. 使用Scrapy(ScrapingHub或AWS Lambda)提取URL,并将其放入Kinesis流中。
2. 从Kinesis流中触发AWS Lambda。
3. Lambda函数通过URL进行爬取,并将数据传输至ElasticSearch Service。
创建IAM用户
在使用Kinesis和ElasticSearch时,需要相应的权限。所以需要准备好每个服务的访问密钥ID和秘密访问密钥。
此外,需要牢记用户的ARN (arn:aws:iam::**********:user/*********)。
使用AWS Kinesis创建一个数据流。
创建 AWS ElasticSearch Service
接下来我们将通过Amazon ElasticSearch Service创建一个ES实例。
在AWS上的操作
使用ElasticSearch进行索引创建和映射。
创建一个用于保存URL、标题和文章内容的映射数据,用于文章保存。
mapping.json
{
“mappings”: {
“article”: {
“properties” : {
“url” : {
“type”: “string”,
“index” : “not_analyzed”
},
“title” : {
“type”: “string”,
“index” : “analyzed”
},
“contents” : {
“type”: “string”,
“index” : “analyzed”
}
}
}
}
}
接下来,将生成一个脚本来创建上述映射数据和索引。
事先在本地安装以下包:
$ pip 安装 requests_aws4auth elasticsearch
# -*- coding: utf-8 -*-
import elasticsearch
from requests_aws4auth import AWS4Auth
import json
if __name__ == '__main__':
# ESのエンドポイントを指定
host='search-***************.ap-northeast-1.es.amazonaws.com'
awsauth = AWS4Auth(
# AWSユーザーのアクセスキーIDとシークレットアクセスキー
'ACCESS_KRY_ID',
'SECRET_ACCESS_KEY',
'ap-northeast-1', 'es')
es = elasticsearch.Elasticsearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=elasticsearch.connection.RequestsHttpConnection
)
f = open('mapping.json', 'r')
mapping = json.load(f)
es.indices.create(index='website')
es.indices.put_mapping(index='website', doc_type='article', body=mapping['mappings'])
创建 AWS Lambda
我创建了Elasticsearch,接下来我将创建Lambda函数。
创建Lambda函数
在本地创建一个Lambda函数。
$ mkdir web_crawler
$ cd web_crawler
$ vim lambda_function.py
在本地创建一个Lambda函数。
$ 创建一个名为web_crawler的文件夹
$ 进入web_crawler文件夹
$ 使用vim编辑器创建lambda_function.py文件
# -*- coding: utf-8 -*-
import os
import base64
from readability import Document
import html2text
import requests
import elasticsearch
from elasticsearch import helpers
from requests_aws4auth import AWS4Auth
def lambda_handler(event, context):
host = os.environ['ES_HOST']
# ElasticSearch Serviceへの認証にIAM Roleを利用する
awsauth = AWS4Auth(
os.environ['ACCESS_ID'],
os.environ['SECRET_KEY'], 'ap-northeast-1', 'es')
es = elasticsearch.Elasticsearch(
hosts=[{'host': host, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=elasticsearch.connection.RequestsHttpConnection
)
articles = []
# Kinesis Streamからイベントを取得
for record in event['Records']:
payload = base64.b64decode(record['kinesis']['data'])
try:
response = requests.get(payload)
if response.ok:
article = Document(response.content).summary()
titleText = html2text.html2text(Document(response.content).title())
contentsText = html2text.html2text(article)
res = es.search(index="website", body={"query": {"match": {"url": payload}}})
# ESにURLが既に登録されているか
if res['hits']['total'] is 0:
doc = {
'url': payload,
'title': titleText.encode('utf-8'),
'contents': contentsText.encode('utf-8')
}
articles.append({'_index':'website', '_type':'scraper', '_source':doc})
except requests.exceptions.HTTPError as err:
print("HTTPError: " + err)
# Bulk Insert
helpers.bulk(es, articles)
创建Lambda函数后,需要将所需的库安装到同一层。
$ pip install readability-lxml html2text elasticsearch requests_aws4auth requests -t /path/to/web_crawler
然后将它们打包成zip文件。
$ zip -r web_crawler.zip .
将Lambda函数部署到AWS
使用Scrapy进行URL抽取,并将其发送到Kinesis数据流中。
下一步是最后阶段,我们将使用Scrapy从列表页面中提取URL,并将数据发送到Kinesis流中试一试。
一覽頁面使用了「はてなブックマーク」的熱門文章。雖然使用Scrapy可以更輕鬆地從RSS中獲取數據,但我故意選擇了從網頁上進行爬蟲。Scrapy是一個方便且強大的框架,當你有興趣時可以嘗試使用它來建立高級的網絡爬蟲。
- Scrapy
创建项目
首先安装Scrapy
$ pip安装scrapy
$ scrapy startproject hotentry
$ vim hotentry/hotentry/spiders/hotentry.py
输入以下代码。
# -*- coding: utf-8 -*-
import scrapy
from scrapy.conf import settings
import boto3
import json
kinesis = boto3.client(
'kinesis',
aws_access_key_id=settings['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY'],
region_name='ap-northeast-1')
class HotEntrySpider(scrapy.Spider):
name = "hotentry"
allowed_domains = ["b.hatena.ne.jp"]
start_urls = ['http://b.hatena.ne.jp/hotentry/general']
def parse(self, response):
for sel in response.css("li.hb-entry-unit-with-favorites"):
url = sel.css("a.entry-link::attr('href')").extract_first()
if url is None:
continue
kinesis.put_record(
StreamName = "scraping_url",
Data = sel.css("a.entry-link::attr('href')").extract_first(),
PartitionKey = "scraper"
)
在hotentry/hotentry/settings.py文件中添加Access Key ID和Secret Access Key:
AWS_ACCESS_KEY_ID = ‘AKI******************’
AWS_SECRET_ACCESS_KEY = ‘************************************’
我现在可以将这段代码放入Kinesis流中进行PUT操作。让我们尝试执行一下来测试一下。
用Scrapy将数据发送到Kinesis,然后通过AWS Lambda将数据发送到ElasticSearch,就应该可以完成这个任务了。
将Scrapy部署到Scrapinghub。
我能够使用Scrapy提取URL并发送到Kinesis,但如果保持这样的话,它将成为本地批处理。因此,我将Scrapy的代码部署到名为Scrapinghub的云服务上。
请参考以下详细的文章来了解导入方法:
* 通过Scrapy + Scrapy Cloud来享受舒适的Python爬虫和网页抓取生活。
由于用户注册到部署非常简单,我将简要概述。
最后
最初,我們將SQS和DynamoDB使用Lambda函數分開,然而由於變得複雜而無法追蹤錯誤,最終失敗了。果然簡單就是最好的。希望Lambda的觸發器能夠支持更多的服務。
因为这段代码只是用于测试,所以并没有严格进行错误处理。即使这段代码带来了任何不利影响,也请自行承担责任。