试着使用Elasticsearch和Kibana将Twitter的热门关键词可视化
我是大久保。
最近,在公司中有机会使用经典的组合Elasticsearch+Kibana+Fluentd进行日志分析,于是我顺便学习了一些相关知识。
尝试一下觉得有趣的是,Elasticsearch不仅可以用于日志分析,还可以像一个小型键值存储一样运作。
通过将Elasticsearch与Kibana结合使用,我们可以做出更多有趣的事情!在本文中,我们编写了一个程序来实时汇总Twitter的趋势词作为一个例子。
开发环境和各种中间件的版本。
如果在本地可以确认的话,开发环境就是Mac。
Java:
Java:
由于Elasticsearch是使用Java编写的软件,因此需要检查Java的版本。
$ java -version
java version "1.8.0_40"
Java(TM) SE Runtime Environment (build 1.8.0_40-b27)
Java HotSpot(TM) 64-Bit Server VM (build 25.40-b25, mixed mode)
Elasticsearch 弹性搜索
我只需要一个选项就可以了:由于Elasticsearch的最新版本是2.2.1,所以我直接用brew安装,不需要指定版本。
$ brew install elasticsearch
==> Downloading https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.1/elasticsearch-2.2.1.tar.gz
######################################################################## 100.0%
==> Caveats
Data: /usr/local/var/elasticsearch/elasticsearch_user_name/
Logs: /usr/local/var/log/elasticsearch/elasticsearch_user_name.log
Plugins: /usr/local/Cellar/elasticsearch/2.2.1/libexec/plugins/
Config: /usr/local/etc/elasticsearch/
plugin script: /usr/local/Cellar/elasticsearch/2.2.1/libexec/bin/plugin
To have launchd start elasticsearch at login:
ln -sfv /usr/local/opt/elasticsearch/*.plist ~/Library/LaunchAgents
Then to load elasticsearch now:
launchctl load ~/Library/LaunchAgents/homebrew.mxcl.elasticsearch.plist
Or, if you don't want/need launchctl, you can just run:
elasticsearch
==> Summary
? /usr/local/Cellar/elasticsearch/2.2.1: 57 files, 31.3M, built in 23 seconds
确认动作
在后台启动Elasticsearch。
$ elasticsearch
请在另一个终端中发送请求进行操作确认。如果按照下面的感觉返回信息,那就可以了。
$ curl http://localhost:9200/
{
"name" : "Astron",
"cluster_name" : "elasticsearch_user_name",
"version" : {
"number" : "2.2.1",
"build_hash" : "d045fc29d1932bce18b2e65ab8b297fbf6cd41a1",
"build_timestamp" : "2016-03-09T09:38:54Z",
"build_snapshot" : false,
"lucene_version" : "5.4.1"
},
"tagline" : "You Know, for Search"
}
注意:
如果使用brew安装Elasticsearch时遇到各种问题,请尝试执行以下命令:
$ brew update
这样可能会有帮助。我自己也遇到了一些困扰。
黑色字分词插件
为了使Elasticsearch支持日语搜索,需要安装一个名为 kuromoji 的插件。
如果您是通过brew安装Elasticsearch的,则需要设置路径才能使用plugin命令。
插件脚本:/usr/local/Cellar/elasticsearch/2.2.0_1/libexec/bin/plugin
由于在安装过程中会很详细地告诉我插件脚本,所以我会将该插件脚本的bin路径添加到.bash_profile中。
# add for Elasticsearch 2.2
export PATH="/usr/local/Cellar/elasticsearch/2.2.1/libexec/bin:$PATH"
重新加载.bash_profile
source .bash_profile
分析-安装和确认Kuromoji
$ plugin install analysis-kuromoji
$ curl -X GET 'http://localhost:9200/_nodes/plugins?pretty'
{
"cluster_name" : "elasticsearch_user_name",
"nodes" : {
...
"plugins" : [ {
"name" : "analysis-kuromoji",
"version" : "2.2.1",
"description" : "The Japanese (kuromoji) Analysis plugin integrates Lucene kuromoji analysis module into elasticsearch.",
"jvm" : true,
"classname" : "org.elasticsearch.plugin.analysis.kuromoji.AnalysisKuromojiPlugin",
"isolated" : true,
"site" : false
} ],
...
}
}
确认安装完成且没有问题。
Kibana – 可视化分析平台
由于Kibana最新版本很好,我安装了4.4版本。据说Kibana4可以通过内部启动HTTP服务器来独立运行。(网络上有很多关于Kibana3的文章,起初我有些混乱。。)
在适当的目录中执行以下操作。
$ curl -O https://download.elastic.co/kibana/kibana/kibana-4.4.2-darwin-x64.tar.gz
$ tar zxvf kibana-4.4.2-darwin-x64.tar.gz
$ mv kibana-4.4.2-darwin-x64 kibana
$ ./kibana/bin/kibana
Python编程语言
这次我使用Python来获取和整理推文。(如果只是获取推文的话,因为有Elasticsearch的插件,可能根本不需要编写代码)
$ python -V
Python 3.5.1 :: Anaconda 2.4.1 (x86_64)
$ pip -V
pip 8.1.1 from /Users/user_name/anaconda/lib/python3.5/site-packages (python 3.5)
本次使用Twitter Stream API来获取推文,所以需要安装相应的Python Twitter库。
$ pip install twitter
由于推文数据的存储位置将是Elasticsearch,因此需要在Python中安装一个用于向Elasticsearch发送请求的库。
$ pip install elasticsearch
收集推文
在收集推文的过程中,我们按照以下流程编写了程序:
1. 使用Twitter的GET trends/place功能获取日本的热门话题词。
2. 根据获取的热门话题词,利用POST statuses/filter进行流式连接,实时获取推文。
3. 将获取到的推文进行格式化,并不断地将其存入Elasticsearch。
4. 经过5分钟后,为了获取新的热门话题词,返回步骤1。
以下是实际的代码示例。
from pytz import timezone
from dateutil import parser
from datetime import datetime
from elasticsearch import Elasticsearch
from twitter import Twitter, TwitterStream, OAuth
from threading import Timer, get_ident
# https://apps.twitter.com/ でTwitterアプリを登録して取得したOAuthキーを設定。
OAUTH_INFO = dict(
token="123456789-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
token_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
consumer_key="XXXXXXXXXXXXXXXXXX",
consumer_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
STREAM_INFO = dict(
timeout=600,
block=False,
heartbeat_timeout=600) # デフォルトだと90秒でストリーム維持のためのheartbeatが飛んで来るので10分に設定
JST = timezone('Asia/Tokyo')
WOEID_JP = 23424856 # 日本のWOEID
class TwitterTrendStream():
def __init__(self):
self.__current_thread_ident = None
self.__oauth = OAuth(**OAUTH_INFO)
self.__es = Elasticsearch()
def __fetch_trands(self, twitter):
response = twitter.trends.place(_id=WOEID_JP)
return [trend["name"] for trend in response[0]["trends"]]
def __fetch_filter_stream(self, twitter_stream, track_list):
track = ",".join(track_list)
return twitter_stream.statuses.filter(track=track)
def run(self):
self.__current_thread_ident = get_ident() # 現在の実行スレッドIDを登録
Timer(300, self.run).start() # 5分後に新たなスレッドを開始
twitter = Twitter(auth=self.__oauth)
twitter_stream = TwitterStream(auth=self.__oauth, **STREAM_INFO)
trend_list = self.__fetch_trands(twitter)
tweet_iter = self.__fetch_filter_stream(twitter_stream, trend_list)
for tweet in tweet_iter:
if "limit" in tweet: # 取得上限超えた時にくるLimit Jsonは無視
continue
if self.__current_thread_ident != get_ident(): # 新たなスレッドが立ち上がったら現在のストリームを終了させる
return True
for trend in trend_list:
if trend in tweet['text']:
doc = {
'track': trend,
'text': tweet['text'],
'created_at': str(parser.parse(tweet['created_at']).astimezone(JST).isoformat())
}
self.__es.index(index="testindex", doc_type='tweet', body=doc)
if __name__ == '__main__':
TwitterTrendStream().run()
只要在Elasticsearch运行的状态下执行,它会将包含所获取的趋势的推文无止境地存储到Elasticsearch中。
数据映射设置
然而,如果直接执行上述代码,将无法正常生成映射。具体情况如下所示。
$ curl http://localhost:9200/testindex/_mapping?pretty
{
"testindex" : {
"mappings" : {
"tweet" : {
"properties" : {
"created_at" : {
"type" : "date",
"format" : "strict_date_optional_time||epoch_millis"
},
"text" : {
"type" : "string"
},
"track" : {
"type" : "string"
}
}
}
}
}
}
使用Elasticsearch的动态映射自动生成的映射JSON中,”text”和”track”的属性只能是字符串。
如果在Kibana中直接显示,按趋势词汇进行汇总的”track”将被分成日文的每个字,或者根本无法进行有效搜索,因此无法使用。
因此,我们需要在”track”中添加 “index” : “not_analyzed” ,在 “text”中添加 “analyzer” : “japanese”。
参考:Elasticsearch + Kibanaで日本語検索の続き
另外,由于一旦生成的映射基本上很难更新,因此需要先删除然后重新创建。
具体来说,可以使用以下命令重新创建。
$ curl -X DELETE http://localhost:9200/testindex?pretty
{
"acknowledged" : true
}
$ curl -XPUT localhost:9200/testindex/ -d '
{
"settings":{
"index":{
"analysis":{
"tokenizer" : {
"kuromoji" : {
"type" : "kuromoji_tokenizer"
}
},
"analyzer" : {
"japanese" : {
"type" : "custom",
"tokenizer" : "kuromoji"
}
}
}
}
},
"mappings":{
"tweet":{
"properties":{
"created_at" : {
"type" : "date",
"format" : "strict_date_optional_time||epoch_millis"
},
"text" : {
"type" : "string",
"analyzer": "japanese"
},
"track": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}'
顺便提一下,如果你想在Elasticsearch中进行动态映射日期,就需要按照官方提供的相应日期格式进行匹配。这次我们选择将输出的时间使用ISO格式来适配。
#确认行动
在运行Elasticsearch和Kibana的情况下,执行Python。
访问 http://localhost:5601 并注册要显示的索引。以上代码中的索引是 testindex 。确认已注册的索引后,可以看到 track 的 analyzed 部分没有被勾选中。这是因为 track 设置为了 not_analyzed。
我认为你可以在Discover上看到推特正在不断地流动。
下面是根据当前正在被推特上发表的推文,实时按照热门话题进行聚合并在Kibana上生成图形化展示的数据。
这一小时从下午14点到15点,我们进行了收集。因为当天恰好是Twitter十周年,所以“#LoveTwitter”标签下的推文数量非常多。由于收集的推文数量太多,我们多次遇到了Twitter流的输出限制。因此,并不能获取到所有发布的推文。
顺便提一下,当只考虑排名第三的节目”#说到底委员会NP要停播”的时间线推文数量时,情况如下。
可能是因为电视节目的内容引起了热议,但是在分钟级别上似乎没有太多可得到的信息呢。。
总结或者说是感想
我通过使用Elasticsearch和Kibana来可视化了Twitter的热门话题。
通过利用Elasticsearch和Kibana,我真切地感受到了能够以非常迅速和轻便的方式进行数据分析!
如果不使用这些工具的话,就需要准备数据库、搭建Web服务器、编写前端等等,这样就会变得非常繁琐,差不多要构建一个小型的Web服务。
我所编写的代码本身只有几十行核心Python代码,实现起来非常精简。(我最近刚开始学写Python,如果有什么问题,请评论提醒我。。)
不过,我也觉得学习使用Elasticsearch本身的学习成本是相当高的。
关于映射和库的导入,因为缺乏可参考的日语信息,所以经常会在设置方面遇到问题。此外,Elasticsearch在进行聚合计算时会消耗相当多的内存,所以在作为一个完善的服务进行利用时,我感觉需要进行调优,包括JSON的设计也在其中。。
参考资料:
・[Python] 使用Python3调用Twitter搜索API
・threading — 基于线程的并行处理
・尝试在Python中使用线程
・继续使用Elasticsearch + Kibana进行日本语搜索
・设置ElasticSearch的分析器配置
这篇文章是转载自Septeni Engineer’s Blog。