Storm杂谈之Acker拾趣

Storm杂谈之Acker拾趣

本文所讲内容并非storm的acker机制,如果想看acker机制的让您失望了,不过在此奉上徐明明大牛的blog: Twitter Storm源代码分析之acker工作流程 Twitter Storm如何保证消息不丢失

或者查看《storm源码分析》(又给京狗打链接)第12章-storm的acker系统,里面会详细说明storm的acker机制,笔者在此就不多述(多述都是废话,还不一定有人家讲的好)了。

这篇主要讲一下,关于开acker和不开acker的区别。 首先说一下,BasicBolt和RichBolt的区别,RichBolt会帮我们自动ack tuple的,basicbolt不会,所以如果继承的是basicBolt的话,就需要自己outputcollecter调ack方法了。

一般Storm有个配置项

/*** How many executors to spawn for ackers.** <p>If this is set to 0, then Storm will immediately ack tuples as soon* as they come off the spout, effectively disabling reliability.</p>*/String TOPOLOGY_ACKER_EXECUTORS = “topology.acker.executors”;

该配置项是配置Acker Bolt数目的,大于0则spout每发一条msg,都会把相应的信息

pending (RotatingMap.2 ;; microoptimize for performance of .size method(reify RotatingMap$ExpiredCallback(expire [this msg-id [task-id spout-id tuple-info start-time-ms]](let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]-info time-delta)))))tuple-action-fn (fn [task-id ^TupleImpl tuple](let [stream-id (.getSourceStreamId tuple)](condp = stream-idConstants/SYSTEM_TICK_STREAM_ID (.rotate pending))(let [id (.getValue tuple 0)[-finished-info start-time-ms] (.remove pending id)](when spout-id(when-not (= stored-task-id task-id)(throw-runtime “Fatal error, mismatched task ids: ” task-id ” ” stored-task-id))(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))](condp = stream-id //这里,根据stream id 来对msg进行ack或failACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)-delta)–delta))));; TODO: on failure, emit tuple to failure stream))))receive-queue (:receive-queue executor-data)event-handler (mk-task-receiver executor-data tuple-action-fn)has-ackers? (has-ackers? storm-conf)emitted-count (MutableLong. 0)empty-emit-streak (MutableLong. 0)

再看acker.clj的execute代码就清晰多了,acker也是根据stream-id来对tuple的msgid进行异或处理的,

pending (.getObject pending)stream-id stream-id Constants/SYSTEM_TICK_STREAM_ID)tuple 0)^OutputCollector output-collector (.getObject output-collector)curr (.get pending id)curr (condp = stream-idACKER-INIT-STREAM-ID (-> curr :spout-task (.getValue tuple 2)))ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))ACKER-FAIL-STREAM-ID pending id curr) curr))output-collector(:spout-task curr)ACKER-ACK-STREAM-ID[id]))pending id)(acker-emit-direct output-collector(:spout-task curr)ACKER-FAIL-STREAM-ID[id]))))(.ack output-collector tuple)))))(^void cleanup [this])

那么bolt是怎样调用acker的呐,上面说了,继承richbolt时会自动调用ack方法,那么ack方法到底做了哪些事情呐,按executor.clj中的mk-threads :bolt部分代码就清晰多了

(.prepare bolt-objstorm-confuser-context(OutputCollector. //实现了匿名内部类,实现IOutputCollector的emit,emitDirect,ack和fail方法(reify IOutputCollectorstream anchors values stream anchors values task))tuple tupleack-val tuple getMessageId getAnchorsToIds)](task/send-unanchored task-dataACKER-ACK-STREAM-ID[root (bit-xor id ack-val)])))user-context .boltAck (BoltAckInfo. tuple task-id delta))(when delta(builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)executor-stats(.getSourceComponent tuple)(.getSourceStreamId tuple)delta)(stats/bolt-acked-tuple! executor-stats(.getSourceComponent tuple)(.getSourceStreamId tuple)delta)))) tuple getMessageId getAnchors)](task/send-unanchored task-dataACKER-FAIL-STREAM-IDtuple)](task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))(when delta(builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)executor-stats(.getSourceComponent tuple) 一个有信念者所开发出的力量,大于99个只有兴趣者。

Storm杂谈之Acker拾趣

相关文章:

你感兴趣的文章:

标签云: