我尝试了一下 Kafka 与 ElasticSearch 的协作

在之前的文章中,我们介绍了如何使用阿里巴巴云E-MapReduce来集成Apache Kafka和Apache Spark Streaming。在本文中,我们将继续使用Twitter消息的示例,介绍在阿里巴巴云上集成Kafka和Elasticsearch的配置,该配置已在许多企业中得到应用。

关于验证环境

卡夫卡
    • EMR-3.20.0

 

    • Zookeeper 3.4.13

 

    • Kakfa 1.1.1

 

    • クラスタータイプは Kafka

 

    • ハードウェア構成(Header)はecs.sn2.largeを1台

 

    ハードウェア構成(Worker)はecs.sn2.largeを2台

希望您能参考官方文档了解创建集群的步骤。

# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core) 
# uname -r
3.10.0-693.2.2.el7.x86_64
# echo envi | nc localhost 2181
Environment:
zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
host.name=emr-header-1.cluster-43709
java.version=1.8.0_151
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre
java.class.path=/usr/lib/zookeeper-current/bin/../build/classes:/usr/lib/zookeeper-current/bin/../build/lib/*.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-api-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/netty-3.10.6.Final.jar:/usr/lib/zookeeper-current/bin/../lib/log4j-1.2.17.jar:/usr/lib/zookeeper-current/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper-current/bin/../lib/audience-annotations-0.5.0.jar:/usr/lib/zookeeper-current/bin/../zookeeper-3.4.13.jar:/usr/lib/zookeeper-current/bin/../src/java/lib/*.jar:/etc/ecm/zookeeper-conf::/var/lib/ecm-agent/data/jmxetric-1.0.8.jar
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=3.10.0-693.2.2.el7.x86_64
user.name=hadoop
user.home=/home/hadoop
user.dir=/home/hadoop
软管鸟客户端

Hosebird Client是一个Java Http客户端,用于与Kafka Producer配合调用Twitter的Streaming API。如果您想了解更多详情,请参考下面的github链接:[https://github.com/twitter/hbc]

整体结构图

首先,阿里巴巴云的架构图如下。为了快速构建Elasticsearch环境,我们使用了阿里巴巴云的托管型Elasticsearch。目前,阿里巴巴云Elasticsearch支持三个版本:Elasticsearch 5.5.3 with Commercial Feature、Elasticsearch 6.3.2 with Commercial Feature、Elasticsearch 6.7.0 with Commercial Feature。它们包含了企业级访问控制、安全监控、警报、可视化报告、机器学习等X-Pack插件。

f:id:sbc_kou:20190807105656p:plain

卡夫卡生产者

既经准备好了,我们立即在本地的Java开发环境中开始编写代码吧!首先,我们将通过以下示例代码创建Kafka Producer,并生成jar文件。

Kafka的引导服务器

在Kafka集群中,选择任意一台机器的IP地址即可。

Twitter流媒体API的认证信息

为了使用Twitter Streaming API,我们需要事先获取consumerKey、consumerSecret、token和secret,并将其输入。


public class ProducerTest {

    Logger logger = LoggerFactory.getLogger(ProducerTest.class.getName());

 /** ---------------------- Twitter Streaming API情報 ---------------------- */
    String consumerKey = "xxxxxxxxxxxxxxxx";
    String consumerSecret = "xxxxxxxxxxxxxxxx";
    String token = "xxxxxxxxxxxxxxxx";
    String secret = "xxxxxxxxxxxxxxxx";
    String mytopic = "tweets_poc";

 /** ---------------------- Tweetsキーワードを指定 ---------------------- */
    List<String> terms = Lists.newArrayList("bitcoin","Blockchain","IoT","5G");


    public ProducerTest(){}

    public static void main(String[] args) {
        new ProducerTest().run();
    }

    public void run(){

        logger.info("Setup");

        BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);

        Client client = createTwitterClient(msgQueue);
        client.connect();

        KafkaProducer<String, String> producer = createKafkaProducer();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("stopping application...");
            logger.info("shutting down client from twitter...");
            client.stop();
            logger.info("closing producer...");
            producer.close();
            logger.info("done!");
        }));

        while (!client.isDone()) {
            String msg = null;
            try {
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
                client.stop();
            }

            if (msg != null){
                logger.info(msg);
                if(StringUtils.containsIgnoreCase(msg,"Bitcoin")){
                    producer.send(new ProducerRecord<>(mytopic, "Bitcoin", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"Blockchain")) {
                    producer.send(new ProducerRecord<>(mytopic, "Blockchain", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"IoT")) {
                    producer.send(new ProducerRecord<>(mytopic, "IoT", msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
                else if (StringUtils.containsIgnoreCase(msg,"5G")) {
                    producer.send(new ProducerRecord<>(mytopic,5,"5G", msg));
                }
                else{
                    producer.send(new ProducerRecord<>(mytopic, null, msg), new Callback() {
                        @Override
                        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                            if (e != null) {
                                logger.error("Something bad happened", e);
                            }
                        }
                    });
                }
            }
        }
        logger.info("End of application");
    }

 /** ---------------------- Hosebird Clientを作成 ---------------------- */
    public Client createTwitterClient(BlockingQueue<String> msgQueue){

        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();

        hosebirdEndpoint.trackTerms(terms);

        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

        ClientBuilder builder = new ClientBuilder()
                .name("Hosebird-Client-01")                              
                .hosts(hosebirdHosts)
                .authentication(hosebirdAuth)
                .endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));

        Client hosebirdClient = builder.build();
        return hosebirdClient;
    }

  /** ---------------------- kakfa producerを作成 ---------------------- */
    public KafkaProducer<String, String> createKafkaProducer(){

        String bootstrapServers = "xxxxxxxxxxxxxxxx";

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); 

        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        return producer;
    }

}

