由卢克洛维特撰写。
自 MongoDB Connector for Hadoop 的最后一个功能发布以来已经快一年了。我们非常高兴地宣布 1.4 版的发布,其中包含一些出色的改进和许多错误修正。在本文中,我想特别关注一项真正提高连接器性能的改进:支持 Hadoop 的 推测执行 。
什么是推测执行?
在像 Hadoop 这样的分布式系统中处理数据时,某些处理工作负载可能会受到一两个速度特别慢的节点的瓶颈。节点变慢的原因可能有很多,例如软件配置错误或硬件故障。也许这些节点周围的网络特别饱和。无论是什么原因,Hadoop 作业都无法完成,直到这些慢速节点赶上它们的工作。
推测执行 允许 Hadoop 在作业结束时冗余地重新安排一些任务。这意味着多个节点最终可能会处理完全相同的数据段,即使这些节点中只有一个节点需要成功完成该工作才能完成作业。它变成了一场争分夺秒的比赛:如果 Hadoop 集群中的一个或两个节点比其余节点慢,那么任何其他更快的节点,给定相同的任务,可能首先完成它。在这种情况下,计划在较慢节点上的任务将被取消,并且作业比它必须在较慢节点上等待时更早完成。
到目前为止,这似乎是一项巧妙的优化,但它在从 Hadoop 连接器写入 MongoDB 时导致了一些严重的问题。让我们更深入地探讨导致此问题的原因......
问题
MongoOutputFormat 创建一个 MongoRecordWriter ,它在收到数据后立即使用 MongoDB Java 驱动程序将数据直接发送到 MongoDB。回想一下,允许在 Hadoop 集群上进行推测性执行允许 ResourceManager 冗余地安排任务。如果所有这些冗余任务同时运行,而且很有可能是,那么它们中的每一个都在写入 MongoDB,而不管它最终是否会完成。
如果作业发出的文档已经具有 _id 字段,这可能会导致 DuplicateKeyErrors。其中一个冗余任务首先完成了比赛,但失败者仍会尝试插入 ID 已存在的文档,因为它们是由获胜者插入的!如果作业发出没有 _id 的文档,则 Java 驱动程序会自动添加它们。如果驱动程序没有生成重复的 ID,我们就会避免 DuplicateKeyErrors,但现在我们的数据库中有重复的文档,具有不同的 ID!无论哪种方式,这都是不可取的。
以前,我们建议用户关闭推测执行。这避免了这种讨厌的行为,但关闭了一个有用的功能。我查看了 其他 Hadoop 连接器 ,它们都面临类似的问题并提出了相同的建议。这个问题似乎是写入实时运行系统的地方病。
有没有 Hadoop 推测执行可以避免输出中重复记录的情况?答案是肯定的,当将输出写入文件时,解决方案非常简单。每个任务都会创建一个临时目录,它可以在其中写入临时文件。当每个任务开始生成输出时,此输出将写入一个临时文件。另一个难题是称为 OutputCommitter 的类。 OutputCommitter 定义了在作业或任务即将开始、已中止或完成时调用的方法。通常,每个 OutputFormat 都定义了一种要使用的 OutputCommitter。例如,如果您使用的是 FileOutputFormat ,那么您也在使用 FileOutputCommitter 。
FileOutputCommitter 只是立即删除已中止任务的所有临时目录。对于我们的慢速节点,它们的任务被重新安排在其他更快的节点上并在慢速节点完成之前完成,所以现在慢速节点被清理干净了。在快速节点上完成的任务会将它们的临时文件收集到一个目录中,该目录代表整个作业的输出。由于输出仅来自成功完成的任务,因此输出中没有重复记录。
我们采用了类似的方法来支持写入 MongoDB 的推测执行。 MongoRecordWriter 不是直接写入 MongoDB,而是写入一个临时目录。每个插入或更新操作都有一个特殊的序列化格式被写入。当任务中止时,这些文件将被删除。当任务完成时,MongoOutputCommitter 读取文件并执行每个操作。
这足以让 Hadoop 连接器很好地处理推测执行。然而,在这一点上,我们可以更进一步,允许另一个优化。
另一个优化
近一年来,MongoDB 驱动程序一直支持批量操作 API。 MongoDB 服务器版本 2.6 及更高版本支持批量操作,其完成速度往往比串行发送的相同操作快得多。 Hadoop 连接器从未利用过批量 API。但是,既然每个任务都会以临时文件的形式生成一批冻结的操作,那么使用批量 API 将这些操作发送到 MongoDB 就相当简单了。
在可以处理和生成 TB 甚至 PB 级文档的 Hadoop 作业中使用批量 API 会对性能产生巨大的积极影响。我们尽最大努力准确衡量这提供了什么样的好处。我们编写了一个“身份”MapReduce 作业(即输出与输入相同且中间没有处理的作业)。该作业的输入是一个大型 BSON 文件,类似于“ mongodump ”程序可能生成的文件。
我们比较了运行 CDH4 的 5 节点 Hadoop 集群上 MongoOutputCommitter 和批量写入更改前后“身份”作业的性能。该作业的输入是 “enron emails”数据集 ,其中包含 501,513 个文档,每个文档的大小约为 4k。在 MongoOutputCommitter 和批量写入更改之前,Hadoop 作业需要 147 分钟 才能完成。当然,其中一些测量表示在 Hadoop 集群中的节点之间移动拆分所花费的时间,但大部分时间是 Hadoop 连接器开销,因为此作业不需要处理。批量写入更改后,同样的工作需要 6 分钟!如果我们假设剩余 6 分钟 的大部分执行时间也是连接器开销(将数据移动到 MongoDB 仍然需要花费一些时间),那么这几乎是 96% 的改进!
我们不仅修复了一个错误,而且在一个非常常见的用例(即使用 MongoDB 作为 Hadoop 作业的接收器)中对连接器的性能进行了巨大的改进。我们希望这个改进和 1.4 中的其他改进让我们的用户非常高兴,并且我们的用户继续成为围绕这个项目的强大支持社区。要利用此处讨论的改进和许多其他改进,请通过将以下内容添加到您的 pom.xml 下载 MongoDB Hadoop 连接器 1.4.0 版:
<dependency>
<groupId>org.mongodb.mongo-hadoop</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>1.4.0</version>
</dependency>
或者如果你使用 Gradle ,试试这个:
<dependency>
<groupId>org.mongodb.mongo-hadoop</groupId>
<artifactId>mongo-hadoop-core</artifactId>
<version>1.4.0</version>
</dependency>
您还可以访问 Github 上的项目主页,并 直接从“发布”页面下载 jar 。
最后,您可以 在此处阅读所有发行说明 。
谢谢。
JVM 驱动程序团队
想要更多 MongoDB?了解 Apache Spark 和 MongoDB 如何协同工作以将分析转化为实时分析。