**Design document – 2021-10-27** **Goal**: Provide lmpd data to Energy Acuity Desired behavior: - Lmpd data will be sent to a Kafka topic with the data directly inside the message. - We will send one message per timestamp. - If some part of the data is modified, we will resend the full message for each impacted timestamp. Each message will be tagged with a “published timestamp” field. For each timestamp, the message with the latest “published timestamp” should be authoritative **Message shape**: ``` { "timestamp": "2021-01-01T00:00:00+00:00", "published_timestamp": "2021-01-01T01:23:00+00:00", "lmpd": [ { "monitored_uid": "A", "contingency_uid": "C", "pnode_uid": "P1", "gsf": 0.3, "shadow_price": 1000, # should we directly provide the impact here }, { "monitored_uid": "A", "contingency_uid": "C", "pnode_uid": "P2", "gsf": -0.4, "shadow_price": 1000, }, ], "meta": None, # could be used for constraint label mapping?? } ``` **Implementation**: 1st loop: - A process listens to the lmpd output topic with raw constraints. - For each message, we update a state that materializes all the records that we’ve seen so far - We pass down the message with the latest available mapping raw_constraint_uid -> logical_constraint_uid - If it’s a backfill message, we send the full content of the timestamp 2nd loop: - The same process listens to a mapping topic that provides the raw_constraint_uid -> logical_constraint_uid (+ other metadata such as labels) - Find out what mappings changed. - For each changed mapping, find the timestamps that contains constraints impacted by the update - Resend messages for those impacted timestamps, with the latest version of the mapping **State implementation**: ``` class Message(NamedTuple): pass class RawMessage(NamedTuple): pass class MappingEvent(NamedTuple): pass class LmpdRecord(NamedTuple): pass class RawConstraintUid(NamedTuple): pass class LogicalConstraintUid(NamedTuple): pass class State: def __init__(self) -> None: self._mappings: Dict[RawConstraintUid, LogicalConstraintUid] # could that be an arrow table instead? # the raw constraint uids could be interned self._db: Dict[LocalizedDateTime, Dict[RawConstraintUid, List[LmpdRecord]]] = {} def update_mapping(self, mapping_event: MappingEvent) -> List[Message]: # update mappings # get the impacted raw constraint uids # get the timestamps impacted by the mapping changes # resend those timestamps with new mappings pass def add_raw_message(self, raw_message: RawMessage) -> Message: # add raw message to in memory db # if backfill: add // if not backfill: overwrite # use the mapping to convert raw_constraint uids -> logical constraint uids # aggregate by logical uids if necessary pass ``` **Notes**: - As usual, we need to pay attention to the order between commits, state storage, flush of the producer (…) - We can store the state on a persisted volume for a faster restart