一个计算机技术爱好者与学习者

0%

好好学K8S:K8S中安装配置Spark

1. spark-on-k8s-operator简介

spark-on-k8s-operator: Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.
The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications.

spark-on-k8s-operator工作流程:
1、提交sparkApplication的请求到api-server
2、把sparkApplication的CRD持久化到etcd
3、operator订阅发现有sparkApplication,获取后通过submission 4、runner提交spark-submit过程,请求给到api-server后生成对应的driver/executor pod
5、spark pod monitor会监控到application执行的状态(所以通过sparkctl可以通过list、status看到)
mutating adminission webhook建svc,可以查看spark web ui

简而言之,spark-on-k8s-operator改变了传统的spark任务运行方式,能够提高资源利用率和节省资源。用户提交CRD之后,k8s才会创建运行spark任务需要的pod,从而能够利用整个k8s集群的资源。任务跑完之后,pod会被回收,从而节省资源。

参考文档:

2. 安装spark-on-k8s-operator

1、安装operator

1
2
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator
helm install my-release spark-operator/spark-operator --namespace spark-operator --create-namespace

或者

1
2
3
git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.git
cd spark-on-k8s-operator/charts/spark-operator-chart
helm install my-release . --namespace spark-operator --create-namespace

2、镜像拉取报错处理
拉取镜像 ghcr.io/googlecloudplatform/spark-operator:v1beta2-1.3.7-3.1.1 时报错ErrImagePull,这是因为ghcr.io被墙了,需要一些技巧进行下载。
方法一:科学上网下载镜像到本地,然后上传镜像到内部镜像仓库,修改yaml使用内部镜像仓库。
方法二:利用GitHub Actions下载ghcr.io的镜像,上传到Docker Hub,详情参考togettoyou/hub-mirror

这里使用方法一,修改镜像为内部镜像:

1
2
3
4
docker pull ghcr.io/googlecloudplatform/spark-operator:v1beta2-1.3.7-3.1.1
docker tag ghcr.io/googlecloudplatform/spark-operator:v1beta2-1.3.7-3.1.1 harbor.voidking.com/ghcr.io/googlecloudplatform/spark-operator:v1beta2-1.3.7-3.1.1
docker login harbor.voidking.com
docker push harbor.voidking.com/ghcr.io/googlecloudplatform/spark-operator:v1beta2-1.3.7-3.1.1

3、替换镜像

1
kubectl edit deployment.apps/my-release-spark-operator -n spark-operator

修改镜像为内部镜像地址。

3. 使用spark-on-k8s-operator

1、下载spark-on-k8s-operator/examples/spark-pi.yaml

2、修改镜像为内部镜像

1
2
3
docker pull gcr.io/spark-operator/spark:v3.1.1
docker tag gcr.io/spark-operator/spark:v3.1.1 harbor.voidking.com/gcr.io/spark-operator/spark:v3.1.1
docker push harbor.voidking.com/gcr.io/spark-operator/spark:v3.1.1

3、替换镜像
修改spark-pi.yaml中的镜像为内部镜像地址

4、生效yaml文件

1
kubectl apply -f spark-pi.yaml

5、查看spark应用

1
2
kubectl get sparkapplications spark-pi -o=yaml -n default
kubectl describe sparkapplication spark-pi -n default

报错:error looking up service account default/spark: serviceaccount “spark” not found.

官方文档中也给了说明,spark这个serviceaccount需要替换成有权限的serviceaccount,该serviceaccount有权创建、获取、列出和删除执行程序 pod,并为驱动程序创建 Kubernetes 无头服务。

注意:不能直接替换成default,否则spark-pi-driver执行时会报错权限不够。
可以选择给default授权,或者自己创建一个新的sa,这里我们选择后者。

6、创建sa
参考spark-application-rbac.yaml,修改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
name: spark-role
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["*"]
- apiGroups: [""]
resources: ["services"]
verbs: ["*"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: default
subjects:
- kind: ServiceAccount
name: spark
namespace: default
roleRef:
kind: Role
name: spark-role
apiGroup: rbac.authorization.k8s.io

注意: configmaps 的管理权限也是必要的。

执行创建sa

1
kubectl apply -f spark-application-rbac.yaml

如果创建的不是名为spark的sa,那么还需要修改spark-pi的spec yaml

1
kubectl edit sparkapplication spark-pi -n default

7、查看spark资源情况

1
kubectl get all -n default | grep spark


以上,符合预期。如果出现Error,那么通过日志进行排查。

8、查看执行结果(查看日志)

1
kubectl logs pod/spark-pi-driver -n default


日志中会输出:Pi is roughly 3.145515727578638

4. sparkapplication yaml编写

参考文档:

5. 调用hive

5.1. hive基本操作

1、登录hiveserver
kubectl exec进入到hive pod中,执行登录命令

1
2
3
hive 
# or
beeline -u jdbc:hive2://localhost:10000 username password

2、查看hive_client_test表内容

1
2
3
4
show databases;
use default;
show tables;
select * from hive_client_test;

5.2. spark调用hive

spark调用hive时,关于hive的配置,有两种配置方式:一种是写在代码中,一种是配置在环境变量中。
写在代码中的demo:

1
2
3
4
5
6
7
spark = SparkSession.builder \
.config("hive.metastore.uris","thrift://hive-metastore.bigdata.svc:9083") \
.appName("test") \
.enableHiveSupport() \
.getOrCreate()
read_df = spark.sql("select * from default.hive_client_test limit 1")
read_df.show()

如果不使用config,那么默认读取环境变量中的配置:

1
2
3
4
5
6
7
spark = SparkSession.builder\
.master("local[*]") \
.appName("test") \
.enableHiveSupport() \
.getOrCreate()
read_df=spark.sql("select * from default.hive_client_test limit 1")
read_df.show()

hive.metastore.uris对应的环境变量是啥?

5.3. sparkapplication定义

参考文档:

这里比较友好的是,gcr.io/spark-operator/spark-py:v3.1.1镜像中包含所有的spark examples,可以比较方便地进行测试。

已知k8s集群中存在hive,hive-metastore的svc为hive-metastore.bigdata.svc

1、修改hive.py,内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
A simple example demonstrating Spark SQL Hive integration.
Run with:
./bin/spark-submit examples/src/main/python/sql/hive.py
"""
# $example on:spark_hive$
from os.path import abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row
# $example off:spark_hive$


if __name__ == "__main__":
# $example on:spark_hive$
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
.builder \
.config("hive.metastore.uris","thrift://hive-metastore.bigdata.svc:9083") \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH '/opt/spark/examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...

# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+

# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...

# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...
# $example off:spark_hive$

spark.stop()

2、docker commit新的镜像
harbor.voidking.com/gcrmirror/spark-operator/spark-py:v3.1.1.0

3、创建spark-hive.yaml,内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: pyspark-hive
namespace: default
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "harbor.voidking.com/gcrmirror/spark-operator/spark-py:v3.1.1.0"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/src/main/python/sql/hive.py
sparkVersion: "3.1.1"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1.0
serviceAccount: spark
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1.0

4、创建sparkapplication

1
kubectl apply -f spark-hive.yaml

5、查看结果

1
2
kubectl get all | grep hive
kubectl logs pod/pyspark-hive-driver

  • 本文作者: 好好学习的郝
  • 原文链接: https://www.voidking.com/dev-k8s-spark/
  • 版权声明: 本文采用 BY-NC-SA 许可协议,转载请注明出处!源站会即时更新知识点并修正错误,欢迎访问~
  • 微信公众号同步更新,欢迎关注~