Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL

Spark SQL、DataFrame与Spark Streaming

1. Spark SQL、DataFrame与Spark Streaming

源码直接参照:https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala

import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Time, Seconds, StreamingContext}import org.apache.spark.util.IntParamimport org.apache.spark.sql.SQLContextimport org.apache.spark.storage.StorageLevelobject SqlNetworkWordCount { def main(args: Array[String]) {if (args.length < 2) {System.err.println(“Usage: NetworkWordCount <hostname> <port>”)System.exit(1)}StreamingExamples.setStreamingLogLevels()// Create the context with a 2 second batch sizeval sparkConf = new SparkConf().setAppName(“SqlNetworkWordCount”).setMaster(“local[4]”)val ssc = new StreamingContext(sparkConf, Seconds(2))// Create a socket stream on target ip:port and count the// words in input stream of \n delimited text (eg. generated by ‘nc’)// Note that no duplication in storage level only for running locally.// Replication necessary in distributed scenario for fault tolerance.//Socke作为数据源val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)//words DStreamval words = lines.flatMap(_.split(” “))// Convert RDDs of the words DStream to DataFrame and run SQL query//调用foreachRDD方法,遍历DStream中的RDDwords.foreachRDD((rdd: RDD[String], time: Time) => {// Get the singleton instance of SQLContextval sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)import sqlContext.implicits._// Convert RDD[String] to RDD[case class] to DataFrameval wordsDataFrame = rdd.map(w => Record(w)).toDF()// Register as tablewordsDataFrame.registerTempTable(“words”)// Do word count on table using SQL and print itval wordCountsDataFrame =sqlContext.sql(“select word, count(*) as total from words group by word”)println(s”========= $time =========”)wordCountsDataFrame.show()})ssc.start()ssc.awaitTermination() }}/** Case class for converting RDD to DataFrame */case class Record(word: String)/** Lazily instantiated singleton instance of SQLContext */object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = {if (instance == null) {instance = new SQLContext(sparkContext)}instance }}

运行程序后,再运行下列命令

root@sparkmaster:~# nc -lk 9999Spark is a fast and general cluster computing system for Big DataSpark is a fast and general cluster computing system for Big DataSpark is a fast and general cluster computing system for Big DataSpark is a fast and general cluster computing system for Big DataSpark is a fast and general cluster computing system for Big DataSpark is a fast and general cluster computing system for Big DataSpark is a fast and general cluster computing system for Big Data

处理结果:

========= 1448783840000 ms =========+———+—–+|word|total|+———+—–+| Spark| 12|| system| 12|| general| 12||fast| 12||and| 12||computing| 12||a| 12||is| 12||for| 12||Big| 12|| cluster| 12||Data| 12|+———+—–+========= 1448783842000 ms =========+—-+—–+|word|total|+—-+—–++—-+—–+========= 1448783844000 ms =========+—-+—–+|word|total|+—-+—–++—-+—–+

寂寞时,想想我的影子,我会在远方给你一个微笑;难过时,

Spark修炼之道(进阶篇)——Spark入门到精通:第十三节 Spark Streaming—— Spark SQL

相关文章:

你感兴趣的文章:

标签云: