SparkQuickIN

快速入门 Spark

[TOC]

⛳︎ 1. 开始 Spark

Spark官网Apache Spark™ - Unified Engine for large-scale data analytics

1.1 什么是Spark

Spark官网的解释:Apache Spark™ is a unified analytics engine for large-scale data processing.

Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是加州大学伯克利分校的 AMP实验室 所开源的类 Hadoop MapReduce 的通用并行计算框架,Spark 拥有 Hadoop MapReduce 所具有的优点,但不同于 MapReduce 的是:Job 中间输出结果可以缓存在内存中,从而不再需要读写 HDFS,减少磁盘数据交互,因此 spark 能更好地适用于数据挖掘与机器学习等需要迭代的算法。

SparkScala 编写,方便快速编程。

其特点是:高速、使用简单、通用、可以在多处运行。

1.2 总体技术栈讲解

Spark 提供了 Sparkcore RDDSpark SQLSpark StreamingSpark MLlibSpark GraphX等技术组件,可以一站式的完成大数据领域的离线批处理、交互式查询、流式计算、机器学习、图计算等常见的任务。这就是Spark一站式开发的特点。

1.3 SparkMapReduce 的区别

1.3.1 MapReduce 的原理

MapReduce 在运算时需要多次进行磁盘 I/O。下面是一个简单的 MapReduce 过程:

视频链接:https://www.bilibili.com/video/BV1TB4y1i7kk/

在这个视频中,可以看出MapReduce 过程中需要多次磁盘 I/O,落地到HDFS上。

1.3.2 Spark 是如何做的

可以看到,MapReduce 的多个 Job 之间相互独立,每个 Job 完成后的数据都需要存储到文件系统中。每个 Job 中也可能会存在大量的磁盘 I/O ,这样会使得 MapReduce 的速度很慢。相比于 MapReduceSpark使用了 DAG 有向无环图。使多个任务串联起来,将结果存储在内存中(当然内存不够还是要将数据缓存在磁盘中)直接进行运算,避免了大量的磁盘I/O。

1.3.3 SparkMapReduce 的一些联系

SparkMapReduce 都是分布式计算框架,Spark 计算中间结果基于内存缓存,MapReduce 基于HDFS存储。也正因此,Spark处理数据的能力一般是 MapReduce的三到五倍以上,Spark 中除了基于内存计算这一个计算快的原因,还有DAG(DAG Schedule)有向无环图来切分任务的执行先后顺序。

1.4 Spark API

Spark API 有多种语言支持,分别包括:ScalaJavaPythonRSQL 等。

1.5 Spark 的运行模式

  • Local:多用于本地测试,如在:EclipseIDEA 中编写测试程序等。
  • StandaloneSpark 自带的资源调度框架,它支持完全分布式。Standalone模式也叫作独立模式其自带完整的服务,可单独部署到一个集群中,无序依赖任何其他资源管理系统。 从一定程度上来说,该模式是 Local 模式和 Yarn 模式的基础。
  • Yarn: Hadoop 生态圈里的一种资源调度框架,Spark也是可以基于 Yarn 来计算的。 若要使用 Yarn 来进行资源调度,必须实现ApplicationMaster 接口,Spark 实现了这个接口,所以可以基于 Yarn 来进行资源调度。
  • Mesos:也是一种资源调度框架(了解即可)。

🥑 2. SparkCore

2.1 RDD

2.1.1 RDD 的概念

RDD(Resilient Distribute Dataset):弹性分布式数据集。

RDD算子Other RDDRDD 经过算子的运算会变成其他的 RDD

(重点)RDD的特点:① 分区的;② 并行操作的;③ 不可变的。

2.1.2 RDD 的五大特性
  1. 每个RDD 由一系列的 Partition 组成。
  2. 函数是作用在每一个 Partition (Split) 上的。
  3. RDD 中有一系列的依赖关系,或者说每个RDD都会依赖其他一系列的RDD
  4. 分区器是作用在 <K, V> 格式的 RDD 上,即:<K, V>RDD 可以通过 Partitioner 进行自定义分区。
  5. RDD提供一系列的最佳计算位置。数据在哪里,计算就在哪里,移动数据不如移动计算。
2.1.3 RDD 理解图

Spark 中读取文件是使用 SparkContext 对象调用 textFile 方法,实际上底层和 MapReduce 读取 HDFS 文件的方式是相同的,读取之前先要进行 split 切片。默认情况下 Split 的大小和 Block 块的大小相同。

