Keep Thinking And Learning

kafka是一种高性能的分布式消息系统,通常用于大规模的数据处理, 其吞吐量是非常高的,我们的蛮多业务线都用到了kafka,包括实时模型,,用户标签,实时点击率等,在开发的过程中间,踩了一些坑,其中consumer gc问题便碰到过几次,在这里探讨下可能的解决方式。

kafka consumer目前提供两种API, 分别为high-level api 和 SimpleConsumer api, 通常我们都是使用的high-level 的api,简单的创建consumer,通过可配置参数设置数据拉取的offset, 自动提交以及超时时间等,不用关注broker和partitions的选择等。kafka 的consumer会在后台启动线程去获取topic的消息信息,由于其很高的吞吐量,当拉取的消息来不及消费就会造成消息的堆积,时间久了就会出现内存gc的问题,我觉得可能通过以下的方式进行处理:

1. 增大机器的内存和性能、提高程序的异步处理能力。接入kafka的一般是计算密集型的,把计算任务并行化会提高消息的消费能力,而增大内存和提高机器性能更为直接,之前就碰到过将程序从虚拟机迁到实体机器就抗住的情形。

2. 修改consumer的配置或者消费方式, 有几次测试的时候,发现服务停止一段时间再重启就很容易出现gc问题,因为从老的offset开始消费,kafka已经堆积了很多消息了,拉去的数据很快会将内存占满,这个时候可以将consumer的group信息从zookeeper删除或者设置成从最新的开始消费一般能解决问题,但是这样就不能从历史的offset开始消费,会存在数据丢失的问题。其次就是尝试设置下数据拉取的数据块的大小,即fetch.messages.max.bytes,不过这个没有验证过。

3. 使用SimpleConsumer api并限制数据拉取的速度, 这个可以借鉴spark streamingRateLimiter的方式。这里贴出java的代码,参数desiredRate<=0的时候,不限制速度,当>0的时候随着数值的增加,限制降低。

4.使用redis来存储数据拉取的状态,按照一定的心跳机制向redis同步状态并修改。

不要轻言放弃,否则对不起自己

Keep Thinking And Learning

相关文章:

你感兴趣的文章:

标签云: