我尝试了一下 Kafka 与 ElasticSearch 的协作
在之前的文章中,我们介绍了如何使用阿里巴巴云E-MapReduce来集成Apache Kafka和Apache Spark Streaming。在本文中,我们将继续使用Twitter消息的示例,介绍在阿里巴巴云上集成Kafka和Elasticsearch的配置,该配置已在许多企业中得到应用。
关于验证环境
卡夫卡
-
- EMR-3.20.0
-
- Zookeeper 3.4.13
-
- Kakfa 1.1.1
-
- クラスタータイプは Kafka
-
- ハードウェア構成(Header)はecs.sn2.largeを1台
- ハードウェア構成(Worker)はecs.sn2.largeを2台
希望您能参考官方文档了解创建集群的步骤。
# cat /etc/redhat-release
CentOS Linux release 7.4.1708 (Core)
# uname -r
3.10.0-693.2.2.el7.x86_64
# echo envi | nc localhost 2181
Environment:
zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 04:05 GMT
host.name=emr-header-1.cluster-43709
java.version=1.8.0_151
java.vendor=Oracle Corporation
java.home=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre
java.class.path=/usr/lib/zookeeper-current/bin/../build/classes:/usr/lib/zookeeper-current/bin/../build/lib/*.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-log4j12-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/slf4j-api-1.7.25.jar:/usr/lib/zookeeper-current/bin/../lib/netty-3.10.6.Final.jar:/usr/lib/zookeeper-current/bin/../lib/log4j-1.2.17.jar:/usr/lib/zookeeper-current/bin/../lib/jline-0.9.94.jar:/usr/lib/zookeeper-current/bin/../lib/audience-annotations-0.5.0.jar:/usr/lib/zookeeper-current/bin/../zookeeper-3.4.13.jar:/usr/lib/zookeeper-current/bin/../src/java/lib/*.jar:/etc/ecm/zookeeper-conf::/var/lib/ecm-agent/data/jmxetric-1.0.8.jar
java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=3.10.0-693.2.2.el7.x86_64
user.name=hadoop
user.home=/home/hadoop
user.dir=/home/hadoop
软管鸟客户端
Hosebird Client是一个Java Http客户端,用于与Kafka Producer配合调用Twitter的Streaming API。如果您想了解更多详情,请参考下面的github链接:[https://github.com/twitter/hbc]
整体结构图
首先,阿里巴巴云的架构图如下。为了快速构建Elasticsearch环境,我们使用了阿里巴巴云的托管型Elasticsearch。目前,阿里巴巴云Elasticsearch支持三个版本:Elasticsearch 5.5.3 with Commercial Feature、Elasticsearch 6.3.2 with Commercial Feature、Elasticsearch 6.7.0 with Commercial Feature。它们包含了企业级访问控制、安全监控、警报、可视化报告、机器学习等X-Pack插件。
卡夫卡生产者
既经准备好了,我们立即在本地的Java开发环境中开始编写代码吧!首先,我们将通过以下示例代码创建Kafka Producer,并生成jar文件。
Kafka的引导服务器
在Kafka集群中,选择任意一台机器的IP地址即可。
Twitter流媒体API的认证信息
为了使用Twitter Streaming API,我们需要事先获取consumerKey、consumerSecret、token和secret,并将其输入。
public class ProducerTest {
Logger logger = LoggerFactory.getLogger(ProducerTest.class.getName());
/** ---------------------- Twitter Streaming API情報 ---------------------- */
String consumerKey = "xxxxxxxxxxxxxxxx";
String consumerSecret = "xxxxxxxxxxxxxxxx";
String token = "xxxxxxxxxxxxxxxx";
String secret = "xxxxxxxxxxxxxxxx";
String mytopic = "tweets_poc";
/** ---------------------- Tweetsキーワードを指定 ---------------------- */
List<String> terms = Lists.newArrayList("bitcoin","Blockchain","IoT","5G");
public ProducerTest(){}
public static void main(String[] args) {
new ProducerTest().run();
}
public void run(){
logger.info("Setup");
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);
Client client = createTwitterClient(msgQueue);
client.connect();
KafkaProducer<String, String> producer = createKafkaProducer();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("stopping application...");
logger.info("shutting down client from twitter...");
client.stop();
logger.info("closing producer...");
producer.close();
logger.info("done!");
}));
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null){
logger.info(msg);
if(StringUtils.containsIgnoreCase(msg,"Bitcoin")){
producer.send(new ProducerRecord<>(mytopic, "Bitcoin", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
else if (StringUtils.containsIgnoreCase(msg,"Blockchain")) {
producer.send(new ProducerRecord<>(mytopic, "Blockchain", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
else if (StringUtils.containsIgnoreCase(msg,"IoT")) {
producer.send(new ProducerRecord<>(mytopic, "IoT", msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
else if (StringUtils.containsIgnoreCase(msg,"5G")) {
producer.send(new ProducerRecord<>(mytopic,5,"5G", msg));
}
else{
producer.send(new ProducerRecord<>(mytopic, null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error("Something bad happened", e);
}
}
});
}
}
}
logger.info("End of application");
}
/** ---------------------- Hosebird Clientを作成 ---------------------- */
public Client createTwitterClient(BlockingQueue<String> msgQueue){
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
hosebirdEndpoint.trackTerms(terms);
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01")
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
Client hosebirdClient = builder.build();
return hosebirdClient;
}
/** ---------------------- kakfa producerを作成 ---------------------- */
public KafkaProducer<String, String> createKafkaProducer(){
String bootstrapServers = "xxxxxxxxxxxxxxxx";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
}
卡夫卡消费者
在本地的Java开发环境中,创建一个Consumer。
Elasticsearch的身份认证信息
为了使用ElasticSearch服务,需要事先获取Elasticsearch的用户名、密码和访问端点。
Elasticsearch的前期准备工作。
在Kibana控制台中,事先创建Elasticsearch的索引(Twitter)和类型(Tweets)。
public class ElasticSearchConsumer {
public static RestHighLevelClient createClient(){
/** ---------------------- ElasticSearch認証情報 ---------------------- */
String hostname = "xxxxxxxxxxxxxxxx";
String username = "xxxxxxxxxxxxxxxx";
String password = "xxxxxxxxxxxxxxxx";
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
RestClientBuilder builder = RestClient.builder(
new HttpHost(hostname, 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
public static KafkaConsumer<String, String> createConsumer(String topic){
String bootstrapServers = "xxxxxxxxxxxxxxxx";
String groupId = "kafka-demo-elasticsearch";
/** ---------------------- consumer パラメータ設定 ---------------------- */
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
/** ---------------------- consumer 作成 ---------------------- */
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
return consumer;
}
private static JsonParser jsonParser = new JsonParser();
private static String extractIdFromTweet(String tweetJson){
return jsonParser.parse(tweetJson)
.getAsJsonObject()
.get("id_str")
.getAsString();
}
public static void main(String[] args) throws IOException {
Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
RestHighLevelClient client = createClient();
KafkaConsumer<String, String> consumer = createConsumer("tweets_poc");
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
Integer recordCount = records.count();
logger.info("Received " + recordCount + " records");
for (ConsumerRecord<String, String> record : records){
try {
String id = extractIdFromTweet(record.value());
/** ---------------------- データをElasticSearchに挿入 ---------------------- */
IndexRequest indexRequest = new IndexRequest(
"twitter",
"tweets",
id
).source(record.value(), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest,RequestOptions.DEFAULT);
logger.info(indexResponse.getId());
} catch (NullPointerException e){
logger.warn("skipping bad data: " + record.value());
}
}
if(recordCount > 0){
logger.info("Committing offsets...");
consumer.commitSync();
logger.info("Offsets have been committed");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
执行
首先,我们将Kafka Producer和Kafka Consumer的Jar文件上传到阿里云OSS。然后,登录到ECS(Kafka Producer)并使用ossutil等工具将Kafka Producer的jar文件(TweetsProducerTest-1.0-jar-with-dependencies.jar)下载到ECS。下载完成后,使用下面的命令启动Kafka Producer,收集Twitter发布的消息。
java -jar TweetsProducerTest-1.0-jar-with-dependencies.jar
透過SSH登入ECS(Kafka使用者)並下載Kafka使用者的JAR檔案(TweetsProducerTest-1.0-jar-with-dependencies.jar)。下載完成後,使用以下指令啟動Kafka使用者,從Kafka的分區中讀取訊息。
java -jar kafka-elasticsearch-poc-1.0-jar-with-dependencies.jar
最後结果是
Apache Kafka和Elasticsearch在许多企业中都被广泛使用。Kafka在使用Elasticsearch之前在数据流处理中扮演了重要角色,而Elasticsearch则被应用于高速搜索,而不是仅仅保存原始数据。此外,除了介绍的Kafka外,如果您想要使用托管的云服务,您也可以使用阿里云的日志服务(LogService),它可以发挥同样的作用。如果您对此感兴趣,请务必参考一下!