# Week 12 — Update ## TL;DR Implemented the gossipsub "message cache" behavior in zig-libp2p. PR: https://github.com/zen-eth/eth-p2p-z/pull/83/commits/438b697d6ced03c50a481ab8df789b4a72bade45 --- ## Data model & core concepts - MessageID: canonical ID used by gossipsub (e.g., hash of message). - Message: the full message payload stored so it can be returned on IWANT. - Gossip window: a sliding window of N buckets representing recent time slices (e.g., heartbeat slots). Messages live in the bucket corresponding to the time they were accepted; the window advances on heartbeat (mcache.shift()). - Buckets: each bucket is a map: Topic -> set of MessageIDs (or small vector) and a map MessageID -> Message to allow retrieval. - Global index: a map MessageID -> (bucket_index, pointer-to-message) for O(1) has/get and to allow quick removal on eviction. - Size limits: per-message and global limits are enforced to avoid unbounded memory. Illustration (logical): - circular buffer of B buckets (B is gossip window size) - each bucket holds Topic => [msg_id1, msg_id2, ...] - one global map: msg_id -> (bucket, message_record) --- ## API - mcache.init(B, per_msg_limit, global_limit) - B = gossip window size in buckets (e.g., 3–5) - mcache.put(msg_id, topic, message) -> bool - Insert a message if not present. Returns true if inserted, false if already present. - Store message payload so we can answer IWANT. - Add msg_id under bucket[current_slot][topic]. - Update global index. - Enforce size bounds: evict oldest if needed during shift (not on put). - mcache.has(msg_id) -> bool - True if present in global index. - mcache.get(msg_id) -> ?Message - Return stored message payload, or null if not present. - mcache.get_gossip_ids(topic, max_ids) -> []MessageID - Return up to max_ids recent message IDs for the topic drawn from all buckets in the window (ordered newest -> oldest). - Used by heartbeat/IHAVE emission. - mcache.shift() -> void - Advance the gossip window by one bucket. Evict messages in the oldest bucket: - Remove msg_id from global index and free message payloads for those IDs. - Clear oldest bucket and reuse it as the new head. - mcache.stats() -> {size, buckets_used, hits, misses, evictions} Notes: - All operations should handle concurrent readers and writers with minimal contention (sharded locks or per-bucket locks). - get_gossip_ids must avoid returning IDs older than the gossip window. --- ## Invariants & behaviors - Deduplication: Every incoming message should check mcache.has(msg_id) before validate/deliver. If true, do not re-process or re-add. - IWANT responses: If a peer requests a msg_id in an IWANT and mcache.get(msg_id) returns a Message, include the Message in reply RPC. - IHAVE emission: heartbeat will call get_gossip_ids(topic, D_lazy) to choose IDs to advertise via IHAVE. IHAVE should not advertise IDs that have been evicted. - Eviction: mcache.shift() deterministically evicts the oldest bucket. This keeps retention bounded by B heartbeats. - Per-message memory: store a compact representation (owned bytes or zero-copy reference where safe). Avoid duplicating large payloads unnecessarily. - Limits enforcement: if a single bucket receives an abnormal number of messages, use per-bucket caps and drop or deprioritize extras to avoid DoS memory spikes. --- ## Pseudocode Initialization: ```pseudo function init(B, per_bucket_cap, global_cap): buckets = [map() for i in 0..B-1] // each: topic -> set(msg_id) global_index = hashmap<msg_id, MessageRecord> current_slot = 0 stats = {hits:0, misses:0, evictions:0} ``` Put: ```pseudo function put(msg_id, topic, message): if global_index.contains(msg_id): return false // insert into current slot bucket = buckets[current_slot] set_for_topic = bucket.get_or_create(topic) if set_for_topic.size >= per_bucket_cap: // backpressure: drop or skip storing this msg in mcache return false set_for_topic.add(msg_id) global_index[msg_id] = {slot: current_slot, topic: topic, msg: message} return true ``` Has / Get: ```pseudo function has(msg_id): return global_index.contains(msg_id) function get(msg_id): if global_index.contains(msg_id): stats.hits += 1 return global_index[msg_id].msg else: stats.misses += 1 return null ``` Get gossip ids: ```pseudo function get_gossip_ids(topic, max_ids): ids = [] // iterate buckets newest -> oldest for i in 0..B-1: slot = (current_slot - i) mod B for id in buckets[slot].get(topic, empty): ids.append(id) if ids.length >= max_ids: return ids return ids ``` Shift: ```pseudo function shift(): oldest_slot = (current_slot + 1) mod B // depends on rotation direction for each topic, set in buckets[oldest_slot]: for id in set: remove global_index[id] free message payload stats.evictions += 1 buckets[oldest_slot].clear() current_slot = oldest_slot ``` --- ## Next - Wire mcache.put/has/get into the receive RPC path (mark seen / mcache.put on valid messages). - Use mcache.get in IWANT responder to include full messages. - Use mcache.get_gossip_ids(topic) within heartbeat IHAVE emission. - Add unit and concurrency tests above, then run interop tests to verify IHAVE/IWANT behavior with other implementations. --- ## References - Gossipsub v1.0 — message cache & gossip windows: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md#message-cache