说明

该文档详细描述使用scala编码word count任务,通过sbt工具构建包,提交spark streaming任务。 实现了几种有代表性的任务.包括从文件系统、HDFS读写,从tcp socket读写, 从kafka读写,实现无状态、有状态、滑动窗口、故障恢复。

测试环境

ssh: root@10.2.35.117 密码 123456
ssh: hadoop@10.2.35.117 密码 123456

spark安装目录 /usr/local/spark/
建议使用hadoop账户执行spark任务

spark项目目录 /data/scala-test/hw
建议使用root账户执行sbt打包

注意: 避免在编码时通过setmaster传递spark master地址。通过spark-submit --master spark://10.2.35.117:7077参数传递。包括在checkpoint重启时

scala 安装

  1. 官网下载sbt, sbt 作为项目管理工具之一,可以为每个项目管理特定版本的scala(比如spark默认使用scala2.11构建), 编译,测试,打包功能
  2. 解压安装后设置下PATH环境变量。
  3. 新建hw目录, 在目录内执行sbt
  4. touch build.sbt 手动生成项目定义文件,文件内至少要有 name := "helloworld" 和 version := "0.0.1-tag" 2字段。
  5. mkdir -p src/main/scala/
  6. 在上面目录里 touch main.scala , 具体的目录结构为
  7. . ├── build.sbt ├── project ├── src │   └── main │   └── scala │ └── hw.scala └── target └── scala-2.11    └── classes └── hello_2.11-0.0.1-SNAPSHOT.jar
  8. object Hi { def main(args: Array[String]) = println("Hi!") }

  9. 在根目录 sbt 进入sbt控制台, run 或 compile, test, package, 也可在shell里 一次多个子命令 sbt compile test package

  10. 在sbt控制台 ~compile 命令 可以监听代码文件修改,保存后自动编译执行

  11. sbt clean compile "testOnly TestA TestB" testOnly 有两个参数 TestA 和 TestB。这个命令会按顺序执行(clean, compile, 然后 testOnly)。
  12. 常用命令 https://www.scala-sbt.org/release/docs/zh-cn/Running.html

spark + scala 初次结合

  1. api doc https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
  2. spark stream programming guide 中文 https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/programming-guide/quick-start.html
  3. val textFile = spark.read.textFile("README.md") // 用 SparkSession的read方法读取本地文件创建Dataset val linewithspark = txtFile.filter(line => line.contains("Spark")) // 筛选得出新的Dataset. 这种方法称为transformation 转换,转换是延迟操作 linewithspark.count // 得出行数 ,这种方法称为action 动作,动作启动计算操作
  4. 独立的scala程序

    1. 按照上面建立好项目目录.
    2. 修改build.sbt name := "hello" // 项目名称 version := "0.0.1-SNAPSHOT" // 版本号 scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.1"
    3. ``` import org.apache.spark.sql.SparkSession

      object SimpleApp { def main(args: Array[String]) { val logFile = "hdfs://10.2.35.117:9000/user/hadoop/hello.go" // 从hdfs里读取 val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("func")).count() val numBs = logData.filter(line => line.contains("string")).count() println(s"Lines with func: $numAs, Lines with string: $numBs") spark.stop() } } ```

  5. 墙,修改国内源 ~/.sbt/repositories
    [repositories] local aliyun-nexus: http://maven.aliyun.com/nexus/content/groups/public/
    typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly sonatype-oss-releases maven-central sonatype-oss-snapshots
  6. sbt package 打包,得到 target/scala版本/namescala版本version.jar
  7. 通过spark-submit 提交任务包 ./spark-submit --class SimpleApp --master local[4] /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar // 本地进行计算
  8. ./spark-submit --class SimpleApp --master spark://10.2.35.117:7077 /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar // 提交到spark集群进行计算 注意root/hadoop账户环境差异
  9. $SPARK_HOME/example/里有很多例子 ./bin/run-example SparkPi
  10. 具体的spark-submit 使用帮助 https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/deploy-guide/submitting-applications.html
  11. spark-submit 参数调优 https://www.cnblogs.com/camilla/p/8301750.html
    1. https://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors
    2. -num-executors 任务总共运行多少个executor
    3. -executor-cores 每个excutor里的线程数 (也许是文件fd open数量limit,查到的情况都是 executor参数更高, excutor-cores参数低,这样提高打开句柄数量)
    4. -executor-memory 每个executor占用内存 最大本机内存/excutor 减去 10%

streaming 实时分析数据流

  1. 目录结构和上面一样

    1. build.sbt name := "hello" // 项目名称 version := "0.0.1-SNAPSHOT" // 版本号 scalaVersion := "2.11.8" // 和安装的sbt内的一致,查看sbt内scala版本号: [hadoop@xhb-master bin]$ sbt scalVersion libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1" // 2.4.1 和安装的spark保持一致 [hadoop@xhb-master bin]$ ll ../jars/ |grep streaming //会下载 spark-streaming_2.11-2.4.1 2.2.1+ 可支持kafka
    2. hw.scala ``` import org.apache.spark._ import org.apache.spark.streaming._

      object SimpleApp { def main(args: Array[String]) { // 创建一个local StreamingContext,包含2个工作线程,并将批次间隔设为1秒 // master至少需要2个CPU核,以避免出现任务饿死的情况 val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // 处理间隔需要和业务相关,极大影响spark性能 val sc = ssc.sparkContext sc.setLogLevel("WARN") // 创建一个连接到hostname:port的DStream val lines = ssc.socketTextStream("10.2.35.117", 9999) val words = lines.flatMap(.split(" ")) val pairs = words.map((, 1)) val wordCounts = pairs.reduceByKey(_ + ) //如果源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合得到的。 https://spark-reference-doc-cn.readthedocs.io/zhCN/latest/programming-guide/streaming-guide.html wordCounts.print() ssc.start() // 启动流式计算 ssc.awaitTermination() // 等待直到计算终止 } } ```

  2. 先启动tcp流服务器 [root@xhb-master hw]# nc -lkv 9999
  3. 启动spark-streaming 任务 ./spark-submit --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar spark://10.2.35.117:7077 // spark master地址可选,如无则本地运行
  4. 任务会从流服务器读取数据,每秒进行一次分析单词数
  5. web查看任务状态 http://10.2.35.117:4040, 查看集群状态 http://10.2.35.117:8080/

streaming 全局有状态

  1. build.sbt保持不变

  2. hw.scala
    ``` import org.apache.spark._ import org.apache.spark.streaming._

    object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(if(args.length >= 1) "hdfs://10.2.35.117:9000/spark" else "./spark-cp") val sc = ssc.sparkContext sc.setLogLevel("WARN") val lines = ssc.socketTextStream("10.2.35.117", 9999) val words = lines.flatMap(.split(" ")) val pairs = words.map((, 1))

        val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
    
    
    
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
    val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
    val output = (word, sum)
    state.update(sum)
    output
    }
    val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
    stateCounter.checkpoint(Seconds(60)) // 好像无效
    stateCounter.print()
    
    
    ssc.start()             
    ssc.awaitTermination()   
    
    }

    } ```

  3. ssc.checkpoint(if(args.length >= 1) "hdfs://10.2.35.117:9000/spark" else "./spark") checkpoint的作用是故障恢复、将复杂的计算依赖存档。使用有状态转换如mapWithState必须设置好checkpoint.需要为所有节点都能访问的nfs mount或者hdfs地址。https://blog.csdn.net/Anbang713/article/details/82047980
    https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/programming-guide/streaming-guide.html#id13

  4. 上面任务会统计出从开始到现在的,当前输入的单词的总数
  5. http://10.2.35.117:50070/explorer.html 查看hdfs目录

streaming 滑动窗口 https://blog.csdn.net/legotime/article/details/51836040

  1. hw.scala
    ``` import org.apache.spark._ import org.apache.spark.streaming._

    object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(if(args.length >= 1) "hdfs://10.2.35.117:9000/spark" else "./spark-checkpoint") val sc = ssc.sparkContext sc.setLogLevel("WARN") val lines = ssc.socketTextStream("10.2.35.117", 9999) val words = lines.flatMap(.split(" ")) val pairs = words.map((, 1))

        val last_10_counter = pairs.reduceByKeyAndWindow(_+_, Seconds(10))  // 最近10秒
        last_10_counter.print()
    
    
    
    ssc.start()             
    ssc.awaitTermination()   
    
    }

    } ```

  2. 统计最近10秒内出现的次数, 窗口时间需要为streaming context频率的倍数

rdd -> dataframe -> dataset

streaming 输出

  1. DStreaming.print() 在任务driver端打印出
  2. DStreaming.saveAsTextFiles(prefix, suffix) 会在路径里生成 prefix-TIMEINMS.suffix 目录,可以是hdfs路径。如果是本地路径而master是集群则驱动器节点本地只会得到任务操作的success结果,worker节点保存着数据。数据的格式是离散的。每个part-000* 文件为rdd文件的partition, partition数量和DStream操作有关 http://10.2.35.117:50070/explorer.html#/spark/20190531143358/test-1559284470000
  3. foreachRDD 通过它将DStream内rdd的数据写到外部系统如tcp

错误恢复

  1. hw.scala
    ``` import org.apache.spark._ import org.apache.spark.streaming._

    object SimpleApp { val checkpointDir = "./spark-cp" def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(2)) // new context val sc = ssc.sparkContext sc.setLogLevel("WARN") ssc.checkpoint(checkpointDir)

        val lines = ssc.socketTextStream("10.2.35.117", 9999) // create DStreams     
        lines.checkpoint(Seconds(10))
    
    
    
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map((_, 1))
    
    
    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // 自己捏一个rdd,平时测试时可以不用读取外部数据
    
    
    val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
        val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
        val output = (word, sum)
        state.update(sum)
        output
    }
    val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
    
    
    stateCounter.print()
    
    
    ssc
    
    } def main(args: Array[String]) { val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _) ssc.start() ssc.awaitTermination() }

    } ```

  2. 当driver进程退出重启后会恢复到崩溃前的状态. https://blog.csdn.net/yjgithub/article/details/78792616
  3. 目前错误恢复从checkpoint恢复后日志等级变成默认了
  4. 需要注意: 如果是集群任务,重启时需要额外增加集群地址参数 --master spark://10.2.35.117

利用连接池发送数据. 目前最高效的方式

    import org.apache.spark._
    import org.apache.spark.streaming._

    import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
    import org.apache.commons.pool2.{ObjectPool, PooledObject, BasePooledObjectFactory}

    import java.text.SimpleDateFormat
    import java.util.Date
    import java.io.PrintStream
    import java.net.Socket

    class ManagedPrintStream(private val pool: ObjectPool[PrintStream], val printStream: PrintStream) {
        def release() = pool.returnObject(printStream)
    }

    object PrintStreamPool {

        var hostPortPool: Map[(String, Int), ObjectPool[PrintStream]] = Map()
        sys.addShutdownHook {
            hostPortPool.values.foreach { pool =>  pool.close() }
        }

        // factory method
        def apply(host: String, port: Int): ManagedPrintStream = {

            val pool = hostPortPool.getOrElse((host, port), {
                val p = new GenericObjectPool[PrintStream](new SocketStreamFactory(host, port))
                hostPortPool += (host, port) -> p
                p
            })

            new ManagedPrintStream(pool, pool.borrowObject())
        }
    }

    class SocketStreamFactory(host: String, port: Int) extends BasePooledObjectFactory[PrintStream] {
        override def create() = new PrintStream(new Socket(host, port).getOutputStream)
        override def wrap(stream: PrintStream) = new DefaultPooledObject[PrintStream](stream)
        override def validateObject(po: PooledObject[PrintStream]) = ! po.getObject.checkError()
        override def destroyObject(po: PooledObject[PrintStream]) = po.getObject.close()
        override def passivateObject(po: PooledObject[PrintStream]) = po.getObject.flush()
    }


    object TimeHelper {
        def strTimeNow():String={
            val now = new Date()
            val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
            dateFormat.format(now)
        }
    }

    object SimpleApp {
        val checkpointDir = "./checkpoint"
        def functionToCreateContext(): StreamingContext = {
            val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
            val ssc = new StreamingContext(conf, Seconds(2))   // new context
            val sc = ssc.sparkContext
            sc.setLogLevel("WARN")
            ssc.checkpoint(checkpointDir)  

            val lines = ssc.socketTextStream("10.2.35.117", 9999) // create DStreams     
            lines.checkpoint(Seconds(10))

            val words = lines.flatMap(_.split(" "))
            val pairs = words.map((_, 1))

            val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

            val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
                val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
                val output = (word, sum)
                state.update(sum)
                output
            }
            val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))

            // foreachPartition rdd按分区数来使用连接,分区数受多种因素影响(读取方式,转换操作).一个案例是 20K消息/秒,64个分区,2秒频率,如果使用简单的连接方式会导致每两秒连接断开服务64次,每次连接破军发送600次左右的数据。
            // 使用连接池的话可能每个jvm始终只连接一次 
            stateCounter.foreachRDD { rdd =>
                rdd.foreachPartition { partitionOfRecords =>
                    val connection = PrintStreamPool("10.2.35.118", 10000) // 
                    partitionOfRecords.foreach(record => connection.printStream.println(record))
                    connection.release()   
                }
            }
            ssc
        }

        def main(args: Array[String]) {
            val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
            ssc.start()             
            ssc.awaitTermination()   
        }
    }

需要启动另外个tcp服务器模拟接收端 nc -lkv 10000 build.sbt

    name := "hello"      // 项目名称
    version := "0.0.1-SNAPSHOT"  // 版本号
    scalaVersion := "2.11.8"
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1"
    libraryDependencies += "org.apache.commons" % "commons-pool2" % "2.6.2"

提交命令, 通过--jars参数添加依赖的jar包, 可以通过sbt fatjar方式打包避免

    ./spark-submit --jars /root/.ivy2/cache/org.apache.commons/commons-pool2/jars/commons-pool2-2.6.2.jar  --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar

sbt assembly 将依赖包打进主包

  1. touch project/assembly.sbt
  2. echo "addSbtPlugin(\"com.eed3si9n\" % \"sbt-assembly\" % \"0.14.9\")" > project/assembly.sbt
  3. 在build.sbt里增加合并策略防止多重定义错误
    assemblyMergeStrategy in assembly := { case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last case PathList("javax", "inject", xs @ _*) => MergeStrategy.last case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last case PathList("javax", "activation", xs @ _*) => MergeStrategy.last case PathList("org", "apache", xs @ _*) => MergeStrategy.last case PathList("com", "google", xs @ _*) => MergeStrategy.last case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case PathList("com", "codahale", xs @ _*) => MergeStrategy.last case PathList("com", "yammer", xs @ _*) => MergeStrategy.last case "about.html" => MergeStrategy.rename case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last case "META-INF/mailcap" => MergeStrategy.last case "META-INF/mimetypes.default" => MergeStrategy.last case "plugin.properties" => MergeStrategy.last case "log4j.properties" => MergeStrategy.last case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) }
  4. sbt assembly 得出fat包,80+M, 对比之前包大小约50倍
  5. ./spark-submit --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello-assembly-0.0.1-SNAPSHOT.jar

使用Direct Stream方式从kafka里读

kafka安装、常见操作在 kafka.md

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

build.sbt

name := "hello"      // 项目名称  
version := "0.0.1-SNAPSHOT"  // 版本号  
scalaVersion := "2.11.8"  
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1"  
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.4.1"  
// 需要注意2.4.1与当前spark版本一直
./spark-submit --jars /root/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.0.1.jar,/root/.ivy2/cache/org.apache.spark/spark-streaming-kafka-0-10_2.11/jars/spark-streaming-kafka-0-10_2.11-2.4.1.jar  --class SimpleApp --master spark://10.2.35.117:7077 /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar
// 在 sbt package时会下载依赖包到 ~/.ivy/cache里
import org.apache.spark._  
import org.apache.spark.streaming._  
import org.apache.spark.streaming.kafka010._

import org.apache.kafka.clients.consumer.ConsumerConfig  
import org.apache.kafka.common.serialization.StringDeserializer

object SimpleApp {  
    def main(args: Array[String]) {
        val checkpointDir =  "hdfs://10.2.35.117:9000/spark-cp"

        def functionToCreateContext(): StreamingContext = {
            val conf = new SparkConf().setAppName("NetworkWordCount")

            val ssc = new StreamingContext(conf, Seconds(2))   // new context
            ssc.checkpoint(checkpointDir)

            val brokers = "10.2.35.117:9092"
            val groupId = "gtest"
            val topics = "test31"
            // Create direct kafka stream with brokers and topics
            val topicsSet = topics.split(",").toSet

            val kafkaParams = Map[String, Object](
              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
              ConsumerConfig.GROUP_ID_CONFIG -> groupId,
              ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
              ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])

            val messages = KafkaUtils.createDirectStream[String, String](
              ssc,
              LocationStrategies.PreferConsistent,
              ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
            val lines = messages.map(_.value)           
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map((_, 1))

            val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

            val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
                val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
                val output = (word, sum)
                state.update(sum)
                output
            }
            val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc)/*.initialState(initialRDD)*/)

            stateCounter.print
            ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
        val sc = ssc.sparkContext
        sc.setLogLevel("WARN")

        ssc.start()             
        ssc.awaitTermination()   
    }
}

往kafka里写

从test31里读取数据,处理后写入 test31-out主题里 如果服务器没配置自动创建新topic则需要先手动创建

[root@xhb-master kafka_2.11-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic test31-out

build.sbt

name := "hello"      // 项目名称  
version := "0.0.1-SNAPSHOT"  // 版本号  
scalaVersion := "2.11.8"  
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1"  
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.4.1"  

hw.scala

import java.util.Properties

import org.apache.spark._  
import org.apache.spark.streaming._  
import org.apache.spark.streaming.kafka010._

import org.apache.kafka.clients.consumer.ConsumerConfig  
import org.apache.kafka.common.serialization.{StringDeserializer,StringSerializer}  
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}

object KafkaSink {  
  def apply(config: Properties): KafkaSink = {
    val f = () => {
      val producer = new KafkaProducer[String, String](config)

      sys.addShutdownHook {
        producer.close()
      }

      producer
    }
    new KafkaSink(f)
  }
}

object SimpleApp {  
    def main(args: Array[String]) {
        val checkpointDir = if(args.length >= 1) "hdfs://10.2.35.117:9000/spark-cp" else "./spark-cp"

        def functionToCreateContext(): StreamingContext = {
            val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount")

            val ssc = new StreamingContext(conf, Seconds(2))   // new context
            ssc.checkpoint(checkpointDir)

            val brokers = "localhost:9092"
            val groupId = "gtest"
            val topics = "test31"
            // Create direct kafka stream with brokers and topics
            val topicsSet = topics.split(",").toSet

            val kafkaParams = Map[String, Object](
              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
              ConsumerConfig.GROUP_ID_CONFIG -> groupId,
              ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
              ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])

            val messages = KafkaUtils.createDirectStream[String, String](
              ssc,
              LocationStrategies.PreferConsistent,
              ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
            val lines = messages.map(_.value)           
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map((_, 1))

            val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

            val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
                val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
                val output = (word, sum)
                state.update(sum)
                output
            }
            val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))

            val serializer = "org.apache.kafka.common.serialization.StringSerializer"
            val props = new Properties()
            props.put("bootstrap.servers", "localhost:9092")
            props.put("key.serializer", serializer)
            props.put("value.serializer", serializer)
            val kafkaSink = ssc.sparkContext.broadcast(KafkaSink(props)) 

            stateCounter.foreachRDD { rdd =>
                rdd.foreach { message =>
                val (word, num) = message
                    kafkaSink.value.send("test31-out", f"$word%s->$num%d")
                }
            }
            ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
        val sc = ssc.sparkContext
        sc.setLogLevel("WARN")

        ssc.start()             
        ssc.awaitTermination()   
    }
}

目前每次重新打包,都不支持之前的版本的checkpoint(可以支持,需要删除目录里的一些文件),为了方便直接删除整个目录

./spark-submit --jars /root/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.0.1.jar,/root/.ivy2/cache/org.apache.spark/spark-streaming-kafka-0-10_2.11/jars/spark-streaming-kafka-0-10_2.11-2.4.1.jar  --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar

启动个producer插入新消息

