尝试使用Apache Kafka
阿帕奇卡夫卡
由于可能需要使用Apache Kafka,因此我进行了操作确认。
Kafka是一个分布式消息传递平台,用于交换消息的平台。
与现有的消息传递服务的区别
以下是与现有的JMS等消息传递系统不同的几个方面:
* 基于分布式处理,具有高可靠性和可扩展性。
* 可以保留消息历史记录,并具有存储的功能。
* 可以进行流处理。对于输入消息,可以进行各种实时处理。
安装
进行测试安装非常容易。
这次我们在AWS上的Amazon Linux上进行操作。
安装Java
因为环境是全新的,所以我们从安装Java开始。
su -
yum install java-1.8.0-openjdk-devel
下载和解压 Kafka
在写这篇文章的时候,1.1.0是最新版本。
wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxf kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0
准备工作已经完成。
开始 Kafka
Zookeeper是一款用于构建分布式处理系统的中间件。
尽管Kafka似乎可在无Zookeeper情况下运行,但由于稍后需要进行多个代理节点的测试,我们会使用它。
启动动物园管理员。
bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-06-20 02:06:32,944] INFO Reading configuration from: config/zookeeper.properties
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
启动经纪人
bin/kafka-server-start.sh config/server.properties
[2018-06-20 02:08:00,771] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-06-20 02:08:01,131] INFO starting (kafka.server.KafkaServer)
这样一来,服务就可以被使用了。
通信测试
创建主题
我将创建一个名为“test”的主题作为试验。
主题就像消息的地址,我们可以通过主题来发送和接收消息。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
确认创建的主题
bin/kafka-topics.sh --list --zookeeper localhost:2181
test
发送消息到测试主题
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hi this is from aws
>I'll send message
确认消费者的留言
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hi this is from aws
I'll send message
能够获取存储在中的现有消息真是令人耳目一新!!
我已经创建了一个主题,并确认了消息订阅和消费者检查消息的一系列流程。
多经纪人测试
我想测试一下卡夫卡的一个特征——多个代理。
多经纪商设定
请将配置文件复制给新经纪人。
由于此次需要构建3个设备,因此需要再复制两个文件。
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
编辑文件
每个人都可以更改经纪人ID、监听端口和日志文件的设置。
編輯config/server-1.properties文件
vi config/server-1.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9093
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1
编辑 config/server-2.properties
vi config/server-2.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9094
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2
启动经纪人
启动了最初设置以及新增的三个代理商。
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
创建一个复制因子为3的新主题
创建一个涵盖三个经纪人的话题。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
确认群集状态
获取新创建的主题信息。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 2,1,0 Isr: 1,0,2
最初的行是关于所有分区的摘要,而后的每一行都展示了关于分区的信息。
* “Leader” 是负责该分区的所有读写操作的节点。在这次例子中,节点0被指定为Leader。
* “Replicas” 是复制该分区日志的节点列表。
* “Isr” 是指“同步副本”的集合。
与现有的话题进行比较
最初创建的单一代理主题中,所有的节点都由节点0负责。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
向聚类过的主题发送消息。
发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>this is clusterd Topic!
>next message
以下为中文翻译版本:
请从消费者处确认。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
this is clusterd Topic!
next message
容错性验证
我們試著關掉主機,節點0。
ps -ef | grep server
ec2-user 9072 2984 2 03:15 pts/0 kafka.Kafka config/server.properties
kill -9 9072
动物园管理员的日志
[2018-06-20 03:20:51,000] INFO Processed session termination for sessionid: 0x1641b2fd2520000 (org.apache.zookeeper.server.PrepRequestProcessor)
[1] Killed bin/kafka-server-start.sh config/server.properties
再次确认集群
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 1,2
可以确认由于主节点0的故障,节点2自动成为了领导者。