準備

GCPでのProjectとGCS <- Kubernetesクラスタがあればいらない
Twitter Developersアカウント

TwitterのCredentialsの準備

https://developer.twitter.com/en/apps/ で

    • API key

 

    • API secret key

 

    • Access token

 

    access token secret

を準備。

確認方法

import tweepy

auth = tweepy.OAuthHandler('api key', 'api secret key')
auth.set_access_token('access token', 'access token secret')

api = tweepy.API(auth)

#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        print(status.text)


def show_my_stream():
    myStreamListener = MyStreamListener()
    myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)

    myStream.filter(track=['corona'])

show_my_stream()
python twitter_stream_consumer.py
RT @dougmar_: the United States is the only country that still have a corona virus problem and Trump wants to focus on tik tok?
RT @RPNishank2020: 13sept se accha to 3 may ko lete exam.
Ab kahoge ki kisko pata tha case badhege.
To tumhe officer kisne banaya
21 din…
Walah.. tiwas seneng jenengku kesebut, ending2 kok marai drop ?
Solo te falta el virus por que la corona ya la Tenes mi reina ?
RT @inewsdotid: Bos WHO Tedros Adhanom Ghebreyesus mengingatkan pandemi virus corona merupakan krisis kesehatan global yang dampaknya bisa…
RT @AvinashBhondwe: Maha. CM visited Pune for supervising Corona Pandemic Control measures. We thought he will ...

出てきたらオッケー

Kubernetes Clusterを建てる

(GKEとかEKSとかローカルのMinikubeかk3dなどで)

自分は今回TerraformのGKEで適当にクラスタ立てた

    https://learn.hashicorp.com/terraform/kubernetes/provision-gke-cluster
terraform apply

kubeconfigを更新

gcloud container clusters get-credentials --project smooth-ace-276614 --zone asia-northeast1-a my-gke-cluster
Fetching cluster endpoint and auth data.
kubeconfig entry generated for my-gke-cluster.

StrimziでKafkaクラスタ+ Kafka Connectを建てる

Strimzi というKafka on Kubernetes in a few minutesというのを使って遊ぶ

(あとでOperator + Kafka Cluster + Kafka Connect + Kafka Connectorをわけるよてい)

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: twitter-connector
  labels:
    strimzi.io/cluster: kafka-connect-source
spec:
  class: com.eneco.trading.kafka.connect.twitter.TwitterSourceConnector
  tasksMax: 2
  config:
    topic: twitter
    track.terms: corona
    language: en
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter: org.apache.kafka.connect.json.JsonConverter
    twitter.token: xxxx
    twitter.secret: xxxx
    twitter.consumerkey: xxxx
    twitter.consumersecret: xxxx
kubectl create namespace kafka-strimzi-18
kubectl apply -k overlays/kafka-strimzi-18

console-consumerでconsumeしてみると、Twitterがとれてる!

kubectl run kafka-consumer -ti --image=strimzi/kafka:0.18.0-kafka-2.5.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic twitter --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"created_at"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"screen_name"},{"type":"string","optional":true,"field":"location"},{"type":"boolean","optional":false,"field":"verified"},{"type":"int32","optional":false,"field":"friends_count"},{"type":"int32","optional":false,"field":"followers_count"},{"type":"int32","optional":false,"field":"statuses_count"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.User","field":"user"},{"type":"string","optional":true,"field":"text"},{"type":"string","optional":true,"field":"lang"},{"type":"boolean","optional":false,"field":"is_retweet"},{"type":"struct","fields":[{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"text"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Hashtag"},"optional":true,"field":"hashtags"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"display_url"},{"type":"string","optional":true,"field":"expanded_url"},{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"type"},{"type":"string","optional":true,"field":"url"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Medium"},"optional":true,"field":"media"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":true,"field":"display_url"},{"type":"string","optional":true,"field":"expanded_url"},{"type":"string","optional":true,"field":"url"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Url"},"optional":true,"field":"urls"},{"type":"array","items":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"screen_name"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.UserMention"},"optional":true,"field":"user_mentions"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Entities","field":"entities"}],"optional":false,"name":"com.eneco.trading.kafka.connect.twitter.Tweet"},"payload":{"id":1290101985916592128,"created_at":"2020-08-03T01:47:37.000+0000","user":{"id":2901232483,"name":"Keryi\uD83E\uDD8B","screen_name":"keryikeryi","location":"Seattle, WA","verified":false,"friends_count":153,"followers_count":222,"statuses_count":13157},"text":"RT @LilNasX: corona is that nigga who already graduated but won’t stop coming up to the school","lang":"en","is_retweet":true,"entities":{"hashtags":[],"media":[],"urls":[],"user_mentions":[{"id":754006735468261376,"name":"nope","screen_name":"LilNasX"}]}}}
^CProcessed a total of 1056 messages

コード

kafka-connect-twitterが古すぎ問題

古くてだれもメンテしてない模様だったので、PR出してあったやつを使ってPluginやいたから、完全に野良。

广告
将在 10 秒后关闭
bannerAds