Spark学习笔记-入门(1)-Spark概述以及环境搭建

Spark概述

Spark是一种基于内存的、快速、可扩展的大数据分析计算引擎。

Hadoop的MapReduce框架和Spark框架都是数据处理引擎,它们之间当然存在一些不同之处。Hadoop的MapReduce框架设计初衷并不是为了满足循环迭代式的数据流处理,因此在许多并行运行的数据可复用场景(例如机器学习、图挖掘算法、交互式数据挖掘算法等)中存在计算效率问题。Spark在传统的MapReduce计算框架的基础上,利用其计算过程的优化大大加快了数据分析、挖掘的运行和读写速度,并将数据单元缩小到更适合并行计算和重复使用的RDD计算模型上。

  • Spark和Hadoop的根本差异式多个作业之间的数据通信问题,Spark多个作业之间的数据通信式基于内存的,而Hadoop是基于磁盘的。
  • Spark Task的启动时间快,它采用的是fork线程的方式,而Hadoop采用的是创建新进程的方式。
  • Spark只有在shuffle的时候将数据写入磁盘,而Hadoop中多个作业之间的数据交互需要依赖于磁盘交互

在绝大多数的数据计算场景中,Spark确实会比MapReduce更有优势,但是Spark是基于内存的,所以在实际的生产环境中, 内存资源是一大限制条件。可能会由于内存资源不够而导致Job执行失败

Spark由以下核心模块构成:

  • Spark Core: Spark Core中提供了Spark中最基础与最核心的功能,Spark其他的功能如Spark SQL、Spark Streaming、Spark MLlib和Spark GraphX都是在Spark Core的基础上进行扩展的
  • Spark SQL: Spark SQL是Spark用来操作结构化数据的组件,通过Spark SQL,用户可以使用SQL或者Apache Hive版本的HQL来查询数据
  • Spark Streaming: Spark Streaming是Spark平台上针对实时数据进行流式计算的组件,提供了丰富的处理数据流的API
  • Spark MLlib: MLlib是Spark提供的一个机器学习算法库。其中不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语
  • Spark GraphX: GraphX是Spark面向图计算提供的框架与算法库

Word Count项目搭建

上面是简要介绍了一些Spark的相关知识,下面可以通过一个入门案例Word Count来开启Spark的学习之路。

项目环境搭建

首先我们需要搭建项目的环境。使用IDEA创建对应的Maven环境。由于Spark的底层编写使用的是Scala,后续我们也会使用Scala来进行相关的开发,所以创建项目的时候需要实现Scala的相关配置,详情可以参考另一篇有关Scala的学习笔记 Scala学习笔记-入门(1)-Scala简介

配置完Scala相关环境之后,还需要引入对应的Maven依赖。这里使用的版本是Spark-3.1.3,依赖如下:

1
2
3
4
5
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.3</version>
</dependency>

这里在选择版本的时候需要注意Scala和Spark之间的版本匹配,我们可以参考Maven中的相关说明: Maven Repository: org.apache.spark » spark-core (mvnrepository.com)

这里使用的版本是Spark-3.1.3,对应使用的Scala版本为2.12.15

Word Count实现

Word Count是大数据学科中最常见的案例,我们在项目目录下的data目录中准备了一些文件,其中是以空格分隔的单词。

下面是Word Count的一种实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 创建Spark运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

// 创建Spark上下文环境对象
val context = new SparkContext(sparkConf)

// 业务操作
// 1.读取文件,获取的是一行一行的信息,得到多行lines,每行line是字符串
val lines = context.textFile("data")

// 2.将每个字符串进行拆分并扁平化,得到words
val words = lines.flatMap(_.split(" "))

// 3.将words按照内容进行分组,便于统计
val wordsGroup = words.groupBy(word => word)

// 4.统计每个分组内的个数
val resList1 = wordsGroup.map(kv => {
val word = kv._1
val list = kv._2
(word, list.size)
})
// 还可以使用模式匹配
val resList2 = wordsGroup.map({
case (str, strings) => (str, strings.size)
})

// 5.打印内容
resList2.foreach(println)

// 关闭Spark连接
context.stop()

这里使用的Spark环境是临时创建的,使用后会被删除。在使用Spark的时候,首先需要获取对应的配置对象,创建Spark上下文对象,之后利用相关API执行业务逻辑,最后关闭Spark连接。

这里在运行的时候可能会出现下面的报错信息:

1
Scala signature package has wrong version expected: 5.0 found: 5.2 in packag

如果出现这种情况,可能是由于Scala的版本问题。引入的Spark-Core依赖可能同时支持Scala 2.12和2.13,如果本地Scala的版本是2.12,那么选择一个只支持2.12的版本例如Spark-Core 3.1.3可以解决该问题

Word Count还可以利用下面的逻辑实现,这种逻辑更加类似于MapRuduce的二阶段实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 创建Spark运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")

