3. Kafka DirectDStream方式数据的接收

KafkaRDD分区个数的确定和每个分区数据接收的计算在KafkUtils.createDirectStream创建了DirectDStream,代码如下:def createDirectStream[K: ClassTag,V: ClassTag,KD <: Decoder[K]: ClassTag,VD <: Decoder[V]: ClassTag] (ssc: StreamingContext,kafkaParams: Map[String, String],topics: Set[String] ): InputDStream[(K, V)] = {val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)val kc = new KafkaCluster(kafkaParams)val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)val result = for {/** 通过跟Kafka集群通信,获得Kafka某个topic的partition信息,topicPartitions是一个数组,数组大小跟Kafka topic的分区个数相同* 数组元素包含话题名和parition的index* */topicPartitions <- kc.getPartitions(topics).rightleaderOffsets <- (if (reset == Some("smallest")) {kc.getEarliestLeaderOffsets(topicPartitions)} else {kc.getLatestLeaderOffsets(topicPartitions)}).right} yield {//计算Kafka topic的每个partition的offsetval fromOffsets = leaderOffsets.map { case (tp, lo) =>(tp, lo.offset)}new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](ssc, kafkaParams, fromOffsets, messageHandler)}KafkaCluster.checkErrors(result) } 在这里,通过跟Kafka集群通信,获得Kafka topic每个partition的消息偏移量,,作为参数继续创建DirectKafkaInputDstream.

DirectKafkaInputDstream的部分代码如下:

class DirectKafkaInputDStream[ K: ClassTag, V: ClassTag, U <: Decoder[K]: ClassTag, T <: Decoder[V]: ClassTag, R: ClassTag](@transient ssc_ : StreamingContext,val kafkaParams: Map[String, String],val fromOffsets: Map[TopicAndPartition, Long],messageHandler: MessageAndMetadata[K, V] => R) extends InputDStream[R](ssc_) with Logging { val maxRetries = context.sparkContext.getConf.getInt("spark.streaming.kafka.maxRetries", 1) // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]") private[streaming] override def name: String = s"Kafka direct stream [$id]" protected[streaming] override val checkpointData =new DirectKafkaInputDStreamCheckpointData protected val kc = new KafkaCluster(kafkaParams) protected val maxMessagesPerPartition: Option[Long] = {val ratePerSec = context.sparkContext.getConf.getInt("spark.streaming.kafka.maxRatePerPartition", 0)if (ratePerSec > 0) {val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000Some((secsPerBatch * ratePerSec).toLong)} else {None} } //将topic的分区个数和偏移量信息保存在currentOffsets中 protected var currentOffsets = fromOffsets @tailrec protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)// Either.fold would confuse @tailrec, do it manuallyif (o.isLeft) {val err = o.left.get.toStringif (retries <= 0) {throw new SparkException(err)} else {log.error(err)Thread.sleep(kc.config.refreshLeaderBackoffMs)latestLeaderOffsets(retries – 1)}} else {o.right.get} } // limits the maximum number of messages per partition /* * 当没有设置最大接收速率的时候,接收终止点是当前时间的每个partition的offset * */ protected def clamp(leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {maxMessagesPerPartition.map { mmp =>leaderOffsets.map { case (tp, lo) =>tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))}}.getOrElse(leaderOffsets) } override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {//计算本次数据接收终止的每个paritition的offsetval untilOffsets = clamp(latestLeaderOffsets(maxRetries))val rdd = KafkaRDD[K, V, U, T, R](context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)// Report the record number of this batch interval to InputInfoTracker.val inputInfo = InputInfo(id, rdd.count)ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)Some(rdd) }结论:spark-streaming DirectDStream数据接受方式,如果没有设置最大接收速率,每个batch的数据接收量为一个batch时间间隔内,Kafka topic接收到的消息量

Kafka的分区信息在DirectKafkaInputDStream的类初始化操作中,通过fromOffsets参数传递给它的currentOffsets成员,这个成员在创建KafkaRDD的时候作为初始化成员将Kafka的分区信息传递给KafkaRDD,作为生成KafkaRDD paritition的依据。

object KafkaRDD { import KafkaCluster.LeaderOffset /** * @param kafkaParams Kafka <a href="#configuration"> * configuration parameters</a>. * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), * NOT zookeeper servers, specified in host1:port1,host2:port2 form. * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) * starting point of the batch * @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive) * ending point of the batch * @param messageHandler function for translating each message into the desired type */ def apply[K: ClassTag,V: ClassTag,U <: Decoder[_]: ClassTag,T <: Decoder[_]: ClassTag,R: ClassTag](sc: SparkContext,kafkaParams: Map[String, String],fromOffsets: Map[TopicAndPartition, Long],untilOffsets: Map[TopicAndPartition, LeaderOffset],messageHandler: MessageAndMetadata[K, V] => R): KafkaRDD[K, V, U, T, R] = {val leaders = untilOffsets.map { case (tp, lo) =>tp -> (lo.host, lo.port)}.toMap//根据Kafka topic的每个partition的起始地址和终止地址计算表示接收数据的数据结构OffsetRangeval offsetRanges = fromOffsets.map { case (tp, fo) =>val uo = untilOffsets(tp)OffsetRange(tp.topic, tp.partition, fo, uo.offset)}.toArraynew KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler) }}class KafkaRDD[ K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag, R: ClassTag] private[spark] (sc: SparkContext,kafkaParams: Map[String, String],val offsetRanges: Array[OffsetRange],leaders: Map[TopicAndPartition, (String, Int)],messageHandler: MessageAndMetadata[K, V] => R ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { //根据OffsetRanges生成RDD的partition override def getPartitions: Array[Partition] = {offsetRanges.zipWithIndex.map { case (o, i) =>val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))//host是Kafka broker的ip地址, port是Kafka broker的端口号new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)}.toArray }在创建RDD的时候,会最终调用到getPartitions方法,这样确定了KafkaRDD每个partition所在的IP地址和端口号,KafkaRDD每个Paritition所在的IP地址为Kafka broker的地址从前面的文章:spark-streaming系列——- 2. spark-streaming的Job调度 下知道,DirectKafkaInputDStream.compute方法被Spark-streaming的调度模块周期调用产生DStream的RDD

通过上面的代码分析,知道了Kafka的分区个数和RDD的分区个数相同,并且RDD的一个paritition和Kafka的一个partition一一对应。

KafkaRDD的数据接收

痛苦留给的一切,请细加回味!苦难一经过去,苦难就变为甘美。

3. Kafka DirectDStream方式数据的接收

相关文章:

你感兴趣的文章:

标签云: