快速入门 Spark
[TOC]
⛳︎ 1. 开始 Spark
Spark官网:Apache Spark™ - Unified Engine for large-scale data analytics
1.1 什么是Spark
Spark官网的解释:Apache Spark™ is a unified analytics engine for large-scale data processing.
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark
是加州大学伯克利分校的 AMP实验室 所开源的类 Hadoop MapReduce
的通用并行计算框架,Spark
拥有 Hadoop MapReduce
所具有的优点,但不同于 MapReduce
的是:Job
中间输出结果可以缓存在内存中,从而不再需要读写 HDFS
,减少磁盘数据交互,因此 spark
能更好地适用于数据挖掘与机器学习等需要迭代的算法。
Spark
是 Scala
编写,方便快速编程。
其特点是:高速、使用简单、通用、可以在多处运行。
1.2 总体技术栈讲解
Spark
提供了 Sparkcore RDD
、Spark SQL
、Spark Streaming
、Spark MLlib
、Spark GraphX
等技术组件,可以一站式的完成大数据领域的离线批处理、交互式查询、流式计算、机器学习、图计算等常见的任务。这就是Spark
一站式开发的特点。
1.3 Spark
和 MapReduce
的区别
1.3.1 MapReduce 的原理
MapReduce
在运算时需要多次进行磁盘 I/O。下面是一个简单的 MapReduce
过程:
视频链接:https://www.bilibili.com/video/BV1TB4y1i7kk/
在这个视频中,可以看出MapReduce
过程中需要多次磁盘 I/O,落地到HDFS
上。
1.3.2 Spark 是如何做的
可以看到,MapReduce
的多个 Job
之间相互独立,每个 Job
完成后的数据都需要存储到文件系统中。每个 Job
中也可能会存在大量的磁盘 I/O ,这样会使得 MapReduce
的速度很慢。相比于 MapReduce
,Spark
使用了 DAG
有向无环图。使多个任务串联起来,将结果存储在内存中(当然内存不够还是要将数据缓存在磁盘中)直接进行运算,避免了大量的磁盘I/O。
1.3.3 Spark
和 MapReduce
的一些联系
Spark
和 MapReduce
都是分布式计算框架,Spark
计算中间结果基于内存缓存,MapReduce
基于HDFS
存储。也正因此,Spark
处理数据的能力一般是 MapReduce
的三到五倍以上,Spark
中除了基于内存计算这一个计算快的原因,还有DAG(DAG Schedule)
有向无环图来切分任务的执行先后顺序。
1.4 Spark API
Spark API 有多种语言支持,分别包括:Scala
、Java
、Python
、R
、SQL
等。
1.5 Spark 的运行模式
Local
:多用于本地测试,如在:Eclipse
、IDEA
中编写测试程序等。Standalone
:Spark
自带的资源调度框架,它支持完全分布式。Standalone
模式也叫作独立模式,其自带完整的服务,可单独部署到一个集群中,无序依赖任何其他资源管理系统。 从一定程度上来说,该模式是Local
模式和Yarn
模式的基础。Yarn
: Hadoop 生态圈里的一种资源调度框架,Spark
也是可以基于Yarn
来计算的。 若要使用Yarn
来进行资源调度,必须实现ApplicationMaster
接口,Spark
实现了这个接口,所以可以基于Yarn
来进行资源调度。Mesos
:也是一种资源调度框架(了解即可)。
🥑 2. SparkCore
2.1 RDD
2.1.1 RDD 的概念
RDD(Resilient Distribute Dataset)
:弹性分布式数据集。
RDD
→ 算子
→ Other RDD
,RDD
经过算子的运算会变成其他的 RDD
。
(重点)RDD
的特点:① 分区的;② 并行操作的;③ 不可变的。
2.1.2 RDD 的五大特性
- 每个
RDD
由一系列的Partition
组成。 - 函数是作用在每一个
Partition (Split)
上的。 RDD
中有一系列的依赖关系,或者说每个RDD
都会依赖其他一系列的RDD
。- 分区器是作用在
<K, V>
格式的RDD
上,即:<K, V>
的RDD
可以通过Partitioner
进行自定义分区。 RDD
提供一系列的最佳计算位置。数据在哪里,计算就在哪里,移动数据不如移动计算。
2.1.3 RDD 理解图
Spark
中读取文件是使用 SparkContext
对象调用 textFile
方法,实际上底层和 MapReduce
读取 HDFS
文件的方式是相同的,读取之前先要进行 split
切片。默认情况下 Split
的大小和 Block
块的大小相同。
一些问题:
RDD
的分布式体现在那些方面?RDD
由一系列的Partition
构成,并且Partition
是分布在不同的节点上的。这就体现了RDD
的分布式。哪里体现了
RDD
的弹性?RDD
由一系列的Partition
组成,其大小和数量都是可以改变的。默认情况下,Partition
的个数和Block
块的个数相同。哪里体现了
RDD
的容错?RDD
之间存在一系列的依赖关系,子RDD
可以找到对应的父RDD
,然后通过一系列计算得到得出响应的结果,这就是容错的体现。RDD
提供计算最佳位置,体现了数据本地化,体现了大数据中”移动数据不如移动计算“的理念。
一些注意事项:
textFile
方法底层封装的是MapReduce
读取文件的方式,读取文件之前先进行Split
切片,默认Split
大小是一个Block
的大小。RDD
实际上不存储数据,但是为了方便理解,可以理解为存储数据。- 什么是
<K, V>
格式的RDD
,如果RDD
里面存储的数据都是二元组对象,那么这个RDD
我们就叫做<K, V>
格式的RDD
。
2.1.4 SparkRDD 编程模型
创建
SparkContext
对象创建
RDD
计算
RDD
输出结果(如控制台打印测试,存储等)
关闭
SparkContext
2.1.5 WordCount
案例
1 | object WordCount { |
2.2 Spark 任务执行原理
从下图中,我们可以看到 Spark 的主要角色:
一些名词的解释:
Master Node
主节点Worker Node
从节点Driver
驱动程序Executor
执行节点Cluster Manager
集群管理者
2.2.1 Spark 架构的类比
我们可以简单的将这个架构和 YARN 对比一下:Master
就相当于 YARN
中的 ResourceManager
,Worker
就相当于 YARN
中的 NodeManager
,Driver
相当于 YARN
中的 Application
。
2.2.2 Spark 执行原理详细说明
Master
和Worker
节点搭建 Spark 集群的时候我们就已经设置好了
Master
节点和Worker
节点,一个集群有多个Master
节点和多个Worker
节点。Master
节点常驻Master
守护进程,负责管理Worker
节点,我们从Master
节点提交应用。Worker
节点常驻Worker
守护进程,与Master
节点通信,并且管理Executor
进程。
一台机器可以同时作为
Master
和Worker
节点(e.g. 有四台机器,可以选择一台设置为Master
节点,然后剩下三台设为Worker
节点,也可以把四台都设为Worker
节点,这种情况下,有一个机器既是Master
节点又是Worker
节点)。一个
Spark
应用程序分为一个驱动程序Driver
和多个执行程序Executors
两种。Driver
和Executor
进程Driver
进程就是应用的main()
函数并且构建SparkContext
对象,当我们提交了应用之后,便会启动一个对应的Driver
进程,Driver
本身会根据我们设置的参数占有一定的资源(主要指 CPU Core 和 Memory)。根据部署模式的不同,
Driver
可以运行在Master
上,也可以运行Worker
上,Driver
与集群节点之间有频繁的通信。上图展示了Driver
在Master
上的部署的情况。如上图所示,
Driver
首先会向**集群管理者Cluster Manager
**,如Standalone
、Yarn
、Mesos
申请 Spark 应用所需的资源,也就是Executor
,然后集群管理者会根据 Spark 应用所设置的参数在各个Worker
上分配一定数量的Executor
,每个Executor
都占用一定数量的CPU
和Memory
。在申请到应用所需的资源以后,
Driver
就开始调度和执行我们编写的应用代码了。Driver
进程会将我们编写的Spark
应用代码拆分成多个Stage
,每个Stage
执行一部分代码片段,并为每个Stage
创建一批Tasks
,然后将这些Tasks
分配到各个Executor
中执行。这一步即Driver ---Task--> Worker
。Executor
进程宿主在Worker
节点上,一个Worker
可以有多个Executor
。每个Executor
持有一个线程池,每个线程可以执行一个Task
,Executor
执行完Task
以后将结果返回给Driver
,每个Executor
执行的Task
都属于同一个应用。此外Executor
还有一个功能就是为应用程序中要求缓存的RDD
提供内存式存储,RDD
是直接缓存在Executor
进程内的,因此任务可以在运行时充分利用缓存数据加速运算。这一步即Worker ---Result--> Driver
。Driver
负责任务Tasks
的分发和结果Results
的回收,即任务的调度。如果Task
的计算结果非常大就不要回收了,会造成OOM
(我们在执行程序时可以通过参数指定Driver
的内存大小,如1G
,如果一个Worker
的结果是510M
,那么两个接节点上的结果就会超过1G
,导致OOM
)。
2.2 RDD 算子
RDD有两种操作算子:分别为转换算子(Transformation) 和 **行动算子(Action)**。算子其实就是函数,只不过在 Scala 中称为算子。
下面表格列出了部分 RDD 算子,完整内容可以查看 [SparkRDD](RDD Programming Guide - Spark 3.0.1 Documentation (apache.org)) 文档。
算子类型 | 算子方法 | 算子转换 |
---|---|---|
Transformations |
map(f: T=>U)
|
RDD[T] => RDD[U]
|
filter(f: T=>Bool)
|
RDD[T] => RDD[T]
|
|
flatMap(f: T=>Seq[U])
|
RDD[T] => RDD[U]
|
|
sample(fraction: Float)
|
RDD[T] => RDD[T](Deterministic sampling)
|
|
groupByKey()
|
RDD[(K, V)] => RDD[(K, Seq[V])]
|
|
reduceByKey(f: (V, V)=>V)
|
RDD[(K, V)] => RDD[(K, V)]
|
|
union()
|
(RDD[T], RDD[T]) => RDD[T]
|
|
join()
|
(RDD[(K, V)], RDD[(K, W)]) => RDD[(K, (V, W))]
|
|
cogroup()
|
(RDD[(K, V)], RDD[(K, W)]) => RDD([K, (Seq[V], Seq[W])])
|
|
crossProduct()
|
(RDD[T], RDD[U]) => (RDD[(T, U)])
|
|
mapValues(f: V=>W)
|
RDD[(K, V)] => RDD[(K, W)](Preserves Partitioning)
|
|
sort(c: Comparator[K])
|
RDD[(K, V)] => RDD[(K, V)]
|
|
partitionBy(p: Partitioner[K])
|
RDD[(K, V)] => RDD[(K, V)]
|
|
... | ... | |
Actions |
count()
|
RDD[T] => Long
|
collect()
|
RDD[T] => Seq[T]
|
|
reduce(f: (T, T)=>T)
|
RDD[T] => T
|
|
lookup(k: K)
|
RDD[(K, V)] => Seq[V](On hash/range partitioned RDDs)
|
|
save(path: String)
|
Outputs RDD to a Storage System, e.g. HDFS
|
|
foreach(func)
|
- | |
... | ... |
2.2.1 Transformation 转换算子
点击快速跳转到转换算子列表:Transformations
Transformation 转换算子有延迟执行的特点,具有**懒加载(Lazy)**的特性。
下面列出常用的行动算子及用法:
map(func)
返回一个新的分布式数据集,由每个原元素经过func
函数转换后组成。
1 | object MapDemo { |
mapPartition(func)
将函数用在每个RDD
的分区上
1 | object MapPartitionDemo { |
filter(func)
返回一个新的数据集,由经过func
函数后返回值为true
的元素组成。
1 | object FilterDemo { |
flatMap(func)
类似于map
,但是每一个输入元素,会被映射为0
到多个输出元素(因此,func
函数的返回值是一个Seq
,而不是单一元素)。
1 | object FlatMapDemo { |
sample(withReplacement, frac, seed)
根据给定的随机种子seed
,随机抽样出数量为frac
的数据。
1 |
union(otherDataset)
返回一个新的数据集,由原数据集和参数联合而成。
1 |
groupByKey([numTasks])
在一个由<K, V>
对组成的数据集上调用,返回一个<K, Seq[V]>
对的数据集。注意:默认情况下,使用8
个并行任务进行分组,你可以传入numTask
可选参数,根据数据量设置不同数目的Task
。使用该算子可以将相同Key
的元素聚集到一起,最终把所有相同Key
的元素合并成一个元素,该元素的Key
不变,Value
则聚集到一个集合中。
1 |
reduceByKey(func, [numTasks])
在一个<K, V>
对的数据集上使用,返回一个<K, V>
对的数据集,key
相同的值,都被使用指定的reduce
函数聚合到一起。和groupByKey
类似,任务的个数是可以通过第二个可选参数来配置的。
1 |
join(otherDataset, [numTasks])
在类型为<K, V>
和<K, W>
类型的数据集上调用,返回一个<K, <V, W>>
对,每个key
中的所有元素都在一起的数据集。sortByKey()
和sortBy()
,sortByKey()
函数需要在类型为<K, V>
类型的数据集上调用。
1 | object SortByKeyDemo { |
2.2.2 Action 行动算子
Action 行动算子具有触发执行的特点,一个 Application
应用程序有几个 Action
类算子执行,就有几个 Job
运行。
点击快速跳转到 SparkRDD Action 列表: Actions
下面列出常用的行动算子及用法:
reduce(func)
通过函数func
聚集数据集中的所有元素。func
函数接受2
个参数,返回1
个值。这个函数必须是关联性的,确保可以被正确的并发执行。关于reduce
的执行过程,可以对比 Scala 中类似的reduce
函数。不同于
Transformation
算子,执行后结果是RDD
,执行Action
算子之后,其结果不再是RDD
,而是一个标量。
1 | object Action_Reduce { |
1 | 输出如下: |
collect
在Driver
的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter
或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD
集Collect
返回,很可能会让Driver
程序OOM
,这点尤其需要注意。
1 |
count
1 |
take
1 |
first
1 |
saveAsTextFile
1 |
foreach
1 |
saveAsNewAPIHadoopFile
1 |
2.2.3 其他算子
Spark中还有许多其他种类的算子,体现了 Spark
算子的灵活性,其中包括将数据进行持久化的算子。
cache
:懒加载执行的,必须有一个Action
触发算子触发执行。persist
:懒加载执行的,必须有一个Action
触发算子触发执行。checkpoint
:算子不仅能将RDD
持久化到磁盘,还能切断RDD
之间的依赖关系。
2.2.3 宽依赖和窄依赖
宽依赖
① 子
RDD
的每个分区依赖于所有的父RDD
分区② 对单个
RDD
基于Key
进行重组和Reduce
,如groupByKey
、reduceByKey
③ 对两个
RDD
基于Key
进行join
和重组,如join
④ 经过大量
shuffle
生成的RDD
,建议进行缓存。这样避免失败后重新计算带来的开销。窄依赖
① 子
RDD
的每个分区依赖于常数个父分区(与数据规模无关)② 输入输出一对一的算子,且结果
RDD
的分区结构不变。主要是map/flatmap
③ 输入输出一对一的算子,但结果
RDD
的分区结构发生了变化,如union/coalesce
④ 从输入中选择部分元素的算子,如
filter
、distinct
、substract
、sample