一个计算机技术爱好者与学习者

0%

Linux中搭建Spark集群

1. 前言

计划在三台Linux主机中搭建Spark 3.3.1集群,主机配置为4C8G,操作系统为CentOS7。
三台主机的IP为:

1
2
3
192.168.56.101
192.168.56.102
192.168.56.103

选择101作为master节点,另外两个作为worker节点。

参考文档:

2. 安装Java

参考文档《全平台安装JDK》

3. 安装Spark

3.1. master节点配置

1、下载spark并解压

1
2
3
wget https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3-scala2.13.tgz
mkdir -p /usr/local/spark
tar -xzvf spark-3.3.1-bin-hadoop3-scala2.13.tgz -C /usr/local/spark

这里选择spark-3.3.1-bin-hadoop3-scala2.13.tgz版本,带着scala一起。
spark-3.3.1-bin-hadoop3.tgz也包含scala,但是版本好像有些问题,因此不选择它。

更多版本的spark,可以在 Download Apache SparkSpark release archives 页面找到。

2、创建配置文件

1
2
3
4
cd /usr/local/spark/spark-3.3.1-bin-hadoop3-scala2.13/conf
cp workers.template workers
cp spark-defaults.conf.template spark-defaults.conf
cp spark-env.sh.template spark-env.sh

3、修改配置
(1)workers中删除localhost,添加

1
2
3
#192.168.56.101
192.168.56.102
192.168.56.103

(2)spark-defaults.conf暂时不变

(3)spark-env.sh中添加

1
2
3
export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_161
export SPARK_MASTER_HOST=192.168.56.101
export SPARK_MASTER_PORT=7077

4、/etc/profile中添加环境变量

1
2
export SPARK_HOME=/usr/local/spark/spark-3.3.1-bin-hadoop3-scala2.13
export PATH=$SPARK_HOME/bin:$PATH

5、使配置生效

1
source /etc/profile

3.2. master配置同步到worker

1、打包spark程序和配置

1
2
cd /usr/local/spark
tar -czvf spark-3.3.1-bin-hadoop3-scala2.13.tgz spark-3.3.1-bin-hadoop3-scala2.13

2、拷贝程序和配置到worker节点

1
2
3
4
5
scp spark-3.3.1-bin-hadoop3-scala2.13.tgz 192.168.56.102:~
scp /etc/profile 192.168.56.102:/etc/profile

scp spark-3.3.1-bin-hadoop3-scala2.13.tgz 192.168.56.103:~
scp /etc/profile 192.168.56.103:/etc/profile

3.3. worker节点配置

1、解压spark

1
2
mkdir -p /usr/local/spark
tar -xzvf spark-3.3.1-bin-hadoop3-scala2.13.tgz -C /usr/local/spark

3.4. 运行Spark

1、启动spark(在master节点上执行)

1
2
cd /usr/local/spark/spark-3.3.1-bin-hadoop3-scala2.13/sbin
./start-all.sh

根据提示,依次输入两台worker节点的密码。(这里最好配置上免密登录)
这样,三个节点上的spark就都可以启动起来。

2、验证安装

1
jps

master节点看到Master和Jps两个进程,worker节点看到Worker和Jps两个进程。

4. 使用Spark

4.1. 单机测试

1、在master或者worker节点上执行

1
2
3
cd /usr/local/spark/spark-3.3.1-bin-hadoop3-scala2.13/

./bin/run-example SparkPi 10

输出:

1
2
3
4
5
6
22/11/09 20:38:29 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 1.118 s
22/11/09 20:38:29 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
22/11/09 20:38:29 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
22/11/09 20:38:29 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 1.175116 s
Pi is roughly 3.138907138907139
22/11/09 20:38:29 INFO SparkUI: Stopped Spark web UI at http://spark-master:4040

2、查看执行进度
浏览器访问 http://192.168.56.101:8080

4.2. spark-submit提交任务

1
2
3
4
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://192.168.56.101:7077 \
./examples/jars/spark-examples_2.12-3.3.1.jar 1000

spark-submit提交的任务,执行速度非常非常慢,不知道为什么,留个坑吧。。。

4.3. 连接mysql

1、创建测试文件 test_mysql.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pyspark.sql import SparkSession

def test_sql():
spark = SparkSession.builder.appName('testmysql').master('local').getOrCreate()

df = spark.read.format("jdbc")\
.option("url", "jdbc:mysql://192.168.56.101:3306/testdb")\
.option("driver", "org.mariadb.jdbc.Driver")\
.option("dbtable", "testtable")\
.option("user", "root")\
.option("password", "xxxxxx")\
.load()
df.printSchema()
df.select(df['testcolumn']).show()

if __name__ == "__main__":
test_sql()

2、提交测试

1
spark-submit mysql.py

4.4. 连接kafka

1、创建测试文件 test_kafka.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark.sql import SparkSession


def read_data():
spark = SparkSession.builder.appName('testkafka').getOrCreate()
# Creating a Kafka Source for Streaming Queries
df = spark.readStream\
.format('kafka')\
.option("kafka.bootstrap.servers", "192.168.56.101:9092")\
.option("subscribe", "customer_volume")\
.load()
print(">>>> print schema:")
df.printSchema()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


if __name__ == "__main__":
read_data()

2、提交测试

1
spark-submit test_kafka.py
报错:
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer

看提示是因为缺少kafka序列化类,根本原因是因为缺少kafka client jar包。

解决办法,下载kafka-clients jar,放入到 /usr/local/spark/spark-3.3.1-bin-hadoop3-scala2.13/jars 目录中

类似的,spark连接kafka相关的jar包还有:

参考文档:Spark 3.0.1 Structured Streaming 提交程序异常解决

5. 后记

对于缺少jar包问题,都可以到 MVNREPOSITORY 这个网站查找需要的jar包,很全。