前提

    Kafkaを起動できること

環境

    • CentOS-6.7

 

    • kafka-0.9.0

 

    PHP-5.6.19

PHPクライアントライブラリ

KafkaのPHPのクライアントライブラリは探すと幾つか出てきますが、 今回はKafkaの公式サイトに紹介されているphpkafkaを使います。phpkafkaのビルドにはphp-rdkafkaを必要とするのでそちらもインストールします。

    • https://github.com/EVODelavega/phpkafka

 

    https://github.com/edenhill/librdkafka

インストール

事前に必要となるライブラリをインストールする

$ sudo yum groupinstall -y --enablerepo=remi "Development Tools"
$ sudo yum install zlib zlib-devel openssl openssl-devel cyrus-sasl2 cyrus-sasl-devel

php-rdkafkaのインストール

$ git clone https://github.com/edenhill/librdkafka/
$ cd librdkafka
$ ./configure
$ make 
$ sudo make install

phpkafkaのExtension作成

$ git clone https://github.com/EVODelavega/phpkafka.git
$ cd phpkafka
$ phpize
$ ./configure --enable-kafka
$ sudo make install

$ php -m | grep kafka
kafka

読み込めない場合

自分の環境では以下のエラーが出てライブラリを上手く読見込めなかったので、librdkafka.soをインストールしたパスを設定する必要があるため以下を実行しました。

$ php -m
PHP Warning:  PHP Startup: Unable to load dynamic library '/usr/lib64/php/modules/kafka.so' - librdkafka.so.1: cannot open shared object file: No such file or directory in Unknown on line 0
$ sudo touch /etc/ld.so.conf.d/librd.conf
$ sudo sh -c 'echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf'
$ ldconfig  <-- 設定を更新
$ ldconfig -p  <-- 反映されたか確認する

実際に動かしてみる

kafkaを起動させてtest_topicというトピックを予め作成して、メッセージを送ってみようと思います。

Producer

メッセージを送る側のスクリプト

<?php

$kafka = new Kafka("localhost:9092");
try {
    $kafka->produce("test_topic", "test_message");
} catch (Exception $e) {
    echo $e->getMessage() . PHP_EOL;
}

$kafka->disconnect(Kafka::MODE_PRODUCER);

Consumer

メッセージを受け取る側のスクリプト

<?php

$kafka = new Kafka("localhost:9092");
$partitions = $kafka->getPartitionsForTopic('test_topic');
$kafka->setPartition($partitions[0]);
$offset = 1;
$size = 1;

while (1) {
    try {
        $messages = $kafka->consume("test_topic", $offset, $size);
        if (count($messages) > 0) {
            foreach ($messages as $message) {
                echo $message . PHP_EOL;
                $offset += 1;
            }
        }
    } catch (Exception $e) {
        echo $e->getMessage() . PHP_EOL;
        break;
    }
}

$kafka->disconnect();

実行結果

kafkaphp.gif

感想

今回使ったPHPクライアントライブラリはHigh Level APIに対応していないのかZooKeeperを使った方法が分かりませんでした。
他のPHPクライアントライブラリを探すと色々出てきて、下のものならHigh Level APIに対応していそうなので今度はそちらを試してみたいと思います。
– https://github.com/arnaud-lb/php-rdkafka

广告
将在 10 秒后关闭
bannerAds