# Argo Events 的 Sensor 逻辑
## Sensor CRD
Sensor CRD 中有两部分用于对事件源进行逻辑运算,一部分是对事件的过滤:
> 针对每个 dependency (即事件源),设置一个 filters 。其作用是对 CloudEvents 格式的事件进行内容、元数据上的筛选过滤。
```yaml
spec:
dependencies:
- name: test-dep
eventSourceName: webhook
eventName: example
filters:
data:
- path: bucket
type: string
value:
- argo-workflow-input
- argo-workflow-input1
```
另一部分是对事件源的过滤:
> 针对每个 triggers ,设置一个 conditions 。其作用是在 conditions 中的事件源经过运算后为真值时(以收到事件为判断条件),才进入触发逻辑,否则不进入触发逻辑。
```yaml
spec:
dependencies:
- name: test-dep
eventSourceName: webhook
eventName: example
- name: test-dep-foo
eventSourceName: webhook
eventName: example-foo
triggers:
- template:
conditions: "test-dep" && "test-dep-foo"
name: webhook-workflow-trigger
```
## 实现
argo-events/sensors/listener.go 中的 **listenEvents** 函数负责监听并处理事件。
1. 先通过 **getDependencyExpression** 函数处理 Trigger 中的 .template.conditions ,它将返回一个事件源列表,告诉 Trigger 应该监听哪些事件源(其中 dependency 指的就是事件源)
处理 conditions 中的逻辑运算符:
```go
// Translate original expression which might contain group names
// to an expression only contains dependency names
translate := func(originalExpr string, parameters map[string]string) (string, error) {
originalExpr = strings.ReplaceAll(originalExpr, "&&", " + \"&&\" + ")
originalExpr = strings.ReplaceAll(originalExpr, "||", " + \"||\" + ")
originalExpr = strings.ReplaceAll(originalExpr, "-", "_")
originalExpr = strings.ReplaceAll(originalExpr, "(", "\"(\"+")
originalExpr = strings.ReplaceAll(originalExpr, ")", "+\")\"")
program, err := expr.Compile(originalExpr, expr.Env(parameters))
if err != nil {
logger.Errorw("Failed to compile original dependency expression", zap.Error(err))
return "", err
}
result, err := expr.Run(program, parameters)
if err != nil {
logger.Errorw("Failed to parse original dependency expression", zap.Error(err))
return "", err
}
newExpr := fmt.Sprintf("%v", result)
newExpr = strings.ReplaceAll(newExpr, "\"(\"", "(")
newExpr = strings.ReplaceAll(newExpr, "\")\"", ")")
return newExpr, nil
}
```
并对逻辑运算进行简化:
```go
// NewBoolExpression returns a Minifier instance
// It is used to simplify boolean epressions.
// For example, "(a || b || c) && (a && b)" can be simplified as "a && b"
// It is achieved by using Quine–McCluskey algorithm.
// See https://en.wikipedia.org/wiki/Quine%E2%80%93McCluskey_algorithm
func NewBoolExpression(expression string) (Minifier, error) {...}
```
2. 生成一个 **triggerMapping** ,用于做一个 dependencyExpression 和对应的 triggers 的映射
dependencyExpression 可以理解为通过逻辑运算符对 dependency 运算后得到的事件源集合。
```go
triggerMapping := make(map[string][]v1alpha1.Trigger)
for _, trigger := range sensor.Spec.Triggers {
depExpr, err := sensorCtx.getDependencyExpression(ctx, trigger)
if err != nil {
logger.Errorw("failed to get dependency expression", zap.Error(err))
return err
}
triggers, ok := triggerMapping[depExpr]
if !ok {
triggers = []v1alpha1.Trigger{}
}
triggers = append(triggers, trigger)
triggerMapping[depExpr] = triggers
}
```
3. 针对每个 **triggerMapping** 中的 item ,会通过一个 **subscribeFunc** 方法来处理触发逻辑
> 其中 ebDriver.SubscribeEventSources 函数的用途和参数说明如下:
>
> ```go
> // SubscribeEventSources is used to subscribe multiple event source dependencies
> // Parameter - ctx, context
> // Parameter - conn, eventbus connection
> // Parameter - group, NATS Streaming queue group or Kafka consumer group
> // Parameter - closeCh, channel to indicate to close the subscription
> // Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3"
> // Parameter - dependencies, array of dependencies information
> // Parameter - filter, a function used to filter the message
> // Parameter - action, a function to be triggered after all conditions meet
> SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error
> ```
>
> 这里提到的 filterFunc 的作用是根据 `.spec.dependencies.filters` 对事件进行筛选 :
>
> ```go
> filterFunc := func(depName string, event cloudevents.Event) bool {
> dep, ok := depMapping[depName]
> if !ok {
> return false
> }
> if dep.Filters == nil {
> return true
> }
> e := convertEvent(event)
> result, err := sensordependencies.Filter(e, dep.Filters)
> if err != nil {
> logger.Errorw("failed to apply filters", zap.Error(err))
> return false
> }
> return result
> }
> ```
>
> actionFunc 的作用是执行真正的触发逻辑:
>
> ```go
> actionFunc := func(events map[string]cloudevents.Event) {
> if err := sensorCtx.triggerActions(cctx, sensor, events, triggers); err != nil {
> logger.Errorw("failed to trigger actions", zap.Error(err))
> }
> }
> ```
```go
subscribeFunc := func() {
wg1.Add(1)
go func() {
defer wg1.Done()
// release the lock when goroutine exits
defer atomic.StoreUint32(&subLock, 0)
logger.Infof("started subscribing to events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID)
err = ebDriver.SubscribeEventSources(cctx, conn, group, closeSubCh, depExpression, deps, filterFunc, actionFunc)
if err != nil {
logger.Errorw("failed to subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err))
return
}
}()
}
```
4. 在 **SubscribeEventSources** 中,会创建一个订阅者来获取 NATS Streaming 中的事件
而只有在这个订阅者注册后出现的事件才会被处理
```go
sub, err := nsc.stanConn.QueueSubscribe(n.subject, group, func(m *stan.Msg) {
n.processEventSourceMsg(m, msgHolder, filter, action, log)
}, stan.DurableName(durableName),
stan.SetManualAckMode(),
stan.StartAt(pb.StartPosition_NewOnly),
stan.AckWait(1*time.Second),
stan.MaxInflight(len(msgHolder.depNames)+2))
```
这时候出现了一个很重要的结构体:**eventSourceMessageHolder** 。它的作用是对事件源的事件进行缓存,以及提供相关的元数据信息(可以根据各属性的注释了解它们的含义)。
```go
// eventSourceMessageHolder is a struct used to hold the message information of subscribed dependencies
type eventSourceMessageHolder struct {
// time that all conditions meet
lastMeetTime int64
// timestamp of last msg when all the conditions meet
latestGoodMsgTimestamp int64
expr *govaluate.EvaluableExpression
depNames []string
// Mapping of [eventSourceName + eventName]dependencyName
sourceDepMap map[string]string
parameters map[string]interface{}
msgs map[string]*eventSourceMessage
// A sync map used to cache the message IDs, it is used to guarantee Exact Once triggering
smap *sync.Map
}
```
在 **processEventSourceMsg** 函数中,会使用 eventSourceMessageHolder 对事件进行一系列的处理:
如果当前在处理的事件的时间戳早于或等于 eventSourceMessageHolder 中最近的事件的时间戳,那么就重置它的时间错为当前时间戳。
```go
if existingMsg, ok := msgHolder.msgs[depName]; ok {
if m.Timestamp == existingMsg.timestamp {
// Re-delivered latest messge, update delivery timestamp and return
existingMsg.lastDeliveredTime = now
msgHolder.msgs[depName] = existingMsg
return
} else if m.Timestamp < existingMsg.timestamp {
// Re-delivered old message, ack and return
msgHolder.ackAndCache(m, event.ID())
log.Debugw("Dropping this message because later ones also satisfy", "eventID", event.ID())
return
}
}
```
如果当前在处理的事件的时间戳晚于 eventSourceMessageHolder 中最近的事件的时间戳,那么就生成一个新的 eventSourceMessage 用于存储这个事件。
```go
msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now}
msgHolder.parameters[depName] = true
```
清理那些留存时间超过 10m 的事件(它们被认为已经在 NATS 侧被删除)。
```go
hasStale := false
for k, v := range msgHolder.msgs {
// Since the message is not acked, the server will keep re-sending it.
// If a message being held didn't get re-delivered in the last 10 minutes, treat it as stale.
if (now - v.lastDeliveredTime) > 10*60 {
msgHolder.reset(k)
hasStale = true
}
}
if hasStale {
return
}
```
判断事件是否满足过滤条件。如果满足条件则表示这一次的事件将会触发最终目标。设置 eventSourceMessageHolder.latestGoodMsgTimestamp 为这个过滤成功的事件的时间戳,同时设置 eventSourceMessageHolder.lastMeetTime 为当前的时间戳。
```go
result, err := msgHolder.expr.Evaluate(msgHolder.parameters)
if err != nil {
log.Errorf("failed to evaluate dependency expression: %v", err)
// TODO: how to handle this situation?
return
}
if result != true {
return
}
msgHolder.latestGoodMsgTimestamp = m.Timestamp
msgHolder.lastMeetTime = time.Now().Unix()
// Trigger actions
messages := make(map[string]cloudevents.Event)
for k, v := range msgHolder.msgs {
messages[k] = *v.event
}
```
执行触发逻辑。
```go
go action(messages)
```