Kafka - PHP 使用 Rdkafka 生产/消费数据

比特启航
• 阅读 14198

Kafka集群部署

安装rdkafka

rdkafka 依赖 libkafka

yum install rdkafka rdkafka-devel
pecl install rdkafka
php --ri rdkafka

http://pecl.php.net/package/r... 可以参阅支持的kafka客户端版本

生产者

连接集群,创建 topic,生产数据。

<?php
$rk = new Rdkafka\Producer();
$rk->setLogLevel(LOG_DEBUG);

// 链接kafka集群
$rk->addBrokers("192.168.20.6:9092,192.168.20.6:9093");

// 创建topic
$topic = $rk->newTopic("topic_1");

while (true) {
    $message = "hello kafka " . date("Y-m-d H:i:s");
    echo "hello kafka " . date("Y-m-d H:i:s") . PHP_EOL;

    try {
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message);
        sleep(2);
    } catch (\Exception $e) {
        echo $e->getMessage() . PHP_EOL;
    }
}

消费者-HighLevel

自动分配partitionrebalancecomsumer group

<?php
$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;
        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(null);
            break;
        default:
            throw new \Exception($err);
    }
});

// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'group_1');

// Initial list of Kafka brokers
$conf->set('metadata.broker.list', '192.168.20.6:9092,192.168.20.6:9093');

$topicConf = new RdKafka\TopicConf();

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Subscribe to topic 'topic_1'
$consumer->subscribe(['topic_1']);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(3e3);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            sleep(2);
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo $message->errstr() . PHP_EOL;
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

消费者-LowLevel

指定partition消费。
php consumer_lowlevel.php [partitonNuo]
LowLevel 没有消费组的概念,也可以认为每个消费者都属于一个独立消费组。

<?php
if (!isset($argv[1])) {
    fwrite(STDOUT, "请指定消费分区:");
    $partition = (int) fread(STDIN, 1024);
} else {
    $partition = (int) $argv[1];
}

$topic = "topic_1";

$conf = new RdKafka\Conf();

// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'group_2');

$rk = new RdKafka\Consumer($conf);
$rk->addBrokers('192.168.20.6:9092,192.168.20.6:9093');

$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 2000);

// Set the offset store method to 'file'
// $topicConf->set('offset.store.method', 'file');
// $topicConf->set('offset.store.path', sys_get_temp_dir());

// Alternatively, set the offset store method to 'broker'
$topicConf->set('offset.store.method', 'broker');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');

$topic = $rk->newTopic($topic, $topicConf);

// Start consuming partition 0
$topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume($partition, 3 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo $message->errstr() . PHP_EOL;
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}
点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
待兔 待兔
1年前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
京东面试真题解析,手撕面试官
第1章快速入门1.1Kafka简介1.2以Kafka为中心的解决方案1.3Kafka核心概念1.4Kafka源码环境第2章生产者2.1KafkaProducer使用示例2.2KafkaProducer分析ProducerInterceptors&cProducerInterceptorKafka集群元数据Serializ
Stella981 Stella981
4年前
Kafka学习之(三)Centos下给PHP开启Kafka扩展(rdkafka)
Centos版本:Centos6.4,PHP版本:PHP7。在上一篇文章中使用IP为192.168.9.154的机器安装并开启了Kafka进行了简单测试,充当了Kafka服务器。本篇文章新开启一台IP为192.16.9.157的机器给PHP开启扩展。找到github的扩展下载地址,这里是phprdkafka,虽然php有一个扩展是phpkaf
Stella981 Stella981
4年前
Kafka快速入门(十一)——RdKafka源码分析
Kafka快速入门(十一)——RdKafka源码分析一、RdKafkaC源码分析1、KafkaOP队列RdKafka将与KafkaBroke的交互、内部实现的操作都封装成Operator结构,然后放入OP处理队列里统一处理。KafkaOP队列是线
Wesley13 Wesley13
4年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Stella981 Stella981
4年前
Kafka配置文件
服务基本设置Theidofthebroker.Thismustbesettoauniqueintegerforeachbroker.kafka集群分组ID
Python进阶者 Python进阶者
2年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这