[Python] 尝试使用 AWS 的无服务器架构来创建一个事件驱动的Web爬虫

尝试使用无服务器和事件驱动的方式创建网页爬虫。

因为我正在学习ElasticSearch,所以我想通过使用ES来做些什么。于是,我尝试编写了一个基于Kinesis+Lambda的事件驱动的网络爬虫。

    • 実行環境

CentOS7
python 2.7

工作流程

serverless-crawler.png

大致的流程如下:
1. 使用Scrapy(ScrapingHub或AWS Lambda)提取URL,并将其放入Kinesis流中。
2. 从Kinesis流中触发AWS Lambda。
3. Lambda函数通过URL进行爬取,并将数据传输至ElasticSearch Service。

创建IAM用户

在使用Kinesis和ElasticSearch时,需要相应的权限。所以需要准备好每个服务的访问密钥ID和秘密访问密钥。

此外,需要牢记用户的ARN (arn:aws:iam::**********:user/*********)。

使用AWS Kinesis创建一个数据流。

スクリーンショット 2017-04-06 12.45.52.png

创建 AWS ElasticSearch Service

接下来我们将通过Amazon ElasticSearch Service创建一个ES实例。

在AWS上的操作

スクリーンショット 2017-04-06 13.30.18.png

使用ElasticSearch进行索引创建和映射。

创建一个用于保存URL、标题和文章内容的映射数据,用于文章保存。
mapping.json
{
“mappings”: {
“article”: {
“properties” : {
“url” : {
“type”: “string”,
“index” : “not_analyzed”
},
“title” : {
“type”: “string”,
“index” : “analyzed”
},
“contents” : {
“type”: “string”,
“index” : “analyzed”
}
}
}
}
}

接下来,将生成一个脚本来创建上述映射数据和索引。

事先在本地安装以下包:
$ pip 安装 requests_aws4auth elasticsearch

# -*- coding: utf-8 -*-
import elasticsearch
from requests_aws4auth import AWS4Auth
import json

if __name__ == '__main__':
    # ESのエンドポイントを指定
    host='search-***************.ap-northeast-1.es.amazonaws.com'
    awsauth = AWS4Auth(
            # AWSユーザーのアクセスキーIDとシークレットアクセスキー
            'ACCESS_KRY_ID',
            'SECRET_ACCESS_KEY',
            'ap-northeast-1', 'es')

    es = elasticsearch.Elasticsearch(
            hosts=[{'host': host, 'port': 443}],
            http_auth=awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=elasticsearch.connection.RequestsHttpConnection
            )

    f = open('mapping.json', 'r')
    mapping = json.load(f)

    es.indices.create(index='website')                    
    es.indices.put_mapping(index='website', doc_type='article', body=mapping['mappings'])
スクリーンショット 2017-04-06 15.32.52.png

创建 AWS Lambda

我创建了Elasticsearch,接下来我将创建Lambda函数。

创建Lambda函数

在本地创建一个Lambda函数。
$ mkdir web_crawler
$ cd web_crawler
$ vim lambda_function.py

在本地创建一个Lambda函数。
$ 创建一个名为web_crawler的文件夹
$ 进入web_crawler文件夹
$ 使用vim编辑器创建lambda_function.py文件


# -*- coding: utf-8 -*-                    
import os
import base64
from readability import Document
import html2text
import requests
import elasticsearch
from elasticsearch import helpers
from requests_aws4auth import AWS4Auth

def lambda_handler(event, context):
    host = os.environ['ES_HOST']
    # ElasticSearch Serviceへの認証にIAM Roleを利用する
    awsauth = AWS4Auth(
            os.environ['ACCESS_ID'],
            os.environ['SECRET_KEY'], 'ap-northeast-1', 'es')

    es = elasticsearch.Elasticsearch(
            hosts=[{'host': host, 'port': 443}],
            http_auth=awsauth,
            use_ssl=True,
            verify_certs=True,
            connection_class=elasticsearch.connection.RequestsHttpConnection
    )

    articles = []

    # Kinesis Streamからイベントを取得
    for record in event['Records']:
        payload = base64.b64decode(record['kinesis']['data'])
        try:
            response = requests.get(payload)
            if response.ok:
                article = Document(response.content).summary()
                titleText = html2text.html2text(Document(response.content).title())
                contentsText = html2text.html2text(article)
                res = es.search(index="website", body={"query": {"match": {"url": payload}}})
                # ESにURLが既に登録されているか
                if res['hits']['total'] is 0:
                    doc = {
                        'url': payload,
                        'title': titleText.encode('utf-8'),
                        'contents': contentsText.encode('utf-8')
                    }
                    articles.append({'_index':'website', '_type':'scraper', '_source':doc})
        except requests.exceptions.HTTPError as err:
            print("HTTPError: " + err)                                                                                                                                                       
    # Bulk Insert
    helpers.bulk(es, articles)

创建Lambda函数后,需要将所需的库安装到同一层。
$ pip install readability-lxml html2text elasticsearch requests_aws4auth requests -t /path/to/web_crawler
然后将它们打包成zip文件。
$ zip -r web_crawler.zip .

将Lambda函数部署到AWS

スクリーンショット 2017-04-06 16.24.30.png

使用Scrapy进行URL抽取,并将其发送到Kinesis数据流中。

下一步是最后阶段,我们将使用Scrapy从列表页面中提取URL,并将数据发送到Kinesis流中试一试。

一覽頁面使用了「はてなブックマーク」的熱門文章。雖然使用Scrapy可以更輕鬆地從RSS中獲取數據,但我故意選擇了從網頁上進行爬蟲。Scrapy是一個方便且強大的框架,當你有興趣時可以嘗試使用它來建立高級的網絡爬蟲。

    Scrapy

创建项目

首先安装Scrapy
$ pip安装scrapy
$ scrapy startproject hotentry
$ vim hotentry/hotentry/spiders/hotentry.py
输入以下代码。

# -*- coding: utf-8 -*-
import scrapy
from scrapy.conf import settings
import boto3
import json

kinesis = boto3.client(
        'kinesis',                                                                                                                                                                           
        aws_access_key_id=settings['AWS_ACCESS_KEY_ID'],
        aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY'],
        region_name='ap-northeast-1')

class HotEntrySpider(scrapy.Spider):
    name = "hotentry"
    allowed_domains = ["b.hatena.ne.jp"]
    start_urls = ['http://b.hatena.ne.jp/hotentry/general']

    def parse(self, response):
        for sel in response.css("li.hb-entry-unit-with-favorites"):
            url = sel.css("a.entry-link::attr('href')").extract_first()
            if url is None:
                continue
            kinesis.put_record(
                    StreamName = "scraping_url",
                    Data = sel.css("a.entry-link::attr('href')").extract_first(),
                    PartitionKey = "scraper"
            )

在hotentry/hotentry/settings.py文件中添加Access Key ID和Secret Access Key:

AWS_ACCESS_KEY_ID = ‘AKI******************’
AWS_SECRET_ACCESS_KEY = ‘************************************’

我现在可以将这段代码放入Kinesis流中进行PUT操作。让我们尝试执行一下来测试一下。

用Scrapy将数据发送到Kinesis,然后通过AWS Lambda将数据发送到ElasticSearch,就应该可以完成这个任务了。

将Scrapy部署到Scrapinghub。

我能够使用Scrapy提取URL并发送到Kinesis,但如果保持这样的话,它将成为本地批处理。因此,我将Scrapy的代码部署到名为Scrapinghub的云服务上。

请参考以下详细的文章来了解导入方法:
* 通过Scrapy + Scrapy Cloud来享受舒适的Python爬虫和网页抓取生活。

由于用户注册到部署非常简单,我将简要概述。

最后

最初,我們將SQS和DynamoDB使用Lambda函數分開,然而由於變得複雜而無法追蹤錯誤,最終失敗了。果然簡單就是最好的。希望Lambda的觸發器能夠支持更多的服務。

因为这段代码只是用于测试,所以并没有严格进行错误处理。即使这段代码带来了任何不利影响,也请自行承担责任。

广告
将在 10 秒后关闭
bannerAds