一些问题:

  1. RDD的分布式体现在那些方面?

    RDD 由一系列的 Partition 构成,并且 Partition 是分布在不同的节点上的。这就体现了 RDD 的分布式。

  2. 哪里体现了 RDD 的弹性?

    RDD 由一系列的 Partition 组成,其大小和数量都是可以改变的。默认情况下,Partition 的个数和 Block 块的个数相同。

  3. 哪里体现了 RDD 的容错?

    RDD 之间存在一系列的依赖关系,子RDD 可以找到对应的父RDD ,然后通过一系列计算得到得出响应的结果,这就是容错的体现。

    RDD 提供计算最佳位置,体现了数据本地化,体现了大数据中”移动数据不如移动计算“的理念。

一些注意事项:

  1. textFile 方法底层封装的是 MapReduce 读取文件的方式,读取文件之前先进行 Split 切片,默认 Split 大小是一个 Block 的大小。
  2. RDD 实际上不存储数据,但是为了方便理解,可以理解为存储数据。
  3. 什么是 <K, V> 格式的 RDD,如果 RDD 里面存储的数据都是二元组对象,那么这个 RDD 我们就叫做 <K, V> 格式的 RDD
2.1.4 SparkRDD 编程模型
  1. 创建 SparkContext 对象

  2. 创建 RDD

  3. 计算 RDD

  4. 输出结果(如控制台打印测试,存储等)

  5. 关闭 SparkContext

2.1.5 WordCount 案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
object WordCount {
def main(args: Array[String]): Unit = {
// 第一步: 创建 SparkContext 对象
// 对于每个 Spark 程序来说, 最重要的就是两个对象: SparkConf 和 SparkContext
val conf = SparkConf()
conf.setAppName("WordCount").setMaster("Local")
val sparkContext = SparkContext(conf)

// 创建 RDD
val line: RDD[String] = sc.textFile("files/order.csv")

// 计算 RDD
val word: RDD[String] = sc.flatMap(x => x.split(" "))
val pair: RDD[(String, Int)] = word.map(x => (x, 1))
// val result: RDD[(String, Int)] = pair.reduceByKey((x, y) => {x + y})
val result: RDD[(String, Int)] = pair.reduceByKey((x, y) => {
println(x + ":" + y)
x + y
})

// 输出 RDD
result.foreach(println)

// 关闭 SparkContext
sparkContext.stop()
}
}

2.2 Spark 任务执行原理

从下图中,我们可以看到 Spark 的主要角色:

SparkExecutor

一些名词的解释:

  • Master Node 主节点
  • Worker Node 从节点
  • Driver 驱动程序
  • Executor 执行节点
  • Cluster Manager 集群管理者
2.2.1 Spark 架构的类比

我们可以简单的将这个架构和 YARN 对比一下:Master 就相当于 YARN 中的 ResourceManagerWorker 就相当于 YARN 中的 NodeManagerDriver 相当于 YARN 中的 Application

