试着使用Elasticsearch和Kibana将Twitter的热门关键词可视化

我是大久保。

最近,在公司中有机会使用经典的组合Elasticsearch+Kibana+Fluentd进行日志分析,于是我顺便学习了一些相关知识。

尝试一下觉得有趣的是,Elasticsearch不仅可以用于日志分析,还可以像一个小型键值存储一样运作。

通过将Elasticsearch与Kibana结合使用,我们可以做出更多有趣的事情!在本文中,我们编写了一个程序来实时汇总Twitter的趋势词作为一个例子。

Kobito.ZqzBFh.png

开发环境和各种中间件的版本。

如果在本地可以确认的话,开发环境就是Mac。

Java:

Java:

由于Elasticsearch是使用Java编写的软件,因此需要检查Java的版本。

$ java -version
java version "1.8.0_40"
Java(TM) SE Runtime Environment (build 1.8.0_40-b27)
Java HotSpot(TM) 64-Bit Server VM (build 25.40-b25, mixed mode)

Elasticsearch 弹性搜索

我只需要一个选项就可以了:由于Elasticsearch的最新版本是2.2.1,所以我直接用brew安装,不需要指定版本。

$ brew install elasticsearch
==> Downloading https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.2.1/elasticsearch-2.2.1.tar.gz
######################################################################## 100.0%
==> Caveats
Data:    /usr/local/var/elasticsearch/elasticsearch_user_name/
Logs:    /usr/local/var/log/elasticsearch/elasticsearch_user_name.log
Plugins: /usr/local/Cellar/elasticsearch/2.2.1/libexec/plugins/
Config:  /usr/local/etc/elasticsearch/
plugin script: /usr/local/Cellar/elasticsearch/2.2.1/libexec/bin/plugin

