使用 Apache Spark 和 MySQL 进行数据分析

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 63w+ 字,讲解图 2808+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 2200+ 小伙伴加入学习 ,欢迎点击围观

Apache Spark 是一个集群计算框架,类似于 Apache Hadoop。 维基百科 对此有很好的描述:

Apache Spark 是一个开源集群计算框架,最初由加州大学伯克利分校的 AMPLab 开发,但后来捐赠给了 Apache 软件基金会,该基金会今天仍然存在。与 Hadoop 的两级基于磁盘的 MapReduce 范例相比,Spark 的多级内存原语为某些应用程序提供高达 100 倍的性能。通过允许用户程序将数据加载到集群的内存中并反复查询,Spark 非常适合机器学习算法。

与流行的看法相反,Spark 不需要所有数据都适合内存,而是会使用缓存来加速操作(就像 MySQL 一样)。 Spark也可以独立运行,不需要Hadoop;它也可以在单个服务器(甚至笔记本电脑或台式机)上运行,并利用您所有的 CPU 内核。

以分布式模式启动它真的很容易。先启动“master”。您可以在同一节点上运行“slave”:


 root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

然后在任何其他节点上运行 Spark Worker(确保将主机名添加到 /etc/hosts 或使用 DNS):


 root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

为什么选择 Spark 而不是 MySQL?

MySQL(开箱即用)在许多任务中表现不佳。 MySQL 的限制之一是:1 个查询 = 1 个 cpu 核心。这意味着即使您有 48 个快速核心和一个大型数据集要处理(即分组、排序等),它也不会利用全部计算能力。相反,Spark 将能够利用您所有的 CPU 内核。

MySQL 和 Spark 的另一个区别:

  • MySQL 使用所谓的“写入模式”——它需要将数据转换为 MySQL。如果我们的数据不在 MySQL 中,则不能使用“sql”来查询它。
  • Spark(以及 Hadoop/Hive)使用“读取模式”——它可以在压缩文本文件(或任何其他支持的输入格式)之上应用表结构,并将其视为表;那么我们就可以使用SQL来查询这个“表”了。

换句话说,MySQL是存储+处理,而Spark的工作只是处理,它可以直接从/到 外部数据集 管道数据,即Hadoop、Amazon S3、本地文件、JDBC(MySQL/其他数据库)。 Spark 支持文本文件(压缩)、SequenceFiles 和任何其他 Hadoop InputFormat 以及 Parquet Columnar 存储。与 Hadoop 相比,Spark 在这方面更加灵活:例如,Spark 可以直接从 MySQL 读取数据。

将外部数据加载到 MySQL 的典型管道是:

  1. 解压缩 (通常外部数据在压缩的文本文件中)
  2. 使用“LOAD DATA INFILE” 将其加载到 MySQL 的暂存表中
  3. 只有这样我们才能过滤/分组并将 结果保存在另一个表中

这可能会导致额外的开销。在许多情况下,我们不需要“原始”数据,但我们仍然必须将其加载到 MySQL 中。

为什么 Spark 与 MySQL 一起使用

相反, 我们的分析结果 (即聚合数据)应该在MySQL中。不一定非得如此,但将分析结果存储在 MySQL 中会方便得多。假设您要分析一个大数据集(即年度销售额比较),您需要以表格或图表的形式呈现它。结果集将 明显变小,因为它会被聚合 ,并且将它存储在 MySQL 中会容易得多,因为许多标准应用程序都可以使用它。

真实世界测试用例

一个有趣的免费数据集是 维基百科页面计数 。 (>1TB 压缩,自 2008 年起可用)。此数据可以下载(作为 gzip 空格分隔的文本文件),也可以在 AWS 上使用(有限数据集)。数据按小时聚合,具有以下字段:

  • 项目(即“en”、“fr”等,通常是一种语言)
  • 页面标题(uri),urlencoded
  • 请求数
  • 返回内容的大小

(日期字段在文件名中编码,每小时 1 个文件)

我们的目标是根据英文维基百科中每天的请求数找到前 10 个页面,同时也支持搜索任意词,这样我们就可以展示如何,例如,关于“Myspace”的维基百科文章的请求数”将与有关“Facebook”(2008 年至 2015 年)的文章进行比较。

要在 MySQL 中执行此操作,我们必须将其按原样加载到 MySQL 中。这些文件是用编码的日期部分分发的。所有文件的未压缩大小 > 10TB。以下是可能的步骤(根据我们典型的 MySQL 管道):

  1. 解压缩文件并运行“LOAD DATA INFILE”到暂存(临时)表中:
    
     root@thor:~/spark# ./sbin/start-master.sh
    less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
    15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
    15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
    15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
    15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
    root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077
    
  2. 聚合并“插入”最终表
    
     root@thor:~/spark# ./sbin/start-master.sh
    less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
    15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
    15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
    15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
    15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
    root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077
    
  3. 以某种方式 url 解码标题(可能使用 UDF)。

这是很大的开销。我们将解压缩数据并将其转换为 MySQL,以丢弃大部分数据。

