Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark Streaming(一)

入门案例

1. Spark流式计算简介

Hadoop的MapReduce及Spark SQL等只能进行离线计算,无法满足实时性要求较高的业务需求,例如实时推荐、实时网站性能分析等,流式计算可以解决这些问题。目前有三种比较常用的流式计算框架,它们分别是Storm,Spark Streaming和Samza,各个框架的比较及使用情况,可以参见:。本节对Spark Streaming进行重点介绍,Spark Streaming作为Spark的五大核心组件之一,其原生地支持多种数据源的接入,而且可以与Spark MLLib、Graphx结合起来使用,轻松完成分布式环境下在线机器学习算法的设计。Spark支持的输入数据源及输出文件如下图所示:

在后面的案例实战当中,会涉及到这部分内容。中间的”Spark Streaming“会对输入的数据源进行处理,然后将结果输出,其内部工作原理如下图所示:

Spark Streaming接受实时传入的数据流,然后将数据按批次(batch)进行划分,然后再将这部分数据交由Spark引擎进行处理,处理完成后将结果输出到外部文件。

先看下面一段基于Spark Streaming的word count代码,它可以很好地帮助初步理解流式计算

import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}object StreamingWordCount { def main(args: Array[String]) {if (args.length < 1) {System.err.println(“Usage: StreamingWordCount <directory>”)System.exit(1)}//创建SparkConf对象val sparkConf = new SparkConf().setAppName(“HdfsWordCount”).setMaster(“local[2]”)// Create the context//创建StreamingContext对象,与集群进行交互val ssc = new StreamingContext(sparkConf, Seconds(20))// Create the FileInputDStream on the directory and use the// stream to count words in new files created//如果目录中有新创建的文件,则读取val lines = ssc.textFileStream(args(0))//分割为单词val words = lines.flatMap(_.split(” “))//统计单词出现次数val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)//打印结果wordCounts.print()//启动Spark Streamingssc.start()//一直运行,除非人为干预再停止ssc.awaitTermination() }}

运行上面的程序后,再通过命令行界面,将文件拷贝到相应的文件目录,具体如下:

程序在运行时,根据文件创建时间对文件进行处理,在上一次运行时间后创建的文件都会被处理,输出结果如下:

2. Spark Streaming相关核心类

1. DStream(discretized stream)

Spark Streaming提供了对数据流的抽象,它就是DStream,它可以通过前述的 Kafka, Flume等数据源创建,DStream本质上是由一系列的RDD构成。各个RDD中的数据为对应时间间隔( interval)中流入的数据,如下图所示:

对DStream的所有操作最终都要转换为对RDD的操作,例如前面的StreamingWordCount程序,flatMap操作将作用于DStream中的所有RDD,如下图所示:

2.StreamingContext 在Spark Streaming当中,StreamingContext是整个程序的入口,其创建方式有多种,最常用的是通过SparkConf来创建:

import org.apache.spark._import org.apache.spark.streaming._val conf = new SparkConf().setAppName(appName).setMaster(master)val ssc = new StreamingContext(conf, Seconds(1))

创建StreamingContext对象时会根据SparkConf创建SparkContext

/** * Create a StreamingContext by providing the configuration necessary for a new SparkContext. * @param conf a org.apache.spark.SparkConf object specifying Spark parameters * @param batchDuration the time interval at which streaming data will be divided into batches */ def this(conf: SparkConf, batchDuration: Duration) = {this(StreamingContext.createNewSparkContext(conf), null, batchDuration) }

也就是说StreamingContext是对SparkContext的封装,StreamingContext还有其它几个构造方法,感兴趣的可以了解,后期在源码解析时会对它进行详细的讲解,创建StreamingContext时会指定batchDuration,它用于设定批处理时间间隔,需要根据应用程序和集群资源情况去设定。当创建完成StreamingContext之后,再按下列步骤进行:

通过输入源创建InputDStreaim

对DStreaming进行transformation和output操作,这样操作构成了后期流式计算的逻辑

通过StreamingContext.start()方法启动接收和处理数据的流程

使用streamingContext.awaitTermination()方法等待程序处理结束(手动停止或出错停止)

也可以调用streamingContext.stop()方法结束程序的运行

关于StreamingContext有几个值得注意的地方:

累死累活不说,走马观花反而少了真实体验,

Spark修炼之道(进阶篇)——Spark入门到精通:第十节 Spark Streaming(一)

相关文章:

你感兴趣的文章:

标签云: