• 欢迎访问蜷缩的蜗牛博客 蜷缩的蜗牛
  • 微信搜索: 蜷缩的蜗牛 | 联系站长 kbsonlong@qq.com
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

【转载】从Hadoop到Spark的架构实践

Linux 蜷缩的蜗牛 1年前 (2017-07-13) 15次浏览 已收录 0个评论

当下,Spark 已经在国内得到了广泛的认可和支持:2014 年,Spark Summit China 在北京召开,场面火爆;同年,Spark Meetup 在北京、上海、深圳和杭州四个城市举办,其中仅北京就成功举办了 5 次,内容更涵盖 Spark Core、Spark Streaming、Spark MLlib、Spark SQL 等众多领域。而作为较早关注和引入 Spark 的移动互联网大数据综合服务公司,TalkingData 也积极地参与到国内 Spark 社区的各种活动,并多次在 Meetup 中分享公司的 Spark 使用经验。本文则主要介绍 TalkingData 在大数据平台建设过程中,逐渐引入 Spark,并且以 Hadoop YARN 和 Spark 为基础来构建移动大数据平台的过程。

初识 Spark

作为一家在移动互联网大数据领域创业的公司,时刻关注大数据技术领域的发展和进步是公司技术团队必做的功课。而在整理 Strata 2013 公开的讲义时,一篇主题为《An Introduction on the Berkeley Data Analytics Stack_BDAS_Featuring Spark,Spark Streaming,and Shark》的教程引起了整个技术团队的关注和讨论,其中 Spark 基于内存的 RDD 模型、对机器学习算法的支持、整个技术栈中实时处理和离线处理的统一模型以及 Shark 都让人眼前一亮。同时期我们关注的还有 Impala,但对比 Spark,Impala 可以理解为对 Hive 的升级,而 Spark 则尝试围绕 RDD 建立一个用于大数据处理的生态系统。对于一家数据量高速增长,业务又是以大数据处理为核心并且在不断变化的创业公司而言,后者无疑更值得进一步关注和研究。

Spark 初探

2013 年中期,随着业务高速发展,越来越多的移动设备侧数据被各个不同的业务平台收集。那么这些数据除了提供不同业务所需要的业务指标,是否还蕴藏着更多的价值?为了更好地挖掘数据潜在价值,我们决定建造自己的数据中心,将各业务平台的数据汇集到一起,对覆盖设备的相关数据进行加工、分析和挖掘,从而探索数据的价值。初期数据中心主要功能设置如下所示:

1. 跨市场聚合的安卓应用排名;

2. 基于用户兴趣的应用推荐。

基于当时的技术掌握程度和功能需求,数据中心所采用的技术架构如图 1。

图 1 基于 Hadoop 2.0 的数据中心技术架构

整个系统构建基于 Hadoop 2.0(Cloudera CDH4.3),采用了最原始的大数据计算架构。通过日志汇集程序,将不同业务平台的日志汇集到数据中心,并通过 ETL 将数据进行格式化处理,储存到 HDFS。其中,排名和推荐算法的实现都采用了 MapReduce,系统中只存在离线批量计算,并通过基于 Azkaban 的调度系统进行离线任务的调度。

第一个版本的数据中心架构基本上是以满足“最基本的数据利用”这一目的进行设计的。然而,随着对数据价值探索得逐渐加深,越来越多的实时分析需求被提出。与此同时,更多的机器学习算法也亟需添加,以便支持不同的数据挖掘需求。对于实时数据分析,显然不能通过“对每个分析需求单独开发 MapReduce 任务”来完成,因此引入 Hive 是一个简单而直接的选择。鉴于传统的 MapReduce 模型并不能很好地支持迭代计算,我们需要一个更好的并行计算框架来支持机器学习算法。而这些正是我们一直在密切关注的 Spark 所擅长的领域——凭借其对迭代计算的友好支持,Spark 理所当然地成为了不二之选。2013 年 9 月底,随着 Spark 0.8.0 发布,我们决定对最初的架构进行演进,引入 Hive 作为即时查询的基础,同时引入 Spark 计算框架来支持机器学习类型的计算,并且验证 Spark 这个新的计算框架是否能够全面替代传统的以 MapReduce 为基础的计算框架。图 2 为整个系统的架构演变。

图 2 在原始架构中测试 Spark

在这个架构中,我们将 Spark 0.8.1 部署在 YARN 上,通过分 Queue,来隔离基于 Spark 的机器学习任务,计算排名的日常 MapReduce 任务和基于 Hive 的即时分析任务。

想要引入 Spark,第一步需要做的就是要取得支持我们 Hadoop 环境的 Spark 包。我们的 Hadoop 环境是 Cloudera 发布的 CDH 4.3,默认的 Spark 发布包并不包含支持 CDH 4.3 的版本,因此只能自己编译。Spark 官方文档推荐用 Maven 进行编译,可是编译却不如想象中顺利。各种包依赖由于众所周知的原因,不能顺利地从某些依赖中心库下载。于是我们采取了最简单直接的绕开办法,利用 AWS 云主机进行编译。需要注意的是,编译前一定要遵循文档的建议,设置:

【转载】从 Hadoop 到 Spark 的架构实践

否则,编译过程中就会遇到内存溢出的问题。针对 CDH 4.3,mvn build 的参数为:

【转载】从 Hadoop 到 Spark 的架构实践

在编译成功所需要的 Spark 包后,部署和在 Hadoop 环境中运行 Spark 则是非常简单的事情。将编译好的 Spark 目录打包压缩后,在可以运行 Hadoop Client 的机器上解压缩,就可以运行 Spark 了。想要验证 Spark 是否能够正常在目标 Hadoop 环境上运行,可以参照 Spark 的官方文档,运行 example 中的 SparkPi 来验证:

【转载】从 Hadoop 到 Spark 的架构实践

完成 Spark 部署之后,剩下的就是开发基于 Spark 的程序了。虽然 Spark 支持 Java、Python,但最合适开发 Spark 程序的语言还是 Scala。经过一段时间的摸索实践,我们掌握了 Scala 语言的函数式编程语言特点后,终于体会了利用 Scala 开发 Spark 应用的巨大好处。同样的功能,用 MapReduce 几百行才能实现的计算,在 Spark 中,Scala 通过短短的数十行代码就能完成。而在运行时,同样的计算功能,Spark 上执行则比 MapReduce 有数十倍的提高。对于需要迭代的机器学习算法来讲,Spark 的 RDD 模型相比 MapReduce 的优势则更是明显,更何况还有基本的 MLlib 的支持。经过几个月的实践,数据挖掘相关工作被完全迁移到 Spark,并且在 Spark 上实现了适合我们数据集的更高效的 LR 等等算法。

全面拥抱 Spark

进入 2014 年,公司的业务有了长足的发展,对比数据中心平台建立时,每日处理的数据量亦翻了几番。每日的排名计算所花的时间越来越长,而基于 Hive 的即时计算只能支持日尺度的计算,如果到周这个尺度,计算所花的时间已经很难忍受,到月这个尺度则基本上没办法完成计算。基于在 Spark 上的认知和积累,是时候将整个数据中心迁移到 Spark 上了。

2014 年 4 月,Spark Summit China 在北京举行。抱着学习的目的,我们技术团队也参加了在中国举行的这一次 Spark 盛会。通过这次盛会,我们了解到国内的很多同行已经开始采用 Spark 来建造自己的大数据平台,而 Spark 也变成了在 ASF 中最为活跃的项目之一。另外,越来越多的大数据相关的产品也逐渐在和 Spark 相融合或者在向 Spark 迁移。Spark 无疑将会变为一个相比 Hadoop MapReduce 更好的生态系统。通过这次大会,我们更加坚定了全面拥抱 Spark 的决心。

基于 YARN 和 Spark,我们开始重新架构数据中心依赖的大数据平台。整个新的数据平台应该能够承载:

1. 准实时的数据汇集和 ETL;

2. 支持流式的数据加工;

3. 更高效的离线计算能力;

4. 高速的多维分析能力;

5. 更高效的即时分析能力;

6. 高效的机器学习能力;

7. 统一的数据访问接口;

8. 统一的数据视图;

9. 灵活的任务调度.

整个新的架构充分地利用 YARN 和 Spark,并且融合公司的一些技术积累,架构如图 3 所示。

在新的架构中,引入了 Kafka 作为日志汇集的通道。几个业务系统收集的移动设备侧的日志,实时地写入到 Kafka 中,从而方便后续的数据消费。

利用 Spark Streaming,可以方便地对 Kafka 中的数据进行消费处理。在整个架构中,Spark Streaming 主要完成了以下工作。

1. 原始日志的保存。将 Kafka 中的原始日志以 JSON 格式无损的保存在 HDFS 中。

2. 数据清洗和转换,清洗和标准化之后,转变为 Parquet 格式,存储在 HDFS 中,方便后续的各种数据计算任务。

3. 定义好的流式计算任务,比如基于频次规则的标签加工等等,计算结果直接存储在 MongoDB 中。

图 3 合了 YARN 和 Spark 的最新数据中心架构

排名计算任务则在 Spark 上做了重新实现,借力 Spark 带来的性能提高,以及 Parquet 列式存储带来的高效数据访问。同样的计算任务,在数据量提高到原来 3 倍的情况下,时间开销只有原来的 1/6。

同时,在利用 Spark 和 Parquet 列式存储带来的性能提升之外,曾经很难满足业务需求的即时多维度数据分析终于成为了可能。曾经利用 Hive 需要小时级别才能完成日尺度的多维度即时分析,在新架构上,只需要 2 分钟就能够顺利完成。而周尺度上也不过十分钟就能够算出结果。曾经在 Hive 上无法完成的月尺度多维度分析计算,则在两个小时内也可以算出结果。另外 Spark SQL 的逐渐完善也降低了开发的难度。

利用 YARN 提供的资源管理能力,用于多维度分析,自主研发的 Bitmap 引擎也被迁移到了 YARN 上。对于已经确定好的维度,可以预先创建 Bitmap 索引。而多维度的分析,如果所需要的维度已经预先建立了 Bitmap 索引,则通过 Bitmap 引擎由 Bitmap 计算来实现,从而可以提供实时的多维度的分析能力。

在新的架构中,为了更方便地管理数据,我们引入了基于 HCatalog 的元数据管理系统,数据的定义、存储、访问都通过元数据管理系统,从而实现了数据的统一视图,方便了数据资产的管理。

YARN 只提供了资源的调度能力,在一个大数据平台,分布式的任务调度系统同样不可或缺。在新的架构中,我们自行开发了一个支持 DAG 的分布式任务调度系统,结合 YARN 提供的资源调度能力,从而实现定时任务、即时任务以及不同任务构成的 pipeline。

基于围绕 YARN 和 Spark 的新的架构,一个针对数据业务部门的自服务大数据平台得以实现,数据业务部门可以方便地利用这个平台对进行多维度的分析、数据的抽取,以及进行自定义的标签加工。自服务系统提高了数据利用的能力,同时也大大提高了数据利用的效率。

使用 Spark 遇到的一些坑

任何新技术的引入都会历经陌生到熟悉,从最初新技术带来的惊喜,到后来遇到困难时的一筹莫展和惆怅,再到问题解决后的愉悦,大数据新贵 Spark 同样不能免俗。下面就列举一些我们遇到的坑。

【坑一:跑很大的数据集的时候,会遇到 org.apache.spark.SparkException: Error communicating with MapOutputTracker】

这个错误报得很隐晦,从错误日志看,是 Spark 集群 partition 了,但如果观察物理机器的运行情况,会发现磁盘 I/O 非常高。进一步分析会发现原因是 Spark 在处理大数据集时的 shuffle 过程中生成了太多的临时文件,造成了操作系统磁盘 I/O 负载过大。找到原因后,解决起来就很简单了,设置 spark.shuffle.consolidateFiles 为 true。这个参数在默认的设置中是 false 的,对于 linux 的 ext4 文件系统,建议大家还是默认设置为 true 吧。Spark 官方文档的描述也建议 ext4 文件系统设置为 true 来提高性能。

【坑二:运行时报 Fetch failure 错】

在大数据集上,运行 Spark 程序,在很多情况下会遇到 Fetch failure 的错。由于 Spark 本身设计是容错的,大部分的 Fetch failure 会经过重试后通过,因此整个 Spark 任务会正常跑完,不过由于重试的影响,执行时间会显著增长。造成 Fetch failure 的根本原因则不尽相同。从错误本身看,是由于任务不能从远程的节点读取 shuffle 的数据,具体原因则需要利用:

【转载】从 Hadoop 到 Spark 的架构实践

查看 Spark 的运行日志,从而找到造成 Fetch failure 的根本原因。其中大部分的问题都可以通过合理的参数配置以及对程序进行优化来解决。2014 年 Spark Summit China 上陈超的那个专题,对于如何对 Spark 性能进行优化,有非常好的建议。

当然,在使用 Spark 过程中还遇到过其他不同的问题,不过由于 Spark 本身是开源的,通过源代码的阅读,以及借助开源社区的帮助,大部分问题都可以顺利解决。

下一步的计划

Spark 在 2014 年取得了长足的发展,围绕 Spark 的大数据生态系统也逐渐的完善。Spark 1.3 引入了一个新的 DataFrame API,这个新的 DataFrame API 将会使得 Spark 对于数据的处理更加友好。同样出自于 AMPLab 的分布式缓存系统 Tachyon 因为其与 Spark 的良好集成也逐渐引起了人们的注意。鉴于在业务场景中,很多基础数据是需要被多个不同的 Spark 任务重复使用,下一步,我们将会在架构中引入 Tachyon 来作为缓存层。另外,随着 SSD 的日益普及,我们后续的计划是在集群中每台机器都引入 SSD 存储,配置 Spark 的 shuffle 的输出到 SSD,利用 SSD 的高速随机读写能力,进一步提高大数据处理效率。

在机器学习方面,H2O 机器学习引擎也和 Spark 有了良好的集成从而产生了 Sparkling-water。相信利用 Sparking-water,作为一家创业公司,我们也可以利用深度学习的力量来进一步挖掘数据的价值。

结语

2004 年,Google 的 MapReduce 论文揭开了大数据处理的时代,Hadoop 的 MapReduce 在过去接近 10 年的时间成了大数据处理的代名词。而 Matei Zaharia 2012 年关于 RDD 的一篇论文“Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing”则揭示了大数据处理技术一个新时代的到来。伴随着新的硬件技术的发展、低延迟大数据处理的广泛需求以及数据挖掘在大数据领域的日益普及,Spark 作为一个崭新的大数据生态系统,逐渐取代传统的 MapReduce 而成为新一代大数据处理技术的热门。我们过去两年从 MapReduce 到 Spark 架构的演变过程,也基本上代表了相当一部分大数据领域从业者的技术演进的历程。相信随着 Spark 生态的日益完善,会有越来越多的企业将自己的数据处理迁移到 Spark 上来。而伴随着越来越多的大数据工程师熟悉和了解 Spark,国内的 Spark 社区也会越来越活跃,Spark 作为一个开源的平台,相信也会有越来越多的华人变成 Spark 相关项目的 Contributor,Spark 也会变得越来越成熟和强大。

 

 

 

本文转载于:http://www.csdn.net/article/2015-06-08/2824889


蜷缩的蜗牛 , 版权所有丨如未注明 , 均为原创丨 转载请注明【转载】从 Hadoop 到 Spark 的架构实践
喜欢 (0)
[]
分享 (0)

您必须 登录 才能发表评论!