# Argo Events 的 Sensor 逻辑 ## Sensor CRD Sensor CRD 中有两部分用于对事件源进行逻辑运算,一部分是对事件的过滤: > 针对每个 dependency (即事件源),设置一个 filters 。其作用是对 CloudEvents 格式的事件进行内容、元数据上的筛选过滤。 ```yaml spec: dependencies: - name: test-dep eventSourceName: webhook eventName: example filters: data: - path: bucket type: string value: - argo-workflow-input - argo-workflow-input1 ``` 另一部分是对事件源的过滤: > 针对每个 triggers ,设置一个 conditions 。其作用是在 conditions 中的事件源经过运算后为真值时(以收到事件为判断条件),才进入触发逻辑,否则不进入触发逻辑。 ```yaml spec: dependencies: - name: test-dep eventSourceName: webhook eventName: example - name: test-dep-foo eventSourceName: webhook eventName: example-foo triggers: - template: conditions: "test-dep" && "test-dep-foo" name: webhook-workflow-trigger ``` ## 实现 argo-events/sensors/listener.go 中的 **listenEvents** 函数负责监听并处理事件。 1. 先通过 **getDependencyExpression** 函数处理 Trigger 中的 .template.conditions ,它将返回一个事件源列表,告诉 Trigger 应该监听哪些事件源(其中 dependency 指的就是事件源) 处理 conditions 中的逻辑运算符: ```go // Translate original expression which might contain group names // to an expression only contains dependency names translate := func(originalExpr string, parameters map[string]string) (string, error) { originalExpr = strings.ReplaceAll(originalExpr, "&&", " + \"&&\" + ") originalExpr = strings.ReplaceAll(originalExpr, "||", " + \"||\" + ") originalExpr = strings.ReplaceAll(originalExpr, "-", "_") originalExpr = strings.ReplaceAll(originalExpr, "(", "\"(\"+") originalExpr = strings.ReplaceAll(originalExpr, ")", "+\")\"") program, err := expr.Compile(originalExpr, expr.Env(parameters)) if err != nil { logger.Errorw("Failed to compile original dependency expression", zap.Error(err)) return "", err } result, err := expr.Run(program, parameters) if err != nil { logger.Errorw("Failed to parse original dependency expression", zap.Error(err)) return "", err } newExpr := fmt.Sprintf("%v", result) newExpr = strings.ReplaceAll(newExpr, "\"(\"", "(") newExpr = strings.ReplaceAll(newExpr, "\")\"", ")") return newExpr, nil } ``` 并对逻辑运算进行简化: ```go // NewBoolExpression returns a Minifier instance // It is used to simplify boolean epressions. // For example, "(a || b || c) && (a && b)" can be simplified as "a && b" // It is achieved by using Quine–McCluskey algorithm. // See https://en.wikipedia.org/wiki/Quine%E2%80%93McCluskey_algorithm func NewBoolExpression(expression string) (Minifier, error) {...} ``` 2. 生成一个 **triggerMapping** ,用于做一个 dependencyExpression 和对应的 triggers 的映射 dependencyExpression 可以理解为通过逻辑运算符对 dependency 运算后得到的事件源集合。 ```go triggerMapping := make(map[string][]v1alpha1.Trigger) for _, trigger := range sensor.Spec.Triggers { depExpr, err := sensorCtx.getDependencyExpression(ctx, trigger) if err != nil { logger.Errorw("failed to get dependency expression", zap.Error(err)) return err } triggers, ok := triggerMapping[depExpr] if !ok { triggers = []v1alpha1.Trigger{} } triggers = append(triggers, trigger) triggerMapping[depExpr] = triggers } ``` 3. 针对每个 **triggerMapping** 中的 item ,会通过一个 **subscribeFunc** 方法来处理触发逻辑 > 其中 ebDriver.SubscribeEventSources 函数的用途和参数说明如下: > > ```go > // SubscribeEventSources is used to subscribe multiple event source dependencies > // Parameter - ctx, context > // Parameter - conn, eventbus connection > // Parameter - group, NATS Streaming queue group or Kafka consumer group > // Parameter - closeCh, channel to indicate to close the subscription > // Parameter - dependencyExpr, example: "(dep1 || dep2) && dep3" > // Parameter - dependencies, array of dependencies information > // Parameter - filter, a function used to filter the message > // Parameter - action, a function to be triggered after all conditions meet > SubscribeEventSources(ctx context.Context, conn Connection, group string, closeCh <-chan struct{}, dependencyExpr string, dependencies []Dependency, filter func(string, cloudevents.Event) bool, action func(map[string]cloudevents.Event)) error > ``` > > 这里提到的 filterFunc 的作用是根据 `.spec.dependencies.filters` 对事件进行筛选 : > > ```go > filterFunc := func(depName string, event cloudevents.Event) bool { > dep, ok := depMapping[depName] > if !ok { > return false > } > if dep.Filters == nil { > return true > } > e := convertEvent(event) > result, err := sensordependencies.Filter(e, dep.Filters) > if err != nil { > logger.Errorw("failed to apply filters", zap.Error(err)) > return false > } > return result > } > ``` > > actionFunc 的作用是执行真正的触发逻辑: > > ```go > actionFunc := func(events map[string]cloudevents.Event) { > if err := sensorCtx.triggerActions(cctx, sensor, events, triggers); err != nil { > logger.Errorw("failed to trigger actions", zap.Error(err)) > } > } > ``` ```go subscribeFunc := func() { wg1.Add(1) go func() { defer wg1.Done() // release the lock when goroutine exits defer atomic.StoreUint32(&subLock, 0) logger.Infof("started subscribing to events for triggers %s with client %s", fmt.Sprintf("[%s]", strings.Join(triggerNames, " ")), clientID) err = ebDriver.SubscribeEventSources(cctx, conn, group, closeSubCh, depExpression, deps, filterFunc, actionFunc) if err != nil { logger.Errorw("failed to subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err)) return } }() } ``` 4. 在 **SubscribeEventSources** 中,会创建一个订阅者来获取 NATS Streaming 中的事件 而只有在这个订阅者注册后出现的事件才会被处理 ```go sub, err := nsc.stanConn.QueueSubscribe(n.subject, group, func(m *stan.Msg) { n.processEventSourceMsg(m, msgHolder, filter, action, log) }, stan.DurableName(durableName), stan.SetManualAckMode(), stan.StartAt(pb.StartPosition_NewOnly), stan.AckWait(1*time.Second), stan.MaxInflight(len(msgHolder.depNames)+2)) ``` 这时候出现了一个很重要的结构体:**eventSourceMessageHolder** 。它的作用是对事件源的事件进行缓存,以及提供相关的元数据信息(可以根据各属性的注释了解它们的含义)。 ```go // eventSourceMessageHolder is a struct used to hold the message information of subscribed dependencies type eventSourceMessageHolder struct { // time that all conditions meet lastMeetTime int64 // timestamp of last msg when all the conditions meet latestGoodMsgTimestamp int64 expr *govaluate.EvaluableExpression depNames []string // Mapping of [eventSourceName + eventName]dependencyName sourceDepMap map[string]string parameters map[string]interface{} msgs map[string]*eventSourceMessage // A sync map used to cache the message IDs, it is used to guarantee Exact Once triggering smap *sync.Map } ``` 在 **processEventSourceMsg** 函数中,会使用 eventSourceMessageHolder 对事件进行一系列的处理: 如果当前在处理的事件的时间戳早于或等于 eventSourceMessageHolder 中最近的事件的时间戳,那么就重置它的时间错为当前时间戳。 ```go if existingMsg, ok := msgHolder.msgs[depName]; ok { if m.Timestamp == existingMsg.timestamp { // Re-delivered latest messge, update delivery timestamp and return existingMsg.lastDeliveredTime = now msgHolder.msgs[depName] = existingMsg return } else if m.Timestamp < existingMsg.timestamp { // Re-delivered old message, ack and return msgHolder.ackAndCache(m, event.ID()) log.Debugw("Dropping this message because later ones also satisfy", "eventID", event.ID()) return } } ``` 如果当前在处理的事件的时间戳晚于 eventSourceMessageHolder 中最近的事件的时间戳,那么就生成一个新的 eventSourceMessage 用于存储这个事件。 ```go msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now} msgHolder.parameters[depName] = true ``` 清理那些留存时间超过 10m 的事件(它们被认为已经在 NATS 侧被删除)。 ```go hasStale := false for k, v := range msgHolder.msgs { // Since the message is not acked, the server will keep re-sending it. // If a message being held didn't get re-delivered in the last 10 minutes, treat it as stale. if (now - v.lastDeliveredTime) > 10*60 { msgHolder.reset(k) hasStale = true } } if hasStale { return } ``` 判断事件是否满足过滤条件。如果满足条件则表示这一次的事件将会触发最终目标。设置 eventSourceMessageHolder.latestGoodMsgTimestamp 为这个过滤成功的事件的时间戳,同时设置 eventSourceMessageHolder.lastMeetTime 为当前的时间戳。 ```go result, err := msgHolder.expr.Evaluate(msgHolder.parameters) if err != nil { log.Errorf("failed to evaluate dependency expression: %v", err) // TODO: how to handle this situation? return } if result != true { return } msgHolder.latestGoodMsgTimestamp = m.Timestamp msgHolder.lastMeetTime = time.Now().Unix() // Trigger actions messages := make(map[string]cloudevents.Event) for k, v := range msgHolder.msgs { messages[k] = *v.event } ``` 执行触发逻辑。 ```go go action(messages) ```