spark streaming updateStateByKey 用法

updateStateByKey 解释: 以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加 在有新的数据信息进入或更新时,可以让用户保持想要的任何状。使用这个功能需要完成两步: 1) 定义状态:可以是任意数据类型 2) 定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。 对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大。

updateStateByKey源码:

/**

@tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)] ): DStream[(K, S)] = { val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s))) } updateStateByKey(newUpdateFunc, partitioner, true, initialRDD) }

代码实现

StatefulNetworkWordCount

object StatefulNetworkWordCount {def main(args: Array[String]) {if (args.length < 2) { System.err.println(“Usage: StatefulNetworkWordCount <hostname> <port>”) System.exit(1)}Logger.getLogger(“org.apache.spark”).setLevel(Level.WARN)val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.sum val previousCount = state.getOrElse(0) Some(currentCount + previousCount)}val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))}val sparkConf = new SparkConf().setAppName(“StatefulNetworkWordCount”).setMaster(“local”)// Create the context with a 1 second batch sizeval ssc = new StreamingContext(sparkConf, Seconds(1))ssc.checkpoint(“.”)// Initial RDD input to updateStateByKeyval initialRDD = ssc.sparkContext.parallelize(List((“hello”, 1), (“world”, 1)))// Create a ReceiverInputDStream on target ip:port and count the// words in input stream of \n delimited test (eg. generated by ‘nc’)val lines = ssc.socketTextStream(args(0), args(1).toInt)val words = lines.flatMap(_.split(” “))val wordDstream = words.map(x => (x, 1))// Update the cumulative count using updateStateByKey// This will give a Dstream made of state (which is the cumulative count of the words)val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc, new HashPartitioner (ssc.sparkContext.defaultParallelism), true, initialRDD)stateDstream.print()ssc.start()ssc.awaitTermination()}}

NetworkWordCount

import orgimport orgimport org.{Seconds, StreamingContext}import org._object NetworkWordCount { def main(args: Array[String]) {if (args.length < 2) {System.err.println(“Usage: NetworkWordCount <hostname> <port>”)System.exit(1)}val sparkConf = new SparkConf().setAppName(“NetworkWordCount”)val ssc = new StreamingContext(sparkConf, Seconds(10))//使用updateStateByKey前需要设置checkpointssc.checkpoint(“hdfs://master:8020/spark/checkpoint”)val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {//通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和val currentCount = currValues.sum// 已累加的值val previousCount = prevValueState.getOrElse(0)// 返回累加后的结果,是一个Option[Int]类型Some(currentCount + previousCount)}val lines = ssc.socketTextStream(args(0), args(1).toInt)val words = lines.flatMap(_.split(” “))val pairs = words.map(word => (word, 1))//val currWordCounts = pairs.reduceByKey(_ + _)//currWordCounts.print()val totalWordCounts = pairs.updateStateByKey[Int](addFunc)totalWordCounts.print()ssc.start()ssc.awaitTermination() }}WebPagePopularityValueCalculatorcheckpointDir = msgConsumerGroup = “user-behavior-topic-message-consumer-group” def main(args: Array[String]) {if (args.length < 2) {println(“Usage:WebPagePopularityValueCalculator zkserver1:2181, zkserver2: 2181, zkserver3: 2181 consumeMsgDataTimeInterval (secs) “)System.exit(1)}val Array(zkServers, processingInterval) = argsval conf = new SparkConf().setAppName(“Web Page Popularity Value Calculator”)val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt))//using updateStateByKey asks for enabling checkpointssc.checkpoint(checkpointDir)val kafkaStream = KafkaUtils.createStream(//Spark streaming contextssc,//zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,…zkServers,//kafka message consumer group IDmsgConsumerGroup,//Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own threadMap(“user-behavior-topic” -> 3))val msgDataRDD = kafkaStream.map(_._2)popularityData = msgDataRDD.map { msgLine => {val dataArr: Array[String] = msgLine.split(“\\|”)val pageID = dataArr(0)//calculate the popularity valueval popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1(pageID, popValue)}}updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {iterator.flatMap(t => {val newValue: Double = t._2.sumval stateValue: Double = t._3.getOrElse(0);Some(newValue + stateValue)}.map(sumedValue => (t._1, sumedValue)))}val initialRDD = ssc.sparkContext.parallelize(List((“page1”, 0.00)))//调用 updateStateByKey 原语并传入上面定义的匿名函数更新网页热度值。val stateDStream = popularityData.updateStateByKey[Double](updatePopularityValue,new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)//set the checkpoint interval to avoid too frequently data checkpoint which may//may significantly reduce operation throughputstateDStream.checkpoint(Duration(8 * processingInterval.toInt * 1000))//after calculation, we need to sort the result and only show the top 10 hot pages//最后得到最新结果后,需要对结果进行排序,最后打印热度值最高的 10 个网页。stateDStream.foreachRDD { rdd => {val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false)val topKData = sortedData.take(10).map { case (v, k) => (k, v) }topKData.foreach(x => {println(x)})}}ssc.start()ssc.awaitTermination() }}最重要的是今天的心。

spark streaming updateStateByKey 用法

相关文章:

你感兴趣的文章:

标签云: