TL;博士; spark、parquet 和 s3(和 mesos)的组合是一个强大、灵活且具有成本效益的分析平台(顺便说一下,它是 hadoop 的替代品)。然而,让所有这些技术融合在一起并很好地发挥作用并不是一项简单的任务。这篇文章描述了我们 (appsflyer) 在基于这些技术构建我们的分析平台时所面临的挑战,以及我们为缓解这些挑战并使其发挥作用所采取的步骤。
spark 正在成为 map/reduce 的领先替代品, 原因有很多,包括被不同的 hadoop 发行版广泛采用,在单一平台上结合批处理和流处理,以及不断增长的机器学习集成库(包括算法)以及与机器学习语言(即 r 和 python)的集成)。在 appsflyer,我们使用 spark 作为 etl(提取、转换和加载)和分析的主要框架已经有一段时间了。最近的一个例子是 我们最近发布的新版留存报告 ,它利用 spark 处理多个数据流(每天 > 1tb)和 etl(主要是数据清理)和分析(全面点击欺诈检测的垫脚石)生成报告。
我们在本报告中介绍的主要变化之一是从基于 序列文件 构建到使用 parquet 文件 。 parquet 是一种柱状数据格式,这可能是当今为分析目的存储长期大数据的最佳选择(除非您大量投资于 hive,其中 orc 是更合适的格式)。 parquet 与序列文件的优势在于性能和压缩,同时又不失大数据工具(spark、hive、drill、tajo、presto 等)广泛支持的优势。
我们的大数据基础架构的一个相对独特的方面是我们不使用 hadoop(也许这是另一篇文章的主题)。我们使用 mesos 作为资源管理器而不是 yarn,我们使用 amazon s3 而不是 hdfs 作为分布式存储解决方案。 hdfs 与 s3 相比有几个优势,但是,与使用 s3 相比,在 aws 上维护长期运行的 hdfs 集群的成本/收益压倒性地支持 s3。
也就是说,spark、parquet 和 s3 的结合给我们带来了一些挑战,这篇文章将列出主要挑战以及我们为应对这些挑战而提出的解决方案。
镶木地板和火花
parquet 和 spark 似乎已经爱恨交加了一段时间。一方面,spark 文档将 parquet 吹捧为大数据分析的最佳格式之一(确实如此),另一方面,spark 对 parquet 的支持不完整且使用起来很烦人。事情肯定在朝着正确的方向发展,但仍有一些怪癖和陷阱需要注意。
从积极的方面开始,Spark 和 Parquet 的集成在过去几个月中取得了长足的进步。以前, 为了能够将现有数据转换为 parquet,人们不得不跳过重重障碍 。将数据帧引入 spark 使这个过程变得非常非常简单。当数据框 api 支持输入格式时,例如输入是 json(内置)或 avro(尚未内置在 spark 中,但您 可以使用库 来读取它)转换为 parquet 只是一个问题在一侧读取输入格式,在另一侧将其保存为镶木地板。例如考虑以下 Scala 中的代码片段:
val inputpath = "../data/json"
val outputpath = "../data/parquet"
val data = sqlcontext.read.json(inputpath)
date.write.parquet(outputpath)
即使您处理的格式不是数据的一部分,转换过程也非常简单,因为 spark 允许您以编程方式指定模式。 spark 文档非常简单, 包含 scala、java 和 python 中的示例 。此外,用其他语言定义模式并不太复杂。例如,在这里 (appsflyer),我们使用 clojure 作为我们的主要开发语言,因此我们开发了几个辅助函数来做到这一点。下面的示例代码提供了详细信息:
第一件事是从我们拥有的任何结构中提取数据并指定我们喜欢的模式。下面的代码采用事件记录并将各种数据点从中提取到形式为 [:column_name value optional_data_type] 的向量中。请注意,数据类型是可选的,因为如果未指定,它默认为字符串。
val inputpath = "../data/json"
val outputpath = "../data/parquet"
val data = sqlcontext.read.json(inputpath)
date.write.parquet(outputpath)
下一步是使用上面提到的结构来提取模式并转换为数据帧行:
val inputpath = "../data/json"
val outputpath = "../data/parquet"
val data = sqlcontext.read.json(inputpath)
date.write.parquet(outputpath)
最后我们将这些函数应用于 rdd ,将其转换为数据框并保存为镶木地板:
val inputpath = "../data/json"
val outputpath = "../data/parquet"
val data = sqlcontext.read.json(inputpath)
date.write.parquet(outputpath)
如上所述,parquet 和 spark 的发展势头良好,但道路尚不明朗。我们遇到的一些问题包括:
- 1.4 版本中的一个严重错误, 写入镶木地板文件时的竞争条件 导致作业大量数据丢失(此错误已在 1.4.1 中修复 - 所以如果你昨天使用 spark 1.4 和镶木地板升级!)
- 过滤器下推优化, 默认情况下关闭,因为 spark 仍然使用 parquet 1.6.0rc3 - 即使 1.6.0 已经推出一段时间(似乎 spark 1.5 将使用 parquet 1.7.0 所以问题将得到解决)
- spark 不“原生”支持 parquet,相反,spark 依赖于 hadoop 对 parquet 格式的支持——这本身不是问题,但对我们来说,当我们尝试将 spark 和 parquet 与 s3 一起使用时,它导致了主要的性能问题——更多在下一节中
镶木地板、火花和 s3
amazon s3(简单存储服务)是一种使用起来相对便宜的对象存储解决方案。与“真正的”文件系统相比,它确实有一些缺点;主要的是最终一致性,即一个进程所做的更改不会立即被其他应用程序看到。 (如果您使用的是亚马逊的 emr,则可以使用 emrfs“一致视图” 来克服此问题。)但是,如果您了解此限制,s3 仍然是一个可行的输入和输出源,至少对于批处理作业而言。
如上所述,spark 没有原生的 s3 实现,而是依赖 hadoop 类来抽象对 parquet 的数据访问。 hadoop 为 s3 提供了 3 个文件系统客户端:
- 似乎不适用于 spark 的 s3 块文件系统(“s3://..”形式的 uri 模式)
- s3 本机文件系统(“s3n://..”uris)——下载支持 hadoop 2 的 spark 发行版。* 如果你想使用它(tl;dr——你不需要)
- s3a——s3n 的替代品,消除了 s3n 的一些限制和问题。下载“spark with hadoop 2.6 and up”支持以使用这个(tl;dr——你想要这个,但它需要一些工作才能使用)
当我们使用 spark 1.3 时,我们在尝试使用 s3 时遇到了很多问题,所以我们开始使用 s3n——它在大部分情况下都有效,即我们让作业运行并完成,但其中很多作业因各种读取超时和主机未知而失败例外。查看作业中的任务,画面甚至更加严峻,失败率很高,这促使我们将超时和重试次数增加到荒谬的水平。当我们转移到 spark 1.4.1 时,我们又尝试了 s3a。这一次我们成功了。我们要做的第一件事是将 spark.executor.extraclasspath 和 spark.executor.extradriverpath 设置为指向 aws-java-sdk 和 hadoop-aws 罐子,因为 显然这两个罐子都在“spark with hadoop 2.6”中丢失了建立 。自然地,我们使用了这些文件的 2.6 版本,但后来我们遇到了这个 小问题 。 hadoop 2.6 aws 实现有一个错误,导致它以意想不到的方式拆分 s3 文件(例如,400 个文件作业运行 1800 万个任务)幸运的是,使用 hadoop aws jar 版本 2.7.0 而不是 2.6 版本解决了这个问题 - 所以,所有设置的 s3a 前缀都可以正常工作(并且提供比 s3n 更好的性能)。
找到合适的 s3 hadoop 库有助于我们工作的稳定性,但无论 s3 库(s3n 或 s3a)如何,使用 parquet 文件的 spark 作业的性能仍然很糟糕。当查看 spark ui 时,处理数据的实际工作似乎很合理,但 spark 在实际开始工作之前和工作“完成”之后才真正终止之前花费了大量时间。我们喜欢将这种现象称为“镶木地板税”。
显然我们无法忍受“镶木地板税”,因此我们深入研究了作业的日志文件并发现了几个问题。第一个与镶木地板作业的启动时间有关。构建 spark 的人明白模式可以随着时间的推移而发展,并为数据框提供了一个很好的功能,称为 “模式合并”。 如果您查看大数据湖/水库(或今天所谓的任何东西)中的模式,您肯定可以期望模式会随着时间的推移而发展。但是,如果您查看一个目录,该目录是单个作业的结果,则模式没有区别……事实证明,当 spark 初始化作业时,它会读取所有 parquet 文件的页脚以执行模式合并。所有这些工作都是在将任何任务分配给执行程序之前由驱动程序完成的,并且可能需要很长时间,甚至数小时(例如,我们有回顾半年安装数据的作业)。它没有记录,但查看 spark 代码,您可以通过将 mergeschema 指定为 false 来覆盖此行为:
在标度中:
val inputpath = "../data/json"
val outputpath = "../data/parquet"
val data = sqlcontext.read.json(inputpath)
date.write.parquet(outputpath)
在 clojure 中:
val inputpath = "../data/json"
val outputpath = "../data/parquet"
val data = sqlcontext.read.json(inputpath)
date.write.parquet(outputpath)
请注意,这在 spark 1.3 中不起作用。在 spark 1.4 中,它按预期工作,在 spark 1.4.1 中,它导致 spark 只查看 _common_metadata 文件,这不是世界末日,因为它是一个小文件,每个目录只有一个。然而,这给我们带来了“镶木地板税”的另一个方面——“工作结束”的延迟。
关闭 schema 合并和控制 spark 使用的 schema 有助于缩短作业启动时间,但是,如前所述,我们仍然在作业结束时遇到长时间的延迟。我们已经知道一个在使用文本文件时与 hadoop<->s3 相关的问题。 hadoop 是不可变的,首先将文件写入临时目录,然后将它们复制过来。使用 s3 这不是问题,但复制操作非常非常昂贵。对于文本文件,databricks 创建了 directoutputcommitter (可能是为了他们的 spark saas 产品)。替换文本文件的输出提交器相当容易——您只需要在 spark 配置上设置“spark.hadoop.mapred.output.committer.class”,例如:
val inputpath = "../data/json"
val outputpath = "../data/parquet"
val data = sqlcontext.read.json(inputpath)
date.write.parquet(outputpath)
parquet 存在类似的解决方案,与文本文件的解决方案不同,它甚至是 spark 分布的一部分。但是,为了使事情变得复杂,您必须在 hadoop 配置上而不是在 spark 配置上配置它。要获取 hadoop 配置,您首先需要从 spark 配置创建一个 spark 上下文,对其调用 hadoopconfiguration,然后设置“spark.sql.parquet.output.committer.class”,如下所示:
val inputpath = "../data/json"
val outputpath = "../data/parquet"
val data = sqlcontext.read.json(inputpath)
date.write.parquet(outputpath)
使用 directparquetoutputcommitter 可以显着减少“parquet tax”,但我们仍然发现一些工作需要很长时间才能完成。同样,问题是文件系统假设 spark 和 hadoop hold 是罪魁祸首。请记住,“_common_metadata”spark 着眼于作业的开始 - 好吧,spark 在作业结束时花费大量时间创建此文件和一个额外的元数据文件,其中包含来自目录中文件的额外信息。同样,这一切都是从一个地方(驱动程序)完成的,而不是由执行者处理的。当作业产生小文件时(即使有几千个),该过程需要合理的时间。然而,当作业产生更大的文件时(例如,当我们摄取一整天的应用程序启动时)这需要一个多小时。与 mergeschema 一样,解决方案是手动管理元数据,因此我们将“parquet.enable.summary-metadata”设置为 false(再次在 hadoop 配置上并自己生成 _common_metadata 文件(用于大型作业)。
总而言之,parquet,尤其是 spark 正在开发中——让尖端技术为您所用可能是一项挑战,需要大量挖掘。文档有时远非完美,但幸运的是所有相关技术都是开源的(甚至 是 amazon sdk ), 因此您可以随时深入研究错误报告、代码等 。了解事情的实际运作方式并找到您需要的解决方案。此外,您还可以不时找到解释如何克服所用技术中的常见问题的文章和博客文章。我希望这篇文章能够解决集成 spark、parquet 和 s3 的一些复杂问题,归根结底,这些都是具有巨大潜力的伟大技术。
(这篇文章的一个版本最初发布在 appsflyer 的博客 中。还要特别感谢来自 appsflyer 数据团队的 morri feldman 和 michael spector ,他们解决了本文中讨论的大部分问题)