Kafka的Hello World项目
卡夫卡环境
卡夫卡1.1 / HDF 3.2
JDK 8
文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>k.helloworld</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
</project>
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
log4j.rootLogger=OFF, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
log4j.logger.org.apache.kafka=ERROR
package com.zzeng;
import java.util.Properties;
import java.util.Scanner;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class KafkaProducerExample {
private final static String TOPIC_NAME = "topic_name";
public static void main(String[] args) {
// produce a test message
// if u run this multiple times ... u will have multiple messages in the
// test_topic topic (as would be expected)
Producer<String, String> producer = KafkaProducerExample.createProducer();
Scanner sc = new Scanner(System.in);
try {
while (true) {
System.out.print("> ");
String text = sc.nextLine();
ProducerRecord<String, String> recordToSend = new ProducerRecord<String, String>(TOPIC_NAME, "message",
text + " , timeInMillis=" + System.currentTimeMillis());
try {
// synchronous send.... get() waits for the computation to
// finish
RecordMetadata rmd = producer.send(recordToSend).get();
System.out.printf("Message Sent ==>> topic = %s, partition = %s, offset = %d\n", rmd.topic(),
rmd.partition(), rmd.offset());
} catch (Exception ex) {
// this is test code...so don't judge me !!
ex.printStackTrace();
}
if (text.equalsIgnoreCase("exit")) {
break;
}
}
} finally {
sc.close();
}
}
private static Producer<String, String> createProducer() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "zzeng-hdp-1.field.hortonworks.com:6667");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<String, String>(kafkaProps);
}
}