2.2.2 Spark 执行原理详细说明
  1. MasterWorker 节点

    搭建 Spark 集群的时候我们就已经设置好了 Master 节点和 Worker 节点,一个集群有多个Master节点和多个Worker节点。

    • Master 节点常驻 Master 守护进程,负责管理 Worker 节点,我们从 Master 节点提交应用。

    • Worker 节点常驻 Worker 守护进程,与 Master 节点通信,并且管理 Executor 进程。

    一台机器可以同时作为 MasterWorker 节点(e.g. 有四台机器,可以选择一台设置为 Master节点,然后剩下三台设为 Worker节点,也可以把四台都设为 Worker 节点,这种情况下,有一个机器既是 Master 节点又是 Worker 节点)。

    一个 Spark 应用程序分为一个驱动程序 Driver 和多个执行程序 Executors 两种。

  2. DriverExecutor 进程
    Driver 进程就是应用的 main() 函数并且构建 SparkContext 对象,当我们提交了应用之后,便会启动一个对应的 Driver 进程,Driver 本身会根据我们设置的参数占有一定的资源(主要指 CPU CoreMemory)。

    根据部署模式的不同,Driver 可以运行在 Master 上,也可以运行 Worker上,Driver 与集群节点之间有频繁的通信。上图展示了 DriverMaster 上的部署的情况。

    • 如上图所示,Driver首先会向**集群管理者Cluster Manager**,如StandaloneYarnMesos 申请 Spark 应用所需的资源,也就是Executor,然后集群管理者会根据 Spark 应用所设置的参数在各个 Worker 上分配一定数量的 Executor,每个 Executor 都占用一定数量的 CPUMemory

    • 在申请到应用所需的资源以后,Driver 就开始调度和执行我们编写的应用代码了。Driver 进程会将我们编写的 Spark 应用代码拆分成多个 Stage,每个Stage 执行一部分代码片段,并为每个 Stage 创建一批 Tasks,然后将这些 Tasks分配到各个 Executor中执行。这一步即 Driver ---Task--> Worker

    • Executor 进程宿主在 Worker 节点上,一个 Worker可以有多个 Executor。每个 Executor 持有一个线程池,每个线程可以执行一个 TaskExecutor 执行完 Task 以后将结果返回给 Driver,每个 Executor 执行的 Task 都属于同一个应用。此外 Executor 还有一个功能就是为应用程序中要求缓存的 RDD 提供内存式存储,RDD 是直接缓存在 Executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。这一步即 Worker ---Result--> Driver

    • Driver 负责任务 Tasks 的分发结果Results的回收,即任务的调度。如果 Task 的计算结果非常大就不要回收了,会造成OOM(我们在执行程序时可以通过参数指定 Driver 的内存大小,如 1G,如果一个 Worker 的结果是 510M,那么两个接节点上的结果就会超过 1G,导致 OOM)。

2.2 RDD 算子

RDD有两种操作算子:分别为转换算子(Transformation) 和 **行动算子(Action)**。算子其实就是函数,只不过在 Scala 中称为算子。

下面表格列出了部分 RDD 算子,完整内容可以查看 [SparkRDD](RDD Programming Guide - Spark 3.0.1 Documentation (apache.org)) 文档。

算子类型 算子方法 算子转换
Transformations map(f: T=>U) RDD[T] => RDD[U]
filter(f: T=>Bool) RDD[T] => RDD[T]
flatMap(f: T=>Seq[U]) RDD[T] => RDD[U]
sample(fraction: Float) RDD[T] => RDD[T](Deterministic sampling)
groupByKey() RDD[(K, V)] => RDD[(K, Seq[V])]
reduceByKey(f: (V, V)=>V) RDD[(K, V)] => RDD[(K, V)]
union() (RDD[T], RDD[T]) => RDD[T]
join() (RDD[(K, V)], RDD[(K, W)]) => RDD[(K, (V, W))]
cogroup() (RDD[(K, V)], RDD[(K, W)]) => RDD([K, (Seq[V], Seq[W])])
crossProduct() (RDD[T], RDD[U]) => (RDD[(T, U)])
mapValues(f: V=>W) RDD[(K, V)] => RDD[(K, W)](Preserves Partitioning)
sort(c: Comparator[K]) RDD[(K, V)] => RDD[(K, V)]
partitionBy(p: Partitioner[K]) RDD[(K, V)] => RDD[(K, V)]
... ...
Actions count() RDD[T] => Long
collect() RDD[T] => Seq[T]
reduce(f: (T, T)=>T) RDD[T] => T
lookup(k: K) RDD[(K, V)] => Seq[V](On hash/range partitioned RDDs)
save(path: String) Outputs RDD to a Storage System, e.g. HDFS
foreach(func) -
... ...
2.2.1 Transformation 转换算子

点击快速跳转到转换算子列表:Transformations

Transformation 转换算子有延迟执行的特点,具有**懒加载(Lazy)**的特性。

下面列出常用的行动算子及用法:

  • map(func) 返回一个新的分布式数据集,由每个原元素经过func函数转换后组成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
object MapDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("MapDemo")
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)

val list = List(0, 10, 15, 20, 25, 30, 35, 40, 45, 50)
val listRDD = sc.parallelize(list, 5)
listRDD.foreach(println)
println()
val retRDD = listRDD.map(num => num * 7)
retRDD.foreach(num => println(num))

sc.stop()
}
}
  • mapPartition(func) 将函数用在每个RDD的分区上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
object MapPartitionDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("MapPartitionDemo")
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)

val list = List(0, 10, 15, 20, 25, 30, 35, 40, 45, 50)
val listRDD = sc.parallelize(list, 3) // 设置三个分区

// 对比 map 和 mapPartition 的区别
listRDD.map( x => {
println("Map 执行一次")
x + 1
}).foreach(println)

println()

listRDD.mapPartitions( x => {
println("MapPartition 执行一次")
x.map(x => {println("mapPartition 里的map"); x + 1}) // 这个 map 不是 RDD 里的 map, 而是 Iterator 中的 map
}).foreach(println)
}
}
  • filter(func)返回一个新的数据集,由经过func函数后返回值为true的元素组成。
1
2
3
4
5
6
7
8
9
10
11
12
13
object FilterDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("FilterDemo")
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)

val list = List(0, 10, 15, 20, 25, 30, 35, 40, 45, 50)
val listRDD = sc.parallelize(list)
val retRDD = listRDD.filter(num => num % 2 == 0)
retRDD.foreach(println)
sc.stop()
}
}
  • flatMap(func)类似于map,但是每一个输入元素,会被映射为 0 到多个输出元素(因此,func 函数的返回值是一个 Seq,而不是单一元素)。
1
2
3
4
5
6
7
8
9
10
11
12
13
object FlatMapDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("FlatMapDemo")
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)

val list = List("hello you", "hello he", "hello me")
val listRDD = sc.parallelize(list)
val wordsRDD = listRDD.flatMap(line => line.split(" "))
wordsRDD.foreach(println)
sc.stop()
}
}
  • sample(withReplacement, frac, seed)根据给定的随机种子 seed,随机抽样出数量为 frac 的数据。
1

  • union(otherDataset)返回一个新的数据集,由原数据集和参数联合而成。
1

  • groupByKey([numTasks])在一个由 <K, V> 对组成的数据集上调用,返回一个 <K, Seq[V]> 对的数据集。注意:默认情况下,使用 8 个并行任务进行分组,你可以传入 numTask 可选参数,根据数据量设置不同数目的 Task。使用该算子可以将相同Key的元素聚集到一起,最终把所有相同Key的元素合并成一个元素,该元素的Key不变,Value则聚集到一个集合中。
1

  • reduceByKey(func, [numTasks])在一个<K, V>对的数据集上使用,返回一个<K, V>对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和 groupByKey类似,任务的个数是可以通过第二个可选参数来配置的。
1

  • join(otherDataset, [numTasks])在类型为<K, V><K, W>类型的数据集上调用,返回一个<K, <V, W>>对,每个key中的所有元素都在一起的数据集。
  • sortByKey()sortBy()sortByKey() 函数需要在类型为 <K, V> 类型的数据集上调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
object SortByKeyDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("SortByKeyDemo")
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)

val list = List(
"1,李 磊,22,175",
"2,刘银鹏,23,175",
"3,齐彦鹏,22,180",
"4,杨 柳,22,168",
"5,敦 鹏,20,175"
)
val listRDD: RDD[String] = sc.parallelize(list)
listRDD.foreach(println)
println()

// 3,齐彦鹏,22,180
// 1,李磊,22,175
// 2,刘银鹏,23,175
val resultRDD = listRDD.map(x => {
val fields = x.split(",")
println(fields(0) + " " + fields(1) + " " + fields(2) + " " + fields(3) + " ")
(fields(0), fields(1), fields(2), fields(3))
}).map(x => {(x._3, x._1)})

// 分区数会影响最终打印结果, 设置分区为 2, 所有分区最终会聚合为两个分区, 打印时显示每个分区的排序
// 这就可能导致打印输出时出现不同分区排序数据交叉的情况。
// 所以我们一般设置分区为 1, 表示排序结果聚合到一个分区
resultRDD.sortByKey(true, 2).foreach(println)

println()
val sortBy_RDD = listRDD.map(x => {
val fields = x.split(",")
// println(fields(0) + " " + fields(1) + " " + fields(2) + " " + fields(3) + " ")
(fields(0), fields(1), fields(2), fields(3))
})
println("SortBy_001")
sortBy_RDD.sortBy(_._1, true, 1).foreach(println)

println("SortBy_002")
sortBy_RDD.sortBy(_._2, true, 1).foreach(println)

println("SortBy_003")
sortBy_RDD.sortBy(_._3, true, 1).foreach(println)

println("SortBy_004")
sortBy_RDD.sortBy(_._4, true, 1).foreach(println)

sc.stop()
}
}
2.2.2 Action 行动算子

Action 行动算子具有触发执行的特点,一个 Application 应用程序有几个 Action 类算子执行,就有几个 Job 运行。

点击快速跳转到 SparkRDD Action 列表: Actions

下面列出常用的行动算子及用法:

  • reduce(func) 通过函数 func 聚集数据集中的所有元素。func 函数接受 2 个参数,返回 1 个值。这个函数必须是关联性的,确保可以被正确的并发执行。关于 reduce 的执行过程,可以对比 Scala 中类似的 reduce函数。

    不同于 Transformation 算子,执行后结果是RDD,执行 Action 算子之后,其结果不再是 RDD,而是一个标量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
object Action_Reduce {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("Action_Reduce")
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sc = new SparkContext(conf)
// val list = List(0, 5, 15, 20, 25, 30) // 如果用这个 list 输出为: 95
val list = List(
"1,李 磊,22,175",
"2,刘银鹏,23,175",
"3,齐彦鹏,22,180",
"4,杨 柳,22,168",
"5,敦 鹏,20,175"
)
val listRDD = sc.parallelize(list)
listRDD.foreach(println)
println()
val ret = listRDD.reduce(
(v1, v2) => {
println("[LOGS] v1 > " + v1 + " ")
println("[LOGS] v2 > " + v2 + " ")
println("[LOGS] v1 + v2 > " + (v1 + v2))
v1 + v2
}
)

println("ret: " + ret)
sc.stop()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
输出如下:
3,齐彦鹏,22,180
1,李 磊,22,175
4,杨 柳,22,168
2,刘银鹏,23,175
5,敦 鹏,20,175

[LOGS] v1 > 3,齐彦鹏,22,180
[LOGS] v2 > 4,杨 柳,22,168
[LOGS] v1 + v2 > 3,齐彦鹏,22,1804,杨 柳,22,168
[LOGS] v1 > 3,齐彦鹏,22,1804,杨 柳,22,168
[LOGS] v2 > 5,敦 鹏,20,175
[LOGS] v1 + v2 > 3,齐彦鹏,22,1804,杨 柳,22,1685,敦 鹏,20,175
[LOGS] v1 > 1,李 磊,22,175
[LOGS] v2 > 2,刘银鹏,23,175
[LOGS] v1 + v2 > 1,李 磊,22,1752,刘银鹏,23,175
[LOGS] v1 > 1,李 磊,22,1752,刘银鹏,23,175
[LOGS] v2 > 3,齐彦鹏,22,1804,杨 柳,22,1685,敦 鹏,20,175
[LOGS] v1 + v2 > 1,李 磊,22,1752,刘银鹏,23,1753,齐彦鹏,22,1804,杨 柳,22,1685,敦 鹏,20,175
ret: 1,李 磊,22,1752,刘银鹏,23,1753,齐彦鹏,22,1804,杨 柳,22,1685,敦 鹏,20,175
  • collectDriver 的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用 filter 或者其它操作后,返回一个足够小的数据子集再使用,直接将整个 RDDCollect返回,很可能会让 Driver 程序 OOM,这点尤其需要注意。
1

  • count
1

  • take
1

  • first
1

  • saveAsTextFile
1

  • foreach
1

  • saveAsNewAPIHadoopFile
1

2.2.3 其他算子

Spark中还有许多其他种类的算子,体现了 Spark 算子的灵活性,其中包括将数据进行持久化的算子。

  • cache:懒加载执行的,必须有一个 Action 触发算子触发执行。
  • persist:懒加载执行的,必须有一个 Action 触发算子触发执行。
  • checkpoint:算子不仅能将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系。
2.2.3 宽依赖和窄依赖
  1. 宽依赖

    ① 子 RDD 的每个分区依赖于所有的父 RDD 分区

    ② 对单个 RDD 基于 Key进行重组和 Reduce,如 groupByKeyreduceByKey

    ③ 对两个 RDD 基于 Key 进行 join 和重组,如 join

    ④ 经过大量 shuffle 生成的 RDD,建议进行缓存。这样避免失败后重新计算带来的开销。

  2. 窄依赖

    ① 子RDD的每个分区依赖于常数个父分区(与数据规模无关)

    ② 输入输出一对一的算子,且结果RDD的分区结构不变。主要是map/flatmap

    ③ 输入输出一对一的算子,但结果RDD的分区结构发生了变化,如union/coalesce

    ④ 从输入中选择部分元素的算子,如 filterdistinctsubstractsample

作者

NilEra

发布于

2024-06-12

更新于

2024-06-30

许可协议

评论