// 创建Spark上下文环境对象
val context = new SparkContext(sparkConf)

// 业务操作
// 1.读取文件,获取的是一行一行的信息,得到多行lines,每行line是字符串
val lines = context.textFile("data")

// 2.将每个字符串进行拆分并扁平化,得到words
val words = lines.flatMap(_.split(" "))

// 3.将words转换数据结构 word => (word, 1)
val wordsAndOne = words.map((_, 1))

// 4.将转换后的数据按照相同的单词进行分组聚合
val wordsGroup = wordsAndOne.groupBy(_._1)
val wordsAndCount = wordsGroup.map(kv=>{
val list = kv._2
list.reduce((v1, v2)=>{
(v1._1, v1._2 + v2._2)
})
})

// 5.将数据聚合结果采集到内存当中
val resList = wordsAndCount.collect()

// 6.打印内容
resList.foreach(println)

// 关闭Spark连接
context.stop()

可以看到,在上面的业务逻辑中,我们使用的是在原生Scala中也存在的一些集合高级操作。而Spark提供给我们一些更加方便好用的API,例如上面的步骤4,将集合按照相同单词进行聚合的操作可以调用如下的API:

1
val wordsAndCount = wordsAndOne.reduceByKey(_ + _)

该操作可以按照key对后续的value进行操作,类似于MapReduce框架中Reducer进行的操作

日志输出

在执行过程中,会产生大量的执行日志。为了更好地查看程序的执行结果,可以在项目的resources目录中创建log4j.properties文件,并添加日志配置信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell,
the
# log level for this class is used to overwrite the root logger's log level, so
that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent
UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

环境

Spark作为一个数据处理框架和计算引擎,被设计成能够在常见的集群环境中运行。在工作中主流的环境为Yarn,不过容器式环境也逐渐流行。

Local模式

配置

该Local模式不同于之前在IDEA中使用的模式。在IDEA中,我们的环境在使用完成之后会被删除,被称为开发环境。而这个Local模式的资源始终存在。这里,Local模式指的是不需要其他任何节点资源就可以在本地执行Spark代码的环境。

首先从官网下载对应的Spark文件,这里对应版本下载得到文件:spark-3.1.3-bin-hadoop3.2.tgz,然后将文件上传到集群当中,解压到对应路径中

1
tar -zxvf spark-3.1.3-bin-hadoop3.2.tgz -C /opt/module

为了方便,可以将对应文件夹的名称改为spark-3.1.3

命令行工具

之后,进入对应下载的Spark路径下,执行如下指令:

1
bin/spark-shell

可以进入命令行工具中。在启用命令行的时候,可以输入网址进行Web UI监控页面的访问,使用端口为4040。(hadoop102:4040)

可以在data目录下准备对应的word.txt文件,之后用一个简单的word count代码来进行测试

