在python连接并使用kafka

最近被 kafka 折腾了很久,还是资料太少了,在 python 里面连接 kafka 的库也总是有各种 bug 。连接 kafka 的库有两种类型,一种是直接连接 kafka 的,存储 offset 的事情要自己在客户端完成。还有一种是先连接 zookeeper 然后再通过 zookeeper 获取 kafkabrokers 信息, offset 存放在 zookeeper 上面,由 zookeeper 来协调。

我现在使用 samsa 这个 highlevel

Producer示例

from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']topic.publish('msg')

** Consumer示例 **

from kazoo.client import KazooClientfrom samsa.cluster import Clusterzookeeper = KazooClient()zookeeper.start()cluster = Cluster(zookeeper)topic = cluster.topics['topicname']consumer = topic.subscribe('groupname')for msg in consumer:    print msg

Tip

consumer 必需在 producer 向 kafka 的 topic 里面提交数据后才能连接,否则会出错。

Kafka 中一个 consumer 需要指定 groupnamegroue 中保存着 offset 等信息,新开启一个 group 会从 offset 0 的位置重新开始获取日志。

kafka 的配置参数中有个 partition ,默认是 1 ,这个会对数据进行分区,如果多个 consumer 想连接同个 group 就必需要增加 partition , partition 只能大于 consumer 的数量,否则多出来的 consumer 将无法获取到数据。

在python连接并使用kafka

相关文章:

你感兴趣的文章:

标签云: