# Events Condition ## 原理 conditions 会对 `input.name` 执行逻辑运算,根据运算结果决定是否触发订阅者 ## 实现设计 ### 结构体 **TriggerEnvConfig** 用于接收、处理由事件框架传递来的信息。 **TriggerMgr** 用于存储事件触发过程中的关键信息,提供 condition 判断的能力。 ```go type TriggerEnvConfig struct { BusComponent string `json:"busComponent"` Inputs []*Input `json:"busTopic,omitempty"` Subscribers map[string]*Subscriber `json:"subscribers,omitempty"` Port string `json:"port,omitempty"` } type Input struct { Name string `json:"name"` Namespace string `json:"namespace,omitempty"` EventSource string `json:"eventSource"` Event string `json:"event"` } type Subscriber struct { SinkComponent string `json:"sinkComponent,omitempty"` DLSinkComponent string `json:"deadLetterSinkComponent,omitempty"` Topic string `json:"topic,omitempty"` DLTopic string `json:"deadLetterTopic,omitempty"` } type TriggerMgr struct { // 以 topic 为 key,使用该 topic 的 inputs 列表为 value 的映射关系 TopicMap map[string][]*Input // 以 inputs 名称列表组成的 cel 环境变量,用于 condition 条件判断 CelEnv *cel.Env // 以 input 名称为 key,InputStatus 为 value 的映射关系 InputStatuses map[string]*InputStatus // 以 condition 内容为 key,condition 的逻辑运算值为 value 的映射关系 Conditions *sync.Map } type InputStatus struct { LastMsgTime int64 LastEvent *common.TopicEvent Status bool } ``` 以下面的 Trigger 配置为例: ```yaml apiVersion: events.openfunction.io/v1alpha1 kind: Trigger metadata: name: trigger-a spec: eventBus: "default" inputs: - name: "A" eventSourceName: "my-es-a" eventName: "event-a" - name: "B" eventSourceName: "my-es-b" eventName: "event-b" subscribers: - condition: A || B sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: function-sample-serving-ksvc namespace: default topic: "metrics" ``` 根据事件总线中的 topic 命名规则({namespace}-{eventSourceName}-{eventName}),两个 input 所使用的 topic 名称如下: A: default-my-es-a-event-a B: default-my-es-b-event-b 将这些信息保存到 **TriggerMgr.TopicMap**中 ```go triggerManager.TopicMap = map[string][]*Input{} triggerManager.TopicMap["default-my-es-a-event-a"] = []{<input A>} triggerManager.TopicMap["default-my-es-b-event-b"] = []{<input B>} ``` 初始状态下,<input A> 的 **InputStatus** 为: ```go inputA := &InputStatus{ LastEvent: nil, LastMsgTime: 0, Status: false, } ``` 当 topic "default-my-es-a-event-a" 中获取到事件后,将执行: 1. 根据 topic name 找到并更新 TriggerMgr.TopicMap 中的 input 的状态 ```go func (t *TriggerMgr) setInputActive(e *common.TopicEvent) { if inputs, ok := t.TopicMap[e.Topic]; ok { for _, input := range inputs { if is, ok := t.InputStatuses[input.Name]; ok { is.Status = true is.LastMsgTime = time.Now().Unix() is.LastEvent = e } } } } ``` 此时的 inputA 为: ```go inputA = &InputStatus{ LastEvent: *event, LastMsgTime: <当前时间>, Status: true, } ``` 2. 触发一次对 condition 的判断,然后获取逻辑运算值为真的 conditions ```go func (t *TriggerMgr) getMatches(env *cel.Env) []string { var matches []string t.Conditions.Range(func(k, v interface{}) bool { ast := compile(env, k.(string), decls.Bool) program, _ := env.Program(ast) statuses := t.genStatus() out, _, _ := program.Eval(statuses) res := boolToPointer(out) if isTrue(res) { matches = append(matches, k.(string)) } t.Conditions.Store(k.(string), boolToPointer(out)) return true }) return matches } ``` 3. 根据映射关系找到满足 condition 的 subscriber 配置,将事件发送给 subscriber(此处 condition 为 "A || B",即当 inputA 的状态为 true 即可以触发) 4. 给指定 channel 发送一个消息,关闭同一个 topic 之前启动的 goroutine 5. 启动一个 goroutine,作用为: 1. 当收到停止消息时关闭自己,并重置 inputA ```go inputA = &InputStatus{ LastEvent: nil, LastMsgTime: 0, Status: false, } ``` 2. 在 60s 后关闭自己,并重置 inputA ```go inputA = &InputStatus{ LastEvent: nil, LastMsgTime: 0, Status: false, } ```