1
2
scala> sc.textFile("data/word.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).collect()
res0: Array[(String, Int)] = Array((Hello, 2), (Scala, 1), (Spark, 1))

提交应用

也可以采用jar包的形式提交应用运行,下面的命令提交了存在于examples文件夹中的一个示例程序,完成的功能是圆周率Π的计算

1
2
3
4
5
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.12-3.1.3.jar\
10
  • --class:表示要执行程序主类

  • --master:表示部署模式,默认为本地模式,其中的数字表示分配的虚拟CPU核数量

  • ./examples/jars/spark-examples_2.12-3.13.jar:运行的应用类所在的jar包位置

  • 10:程序的入口参数,用于设定当前应用的任务数量

Standalone模式

配置

Standalone指的是独立部署模式,即只使用Spark自身节点运行的集群模式。Spark的Standalone模式是经典的主从模式,规划如下:

hadoop102 hadoop103 hadoop104
角色(进程) Worker、Master Worker Worker

使用独立部署模式需要修改对应的配置文件。

Spark的配置文件都存在于$SPARK_HOME下的conf文件夹中,并且在初始情况下,所有的配置文件的最后后缀都是.template,如果需要对应的配置文件生效,需要删除该后缀

修改workers文件,将其中的localhost修改为Worker节点:

1
2
3
hadoop102
hadoop103
hadoop104

修改spark-env.sh文件,添加JAVA_HOME环境变量以及集群对应的Master节点,默认采用7077端口进行通信

1
2
3
4
export JAVA_HOME=/opt/module/jdk1.8.0_212

SPARK_MASTER_HOST=hadoop102
SPARK_MASTER_PORT=7077

将完成的配置文件目录conf进行分发

之后可以启动集群查看效果

1
sbin/start-all.sh

可以查看Master资源监控Web UI界面(hadoop102:8080)

提交应用

与上面的命令类似,唯一需要改动的地方就是部署模式

1
2
3
4
5
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077 \
./examples/jars/spark-examples_2.12-3.1.3.jar \
10

配置历史服务

前面会看到,如果spark-shell停掉之后,集群监控的4040页面就看不到历史任务的运行情况,所以在开发的时候都需要配置历史服务器来记录任务运行情况。

修改spark-defaults.conf文件,配置日志存储路径

1
2
spark.eventLog.enabled	true
spark.eventLog.dir hdfs://hadoop102:8020/spark-history

这里配置的是HDFS文件系统中的位置,需要保证该处的spark-history目录存在,因此需要启动HDFS集群后进行创建

之后修改spark-env.sh文件,添加日志配置:

1
2
3
4
export SPARK_HISTORY_OPTS="
-Dspark.history.ui.port=18080
-Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/spark-history
-Dspark.history.retainedApplications=30"
  • 参数1:指定Web UI的访问端口为18080
  • 参数2:指定历史服务器的日志存储路径
  • 参数3:指定保存Application历史记录的个数。如果超过这个值,旧的应用程序信息将被删除。(内存中的应用数,而不是页面上显示的应用数)

修改完成之后分发配置文件目录conf

重启集群和历史服务查看效果(HDFS集群也要启动)

1
2
# HADOOP_HOME
sbin/start-dfs.sh
1
2
3
# SPARK_HOME
sbin/start-all.sh
sbin/start-history-server.sh

配置高可用

在没有高可用的情况下,Master节点仅有一个,存在单点故障问题。为了解决单点故障问题,需要在集群中配置多个Master节点,一旦处于活动状态的Master发生故障,由备用的Master提供服务,保证作业继续执行。高可用一般采用Zookeeper设置

hadoop102 hadoop103 hadoop104
角色 Master、Zookeeper、Worker Master、Zookeeper、Worker Zookeeper、Worker

修改spark-env.sh

注释如下内容:

1
2
#SPARK_MASTER_HOST=hadoop102
#SPARK_MASTER_PORT=7077

添加如下内容:

1
2
3
4
5
6
SPARK_MASTER_WEBUI_PORT=8989

export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104
-Dspark.deploy.zookeeper.dir=/spark"

分发配置文件目录conf

之后启动集群,同时还要启动Zookeeper。此时仍然只有一个Master。需要在另一台非Master的机器例如hadoop103上单独启动Master

1
2
# SPARK_HOME
sbin/start-master.sh

在高可用情况下的应用提交

1
2
3
4
5
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://hadoop102:7077, hadoop103:7077 \
./examples/jars/spark-examples_2.12-3.13.jar \
10

Yarn模式

配置

独立部署(Standalone)模式由 Spark 自身提供计算资源,无需其他框架提供资源。这种方式降低了和其他第三方资源框架的耦合性,独立性非常强。但是Spark主要是计算框架,而不是资源调度框架,Spark本身提供的资源调度并不是它的强项,因此可以选择继承其他专业的资源调度框架,例如Yarn

在学习Hadoop的过程中,我们已经完成了Yarn集群的配置,这里就不再赘述,参考之前的配置流程即可。

修改spark-env.sh,添加Yarn配置文件的位置

1
YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop

提交应用

提交应用同样需要修改部署模式

1
2
3
4
5
6
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.1.3.jar \
10

(这里的deploy-mode还可以选择 client,可能需要配置历史服务,配置方式同前面一致)

提交完成之后可以在对应的Web页面查看日志(hadoop103:8088)

容器模式

(待补充...)

Windows模式

Spark提供了在Windows系统下启动本地集群的方式,可以直接在Windows中使用。

将文件解压到对应的位置,里面的目录同虚拟机中完全一致。sbin/start-shell.cmd可以启动Spark本地环境

提交应用也是类似的操作

1
2
spark-submit --class org.apache.spark.examples.SparkPi --master 
local[2] ../examples/jars/spark-examples_2.12-3.0.0.jar 10

部署模式对比

模式 Spark安装机器数 需要启动的进程 所属者 应用场景
Local 1 Spark 测试
Standalone 3 Master以及Worker Spark 单独部署
Yarn 1 Yarn以及HDFS Hadoop 混合部署

使用端口说明

  • Spark查看当前Spark-shell运行任务情况端口号:4040
  • Spark Master内部通信服务端口号:7077
  • Standalone模式下,Spark Master Web端口号:8080
  • Spark历史服务器端口号:18080
  • Hadoop Yarn任务运行情况查看端口号:8088

参考文章

  1. spark 与 scala 的对应版本查看_孙砚秋的博客-CSDN博客_spark与scala版本对应
  2. scala signature package has wrong version expected: 5.0 found: 5.2 in package.class问题记录_盐水鱼的博客-CSDN博客

Spark学习笔记-入门(1)-Spark概述以及环境搭建
http://example.com/2022/05/06/Spark学习笔记-入门-1-Spark概述以及环境搭建/
作者
EverNorif
发布于
2022年5月6日
许可协议