Knative-Eventing 结合Kafka Channel实现事件总线
整体框架
如上图,主要以Kafka Channel为例,整体分为控制面和数据面。控制面负责根据Broker或者Trigger生成一系列的其他资源,包括运行的pod和channel类的配置等。
控制面
整体流程介绍
有两个流程可以触发配置的生成,即broker的创建流程和trigger的创建流程,均为外部添加,分别定义生产和消费的流程。结合上图,每种颜色的虚线代表一种触发流程。
涉及模块与资源
介绍流程前,先介绍涉及的资源。
- 各类Controller
- mt-broker-controller
多租户broker的controller,watch broker和trigger,当有添加或者变更时,分别生成channel和subscription(简称sub); - event-controller
负责channel backup、sub同步、顺序投递等多项内容。
核心流程是sub的流程,根据mt-broker-controller生成的sub,同步到channel中。 - eventing-kafka-channel-controller
watch kafka channel,有变更后负责kafka相关的资源的创建,包含topic、receiver、dispatcher、channel service等等,这里相对较复杂。
- mt-broker-controller
- 各类配置(k8s自定义资源)
- Broker
是使用方添加,核心是spec.config,表示是哪种类型的broker,后续由哪种类型的controller处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: e-0fa923f63c58387a
namespace: console-integration-test
spec:
config:
apiVersion: v1
kind: ConfigMap
name: config-br-default-channel
namespace: knative-eventing
status:
address:
url: http://broker-ingress.knative-eventing.svc.cluster.local/console-integration-test/e-0fa923f63c58387a
annotations:
knative.dev/channelAPIVersion: messaging.knative.dev/v1beta1
knative.dev/channelAddress: http://e-0fa923f63c58387a-kne-trigger-kn-channel.console-integration-test.svc.cluster.local
knative.dev/channelKind: KafkaChannel
knative.dev/channelName: e-0fa923f63c58387a-kne-trigger - Channel
包含大部分信息,channel定义具体的实现类型,如下边的kafka topic、订阅信息等,由mt-broker-controller、eventing-kafka-channel-controller和event-controller共同维护1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
metadata:
labels:
eventing.knative.dev/broker: e-0fa923f63c58387a
eventing.knative.dev/brokerEverything: "true"
kafkaTopic: console-integration-test.e-0fa923f63c58387a-kne-trigger
name: e-0fa923f63c58387a-kne-trigger
namespace: console-integration-test
ownerReferences:
- apiVersion: eventing.knative.dev/v1
blockOwnerDeletion: true
controller: true
kind: Broker
name: e-0fa923f63c58387a
uid: f4397394-5256-444f-986a-3d30494b9eb3
resourceVersion: "1256200358"
uid: a72f806c-4447-486a-a66f-f581272792db
spec:
numPartitions: 6
replicationFactor: 1
retentionDuration: PT168H
subscribers:
- generation: 1
replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/console-integration-test/e-0fa923f63c58387a
subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/console-integration-test/e-0fa923f63c58387a/7c2d2fb2-17e6-4358-8690-78e994a94926
uid: ee29f447-4d1b-480c-88d6-a9a34ec425e0
status:
address:
url: http://e-0fa923f63c58387a-kne-trigger-kn-channel.console-integration-test.svc.cluster.local - channel-service
维护channel对应的service,控制着流量从ingress出来,发送到哪里1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26apiVersion: v1
kind: Service
metadata:
labels:
k8s-app: eventing-kafka-channels
kafkachannel-name: e-0fa923f63c58387a-kne-trigger
kafkachannel-namespace: console-integration-test
kafkachannel-receiver: "true"
name: e-0fa923f63c58387a-kne-trigger-kn-channel
namespace: console-integration-test
ownerReferences:
- apiVersion: messaging.knative.dev/v1beta1
blockOwnerDeletion: true
controller: true
kind: KafkaChannel
name: e-0fa923f63c58387a-kne-trigger
uid: a72f806c-4447-486a-a66f-f581272792db
resourceVersion: "1250797977"
uid: 9f147779-0bd3-490c-9d14-f4717e14b300
spec:
externalName: kafka-cluster-32603413-receiver.knative-eventing.svc.cluster.local
internalTrafficPolicy: Cluster
sessionAffinity: None
type: ExternalName
status:
loadBalancer: {} - Trigger
trigger是使用方添加的,会触发消费相关的流程,指定了broker的信息,mt-broker-controller会根据broker生成对应的sub1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
labels:
eventing.knative.dev/broker: e-0fa923f63c58387a
name: e-0fa923f63c58387a
namespace: console-integration-test
ownerReferences:
- apiVersion: eventing.knative.dev/v1
blockOwnerDeletion: true
kind: Broker
name: e-0fa923f63c58387a
uid: f4397394-5256-444f-986a-3d30494b9eb3
resourceVersion: "1254842424"
uid: 7c2d2fb2-17e6-4358-8690-78e994a94926
spec:
broker: e-0fa923f63c58387a
filter: {}
subscriber:
uri: http://**.**.net/event/save?busId=e-0fa923f63c58387a&busProject=console - subscription
sub为订阅信息,涉及channel,相关核心的部分,会有event-controller同步到channel中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31apiVersion: messaging.knative.dev/v1
kind: Subscription
metadata:
labels:
eventing.knative.dev/broker: e-0fa923f63c58387a
eventing.knative.dev/trigger: e-0fa923f63c58387a
name: e-0fa923f63c58387a-e-0fa923f63caf5783cb75857ee6af2dda00ad7e4e29
namespace: console-integration-test
ownerReferences:
- apiVersion: eventing.knative.dev/v1
blockOwnerDeletion: true
controller: true
kind: Trigger
name: e-0fa923f63c58387a
uid: 7c2d2fb2-17e6-4358-8690-78e994a94926
resourceVersion: "1254842422"
uid: ee29f447-4d1b-480c-88d6-a9a34ec425e0
spec:
channel:
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
name: e-0fa923f63c58387a-kne-trigger
reply:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: e-0fa923f63c58387a
namespace: console-integration-test
subscriber:
uri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/console-integration-test/e-0fa923f63c58387a/7c2d2fb2-17e6-4358-8690-78e994a94926
status:
- Broker
Broker创建流程
- 通过接口或者cli生成broker
- mt-broker-controller 监听 borker
有broker添加后根据broker生成channel,并检查相关的ingress、dispatcher等状态,更新到broker的status中。
比如channel的url会更新到Annotations中,ingress会从这里获取去,其格式:http://{channel.name}.{namespace}.svc.cluster.local - eventing-kafka-channel-controller 监听 channel,有变更后,会进行多很多具体资源的创建。
- Kafka topic创建
- Receiver如果不存在,进行service和deployment的创建
- channel对应的service的创建,建立channel到receiver的关联关系,即确定上一步中url的具体服务地址,这样可以适配多种channel
- Dispather的service和deployment的创建,一个channel对应一个dispatcher的service
- 更新channel自己的信息,如Annotations和Labels
Trigger创建流程
- 通过接口或者cli生成trigger
- mt-broker-controller 监听 trigger,生成sub,主要是添加channel信息、生成对应的trigger url等,并更新trigger的状态
- eventing-controller 监听 sub,将变更同步到channel中,eventing-kafka-channel-controller将会根据channel的变更做相关的调整,这块同broker的创建流程中的相关部分
主要流程源码分析
根据上面的内容,流程上主要是:
- broker创建后channel的生成
其中涉及mt-broker-controller和eventing-kafka-channel-controller - trigger创建后sub的生成
其中涉及mt-broker-controller
依赖的框架
基于knative.dev/pkg/injection/sharedmain
https://github.com/knative/pkg/blob/main/injection/sharedmain/main.go
mt-broker-controller
1 | package main |
mt-broker-controller主要watch两类变更,broker和trigger的变更。
eventing-kafka-channel-controller
主要监听channel的变更。
webhook
数据面
整体流程介绍
一条消息发送和接受的整个流程:
- 消息先进入ingress,在ingress中完成CloudEvent的转换和相关校验,然后根据broker中的channel的svc地址,发送过去;
- receiver收到ingress发送过来的消息,从request.Host中解析出namespace,channel name,然后调用kafka的producer发送,此处除了config-map config-kafka和secret kafka-cluster基本不和外部模块交互;
- dispatcher根据订阅启动对应的消费者,消费到消息后,根据sub中的url,推送到filter;配置变更来源于channel,一个channel对应一个服务,所有的订阅都在一个服务中;
- filter主要实现了过滤,收到dispatcher的请求后,进行过滤,然后发送到接收端;
ingress
ingress主要完成了两个功能:
- message校验以及转换成CloudEvent
- 从broker annotations中的knative.dev/channelAddress查到地址,并发送到相应的地址
receiver
recevier主要负责将event发送到broker中,主要内容如下:
- 维护kafkaclient,这里配置从cm中获取(timeout、KeepAlive、RequiredAcks(默认配置是-1)),但是不支持动态更新,secret的变更支持动态更新
- 接收http请求,从host中获取到namespace和channel name,其格式:http://{channel.name}.{namespace}.svc.cluster.local
- 调用kafkaclient发送event
dispatcher
dispatcher跟其他模块不同,它是一个channel一个service,此外相对其他模块复杂些,其中包含控制和数据两个方面的变更,主要内容如下:
- 控制面
- Reconcile流程,感知channel的变更,根据sub部分的信息调整consumer,关闭减少的,启动新增的,并且会根据启动状态更新channel中的status
- 根据channel的Secret Watcher变更,同receiver,涉及kafka client的变更,此类变更比较重,会重启所有的consumer。这个过程也会触发reconcile流程。
- 数据面
相对简单,拿到消息后,根据配置,推送到filter,主要工作由channel.MessageDispatcher
filter
filter代码结构类似ingress,相对其他模块,较为简单,主要是接收http请求,处理后,在发送http请求,具体流程如下:
- 接收请求,并校验,从路径中解析出当前属于哪个trigger,格式如
/triggers/namespace/name/uid
,可以得到namespace和name - 根据namespace和name获取trigger的具体配置
- 根据配置获取filter,如果配置了,按其进行过滤
- 未过滤掉的发送给接收端
其他
模块间主要通过http通信,其地址是其对应的标识,各模块负责服务的资源地址格式如下:
资源 | 所在服务 | uri |
---|---|---|
broker | ingress | http://broker-ingress.knative-eventing.svc.cluster.local/{namespace}/{brokername} |
channel | receiver | http://{channel.name}.{namespace}.svc.cluster.local |
sub | dispatcher | 无 |
trigger | filter | http://broker-filter.knative-eventing.svc.cluster.local/triggers/{namespace}/{trigger.name}/{trigger.uid} |