整体框架

如上图,主要以Kafka Channel为例,整体分为控制面和数据面。控制面负责根据Broker或者Trigger生成一系列的其他资源,包括运行的pod和channel类的配置等。

控制面

整体流程介绍

有两个流程可以触发配置的生成,即broker的创建流程和trigger的创建流程,均为外部添加,分别定义生产和消费的流程。结合上图,每种颜色的虚线代表一种触发流程。

涉及模块与资源

介绍流程前,先介绍涉及的资源。

  • 各类Controller
    • mt-broker-controller
      多租户broker的controller,watch broker和trigger,当有添加或者变更时,分别生成channel和subscription(简称sub);
    • event-controller
      负责channel backup、sub同步、顺序投递等多项内容。
      核心流程是sub的流程,根据mt-broker-controller生成的sub,同步到channel中。
    • eventing-kafka-channel-controller
      watch kafka channel,有变更后负责kafka相关的资源的创建,包含topic、receiver、dispatcher、channel service等等,这里相对较复杂。
  • 各类配置(k8s自定义资源)
    1. Broker
      是使用方添加,核心是spec.config,表示是哪种类型的broker,后续由哪种类型的controller处理。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      metadata:
      name: e-0fa923f63c58387a
      namespace: console-integration-test
      spec:
      config:
      apiVersion: v1
      kind: ConfigMap
      name: config-br-default-channel
      namespace: knative-eventing
      status:
      address:
      url: http://broker-ingress.knative-eventing.svc.cluster.local/console-integration-test/e-0fa923f63c58387a
      annotations:
      knative.dev/channelAPIVersion: messaging.knative.dev/v1beta1
      knative.dev/channelAddress: http://e-0fa923f63c58387a-kne-trigger-kn-channel.console-integration-test.svc.cluster.local
      knative.dev/channelKind: KafkaChannel
      knative.dev/channelName: e-0fa923f63c58387a-kne-trigger
    2. Channel
      包含大部分信息,channel定义具体的实现类型,如下边的kafka topic、订阅信息等,由mt-broker-controller、eventing-kafka-channel-controller和event-controller共同维护
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      apiVersion: messaging.knative.dev/v1beta1
      kind: KafkaChannel
      metadata:
      labels:
      eventing.knative.dev/broker: e-0fa923f63c58387a
      eventing.knative.dev/brokerEverything: "true"
      kafkaTopic: console-integration-test.e-0fa923f63c58387a-kne-trigger
      name: e-0fa923f63c58387a-kne-trigger
      namespace: console-integration-test
      ownerReferences:
      - apiVersion: eventing.knative.dev/v1
      blockOwnerDeletion: true
      controller: true
      kind: Broker
      name: e-0fa923f63c58387a
      uid: f4397394-5256-444f-986a-3d30494b9eb3
      resourceVersion: "1256200358"
      uid: a72f806c-4447-486a-a66f-f581272792db
      spec:
      numPartitions: 6
      replicationFactor: 1
      retentionDuration: PT168H
      subscribers:
      - generation: 1
      replyUri: http://broker-ingress.knative-eventing.svc.cluster.local/console-integration-test/e-0fa923f63c58387a
      subscriberUri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/console-integration-test/e-0fa923f63c58387a/7c2d2fb2-17e6-4358-8690-78e994a94926
      uid: ee29f447-4d1b-480c-88d6-a9a34ec425e0
      status:
      address:
      url: http://e-0fa923f63c58387a-kne-trigger-kn-channel.console-integration-test.svc.cluster.local
    3. channel-service
      维护channel对应的service,控制着流量从ingress出来,发送到哪里
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      apiVersion: v1
      kind: Service
      metadata:
      labels:
      k8s-app: eventing-kafka-channels
      kafkachannel-name: e-0fa923f63c58387a-kne-trigger
      kafkachannel-namespace: console-integration-test
      kafkachannel-receiver: "true"
      name: e-0fa923f63c58387a-kne-trigger-kn-channel
      namespace: console-integration-test
      ownerReferences:
      - apiVersion: messaging.knative.dev/v1beta1
      blockOwnerDeletion: true
      controller: true
      kind: KafkaChannel
      name: e-0fa923f63c58387a-kne-trigger
      uid: a72f806c-4447-486a-a66f-f581272792db
      resourceVersion: "1250797977"
      uid: 9f147779-0bd3-490c-9d14-f4717e14b300
      spec:
      externalName: kafka-cluster-32603413-receiver.knative-eventing.svc.cluster.local
      internalTrafficPolicy: Cluster
      sessionAffinity: None
      type: ExternalName
      status:
      loadBalancer: {}
    4. Trigger
      trigger是使用方添加的,会触发消费相关的流程,指定了broker的信息,mt-broker-controller会根据broker生成对应的sub
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      apiVersion: eventing.knative.dev/v1
      kind: Trigger
      metadata:
      labels:
      eventing.knative.dev/broker: e-0fa923f63c58387a
      name: e-0fa923f63c58387a
      namespace: console-integration-test
      ownerReferences:
      - apiVersion: eventing.knative.dev/v1
      blockOwnerDeletion: true
      kind: Broker
      name: e-0fa923f63c58387a
      uid: f4397394-5256-444f-986a-3d30494b9eb3
      resourceVersion: "1254842424"
      uid: 7c2d2fb2-17e6-4358-8690-78e994a94926
      spec:
      broker: e-0fa923f63c58387a
      filter: {}
      subscriber:
      uri: http://**.**.net/event/save?busId=e-0fa923f63c58387a&busProject=console
    5. subscription
      sub为订阅信息,涉及channel,相关核心的部分,会有event-controller同步到channel中
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      apiVersion: messaging.knative.dev/v1
      kind: Subscription
      metadata:
      labels:
      eventing.knative.dev/broker: e-0fa923f63c58387a
      eventing.knative.dev/trigger: e-0fa923f63c58387a
      name: e-0fa923f63c58387a-e-0fa923f63caf5783cb75857ee6af2dda00ad7e4e29
      namespace: console-integration-test
      ownerReferences:
      - apiVersion: eventing.knative.dev/v1
      blockOwnerDeletion: true
      controller: true
      kind: Trigger
      name: e-0fa923f63c58387a
      uid: 7c2d2fb2-17e6-4358-8690-78e994a94926
      resourceVersion: "1254842422"
      uid: ee29f447-4d1b-480c-88d6-a9a34ec425e0
      spec:
      channel:
      apiVersion: messaging.knative.dev/v1beta1
      kind: KafkaChannel
      name: e-0fa923f63c58387a-kne-trigger
      reply:
      ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: e-0fa923f63c58387a
      namespace: console-integration-test
      subscriber:
      uri: http://broker-filter.knative-eventing.svc.cluster.local/triggers/console-integration-test/e-0fa923f63c58387a/7c2d2fb2-17e6-4358-8690-78e994a94926
      status:

