# Events Condition
## 原理
conditions 会对 `input.name` 执行逻辑运算,根据运算结果决定是否触发订阅者
## 实现设计
### 结构体
**TriggerEnvConfig** 用于接收、处理由事件框架传递来的信息。
**TriggerMgr** 用于存储事件触发过程中的关键信息,提供 condition 判断的能力。
```go
type TriggerEnvConfig struct {
BusComponent string `json:"busComponent"`
Inputs []*Input `json:"busTopic,omitempty"`
Subscribers map[string]*Subscriber `json:"subscribers,omitempty"`
Port string `json:"port,omitempty"`
}
type Input struct {
Name string `json:"name"`
Namespace string `json:"namespace,omitempty"`
EventSource string `json:"eventSource"`
Event string `json:"event"`
}
type Subscriber struct {
SinkComponent string `json:"sinkComponent,omitempty"`
DLSinkComponent string `json:"deadLetterSinkComponent,omitempty"`
Topic string `json:"topic,omitempty"`
DLTopic string `json:"deadLetterTopic,omitempty"`
}
type TriggerMgr struct {
// 以 topic 为 key,使用该 topic 的 inputs 列表为 value 的映射关系
TopicMap map[string][]*Input
// 以 inputs 名称列表组成的 cel 环境变量,用于 condition 条件判断
CelEnv *cel.Env
// 以 input 名称为 key,InputStatus 为 value 的映射关系
InputStatuses map[string]*InputStatus
// 以 condition 内容为 key,condition 的逻辑运算值为 value 的映射关系
Conditions *sync.Map
}
type InputStatus struct {
LastMsgTime int64
LastEvent *common.TopicEvent
Status bool
}
```
以下面的 Trigger 配置为例:
```yaml
apiVersion: events.openfunction.io/v1alpha1
kind: Trigger
metadata:
name: trigger-a
spec:
eventBus: "default"
inputs:
- name: "A"
eventSourceName: "my-es-a"
eventName: "event-a"
- name: "B"
eventSourceName: "my-es-b"
eventName: "event-b"
subscribers:
- condition: A || B
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: function-sample-serving-ksvc
namespace: default
topic: "metrics"
```
根据事件总线中的 topic 命名规则({namespace}-{eventSourceName}-{eventName}),两个 input 所使用的 topic 名称如下:
A: default-my-es-a-event-a
B: default-my-es-b-event-b
将这些信息保存到 **TriggerMgr.TopicMap**中
```go
triggerManager.TopicMap = map[string][]*Input{}
triggerManager.TopicMap["default-my-es-a-event-a"] = []{<input A>}
triggerManager.TopicMap["default-my-es-b-event-b"] = []{<input B>}
```
初始状态下,<input A> 的 **InputStatus** 为:
```go
inputA := &InputStatus{
LastEvent: nil,
LastMsgTime: 0,
Status: false,
}
```
当 topic "default-my-es-a-event-a" 中获取到事件后,将执行:
1. 根据 topic name 找到并更新 TriggerMgr.TopicMap 中的 input 的状态
```go
func (t *TriggerMgr) setInputActive(e *common.TopicEvent) {
if inputs, ok := t.TopicMap[e.Topic]; ok {
for _, input := range inputs {
if is, ok := t.InputStatuses[input.Name]; ok {
is.Status = true
is.LastMsgTime = time.Now().Unix()
is.LastEvent = e
}
}
}
}
```
此时的 inputA 为:
```go
inputA = &InputStatus{
LastEvent: *event,
LastMsgTime: <当前时间>,
Status: true,
}
```
2. 触发一次对 condition 的判断,然后获取逻辑运算值为真的 conditions
```go
func (t *TriggerMgr) getMatches(env *cel.Env) []string {
var matches []string
t.Conditions.Range(func(k, v interface{}) bool {
ast := compile(env, k.(string), decls.Bool)
program, _ := env.Program(ast)
statuses := t.genStatus()
out, _, _ := program.Eval(statuses)
res := boolToPointer(out)
if isTrue(res) {
matches = append(matches, k.(string))
}
t.Conditions.Store(k.(string), boolToPointer(out))
return true
})
return matches
}
```
3. 根据映射关系找到满足 condition 的 subscriber 配置,将事件发送给 subscriber(此处 condition 为 "A || B",即当 inputA 的状态为 true 即可以触发)
4. 给指定 channel 发送一个消息,关闭同一个 topic 之前启动的 goroutine
5. 启动一个 goroutine,作用为:
1. 当收到停止消息时关闭自己,并重置 inputA
```go
inputA = &InputStatus{
LastEvent: nil,
LastMsgTime: 0,
Status: false,
}
```
2. 在 60s 后关闭自己,并重置 inputA
```go
inputA = &InputStatus{
LastEvent: nil,
LastMsgTime: 0,
Status: false,
}
```