To have launchd start elasticsearch at login:
  ln -sfv /usr/local/opt/elasticsearch/*.plist ~/Library/LaunchAgents
Then to load elasticsearch now:
  launchctl load ~/Library/LaunchAgents/homebrew.mxcl.elasticsearch.plist
Or, if you don't want/need launchctl, you can just run:
  elasticsearch
==> Summary
?  /usr/local/Cellar/elasticsearch/2.2.1: 57 files, 31.3M, built in 23 seconds

确认动作

在后台启动Elasticsearch。

$ elasticsearch

请在另一个终端中发送请求进行操作确认。如果按照下面的感觉返回信息,那就可以了。

$ curl http://localhost:9200/
{
  "name" : "Astron",
  "cluster_name" : "elasticsearch_user_name",
  "version" : {
    "number" : "2.2.1",
    "build_hash" : "d045fc29d1932bce18b2e65ab8b297fbf6cd41a1",
    "build_timestamp" : "2016-03-09T09:38:54Z",
    "build_snapshot" : false,
    "lucene_version" : "5.4.1"
  },
  "tagline" : "You Know, for Search"
}

注意:

如果使用brew安装Elasticsearch时遇到各种问题,请尝试执行以下命令:

$ brew update

这样可能会有帮助。我自己也遇到了一些困扰。

黑色字分词插件

为了使Elasticsearch支持日语搜索,需要安装一个名为 kuromoji 的插件。
如果您是通过brew安装Elasticsearch的,则需要设置路径才能使用plugin命令。

插件脚本:/usr/local/Cellar/elasticsearch/2.2.0_1/libexec/bin/plugin

由于在安装过程中会很详细地告诉我插件脚本,所以我会将该插件脚本的bin路径添加到.bash_profile中。

# add for Elasticsearch 2.2
export PATH="/usr/local/Cellar/elasticsearch/2.2.1/libexec/bin:$PATH"

重新加载.bash_profile

source .bash_profile

分析-安装和确认Kuromoji

$ plugin install analysis-kuromoji
$ curl -X GET 'http://localhost:9200/_nodes/plugins?pretty'
{
  "cluster_name" : "elasticsearch_user_name",
  "nodes" : {
      ...
      "plugins" : [ {
        "name" : "analysis-kuromoji",
        "version" : "2.2.1",
        "description" : "The Japanese (kuromoji) Analysis plugin integrates Lucene kuromoji analysis module into elasticsearch.",
        "jvm" : true,
        "classname" : "org.elasticsearch.plugin.analysis.kuromoji.AnalysisKuromojiPlugin",
        "isolated" : true,
        "site" : false
      } ],
      ...
  }
}

确认安装完成且没有问题。

Kibana – 可视化分析平台

由于Kibana最新版本很好,我安装了4.4版本。据说Kibana4可以通过内部启动HTTP服务器来独立运行。(网络上有很多关于Kibana3的文章,起初我有些混乱。。)

在适当的目录中执行以下操作。

$ curl -O https://download.elastic.co/kibana/kibana/kibana-4.4.2-darwin-x64.tar.gz
$ tar zxvf kibana-4.4.2-darwin-x64.tar.gz
$ mv kibana-4.4.2-darwin-x64 kibana
$ ./kibana/bin/kibana 
Kobito.lYucZY.png

Python编程语言

这次我使用Python来获取和整理推文。(如果只是获取推文的话,因为有Elasticsearch的插件,可能根本不需要编写代码)

$ python -V
Python 3.5.1 :: Anaconda 2.4.1 (x86_64)
$ pip -V
pip 8.1.1 from /Users/user_name/anaconda/lib/python3.5/site-packages (python 3.5)

本次使用Twitter Stream API来获取推文,所以需要安装相应的Python Twitter库。

$ pip install twitter

由于推文数据的存储位置将是Elasticsearch,因此需要在Python中安装一个用于向Elasticsearch发送请求的库。

$ pip install elasticsearch

收集推文

在收集推文的过程中,我们按照以下流程编写了程序:
1. 使用Twitter的GET trends/place功能获取日本的热门话题词。
2. 根据获取的热门话题词,利用POST statuses/filter进行流式连接,实时获取推文。
3. 将获取到的推文进行格式化,并不断地将其存入Elasticsearch。
4. 经过5分钟后,为了获取新的热门话题词,返回步骤1。

以下是实际的代码示例。

from pytz import timezone
from dateutil import parser
from datetime import datetime
from elasticsearch import Elasticsearch
from twitter import Twitter, TwitterStream, OAuth
from threading import Timer, get_ident

# https://apps.twitter.com/ でTwitterアプリを登録して取得したOAuthキーを設定。
OAUTH_INFO = dict(
    token="123456789-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
    token_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
    consumer_key="XXXXXXXXXXXXXXXXXX",
    consumer_secret="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")

STREAM_INFO = dict(
    timeout=600,
    block=False,
    heartbeat_timeout=600) # デフォルトだと90秒でストリーム維持のためのheartbeatが飛んで来るので10分に設定

JST = timezone('Asia/Tokyo')
WOEID_JP = 23424856 # 日本のWOEID

class TwitterTrendStream():

    def __init__(self):
        self.__current_thread_ident = None
        self.__oauth = OAuth(**OAUTH_INFO)
        self.__es = Elasticsearch()

    def __fetch_trands(self, twitter):
        response = twitter.trends.place(_id=WOEID_JP)
        return [trend["name"] for trend in response[0]["trends"]]

    def __fetch_filter_stream(self, twitter_stream, track_list):
        track = ",".join(track_list)
        return twitter_stream.statuses.filter(track=track)

    def run(self):
        self.__current_thread_ident = get_ident() # 現在の実行スレッドIDを登録
        Timer(300, self.run).start()              # 5分後に新たなスレッドを開始

        twitter = Twitter(auth=self.__oauth)
        twitter_stream = TwitterStream(auth=self.__oauth, **STREAM_INFO)

        trend_list = self.__fetch_trands(twitter)
        tweet_iter = self.__fetch_filter_stream(twitter_stream, trend_list)

        for tweet in tweet_iter:
            if "limit" in tweet: # 取得上限超えた時にくるLimit Jsonは無視
                continue

            if self.__current_thread_ident != get_ident(): # 新たなスレッドが立ち上がったら現在のストリームを終了させる
                return True
            for trend in trend_list:
                if trend in tweet['text']:
                    doc = {
                        'track': trend,
                        'text': tweet['text'],
                        'created_at': str(parser.parse(tweet['created_at']).astimezone(JST).isoformat())
                    }
                    self.__es.index(index="testindex", doc_type='tweet', body=doc)

if __name__ == '__main__':
    TwitterTrendStream().run()

只要在Elasticsearch运行的状态下执行,它会将包含所获取的趋势的推文无止境地存储到Elasticsearch中。

数据映射设置

然而,如果直接执行上述代码,将无法正常生成映射。具体情况如下所示。

$ curl http://localhost:9200/testindex/_mapping?pretty
{
  "testindex" : {
    "mappings" : {
      "tweet" : {
        "properties" : {
          "created_at" : {
            "type" : "date",
            "format" : "strict_date_optional_time||epoch_millis"
          },
          "text" : {
            "type" : "string"
          },
          "track" : {
            "type" : "string"
          }
        }
      }
    }
  }
}

使用Elasticsearch的动态映射自动生成的映射JSON中,”text”和”track”的属性只能是字符串。
如果在Kibana中直接显示,按趋势词汇进行汇总的”track”将被分成日文的每个字,或者根本无法进行有效搜索,因此无法使用。
因此,我们需要在”track”中添加 “index” : “not_analyzed” ,在 “text”中添加 “analyzer” : “japanese”。
参考:Elasticsearch + Kibanaで日本語検索の続き

另外,由于一旦生成的映射基本上很难更新,因此需要先删除然后重新创建。

具体来说,可以使用以下命令重新创建。

$ curl -X DELETE http://localhost:9200/testindex?pretty
{
  "acknowledged" : true
}
$ curl -XPUT localhost:9200/testindex/ -d '
{
  "settings":{
     "index":{
        "analysis":{
           "tokenizer" : {
               "kuromoji" : {
                  "type" : "kuromoji_tokenizer"
               }
           },
           "analyzer" : {
               "japanese" : {
                   "type" : "custom",
                   "tokenizer" : "kuromoji"
               }
           }
        }
     }
  },
  "mappings":{
    "tweet":{
      "properties":{
        "created_at" : {
          "type" : "date",
          "format" : "strict_date_optional_time||epoch_millis"
        },
        "text" : {
          "type" : "string",
          "analyzer": "japanese"
        },
        "track": {
          "type": "string",
          "index": "not_analyzed"
        }
      }
    }
  }
}'

顺便提一下,如果你想在Elasticsearch中进行动态映射日期,就需要按照官方提供的相应日期格式进行匹配。这次我们选择将输出的时间使用ISO格式来适配。

#确认行动

在运行Elasticsearch和Kibana的情况下,执行Python。

访问 http://localhost:5601 并注册要显示的索引。以上代码中的索引是 testindex 。确认已注册的索引后,可以看到 track 的 analyzed 部分没有被勾选中。这是因为 track 设置为了 not_analyzed。

skitched-20160321-155852.png

我认为你可以在Discover上看到推特正在不断地流动。

Kobito.nNbLwN.png

下面是根据当前正在被推特上发表的推文,实时按照热门话题进行聚合并在Kibana上生成图形化展示的数据。

Kobito.ZqzBFh.png

这一小时从下午14点到15点,我们进行了收集。因为当天恰好是Twitter十周年,所以“#LoveTwitter”标签下的推文数量非常多。由于收集的推文数量太多,我们多次遇到了Twitter流的输出限制。因此,并不能获取到所有发布的推文。

顺便提一下,当只考虑排名第三的节目”#说到底委员会NP要停播”的时间线推文数量时,情况如下。

Kobito.NFHQK1.png

可能是因为电视节目的内容引起了热议,但是在分钟级别上似乎没有太多可得到的信息呢。。

总结或者说是感想

我通过使用Elasticsearch和Kibana来可视化了Twitter的热门话题。
通过利用Elasticsearch和Kibana,我真切地感受到了能够以非常迅速和轻便的方式进行数据分析!
如果不使用这些工具的话,就需要准备数据库、搭建Web服务器、编写前端等等,这样就会变得非常繁琐,差不多要构建一个小型的Web服务。
我所编写的代码本身只有几十行核心Python代码,实现起来非常精简。(我最近刚开始学写Python,如果有什么问题,请评论提醒我。。)
不过,我也觉得学习使用Elasticsearch本身的学习成本是相当高的。
关于映射和库的导入,因为缺乏可参考的日语信息,所以经常会在设置方面遇到问题。此外,Elasticsearch在进行聚合计算时会消耗相当多的内存,所以在作为一个完善的服务进行利用时,我感觉需要进行调优,包括JSON的设计也在其中。。

参考资料:
・[Python] 使用Python3调用Twitter搜索API
・threading — 基于线程的并行处理
・尝试在Python中使用线程
・继续使用Elasticsearch + Kibana进行日本语搜索
・设置ElasticSearch的分析器配置

这篇文章是转载自Septeni Engineer’s Blog。

广告
将在 10 秒后关闭
bannerAds