Spark修炼之道(进阶篇)——Spark入门到精通:第十二节 Spark Streaming—— DStream W

入门案例

1. Window Operation

Spark Streaming提供窗口操作(Window Operation),如下图所示:

上图中,红色实线表示窗口当前的滑动位置,虚线表示前一次窗口位置,窗口每滑动一次,落在该窗口中的RDD被一起同时处理,生成一个窗口DStream(windowed DStream),窗口操作需要设置两个参数: (1)窗口长度(window length),即窗口的持续时间,上图中的窗口长度为3 (2)滑动间隔(sliding interval),窗口操作执行的时间间隔,上图中的滑动间隔为2 这两个参数必须是原始DStream 批处理间隔(batch interval)的整数倍(上图中的原始DStream的batch interval为1)

2. 入门案例

WindowWordCount——reduceByKeyAndWindow方法使用

import org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._object WindowWordCount { def main(args: Array[String]) {//传入的参数为localhost 9999 30 10if (args.length != 4) {System.err.println(“Usage: WindowWorldCount <hostname> <port> <windowDuration> <slideDuration>”)System.exit(1)}StreamingExamples.setStreamingLogLevels()val conf = new SparkConf().setAppName(“WindowWordCount”).setMaster(“local[4]”)val sc = new SparkContext(conf)// 创建StreamingContext,batch interval为5秒val ssc = new StreamingContext(sc, Seconds(5))//Socket为数据源val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)val words = lines.flatMap(_.split(” “))// windows操作,对窗口中的单词进行计数val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt))wordCounts.print()ssc.start()ssc.awaitTermination() }}

通过下列代码启动netcat server

root@sparkmaster:~# nc -lk 9999

再运行WindowWordCount 输入下列语句

root@sparkmaster:~# nc -lk 9999Spark is a fast and general cluster computing system for Big Data. It provides

观察执行情况:

——————————————-Time: 1448778805000 ms(10秒,第一个滑动窗口时间)——————————————-(provides,1)(is,1)(general,1)(Big,1)(fast,1)(cluster,1)(Data.,1)(computing,1)(Spark,1)(a,1)…——————————————-Time: 1448778815000 ms(10秒后,第二个滑动窗口时间)——————————————-(provides,1)(is,1)(general,1)(Big,1)(fast,1)(cluster,1)(Data.,1)(computing,1)(Spark,1)(a,1)…——————————————-Time: 1448778825000 ms(10秒后,第三个滑动窗口时间)——————————————-(provides,1)(is,1)(general,1)(Big,1)(fast,1)(cluster,1)(Data.,1)(computing,1)(Spark,1)(a,1)…——————————————-Time: 1448778835000 ms(再经10秒后,超出window length窗口长度,不在计数范围内)————————————————————————————–Time: 1448778845000 ms——————————————-

同样的语句输入两次

root@sparkmaster:~# nc -lk 9999Spark is a fast and general cluster computing system for Big Data. It providesSpark is a fast and general cluster computing system for Big Data. It providesSpark is a fast and general cluster computing system for Big Data. It provides

观察执行结果如下:

Time: 1448779205000 ms——————————————-(provides,2)(is,2)(general,2)(Big,2)(fast,2)(cluster,2)(Data.,2)(computing,2)(Spark,2)(a,2)…

再输入一次

root@sparkmaster:~# nc -lk 9999Spark is a fast and general cluster computing system for Big Data. It providesSpark is a fast and general cluster computing system for Big Data. It providesSpark is a fast and general cluster computing system for Big Data. It providesSpark is a fast and general cluster computing system for Big Data. It provides希望有一天,自己也像他们一样,踩着单车上路,

Spark修炼之道(进阶篇)——Spark入门到精通:第十二节 Spark Streaming—— DStream W

相关文章:

你感兴趣的文章:

标签云: