在尝试提交广播variables的Spark中的Kafka偏移量时获取任务不可序列化

我写了从Kafka读取并且手动提交偏移量的 Spark作业。 它工作正常,但自从我引入广播variables,我得到序列化exception,因为它试图序列化KafkaInputDStream。 下面是一个显示问题的简单代码(代码是用Kotlin编写的,但我相信它也会在Java中发生):

fun testBroadCast(jsc: JavaStreamingContext, kafkaStream: JavaInputDStream<ConsumerRecord>) { val keyPrefix = jsc.sparkContext().broadcast("EVENT:") kafkaStream.foreachRDD { rdd -> val offsetRanges = (rdd.rdd() as HasOffsetRanges).offsetRanges() val prefixedIds = rdd.map { "${keyPrefix.value}:$it" }.collect() (kafkaStream.dstream() as CanCommitOffsets).commitAsync(offsetRanges) } } fun main(args: Array) { val jsc = JavaStreamingContext(SparkConf().setAppName("test simple prefixer").setMaster("local[*]"), Duration(5000)) val stream = makeStreamFromSerializableEventTopic(jsc) testBroadCast(jsc, stream) jsc.start() jsc.awaitTermination() } 

如果我删除keyPreix并在地图函数中放置“EVENT:”,它按预期工作。 否则我会得到:

java.io.NotSerializableException:org.apache.spark.streaming.kafka010.DirectKafkaInputDStream的对象可能被作为RDD操作的关闭的一部分被序列化。 这是因为DStream对象被从闭包中引用。 请重写此DStream内的RDD操作以避免这种情况。 这已经被执行,以避免不必要的对象的Spark任务的膨胀。 at org.apache.spark.streaming.dstream.DStream $$ anonfun $ writeObject $ 1.apply $ mcV $ sp(DStream.scala:525)at org.apache.spark.streaming.dstream.DStream $$ anonfun $ writeObject $ 1。在org.apache.spark.streaming.dstream.DStream上应用(DStream.scala:512)$$ anonfun $ writeObject $ 1.apply(DStream.scala:512)at org.apache.spark.util.Utils $ .tryOrIOException(Utils .scala:1303)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java(org.apache.spark.streaming.dstream.DStream.writeObject(DStream.scala:512) :62)在java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)处的java.lang.reflect.Method.invoke(Method.java:498)处的sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)位于java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)位于java.io.ObjectOutputStream.writeObjectO(ObjectO java.io.ObjectOutputStream.java:1178)java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)位于java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)位于java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432 )在java.io.ObjectOutputStream.writeObjectO(ObjectOutputStream.java:1178)处的java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)位于java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)at java.io .ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)at java.io.ObjectOutputStream.writeSerialData( ObjectOutputStream.java:1509)位于java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)位于java.io.ObjectOutputStream.writeObjectO(ObjectOutputStream.java:1178)位于java.io.ObjectOutputStream.defaultWriteFields(Obje java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:1548)位于java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:1509)位于java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)位于java.io.ObjectOutputStream.writeObjectO(ObjectOutputStream.java:1178 )在org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)上的org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer。)上的java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:295)org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner .scala:288)at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)at org.apache.spark.rdd .RDD $$ anonfun $ map $ 1.apply(RDD.scala:370)at org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:369)at org.apache.spark.rdd .RDDOpe 在org.apache.spark.rdd.RDD.withScope(RDD.scala:362)处的org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)处的rationScope $ .withScope(RDDOperationScope.scala:151)在org.apache.spark.rdd.RDD.map(RDD.scala:369)at org.apache.spark.api.java.JavaRDDLike $ class.map(JavaRDDLike.scala:93)at org.apache.spark.api .java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)at ir.pegahtech.tapsell.brain.engine.jobs.Test $ testBroadCast $ 1.call(Test.kt:226)at ir.pegahtech.tapsell.brain.engine。 jobs.Test $ testBroadCast $ 1.call(Test.kt)at org.apache.spark.streaming.api.java.JavaDStreamLike $$ anonfun $ foreachRDD $ 1.apply(JavaDStreamLike.scala:272)at org.apache.spark.streaming .api.java.JavaDStreamLike $$ anonfun $ foreachRDD $ 1.apply(JavaDStreamLike.scala:272)at org.apache.spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3。在org.apache.spark.streaming.dstream.DStream上应用(DStream.scala:628)$$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3.apply(DStream.scala:628)at org .apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp(ForEachDStream.scala:51)at org.apache.spark.streaming.dstream.ForEachDStream $ $ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply(ForEachDStream.scala:51)at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ForEachDStream.scala:51)at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply $ mcV $ sp (ForEachDStream.scala:50)at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply(ForEachDStream.scala:50)at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply (ForEachDStream.scala:50)at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)在org.apache.spark上scala.util.Try $ .apply(Try.scala:192) .streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply $ mcV $ sp(JobScheduler.scala:257)at org.apache.spark.streaming.schedule r.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply(JobScheduler.scala:257)at org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply(JobScheduler.scala:257)at在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor。)上的org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler.run(JobScheduler.scala:256)处的scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)

如何使用或不使用广播variables涉及序列化KafkaInputDStream? Spark版本是2.2.0。