[root@xhb-master kafka_2.11-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test31
>hello
>a
>a
>a
>hello

启动个consumer查看scala处理后插入的新topic里的情况

[root@xhb-master kafka_2.11-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test31-out --from-beginning
hello->2  
a->1  
a->2  
a->3  
hello->3  

上面代码在重启任务时会出现 "java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to KafkaSink" 是因为 The cause is Streaming checkpointing doesn't support Accumulators and Broadcast values. 所以根据官方文档 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala

改写为

import java.util.Properties

import org.apache.spark._  
import org.apache.spark.streaming._  
import org.apache.spark.streaming.kafka010._  
import org.apache.spark.broadcast.Broadcast

import org.apache.kafka.clients.consumer.ConsumerConfig  
import org.apache.kafka.common.serialization.{StringDeserializer,StringSerializer}  
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}


object KafkaSinkInstance {  
    @volatile private var instance: Broadcast[KafkaSink] = null

    private def conf(config: Properties): KafkaSink = {
        val f = () => {
            val producer = new KafkaProducer[String, String](config)

            sys.addShutdownHook {
                producer.close()
            }   

            producer
        }
        new KafkaSink(f)
    }

    def getInstance(sc: SparkContext, config: Properties): Broadcast[KafkaSink] = {
    if (instance == null) {
        synchronized {
            if (instance == null) {

                instance = sc.broadcast(conf(config))
            }
          }
        }
        instance
    }
}
object SimpleApp {  
    def main(args: Array[String]) {
        val checkpointDir = "hdfs://10.2.35.117:9000/spark-cp"

        def functionToCreateContext(): StreamingContext = {
            val conf = new SparkConf().setAppName("NetworkWordCount")

            val ssc = new StreamingContext(conf, Seconds(2))   // new context
            ssc.checkpoint(checkpointDir)

            val brokers = "10.2.35.117:9092"
            val groupId = "gtest"
            val topics = "test31"
            // Create direct kafka stream with brokers and topics
            val topicsSet = topics.split(",").toSet

            val kafkaParams = Map[String, Object](
              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
              ConsumerConfig.GROUP_ID_CONFIG -> groupId,
              ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
              ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])

            val messages = KafkaUtils.createDirectStream[String, String](
              ssc,
              LocationStrategies.PreferConsistent,
              ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Subscribe有三个参数:topic列表、consumer配置项、topic+partition起始offset,其中fromOffsets是可选的。 如果fromoffset不为空,将会从该位置开始拉取,否则看 kafka的auto.offset.reset配置项值默认latest
            val lines = messages.map(_.value)           
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map((_, 1))

            val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

            val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
                val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
                val output = (word, sum)
                state.update(sum)
                output
            }
            val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc)/*.initialState(initialRDD)*/)

            val serializer = "org.apache.kafka.common.serialization.StringSerializer"
            val props = new Properties()
            props.put("bootstrap.servers", "10.2.35.117:9092")
            props.put("key.serializer", serializer)
            props.put("value.serializer", serializer)


            stateCounter.foreachRDD { rdd =>
                val kafkaSink = KafkaSinkInstance.getInstance(rdd.sparkContext, props)
                rdd.foreach { message =>
                val (word, num) = message
                    kafkaSink.value.send("test31-out", f"$word%s->$num%d")
                }
            }
            ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
        val sc = ssc.sparkContext
        sc.setLogLevel("WARN")

        ssc.start()             
        ssc.awaitTermination()   
    }
}

问题: 非正常情况下,如何避免kafka的消息堆积、丢失、重复

  1. 避免堆积消息 spark的消费速度应该大于kafka的producer速度, 调整spark-submit 性能参数
  2. 首次启动、故障恢复后启动,有大量未消费的消息,可能多到加载到内存不够而崩溃。 开启反压 SparkConf.set("spark.streaming.backpressure.enabled", "true")
  3. 设置合理的批处理间隔 val ssc = new StreamingContext(conf, Seconds(2)) https://www.jianshu.com/p/5c20e5bc402c
  4. spark设置消费速度 SparkConf.set("spark.streaming.kafka.maxRatePerPartition", ...)
  5. https://www.jianshu.com/p/c0b724137416
  6. 基于kafka时间戳索引读取数据(比如重启后只消费半小时前的消息) https://blog.csdn.net/weixin_34379433/article/details/87253473
  7. https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
  8. 将offset存储在 checkpoint内(可能会有streaming的批次周期的重复数据产生),存在kafka本身(默认开启,没和streaming 业务有事务关系,也可能产生重复数据),存在外部系统(自己处理事务能实现准确一次)

.initialState(initialRDD)