卡夫卡消费者

在本地的Java开发环境中,创建一个Consumer。

Elasticsearch的身份认证信息

为了使用ElasticSearch服务,需要事先获取Elasticsearch的用户名、密码和访问端点。

Elasticsearch的前期准备工作。

在Kibana控制台中,事先创建Elasticsearch的索引(Twitter)和类型(Tweets)。


public class ElasticSearchConsumer {

    public static RestHighLevelClient createClient(){

        /** ---------------------- ElasticSearch認証情報 ---------------------- */
        String hostname = "xxxxxxxxxxxxxxxx"; 
        String username = "xxxxxxxxxxxxxxxx"; 
        String password = "xxxxxxxxxxxxxxxx"; 

        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(username, password));

        RestClientBuilder builder = RestClient.builder(
                new HttpHost(hostname, 9200, "http"))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                });

        RestHighLevelClient client = new RestHighLevelClient(builder);
        return client;
    }

    public static KafkaConsumer<String, String> createConsumer(String topic){

        String bootstrapServers = "xxxxxxxxxxxxxxxx";
        String groupId = "kafka-demo-elasticsearch";

         /** ---------------------- consumer パラメータ設定 ---------------------- */
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); 
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

        /** ---------------------- consumer 作成 ---------------------- */
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(topic));

        return consumer;

    }

    private static JsonParser jsonParser = new JsonParser();

    private static String extractIdFromTweet(String tweetJson){

        return jsonParser.parse(tweetJson)
                .getAsJsonObject()
                .get("id_str")
                .getAsString();
    }

    public static void main(String[] args) throws IOException {

        Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
        RestHighLevelClient client = createClient();

        KafkaConsumer<String, String> consumer = createConsumer("tweets_poc");

        while(true){
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100)); 

            Integer recordCount = records.count();
            logger.info("Received " + recordCount + " records");    

            for (ConsumerRecord<String, String> record : records){

                try {
                    String id = extractIdFromTweet(record.value());

                    /** ---------------------- データをElasticSearchに挿入 ---------------------- */
                    IndexRequest indexRequest = new IndexRequest(
                            "twitter",
                            "tweets",
                            id 
                    ).source(record.value(), XContentType.JSON);


                    IndexResponse indexResponse = client.index(indexRequest,RequestOptions.DEFAULT);
                    logger.info(indexResponse.getId());

                } catch (NullPointerException e){
                    logger.warn("skipping bad data: " + record.value());
                }

            }

            if(recordCount > 0){

                logger.info("Committing offsets...");
                consumer.commitSync();
                logger.info("Offsets have been committed");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }

    }
}

执行

首先,我们将Kafka Producer和Kafka Consumer的Jar文件上传到阿里云OSS。然后,登录到ECS(Kafka Producer)并使用ossutil等工具将Kafka Producer的jar文件(TweetsProducerTest-1.0-jar-with-dependencies.jar)下载到ECS。下载完成后,使用下面的命令启动Kafka Producer,收集Twitter发布的消息。

java -jar TweetsProducerTest-1.0-jar-with-dependencies.jar

透過SSH登入ECS(Kafka使用者)並下載Kafka使用者的JAR檔案(TweetsProducerTest-1.0-jar-with-dependencies.jar)。下載完成後,使用以下指令啟動Kafka使用者,從Kafka的分區中讀取訊息。

java -jar kafka-elasticsearch-poc-1.0-jar-with-dependencies.jar
f:id:sbc_kou:20190807110631p:plain
f:id:sbc_kou:20190807110421p:plain

最後结果是

Apache Kafka和Elasticsearch在许多企业中都被广泛使用。Kafka在使用Elasticsearch之前在数据流处理中扮演了重要角色,而Elasticsearch则被应用于高速搜索,而不是仅仅保存原始数据。此外,除了介绍的Kafka外,如果您想要使用托管的云服务,您也可以使用阿里云的日志服务(LogService),它可以发挥同样的作用。如果您对此感兴趣,请务必参考一下!

广告
将在 10 秒后关闭
bannerAds