一个计算机技术爱好者与学习者

0%

好好学K8S:K8S中的Leader Election机制

1. 需求描述

假设有一个基于client-go的程序,叫做watcher,会监听k8s集群中pod被删除的消息,当pod被删除时,会触发执行一个动作。
当watcher只有一个副本时,程序运行符合预期。但是当watcher有多个副本时,多个watcher副本都监听到pod被删除的消息,都会触发执行一个动作。而这个动作,我们希望只执行一次。
有什么办法,可以让多个watcher具备多个副本,但是当监听到pod被删除时,只会触发一次执行动作?

2. 实现思路

要实现这个需求,有三个思路:

  • 思路一:基于K8S ValidatingAdmissionWebhook机制。实现一个ValidatingAdmissionWebhook,限制watcher服务只能有一个副本。watcher服务只有一个副本,自然就只会执行一次动作,但是,与需求不相符,不能高可用。
  • 思路二:基于分布式锁。常见的实现方式包括使用 redis、zookeeper 或 etcd 等工具。这种实现方式简单,但是需要额外维护一个中间件,不够优雅。
  • 思路三:基于K8S Leader Election机制。K8S 提供了 Leader Election 机制,可以通过 client-go 库实现,只有被选为 Leader 的 watcher 才会处理 pod 删除事件。

本文中选择思路三(K8S Leader Election机制)来实现需求:服务多个副本但是只执行一次动作。

参考文档:

3. Leader Election机制简介

3.1. Leader Election原理

Leader Election 是一种分布式系统中的机制,旨在确保在多个候选者中选出一个“领导者”进程,以负责执行特定的操作。

1、候选者识别:在 Leader Election 中,首先需要识别出一组候选者,这些候选者可能是运行在同一集群中的多个实例(如 Pods)。这些候选者会竞争成为领导者。

2、竞选过程:候选者通过某种方式(如心跳信号)宣告自己为领导者。通常,所有候选者会尝试同时声明自己为领导者。其中一个候选者成功地获得领导权,而其他候选者则进入待命状态,准备在当前领导者失效时进行新的竞选。

3、心跳机制:一旦某个实例成为领导者,它会定期发送心跳信号以维持其领导地位。如果领导者未能在预定时间内发送心跳信号,其他候选者将启动新的竞选过程,以确保始终有一个活跃的领导者。

4、故障恢复:当当前领导者发生故障或被终止时,其他候选者会迅速重新进行竞选,以确定新的领导者。这种机制保证了系统的高可用性。

4. K8S 中的 Leader Election 实现

Kubernetes 提供了一种简化的方式来实现 Leader Election,相关概念包括资源锁和Lease API。

  • 资源锁:Kubernetes 使用 ConfigMapLease 等资源作为锁来管理领导权。每个候选者尝试更新这个资源以声明自己为领导者。例如,通过更新 ConfigMap 中的某个字段来表示当前的领导者。
  • Lease API:Kubernetes 从 v1.14 开始引入了 coordination.k8s.io API,允许更高效地管理领导权。使用 Lease 对象可以减少对 API 的调用频率,并避免过多的事件通知。

选举过程:
1、候选者创建或获取 Lease 对象,并尝试更新其内容以表明其身份。
2、只有第一个成功更新 Lease 的实例能够获得领导权。
3、其他实例在发现 Lease 被更新后,将停止尝试并进入待命状态。

选举机制保证了控制器的高可用,同时只有一个控制器为主,其他为从,防止同个事件被多次重复监听,重复执行相关的业务逻辑。

5. Leader Election示例代码

示例代码地址:examples - leader-election

1、创建示例项目

1
2
mkdir leader-election-demo && cd leader-election-demo
go mod init leader-election-demo

2、粘贴示例代码
创建文件 main.go ,并且把示例代码粘贴进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"flag"
"os"
"os/signal"
"syscall"
"time"

"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
)

func buildConfig(kubeconfig string) (*rest.Config, error) {
if kubeconfig != "" {
cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
return cfg, nil
}

cfg, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
return cfg, nil
}

func main() {
klog.InitFlags(nil)

var kubeconfig string
var leaseLockName string
var leaseLockNamespace string
var id string

flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name")
flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name")
flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
flag.Parse()

if leaseLockName == "" {
klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).")
}
if leaseLockNamespace == "" {
klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).")
}

// leader election uses the Kubernetes API by writing to a
// lock object, which can be a LeaseLock object (preferred),
// a ConfigMap, or an Endpoints (deprecated) object.
// Conflicting writes are detected and each client handles those actions
// independently.
config, err := buildConfig(kubeconfig)
if err != nil {
klog.Fatal(err)
}
client := clientset.NewForConfigOrDie(config)

run := func(ctx context.Context) {
// complete your controller loop here
klog.Info("Controller loop...")

select {}
}

// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()

// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: client.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}

// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
run(ctx)
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == id {
// I just got the lock
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
}

3、安装依赖

1
go mod tidy

4、运行代码

1
2
3
4
5
6
7
8
# first terminal
go run main.go -kubeconfig=/path/to/kubeconfig -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1

# second terminal
go run main.go -kubeconfig=/path/to/kubeconfig -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2

# third terminal
go run main.go -kubeconfig=/path/to/kubeconfig -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=3

5、查看lease

1
kubectl get lease

看到如下内容:

1
2
NAME      HOLDER   AGE
example 1 104s

6、结束1号进程,再次查看lease
看到如下内容:

1
2
NAME      HOLDER   AGE
example 3 3m12s

上面的示例,可以证明Leader Election机制已经生效了。当1号进程不可用时,另外的两个进程其中之一会成为领导者。