Broker创建流程

  1. 通过接口或者cli生成broker
  2. mt-broker-controller 监听 borker
    有broker添加后根据broker生成channel,并检查相关的ingress、dispatcher等状态,更新到broker的status中。
    比如channel的url会更新到Annotations中,ingress会从这里获取去,其格式:http://{channel.name}.{namespace}.svc.cluster.local
  3. eventing-kafka-channel-controller 监听 channel,有变更后,会进行多很多具体资源的创建。
    • Kafka topic创建
    • Receiver如果不存在,进行service和deployment的创建
    • channel对应的service的创建,建立channel到receiver的关联关系,即确定上一步中url的具体服务地址,这样可以适配多种channel
    • Dispather的service和deployment的创建,一个channel对应一个dispatcher的service
    • 更新channel自己的信息,如Annotations和Labels

Trigger创建流程

  1. 通过接口或者cli生成trigger
  2. mt-broker-controller 监听 trigger,生成sub,主要是添加channel信息、生成对应的trigger url等,并更新trigger的状态
  3. eventing-controller 监听 sub,将变更同步到channel中,eventing-kafka-channel-controller将会根据channel的变更做相关的调整,这块同broker的创建流程中的相关部分

主要流程源码分析

根据上面的内容,流程上主要是:

  • broker创建后channel的生成
    其中涉及mt-broker-controller和eventing-kafka-channel-controller
  • trigger创建后sub的生成
    其中涉及mt-broker-controller

