Spark PairRDDFunctions—AggregateByKey

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 63w+ 字,讲解图 2808+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2200+ 小伙伴加入学习 ,欢迎点击围观

Spark Framework 的一大优点是提供了大量开箱即用的功能。有一个类专门用于处理键值对,即 PairRDDFunctions 类。当以键值格式处理数据时,最常见的操作之一是按键对值进行分组。 PairRDDFunctions 类提供了一个 groupByKey 函数,它使按键分组变得微不足道。但是, groupByKey 非常昂贵,并且根据用例,可以使用更好的替代方案。在 groupByKey 调用中,所有键值对都将通过网络洗牌到一个 reducer,在这个 reducer 中将值收集在一起。在某些情况下, groupByKey 只是通过键执行其他操作(求和、平均)的起点。在其他情况下,我们需要将值收集在一起以返回不同的值类型。 Spark 提供了一些分组的替代方法,可以提高性能或简化将值组合成不同类型的能力。这篇文章的重点是考虑这些备用分组功能之一。

替代分组函数

虽然 PairRDDFunctions 类中有许多函数,但今天我们将重点关注 aggregateByKey aggregateByKey 函数用于聚合每个键的值并增加返回不同值类型的可能性。

聚合按键

aggregateByKey 函数需要 3 个参数:

  1. 不会影响要收集的总值的初始“零”值。例如,如果我们要添加数字,则初始值将为 0。或者在每个键收集唯一元素的情况下,初始值将为空集。
  2. 接受两个参数的组合函数。第二个参数合并到第一个参数中。此函数组合/合并分区内的值。
  3. 接受两个参数的合并函数函数。在这种情况下,参数合并为一个。此步骤跨分区合并值。

例如,让我们收集每个键的唯一值。将此视为调用 someRDD.groupByKey().distinct() 的替代方法这是代码:


 val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()

val initialSet = mutable.HashSet.empty[String] val addToSet = (s: mutable.HashSet[String], v: String) => s += v val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2

val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)

您会注意到我们在示例中使用了 可变 哈希集。使用可变集合的原因是为了避免每次我们向集合添加值或合并集合时返回新集合相关的额外内存开销。 (这在 PairRDDFunctions 文档中明确说明)。虽然使用 aggregateByKey 比较冗长,但如果您的数据有很多值但只有少数是唯一的,则此方法可能会提高性能。

对于我们的第二个示例,我们将通过键对值求和,这应该有助于提高性能,因为更少的数据将在网络中洗牌。我们为 aggregateByKey 函数提供了 3 个不同的参数。这次我们想通过键计算我们有多少个值,而不考虑重复项。


 val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()

val initialSet = mutable.HashSet.empty[String] val addToSet = (s: mutable.HashSet[String], v: String) => s += v val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2

val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)

对于使用过 hadoop 的任何人来说,此功能类似于使用组合器。

结果

运行我们的示例会产生以下结果:


 val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()

val initialSet = mutable.HashSet.empty[String] val addToSet = (s: mutable.HashSet[String], v: String) => s += v val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2

val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)

结论

我们对 aggregateByKey 函数的快速浏览到此结束。虽然使用 3 个函数可能有点笨拙,但它无疑是一个可供您使用的好工具。在后续文章中,我们将继续介绍 Spark 的 PairRDDFunctions 类中的方法

资源

相关文章