(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.

sbt + jni + cpp

  1. ./project/plugins.sbt
    addSbtPlugin("ch.jodersky" % "sbt-jni" % "1.3.2")
  2. sbt compile // 下载依赖
  3. sbt javah // 生成头文件
  4. 实现头文件
  5. ```

    !/bin/sh

    g++ -fPIC -shared -O3 \ -I/usr/include \ -I$JAVAHOME/include \ -I$JAVAHOME/include/linux \ CppCls.cpp -o libCppCls.so ```

  6. 将so部署到所有work节点,将所在目录加入LDLIBRARYPATH 或者复制到/usr/lib 、 /usr/lib64等目录
  7. 正常运行spark任务
    ``` ./spark-submit --jars /root/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.0.1.jar,/root/.ivy2/cache/org.apac.spark/spark-streaming-kafka-0-102.11/jars/spark-streaming-kafka-0-102.11-2.4.1.jar --class SimpleApp --master spark://10.2.35.117:7077 /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar

    ```

import java.util.Properties

import org.apache.spark._  
import org.apache.spark.streaming._  
import org.apache.spark.streaming.kafka010._  
import org.apache.spark.broadcast.Broadcast

import org.apache.kafka.clients.consumer.ConsumerConfig  
import org.apache.kafka.common.serialization.{StringDeserializer,StringSerializer}  
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}

class CppCls {  
  // --- Native methods
  @native def intMethod(n: Int): Int
  @native def booleanMethod(b: Boolean): Boolean
  @native def stringMethod(s: String): String
  @native def intArrayMethod(a: Array[Int]): Int
}

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}


object KafkaSinkInstance {  
    @volatile private var instance: Broadcast[KafkaSink] = null

    private def conf(config: Properties): KafkaSink = {
        val f = () => {
            val producer = new KafkaProducer[String, String](config)

            sys.addShutdownHook {
                producer.close()
            }   

            producer
        }
        new KafkaSink(f)
    }

    def getInstance(sc: SparkContext, config: Properties): Broadcast[KafkaSink] = {
    if (instance == null) {
        synchronized {
            if (instance == null) {

                instance = sc.broadcast(conf(config))
            }
          }
        }
        instance
    }
}
/*
object LibraryLoader {  
    lazy val load = System.load(SparkFiles.get("libCppCls.so"))
}
*/
object SimpleApp {  
    def main(args: Array[String]) {
        val checkpointDir = "hdfs://10.2.35.117:9000/spark-cp"

        def functionToCreateContext(): StreamingContext = {
            val conf = new SparkConf().setAppName("NetworkWordCount")

            val ssc = new StreamingContext(conf, Seconds(2))   // new context
            ssc.checkpoint(checkpointDir)

            val brokers = "10.2.35.117:9092"
            val groupId = "gtest"
            val topics = "test31"
            // Create direct kafka stream with brokers and topics
            val topicsSet = topics.split(",").toSet

            val kafkaParams = Map[String, Object](
              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
              ConsumerConfig.GROUP_ID_CONFIG -> groupId,
              ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
              ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])

            val messages = KafkaUtils.createDirectStream[String, String](
              ssc,
              LocationStrategies.PreferConsistent,
              ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Subscribe有三个参数:topic列表、consumer配置项、topic+partition起始offset,其中fromOffsets是可选的。 如果fromoffset不为空,将会从该位置开始拉取,否则看 kafka的auto.offset.reset配置项值默认latest
            val lines = messages.map(_.value)           
            val words = lines.flatMap(_.split(" "))
            val pairs = words.map((_, 1))

            val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))

            val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
                val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
                val output = (word, sum)
                state.update(sum)
                output
            }
            val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc)/*.initialState(initialRDD)*/)


            // run in driver.
            val sample = new CppCls
            val square = sample.intMethod(5)
            val bool = sample.booleanMethod(true)
            val sum = sample.intArrayMethod(Array(1, 1, 2, 3, 5, 8, 13))
            println(s"intMethod: $square")
            println(s"booleanMethod: $bool")
            println(s"intArrayMethod: $sum")


            val serializer = "org.apache.kafka.common.serialization.StringSerializer"
            val props = new Properties()
            props.put("bootstrap.servers", "10.2.35.117:9092")
            props.put("key.serializer", serializer)
            props.put("value.serializer", serializer)


            stateCounter.foreachRDD { rdd =>
                val kafkaSink = KafkaSinkInstance.getInstance(rdd.sparkContext, props)
                rdd.foreach { message =>
                    val (word, num) = message
                    System.loadLibrary("CppCls")
                    // LibraryLoader.load
                    val remote = new CppCls
                    kafkaSink.value.send("test31-out", remote.stringMethod(f"$word%s->$num%d"))
                }
            }
            ssc
        }
        System.loadLibrary("CppCls")
        val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
        val sc = ssc.sparkContext
        sc.setLogLevel("WARN")

        ssc.start()             
        ssc.awaitTermination()   
    }
}