本文共 801 字,大约阅读时间需要 2 分钟。
由于Spark的distinct算子默认实现效率较低,需要自行优化以提升性能。
具体实现方式非常简单,主要基于集合的特性。
def mydistinct(iter: Iterator[(String, Int)]: Iterator[String] = { iter.foldLeft(Set[String]())((curS, item) => curS + item._1).toIterator}
使用mydistinct的方式如下:
val rdd2 = rdd1.map(x => (x._1 + SPLIT + x._2 + SPLIT + x._3 + SPLIT + x._4, 1)).partitionBy(new org.apache.spark.HashPartitioner(100)).mapPartitions(SetProcess.mydistinct).map(key => { val strs = key.split(SPLIT) (strs(0), strs(1), strs(2), strs(3))
说明:
这种方法充分利用了Spark的高效分区机制和集合的去重特性,实现了高效的去重操作。
转载地址:http://xdig.baihongyu.com/