# Week 12 — Update
## TL;DR
Implemented the gossipsub "message cache" behavior in zig-libp2p.
PR: https://github.com/zen-eth/eth-p2p-z/pull/83/commits/438b697d6ced03c50a481ab8df789b4a72bade45
---
## Data model & core concepts
- MessageID: canonical ID used by gossipsub (e.g., hash of message).
- Message: the full message payload stored so it can be returned on IWANT.
- Gossip window: a sliding window of N buckets representing recent time slices (e.g., heartbeat slots). Messages live in the bucket corresponding to the time they were accepted; the window advances on heartbeat (mcache.shift()).
- Buckets: each bucket is a map: Topic -> set of MessageIDs (or small vector) and a map MessageID -> Message to allow retrieval.
- Global index: a map MessageID -> (bucket_index, pointer-to-message) for O(1) has/get and to allow quick removal on eviction.
- Size limits: per-message and global limits are enforced to avoid unbounded memory.
Illustration (logical):
- circular buffer of B buckets (B is gossip window size)
- each bucket holds Topic => [msg_id1, msg_id2, ...]
- one global map: msg_id -> (bucket, message_record)
---
## API
- mcache.init(B, per_msg_limit, global_limit)
- B = gossip window size in buckets (e.g., 3–5)
- mcache.put(msg_id, topic, message) -> bool
- Insert a message if not present. Returns true if inserted, false if already present.
- Store message payload so we can answer IWANT.
- Add msg_id under bucket[current_slot][topic].
- Update global index.
- Enforce size bounds: evict oldest if needed during shift (not on put).
- mcache.has(msg_id) -> bool
- True if present in global index.
- mcache.get(msg_id) -> ?Message
- Return stored message payload, or null if not present.
- mcache.get_gossip_ids(topic, max_ids) -> []MessageID
- Return up to max_ids recent message IDs for the topic drawn from all buckets in the window (ordered newest -> oldest).
- Used by heartbeat/IHAVE emission.
- mcache.shift() -> void
- Advance the gossip window by one bucket. Evict messages in the oldest bucket:
- Remove msg_id from global index and free message payloads for those IDs.
- Clear oldest bucket and reuse it as the new head.
- mcache.stats() -> {size, buckets_used, hits, misses, evictions}
Notes:
- All operations should handle concurrent readers and writers with minimal contention (sharded locks or per-bucket locks).
- get_gossip_ids must avoid returning IDs older than the gossip window.
---
## Invariants & behaviors
- Deduplication: Every incoming message should check mcache.has(msg_id) before validate/deliver. If true, do not re-process or re-add.
- IWANT responses: If a peer requests a msg_id in an IWANT and mcache.get(msg_id) returns a Message, include the Message in reply RPC.
- IHAVE emission: heartbeat will call get_gossip_ids(topic, D_lazy) to choose IDs to advertise via IHAVE. IHAVE should not advertise IDs that have been evicted.
- Eviction: mcache.shift() deterministically evicts the oldest bucket. This keeps retention bounded by B heartbeats.
- Per-message memory: store a compact representation (owned bytes or zero-copy reference where safe). Avoid duplicating large payloads unnecessarily.
- Limits enforcement: if a single bucket receives an abnormal number of messages, use per-bucket caps and drop or deprioritize extras to avoid DoS memory spikes.
---
## Pseudocode
Initialization:
```pseudo
function init(B, per_bucket_cap, global_cap):
buckets = [map() for i in 0..B-1] // each: topic -> set(msg_id)
global_index = hashmap<msg_id, MessageRecord>
current_slot = 0
stats = {hits:0, misses:0, evictions:0}
```
Put:
```pseudo
function put(msg_id, topic, message):
if global_index.contains(msg_id):
return false
// insert into current slot
bucket = buckets[current_slot]
set_for_topic = bucket.get_or_create(topic)
if set_for_topic.size >= per_bucket_cap:
// backpressure: drop or skip storing this msg in mcache
return false
set_for_topic.add(msg_id)
global_index[msg_id] = {slot: current_slot, topic: topic, msg: message}
return true
```
Has / Get:
```pseudo
function has(msg_id):
return global_index.contains(msg_id)
function get(msg_id):
if global_index.contains(msg_id):
stats.hits += 1
return global_index[msg_id].msg
else:
stats.misses += 1
return null
```
Get gossip ids:
```pseudo
function get_gossip_ids(topic, max_ids):
ids = []
// iterate buckets newest -> oldest
for i in 0..B-1:
slot = (current_slot - i) mod B
for id in buckets[slot].get(topic, empty):
ids.append(id)
if ids.length >= max_ids:
return ids
return ids
```
Shift:
```pseudo
function shift():
oldest_slot = (current_slot + 1) mod B // depends on rotation direction
for each topic, set in buckets[oldest_slot]:
for id in set:
remove global_index[id]
free message payload
stats.evictions += 1
buckets[oldest_slot].clear()
current_slot = oldest_slot
```
---
## Next
- Wire mcache.put/has/get into the receive RPC path (mark seen / mcache.put on valid messages).
- Use mcache.get in IWANT responder to include full messages.
- Use mcache.get_gossip_ids(topic) within heartbeat IHAVE emission.
- Add unit and concurrency tests above, then run interop tests to verify IHAVE/IWANT behavior with other implementations.
---
## References
- Gossipsub v1.0 — message cache & gossip windows: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#message-cache