**Design document – 2021-10-27**
**Goal**: Provide lmpd data to Energy Acuity
Desired behavior:
- Lmpd data will be sent to a Kafka topic with the data directly inside the message.
- We will send one message per timestamp.
- If some part of the data is modified, we will resend the full message for each impacted timestamp. Each message will be tagged with a “published timestamp” field. For each timestamp, the message with the latest “published timestamp” should be authoritative
**Message shape**:
```
{
"timestamp": "2021-01-01T00:00:00+00:00",
"published_timestamp": "2021-01-01T01:23:00+00:00",
"lmpd": [
{
"monitored_uid": "A",
"contingency_uid": "C",
"pnode_uid": "P1",
"gsf": 0.3,
"shadow_price": 1000,
# should we directly provide the impact here
},
{
"monitored_uid": "A",
"contingency_uid": "C",
"pnode_uid": "P2",
"gsf": -0.4,
"shadow_price": 1000,
},
],
"meta": None, # could be used for constraint label mapping??
}
```
**Implementation**:
1st loop:
- A process listens to the lmpd output topic with raw constraints.
- For each message, we update a state that materializes all the records that we’ve seen so far
- We pass down the message with the latest available mapping raw_constraint_uid -> logical_constraint_uid
- If it’s a backfill message, we send the full content of the timestamp
2nd loop:
- The same process listens to a mapping topic that provides the raw_constraint_uid -> logical_constraint_uid (+ other metadata such as labels)
- Find out what mappings changed.
- For each changed mapping, find the timestamps that contains constraints impacted by the update
- Resend messages for those impacted timestamps, with the latest version of the mapping
**State implementation**:
```
class Message(NamedTuple):
pass
class RawMessage(NamedTuple):
pass
class MappingEvent(NamedTuple):
pass
class LmpdRecord(NamedTuple):
pass
class RawConstraintUid(NamedTuple):
pass
class LogicalConstraintUid(NamedTuple):
pass
class State:
def __init__(self) -> None:
self._mappings: Dict[RawConstraintUid, LogicalConstraintUid]
# could that be an arrow table instead?
# the raw constraint uids could be interned
self._db: Dict[LocalizedDateTime, Dict[RawConstraintUid, List[LmpdRecord]]] = {}
def update_mapping(self, mapping_event: MappingEvent) -> List[Message]:
# update mappings
# get the impacted raw constraint uids
# get the timestamps impacted by the mapping changes
# resend those timestamps with new mappings
pass
def add_raw_message(self, raw_message: RawMessage) -> Message:
# add raw message to in memory db
# if backfill: add // if not backfill: overwrite
# use the mapping to convert raw_constraint uids -> logical constraint uids
# aggregate by logical uids if necessary
pass
```
**Notes**:
- As usual, we need to pay attention to the order between commits, state storage, flush of the producer (…)
- We can store the state on a persisted volume for a faster restart