依赖的框架

基于knative.dev/pkg/injection/sharedmain
https://github.com/knative/pkg/blob/main/injection/sharedmain/main.go

mt-broker-controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"context"

"knative.dev/pkg/injection/sharedmain"

"knative.dev/eventing/pkg/reconciler/broker"
mttrigger "knative.dev/eventing/pkg/reconciler/broker/trigger"
)

const (
component = "mt-broker-controller"
)

func main() {
sharedmain.Main(
component,
broker.NewController,
mttrigger.NewController,
)
broker.Tracer.Shutdown(context.Background())
}

mt-broker-controller主要watch两类变更,broker和trigger的变更

eventing-kafka-channel-controller

主要监听channel的变更。

webhook

数据面

整体流程介绍

一条消息发送和接受的整个流程:

  1. 消息先进入ingress,在ingress中完成CloudEvent的转换和相关校验,然后根据broker中的channel的svc地址,发送过去;
  2. receiver收到ingress发送过来的消息,从request.Host中解析出namespace,channel name,然后调用kafka的producer发送,此处除了config-map config-kafka和secret kafka-cluster基本不和外部模块交互;
  3. dispatcher根据订阅启动对应的消费者,消费到消息后,根据sub中的url,推送到filter;配置变更来源于channel,一个channel对应一个服务,所有的订阅都在一个服务中;
  4. filter主要实现了过滤,收到dispatcher的请求后,进行过滤,然后发送到接收端;

ingress

ingress主要完成了两个功能:

  1. message校验以及转换成CloudEvent
  2. 从broker annotations中的knative.dev/channelAddress查到地址,并发送到相应的地址

receiver

recevier主要负责将event发送到broker中,主要内容如下:

  1. 维护kafkaclient,这里配置从cm中获取(timeout、KeepAlive、RequiredAcks(默认配置是-1)),但是不支持动态更新,secret的变更支持动态更新
  2. 接收http请求,从host中获取到namespace和channel name,其格式:http://{channel.name}.{namespace}.svc.cluster.local
  3. 调用kafkaclient发送event

dispatcher

dispatcher跟其他模块不同,它是一个channel一个service,此外相对其他模块复杂些,其中包含控制和数据两个方面的变更,主要内容如下:

  • 控制面
    1. Reconcile流程,感知channel的变更,根据sub部分的信息调整consumer,关闭减少的,启动新增的,并且会根据启动状态更新channel中的status
    2. 根据channel的Secret Watcher变更,同receiver,涉及kafka client的变更,此类变更比较重,会重启所有的consumer。这个过程也会触发reconcile流程。
  • 数据面
    相对简单,拿到消息后,根据配置,推送到filter,主要工作由channel.MessageDispatcher

filter

filter代码结构类似ingress,相对其他模块,较为简单,主要是接收http请求,处理后,在发送http请求,具体流程如下:

  1. 接收请求,并校验,从路径中解析出当前属于哪个trigger,格式如/triggers/namespace/name/uid,可以得到namespace和name
  2. 根据namespace和name获取trigger的具体配置
  3. 根据配置获取filter,如果配置了,按其进行过滤
  4. 未过滤掉的发送给接收端

其他

模块间主要通过http通信,其地址是其对应的标识,各模块负责服务的资源地址格式如下:

资源 所在服务 uri
broker ingress http://broker-ingress.knative-eventing.svc.cluster.local/{namespace}/{brokername}
channel receiver http://{channel.name}.{namespace}.svc.cluster.local
sub dispatcher
trigger filter http://broker-filter.knative-eventing.svc.cluster.local/triggers/{namespace}/{trigger.name}/{trigger.uid}