上次我们介绍了 Spark 中的 二次排序 。我们获取了航空公司的绩效数据,并按航空公司、目的地机场和延误量对结果进行了排序。我们对所有数据都使用了 id。虽然这种方法有利于提高性能,但以这种格式查看结果会失去意义。幸运的是, 交通局 网站提供了可供下载的参考文件。参考文件为 CSV 格式,每一行由键值对组成。我们的目标是将参考数据存储在哈希图中并利用 广播变量 ,以便不同分区上的所有操作都可以轻松访问相同的数据。我们有四个带代码的字段:航空公司、始发城市机场、始发城市、目的地机场和目的地城市。我们的两个代码字段使用相同的参考文件(机场 ID),因此我们需要下载 3 个文件。但是,是否有更简单的方法将 3 个文件加载到哈希图中并拥有 3 个单独的广播变量?有,通过使用 Guava Tables 。
番石榴表简介
虽然对 Guava
Table
的完整讨论超出了本文的范围,但简短的描述会有所帮助。它基本上是“哈希图的哈希图”的抽象,使样板不再添加或检索数据。例如:
Map<String,Map<String,String>> outerMap = new HashMap<>();
Map<String,String> inner = outerMap.get("key");
//getting a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
return null;
}else{
String value = inner.get("innerKey");
return value;
}
//adding a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
}
inner.put("innerKey","innerValue");
Table<String,String,String> table = HashBasedTable.create();
//expected behavior if not found - returns null
String innerValue = table.get("key","innerKey");
//if no value exists for "key" hashmap is created.
table.put("key","innerKey","value")
Guava Table 与 HashMaps
希望这个例子足以说明为什么我们想要使用 Guava 表而不是“hashmap of hashmaps”方法。
载入表格
我们有 3 个文件要加载到我们的表中以供查找。这样做的代码很简单:
Map<String,Map<String,String>> outerMap = new HashMap<>();
Map<String,String> inner = outerMap.get("key");
//getting a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
return null;
}else{
String value = inner.get("innerKey");
return value;
}
//adding a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
}
inner.put("innerKey","innerValue");
Table<String,String,String> table = HashBasedTable.create();
//expected behavior if not found - returns null
String innerValue = table.get("key","innerKey");
//if no value exists for "key" hashmap is created.
table.put("key","innerKey","value")
加载番石榴表
load
方法采用引用文件所在的基本路径和文件名列表(还有另一种
load
方法接受以逗号分隔的文件名列表)。我们遍历文件名列表,重新使用基本名称作为“行键”,然后遍历在文件中找到的键值对,将它们存储在表中。在这里,我们在“#”字符上拆分行。参考数据中的值包含逗号并用引号括起来。通过删除双引号并将分隔符更改为“#”来清理文件。
将 Guava 表设置为广播变量
现在我们需要将表对象作为广播变量集成到我们的 Spark 作业中。为此,我们将重新使用上一篇文章中的
SecondarySort
对象:
Map<String,Map<String,String>> outerMap = new HashMap<>();
Map<String,String> inner = outerMap.get("key");
//getting a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
return null;
}else{
String value = inner.get("innerKey");
return value;
}
//adding a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
}
inner.put("innerKey","innerValue");
Table<String,String,String> table = HashBasedTable.create();
//expected behavior if not found - returns null
String innerValue = table.get("key","innerKey");
//if no value exists for "key" hashmap is created.
table.put("key","innerKey","value")
添加表作为广播变量
我们添加了两个参数,参考文件的基本路径和参考文件名的逗号分隔列表。加载表后,我们使用
sc.broadcast
方法调用创建一个广播变量。
查找参考数据
现在我们剩下的就是获取排序结果并将所有 id 转换为更有意义的名称。
Map<String,Map<String,String>> outerMap = new HashMap<>();
Map<String,String> inner = outerMap.get("key");
//getting a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
return null;
}else{
String value = inner.get("innerKey");
return value;
}
//adding a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
}
inner.put("innerKey","innerValue");
Table<String,String,String> table = HashBasedTable.create();
//expected behavior if not found - returns null
String innerValue = table.get("key","innerKey");
//if no value exists for "key" hashmap is created.
table.put("key","innerKey","value")
将排序结果映射到参考数据
这里我们通过
createDelayedFlight
方法将排序后的结果映射到
DelayedFlight
对象中。这里有两点需要注意:
-
要使用表对象,我们需要先将其从
Broadcast
对象中“解包”出来。 -
到达机场 ID 需要转换为
String
,因为它是FlightKey
类中的一个整数,但我们的参考表仅包含字符串。
结果
现在结果如下所示:
Map<String,Map<String,String>> outerMap = new HashMap<>();
Map<String,String> inner = outerMap.get("key");
//getting a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
return null;
}else{
String value = inner.get("innerKey");
return value;
}
//adding a value
if(inner == null){
inner = new HashMap<>();
outerMap.put("key",inner);
}
inner.put("innerKey","innerValue");
Table<String,String,String> table = HashBasedTable.create();
//expected behavior if not found - returns null
String innerValue = table.get("key","innerKey");
//if no value exists for "key" hashmap is created.
table.put("key","innerKey","value")
映射航班结果
快速浏览并一直滚动到右侧,我们可以看到当天飞往德克萨斯州达拉斯的航班出现了相当大的延误。
结论
这总结了我们如何在 Spark 作业中使用 Guava 表作为广播变量。希望读者可以看到使用这种方法的好处。谢谢你的时间。