Spark Framework
的一大优点是提供了大量开箱即用的功能。有一个类专门用于处理键值对,即
PairRDDFunctions
类。当以键值格式处理数据时,最常见的操作之一是按键对值进行分组。 PairRDDFunctions 类提供了一个
groupByKey
函数,它使按键分组变得微不足道。但是,
groupByKey
非常昂贵,并且根据用例,可以使用更好的替代方案。在
groupByKey
调用中,所有键值对都将通过网络洗牌到一个 reducer,在这个 reducer 中将值收集在一起。在某些情况下,
groupByKey
只是通过键执行其他操作(求和、平均)的起点。在其他情况下,我们需要将值收集在一起以返回不同的值类型。 Spark 提供了一些分组的替代方法,可以提高性能或简化将值组合成不同类型的能力。这篇文章的重点是考虑这些备用分组功能之一。
替代分组函数
虽然
PairRDDFunctions
类中有许多函数,但今天我们将重点关注
aggregateByKey
。
aggregateByKey
函数用于聚合每个键的值并增加返回不同值类型的可能性。
聚合按键
aggregateByKey 函数需要 3 个参数:
- 不会影响要收集的总值的初始“零”值。例如,如果我们要添加数字,则初始值将为 0。或者在每个键收集唯一元素的情况下,初始值将为空集。
- 接受两个参数的组合函数。第二个参数合并到第一个参数中。此函数组合/合并分区内的值。
- 接受两个参数的合并函数函数。在这种情况下,参数合并为一个。此步骤跨分区合并值。
例如,让我们收集每个键的唯一值。将此视为调用
someRDD.groupByKey().distinct()
的替代方法这是代码:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
您会注意到我们在示例中使用了
可变
哈希集。使用可变集合的原因是为了避免每次我们向集合添加值或合并集合时返回新集合相关的额外内存开销。 (这在 PairRDDFunctions 文档中明确说明)。虽然使用
aggregateByKey
比较冗长,但如果您的数据有很多值但只有少数是唯一的,则此方法可能会提高性能。
对于我们的第二个示例,我们将通过键对值求和,这应该有助于提高性能,因为更少的数据将在网络中洗牌。我们为
aggregateByKey
函数提供了 3 个不同的参数。这次我们想通过键计算我们有多少个值,而不考虑重复项。
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
对于使用过 hadoop 的任何人来说,此功能类似于使用组合器。
结果
运行我们的示例会产生以下结果:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
结论
我们对
aggregateByKey
函数的快速浏览到此结束。虽然使用 3 个函数可能有点笨拙,但它无疑是一个可供您使用的好工具。在后续文章中,我们将继续介绍 Spark 的
PairRDDFunctions
类中的方法