为什么不Spark并行randomSplit

我有一些Spark代码可以创建一系列的RDD。 最后我调用randomSplit把它分成3组,然后我把它们写到磁盘上。 所以第一阶段是:

  1. 获取一些数据
  2. 做一些转变
  3. 缓存结果
  4. 通过randomSplit分割
  5. 将所有分割写入磁盘

因为步骤(4)将事物分成3组,所以在这里有3个不同的Spark阶段。 在第一阶段结束时,我们开始用尽第一阶段的任务,但是有执行者可用:

在这里输入图像描述

此时已经计算出了几个分区的数据集。 据我所知randomSplit按分区运行在一个分区上; 换句话说,它不需要洗牌或收集 – 它只是在每个分区的基础上随机选择行。 如果这是正确的,那么没有任何理由说明阶段2的一些任务不能在可用的执行程序上运行 – 它们的RDD的分区已经被计算和缓存。 为什么不启动一些阶段2的任务来利用可用的资源。

注意:显然,“他们可以,但他们没有”在这里回答是完全有效的。 我想我真正要问的是,是否有一些技术上的原因,我没有想到,这使得这不可能(或非常困难),或者这只是一个执行监督?

以下是代码的简化版本(在Kotlin中):

 fun run(sc: JavaSparkContext, opts: Options) { val allData = fetchABunchOfData() val allDataRdd = sc.parallelize(allData) val taggedAndTokenized = allDataRdd.mapPartitions { addTagsAndTokens(it) } // Convert each ResponseData to a JSON String val jsonStrings = taggedAndTokenized.map { val mapper = AnxJsonUtils.getMapper() mapper.writeValueAsString(it) } // the randomSplit below creates 3 distinct RDD lineags so if we don't cache the parsing stuff we'll parse the // entire document set twice. jsonStrings.cache() val trainValidTest = jsonStrings.randomSplit(doubleArrayOf(opts.trainFrac, opts.validFrac, opts.testFrac), splitSeed) trainValidTest[0].saveAsTextFile(opts.outPath + "/" + TRAIN_NAME) trainValidTest[1].saveAsTextFile(opts.outPath + "/" + VALID_NAME) trainValidTest[2].saveAsTextFile(opts.outPath + "/" + TEST_NAME) } 

由于多种原因, saveAsTextFile是一个阻塞调用。 这意味着Spark主机将不会收到第二个保存指令,直到第一个完成。

也就是说,如果你想利用这些可用的资源,你可以做的是在三个独立的线程中调用saveAsTextFile并等待它们的未来。 一旦工作人员完成第一个任务的分区,就可以从第二个任务开始。