根据我的计算,完成 6 年数据的整个管道需要 > 1 个月的时间(这个时间不包括解压缩时间,也不包括加载时间折旧,因为表变得越来越大,索引需要更新).当然,我们可以在这里做很多事情来加快速度,即加载到不同的 MySQL 实例中,首先加载到 MEMORY 表中,然后分组到 InnoDB 中,等等。

但这里最简单的方法之一是使用 Apache Spark 和 Python 脚本 (pyspark)。 Pyspark 可以读取原始的 gzip 文本文件,用 SQL 查询这些文本文件,应用任何过滤器,函数,即 urldecode,按天分组并将结果集保存到 MySQL 中。

下面是执行这些操作的 Python 脚本:


 root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

在脚本中,我使用 Spark 读取原始 gzip 文件(一次 1 天)。我们可以使用目录作为“输入”或文件列表。然后我将使用 弹性数据集 (RDD) 转换 ; Python 有 lambda 函数 map 和 filter,它们允许我们拆分“输入文件”并过滤它们。

下一步将是应用模式(声明字段);在这里我们还可以应用任何其他功能;即,我使用 urllib.unquote 来解码标题 (urldecode)。最后,我们可以注册临时表,然后使用熟悉的 SQL 进行分组。

该脚本通常会利用所有 cpu 内核。此外,即使没有 Hadoop,也很容易以分布式模式运行它:只需将文件复制到 Spark 集群中的所有机器或使用 NFS/外部存储。

该脚本在 3 个盒子上花费了大约一个小时来处理 1 个月的数据并将 聚合数据 加载到 MySQL(单实例)。我们可以估计将所有 6 年(汇总)加载到 MySQL 大约需要 3 天。

您可能会问为什么它现在明显更快(而且我们仍然将结果加载到同一个 MySQL 实例)?答案是它是一种不同且更高效的管道。在我们最初的 MySQL 管道(可能需要几个月)中,我们将原始数据加载到 MySQL。这里我们对读取进行过滤和分组,只将我们需要的写入MySQL。

这里可能还会出现一个问题:我们真的需要这一整条“管道”吗?我们可以简单地在“原始”数据之上运行我们的分析查询吗?好吧,这是可能的,但可能需要 1000 个节点的 Spark Cluster 才能有效地完成它,因为它需要扫描 5TB 的数据(参见下面的“更多阅读”)。

MySQL 插入的多线程性能

当使用 group_res.write.jdbc(url=mysql_url, table=”wikistats.wikistats_by_day_spark”, mode=”append”) 时,Spark 将使用多线程插入 MySQL。


 root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

监控你的工作

Spark 为您提供了一个 Web 界面来监控和管理您的作业。这是示例:我正在运行 wikistats.py 应用程序:

结果:使用 Parquet 列格式与 MySQL InnoDB 表

Spark 支持 Apache Parquet Columnar 格式,所以我们可以将 RDD 保存为 parquet 文件(可以保存到目录到 HDFS):


 root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

在这里,我们将管道的结果(聚合数据)保存到 Spark 中。我还使用按天分区(“mydate=20080101”),Spark 可以自动发现这种格式的分区。

当我得到结果时,我想查询它。假设我想在 2018 年 1 月找到前 10 个最常查询的 wiki 页面。我可以使用 MySQL 执行此查询(我需要过滤掉主页和搜索页面):


 root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

请注意,这里我们使用的是已经聚合(按数据汇总)的表格,而不是“原始”数据。

正如我们所见,查询耗时 1 小时 22 分钟。我还将相同的结果保存到 Parquet(参见脚本),所以现在我可以将它与 Spark-SQL 一起使用:


 root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

这将使用本地版本的 spark-sql,仅使用 1 个主机。


 root@thor:~/spark# ./sbin/start-master.sh
less ../logs/spark-root-org.apache.spark.deploy.master.Master-1-thor.out
15/08/25 11:21:21 INFO Master: Starting Spark master at spark://thor:7077
15/08/25 11:21:21 INFO Master: Running Spark version 1.4.1
15/08/25 11:21:21 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/08/25 11:21:21 INFO MasterWebUI: Started MasterWebUI at http://10.60.23.188:8080
root@thor:~/spark# ./sbin/start-slave.sh spark://thor:7077

这花了大约 20 分钟,这要快得多。

结论

Apache Spark 提供了一种非常简单的方法来分析和聚合数据。与其他大数据和分析框架相比,我喜欢 Spark 的地方:

  • 开源并积极开发
  • 不依赖于工具,即输入数据和输出数据不必在 Hadoop 中
  • 单机模式,快速启动,易于部署
  • 大规模并行,易于添加节点
  • 支持多种输入输出格式;即,它可以读取/写入 MySQL(相对于 JDBC 驱动程序)和 Parquet Columnar 格式

但是,有许多缺点:

  • 它仍然是新的,因此您可以预料到一些错误和未记录的行为。许多错误很难解释。
  • 它需要Java; Spark 1.5 仅支持 Java 7 及更高版本。这也意味着它将需要额外的内存,这在当今是合理的。
  • 您将需要通过“spark-submit”运行作业

我相信 Apache Spark 是一个很棒的工具,可以补充 MySQL 的数据分析和 BI 目的。

最初由 Alexander Rubin 为 Percona 编写。

相关文章