Tag: apache spark

在尝试提交广播variables的Spark中的Kafka偏移量时获取任务不可序列化

我写了从Kafka读取并且手动提交偏移量的 Spark作业。 它工作正常,但自从我引入广播variables,我得到序列化exception,因为它试图序列化KafkaInputDStream。 下面是一个显示问题的简单代码(代码是用Kotlin编写的,但我相信它也会在Java中发生): fun testBroadCast(jsc: JavaStreamingContext, kafkaStream: JavaInputDStream<ConsumerRecord>) { val keyPrefix = jsc.sparkContext().broadcast(“EVENT:”) kafkaStream.foreachRDD { rdd -> val offsetRanges = (rdd.rdd() as HasOffsetRanges).offsetRanges() val prefixedIds = rdd.map { “${keyPrefix.value}:$it” }.collect() (kafkaStream.dstream() as CanCommitOffsets).commitAsync(offsetRanges) } } fun main(args: Array) { val jsc = JavaStreamingContext(SparkConf().setAppName(“test simple prefixer”).setMaster(“local[*]”), Duration(5000)) val stream = makeStreamFromSerializableEventTopic(jsc) testBroadCast(jsc, stream) jsc.start() jsc.awaitTermination() […]

匕首2:无法在Intellij Idea(Kotlin)中find生成的类

我试图写一个应用程序来理解火花和匕首2.但无法使用生成的匕首文件。 这个问题有很多类似的问题,但是我不能用这些问题来解决问题。 我的项目可以在这里findgithub build.gradle文件看起来像这样 … apply plugin: ‘kotlin-kapt’ dependencies { compile “org.jetbrains.kotlin:kotlin-stdlib-jre8:$kotlin_version” compile “com.sparkjava:spark-kotlin:$spark_kotlin_version” compile “org.slf4j:slf4j-log4j12:$slf4j_version” compile “com.google.dagger:dagger:$dagger_version” kapt “com.google.dagger:dagger-compiler:$dagger_version” testCompile group: ‘junit’, name: ‘junit’, version: ‘4.12’ } …. 这是我试图注入的SparkSetup.kt类。 这个模块和组件存在于co.pissarra.util.dagger包中 SetupModule.kt的内容如下所示 @Module class SetUpModule { @Provides @Singleton fun provideSparkSetup() : SparkSetup { return SparkSetup() } } 这是AppComponent.kt类 @Singleton @Component(modules = arrayOf(SetUpModule::class)) interface AppComponent { […]