用于 Spark 的番石榴表

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 63w+ 字,讲解图 2808+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2200+ 小伙伴加入学习 ,欢迎点击围观

上次我们介绍了 Spark 中的 二次排序 。我们获取了航空公司的绩效数据,并按航空公司、目的地机场和延误量对结果进行了排序。我们对所有数据都使用了 id。虽然这种方法有利于提高性能,但以这种格式查看结果会失去意义。幸运的是, 交通局 网站提供了可供下载的参考文件。参考文件为 CSV 格式,每一行由键值对组成。我们的目标是将参考数据存储在哈希图中并利用 广播变量 ,以便不同分区上的所有操作都可以轻松访问相同的数据。我们有四个带代码的字段:航空公司、始发城市机场、始发城市、目的地机场和目的地城市。我们的两个代码字段使用相同的参考文件(机场 ID),因此我们需要下载 3 个文件。但是,是否有更简单的方法将 3 个文件加载到哈希图中并拥有 3 个单独的广播变量?有,通过使用 Guava Tables

番石榴表简介

虽然对 Guava Table 的完整讨论超出了本文的范围,但简短的描述会有所帮助。它基本上是“哈希图的哈希图”的抽象,使样板不再添加或检索数据。例如:


 Map<String,Map<String,String>> outerMap = new HashMap<>();

Map<String,String> inner = outerMap.get("key"); //getting a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); return null; }else{ String value = inner.get("innerKey"); return value; }

//adding a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); } inner.put("innerKey","innerValue");

Table<String,String,String> table = HashBasedTable.create(); //expected behavior if not found - returns null String innerValue = table.get("key","innerKey"); //if no value exists for "key" hashmap is created. table.put("key","innerKey","value")

Guava Table 与 HashMaps

希望这个例子足以说明为什么我们想要使用 Guava 表而不是“hashmap of hashmaps”方法。

载入表格

我们有 3 个文件要加载到我们的表中以供查找。这样做的代码很简单:


 Map<String,Map<String,String>> outerMap = new HashMap<>();

Map<String,String> inner = outerMap.get("key"); //getting a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); return null; }else{ String value = inner.get("innerKey"); return value; }

//adding a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); } inner.put("innerKey","innerValue");

Table<String,String,String> table = HashBasedTable.create(); //expected behavior if not found - returns null String innerValue = table.get("key","innerKey"); //if no value exists for "key" hashmap is created. table.put("key","innerKey","value")

加载番石榴表

load 方法采用引用文件所在的基本路径和文件名列表(还有另一种 load 方法接受以逗号分隔的文件名列表)。我们遍历文件名列表,重新使用基本名称作为“行键”,然后遍历在文件中找到的键值对,将它们存储在表中。在这里,我们在“#”字符上拆分行。参考数据中的值包含逗号并用引号括起来。通过删除双引号并将分隔符更改为“#”来清理文件。

将 Guava 表设置为广播变量

现在我们需要将表对象作为广播变量集成到我们的 Spark 作业中。为此,我们将重新使用上一篇文章中的 SecondarySort 对象:


 Map<String,Map<String,String>> outerMap = new HashMap<>();

Map<String,String> inner = outerMap.get("key"); //getting a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); return null; }else{ String value = inner.get("innerKey"); return value; }

//adding a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); } inner.put("innerKey","innerValue");

Table<String,String,String> table = HashBasedTable.create(); //expected behavior if not found - returns null String innerValue = table.get("key","innerKey"); //if no value exists for "key" hashmap is created. table.put("key","innerKey","value")

添加表作为广播变量

我们添加了两个参数,参考文件的基本路径和参考文件名的逗号分隔列表。加载表后,我们使用 sc.broadcast 方法调用创建一个广播变量。

查找参考数据

现在我们剩下的就是获取排序结果并将所有 id 转换为更有意义的名称。


 Map<String,Map<String,String>> outerMap = new HashMap<>();

Map<String,String> inner = outerMap.get("key"); //getting a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); return null; }else{ String value = inner.get("innerKey"); return value; }

//adding a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); } inner.put("innerKey","innerValue");

Table<String,String,String> table = HashBasedTable.create(); //expected behavior if not found - returns null String innerValue = table.get("key","innerKey"); //if no value exists for "key" hashmap is created. table.put("key","innerKey","value")

将排序结果映射到参考数据

这里我们通过 createDelayedFlight 方法将排序后的结果映射到 DelayedFlight 对象中。这里有两点需要注意:

  1. 要使用表对象,我们需要先将其从 Broadcast 对象中“解包”出来。
  2. 到达机场 ID 需要转换为 String ,因为它是 FlightKey 类中的一个整数,但我们的参考表仅包含字符串。

结果

现在结果如下所示:


 Map<String,Map<String,String>> outerMap = new HashMap<>();

Map<String,String> inner = outerMap.get("key"); //getting a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); return null; }else{ String value = inner.get("innerKey"); return value; }

//adding a value if(inner == null){ inner = new HashMap<>(); outerMap.put("key",inner); } inner.put("innerKey","innerValue");

Table<String,String,String> table = HashBasedTable.create(); //expected behavior if not found - returns null String innerValue = table.get("key","innerKey"); //if no value exists for "key" hashmap is created. table.put("key","innerKey","value")

映射航班结果

快速浏览并一直滚动到右侧,我们可以看到当天飞往德克萨斯州达拉斯的航班出现了相当大的延误。

结论

这总结了我们如何在 Spark 作业中使用 Guava 表作为广播变量。希望读者可以看到使用这种方法的好处。谢谢你的时间。

资源

相关文章