# Build Ethereum Portal Network Go client from scratch [TOC] ## Motivation [EthStorage](https://ethstorage.io/#/), a decentralized storage protocol based on Ethereum, hopes to use the portal network to support their proposed web3:// protocol. Because EthStorage is based on the Go technology stack, and there were only three portal network clients: rust, nim and js, so we started to build a [Go client](https://github.com/zen-eth/shisui) from scratch. Read this [blog](https://blog.ethportal.net/posts/shisui-story) for further details. ## Inspection By reading the Spec of the Portal Network, we learned that its core mechanism is as follows. - Its wire protocol is a DHT network protocol based on discv5, if the length of a single content is greater than the limit of a single discv5 packet, it needs to be combined with the utp protocol to transfer multiple discv5 packets. - At the same time, content uses SSZ to implement encoding and decoding. - On top of the decentralized storage layer, the JSON RPC interface is provided externally to provide services. - Custom sub protocols can be implemented on top of the wire protocol, and there are currently three sub networks: history, beacon and state. ## Design & Implemetation ### Technical Decision After understanding the core mechanism of the portal network, we began to design how to implement the portal network Go client based on the Go ecosystem projects in the current ethereum community. - [go-ethereum](https://github.com/ethereum/go-ethereum) was chosen as the main dependency because it already implements the discv5 protocol, provides a JSON RPC framework, and also provides basic packages such as log, metrics, etc. - Use [fastssz](https://github.com/ferranbt/fastssz) and [zrnt](https://github.com/protolambda/zrnt) to define the portal wire type. - Fork [storj's UTP library](https://github.com/storj/utp-go) and extend its underlying UDP library into a discv5 package. ### Overall Architecture ```go /* Overall Architecture +-------------------------------------------------------------------------------------------------+ | External Interface (JSON-RPC) | | (e.g., history_getContent, beacon_offer, state_addEnr, discv5_nodeInfo) | +-------------------------------------------------------------------------------------------------+ | | | | Calls specific methods | Calls specific methods | Calls specific methods ▼ ▼ ▼ +-----------------------+ +-----------------------+ +-----------------------+ | history.API | | beacon.API | | state.API | |-----------------------| |-----------------------| |-----------------------| | Embeds PortalProtoAPI | | Embeds PortalProtoAPI | | Embeds PortalProtoAPI | | | | Uses ConsensusClient | | | +-----------┬-----------+ +-----------┬-----------+ +-----------┬-----------+ | | | | Delegates generic calls | Delegates generic calls | Delegates generic calls └───────────────────┬─────────────────┴───────────────────┬─────────────────┘ ▼ │ Uses specific data +-------------------------------------------------+ │ (e.g., finalized root) | portalwire.PortalProtocolAPI | │ | (Generic RPC wrapper for PortalProtocol methods)| ▼ | (e.g., AddEnr, FindNodes, RecursiveFindContent) | <-----------------+-----------------+ +-----------------------┬-------------------------+ | beacon.ConsensusClient| | +-----------------------+ | Calls core protocol methods ▼ +-------------------------------------------------------------------------------------------------+ | portalwire.PortalProtocol (Core Logic) | |-------------------------------------------------------------------------------------------------| | - Manages DHT (Table) & Node Discovery (via DiscV5) | | - Handles Wire Protocol Messages (PING, FINDNODES, FINDCONTENT, OFFER, ACCEPT) via TalkReq/Resp | | - Facilitates Content Transfer via uTP (ZenEthUtp) | | - Interacts with Local Storage (storage.ContentStorage) | | - Provides contentQueue for received OFFERed data | | - Provides core methods like ContentLookup, offer, findNodes, ping, etc. | +-------┬---------------┬---------------┬---------------┬---------------┬---------------┬---------+ | | | | | | Uses │ Uses │ Uses │ Uses │ Uses │ Uses │ Uses ▼ ▼ ▼ ▼ ▼ ▼ +---------------+ +-------------+ +-------------+ +-------------+ +-------------+ +-------------+ | history.Network | beacon.Network| state.Network | discover.UDPv5| ZenEthUtp | storage.Store | | (Validation, | (Validation, | (Validation, | (Underlying | (uTP Transfer)| (Local Disk) | | Processing) | Processing) | Processing) | Transport) | | | +---------------+ +-------------+ +-------------+ +-------------+ +-------------+ +-------------+ ▲ ▲ ▲ | Reads from | Reads from | Reads from └---------------┴---------------┴--------------------------------------------------------- contentQueue */ ``` - The underlying core data structure 'PortalProtocol' wraps Discv5 services, Utp services and Storage services to implement the core portal wire protocol, including PING, FINDNODES, FINDCONTENT, OFFER/ACCEPT. - After the sub-network receives the content, it needs to be validated before it can store and gossip. - The API receives external requests and converts them into underlay network requests, which are proxied to the 'PortalProtocol' for processing. ### Data Distribution Mode Pull vs Push ```go /* Content Mechanisms: Pull vs Push --------------------------------------------------------------------- Pull Mechanism (FINDCONTENT / CONTENT) - Initiated by Requester --------------------------------------------------------------------- Goal: Node A wants content with key K. +-------------+ +-------------+ | Node A | | Node B | | (Requester) | | (Potential | | | | Holder) | +-------------+ +-------------+ | | | 1. Needs content K | | (e.g., via `RecursiveFindContent`) | | | | 2. Sends FINDCONTENT(K) -----------------------> | | (via `findContent` -> discv5.TalkRequest) | | | | | 3. Receives FINDCONTENT(K) | | (via `handleTalkRequest`) | | | | 4. Checks local storage for K | | (via `storage.Get`) | | | |---- YES (Found K) ------>| 5a. Prepare CONTENT Response | | | | - Raw Data (if small) | | | | - uTP ID (if large) | | | | (Start uTP Acceptor) | | | +----------------------------+ | | | | |<--------------------------------------| 6a. Send CONTENT(Data/uTP ID) | | (via TalkResponse) | | | |---- NO (Not Found K) --->| 5b. Find closest nodes to K | | | | (via `table.FindClosest`) | | | +-----------------------------+ | | | | |<--------------------------------------| 6b. Send CONTENT(ENRs) | | (via TalkResponse) | | | 7. Receives CONTENT response | | (via `processContent`) | | | |---- Response = Raw Data ---->| 8a. Use Content | | +-------------------+ | |---- Response = ENRs -------->| 8b. Add nodes to lookup queue | | | (Continue lookup process) | | +-------------------------------+ | |---- Response = uTP ID ------>| 8c. Initiate uTP Download | | | (`Utp.DialWithCid`) | | +-------------------------------+ | | | | |--------------------->| 9c. uTP Connection Established | | | (via `Utp.AcceptWithCid`) | |<---------------------| 10c. uTP Data Transfer | | | | 11c. Use Content | | +---------------------------------------+ +---------------------------------------+ --------------------------------------------------------------------- Push Mechanism (OFFER / ACCEPT) - Initiated by Holder/Offerer --------------------------------------------------------------------- Goal: Node A has content K1, K2 and wants to offer it to Node B. +-------------+ +-------------+ | Node A | | Node B | | (Offerer) | | (Receiver) | +-------------+ +-------------+ | | | 1. Has content K1, K2 | | (e.g., recently received or stored) | | | | 2. Sends OFFER(K1, K2) ------------------------> | | (via `offer` -> discv5.TalkRequest) | | | | | 3. Receives OFFER(K1, K2) | | (via `handleTalkRequest`) | | | | 4. Filters Keys (K1, K2): | | - Check radius (`storage.Radius`) | | - Check if already stored (`storage.Has`) | | - Check rate limits / ongoing transfers | | (via `filterContentKeys`) | | | |---- Keys Accepted (e.g., K2) -->| 5a. Generate uTP ID | | | | (`Utp.CidWithAddr`) | | | | Start uTP Acceptor | | | +---------------------+ | | | | |<--------------------------------------| 6a. Send ACCEPT(Bitlist/Codes=[K2], uTP ID) | | (via TalkResponse) | | | |---- No Keys Accepted ---------->| 5b. Prepare ACCEPT (Empty) | | | +--------------------------+ | | | | |<--------------------------------------| 6b. Send ACCEPT(Bitlist/Codes=[], uTP ID=0) | | (via TalkResponse) | | | 7. Receives ACCEPT response | | (via `processOffer`) | | | |---- Keys Accepted (K2) ----->| 8a. Initiate uTP Upload (K2) | | | (`Utp.DialWithCid`) | | +-------------------------------+ | | | | |--------------------->| 9a. uTP Connection Established | | | (via `Utp.AcceptWithCid`) | | | | |<---------------------| 10a. uTP Data Transfer (K2) | | | | 11a. Upload Complete | | 11b. Receive Data (K2) | | | Put on `contentQueue` | | | (Sub-protocol validates/stores) | | | |---- No Keys Accepted ------>| 8b. Do Nothing | | | +-------------------+ | | | +-------------------------------------------------------------+ */ ``` Supports both Pull and Push data distribution modes for portal wire protocol. ### Queue Strategy ```go /* Queue Mechanisms: contentQueue & offerQueue --------------------------------------------------------------------- contentQueue: Inbound Content Flow (Post-uTP from OFFER/ACCEPT) --------------------------------------------------------------------- Purpose: Decouples network reception/uTP download from sub-protocol processing. Allows the network layer to quickly accept data and hand it off. +--------------------------------+ +--------------------------------+ +--------------------------------+ | Network Listener | | portalwire.PortalProtocol | | Sub-Protocol Network | | (Receives OFFER, Sends ACCEPT) | | | | (e.g., history.Network) | +--------------------------------+ +--------------------------------+ +--------------------------------+ | | | | 1. Receives OFFER | | | (via handleOffer) | | | | | | 2. Filters Keys, Accepts some | | | | | | 3. Sends ACCEPT (with uTP ID) | | | Starts uTP Acceptor Goroutine --> | 4. Goroutine: Accepts uTP Conn | | | (via Utp.AcceptWithCid) | | | | | | 5. Goroutine: Reads uTP Data | | | | | | 6. Goroutine: Packages data into | | | *ContentElement (keys, content) | | | | | | 7. Goroutine: Pushes onto queue ---> | contentQueue (chan *ContentElement) | | | | | | | | <--- 8. Reads from queue | | | | (in processContentLoop) | | | | | | | | 9. Validates Content | | | | (e.g., history.validateContent) | | | | | | | | 10. Stores Content (if valid) | | | | (via portalProtocol.Put -> storage)| +--------------------------------------+ +--------------------------------------+ --------------------------------------------------------------------- offerQueue: Outbound Offer Flow (e.g., Gossip) --------------------------------------------------------------------- Purpose: Buffers outgoing OFFER requests, manages concurrency of sending offers. +--------------------------------+ +--------------------------------+ +--------------------------------+ | Initiator (e.g., Gossip) | | portalwire.PortalProtocol | | Network Sender | | | | | | (Sends OFFER via discv5) | +--------------------------------+ +--------------------------------+ +--------------------------------+ | | | | 1. Decides to offer content K | | | to Node B | | | (e.g., based on radius/events) | | | | | | 2. Creates *OfferRequestWithNode | | | (contains Node B, keys [K]) | | | | | | 3. Pushes request onto queue ------> | offerQueue (chan *OfferRequest...) | <--- 4. Reads from queue | | | | (by offerWorker goroutine) | | | | | | | | 5. Calls p.offer(req.Node, req.Req)| | | | | | | <------------------------------------+ | | | 6. p.offer: Marshals OFFER message | | | Sends via discv5.TalkRequest ---> | 7. Sends OFFER(K) to Node B | | | | (Handles response internally) | | | | | +--------------------------------------+ +--------------------------------------+ */ ``` In order to ensure system stability and performance, the internal queue is used to provide the capabilities of ratelimit and backpressure. ### Concurrency Design ```go /* Goroutine Design Key: [Component] - A major struct or module. (Goroutine: X) - An independent, concurrently running function/loop X. ----> - Function call / Data flow / Spawning goroutine. {Channel} - A Go channel used for communication. +-----------------------------------------------------------------------------------+ | Main Application Thread | | (Initializes components, calls Start()) | +-----------------------------------------------------------------------------------+ | | Calls Start() on PortalProtocol, DiscV5, UTP, SubProtocols ▼ +-----------------------------------------------------------------------------------+ | [portalwire.PortalProtocol] | |-----------------------------------------------------------------------------------| | | | (Goroutine: table.loop) <-------------------------------------- Reads timer/events| | - Manages routing table updates, pings, refreshes. | | | | (Goroutine: offerWorker) x N <---- {offerQueue} <---- [Gossip/Other Logic] | | - Reads outgoing offer requests. | | - Calls p.offer() -> Sends OFFER via [DiscV5]. | | | | (Goroutine: handleOffer Download) <---- Spawned by handleOffer() | | - Accepts uTP connection via [ZenEthUtp]. | | - Reads data from uTP stream. | | - Pushes received *ContentElement ----> {contentQueue} | | | | (Goroutine: ContentLookup) <---- Spawned by RecursiveFindContent() call | | - Manages recursive lookup state. | | - Spawns temporary query goroutines -> p.findContent() -> [DiscV5] | | - Processes responses (Data, ENRs, uTP ID). | | - If uTP ID, may spawn temporary goroutine -> [ZenEthUtp].DialWithCid() | | | +-----------------------------------------------------------------------------------+ | | | | | | Registers | Interacts with | Interacts with | Reads from | Pushes to | Talk Handler | | | | ▼ ▼ ▼ ▼ ▼ +---------------+ +---------------+ +---------------+ +-----------------+ +-----------------+ | [DiscV5] | | [ZenEthUtp] | | [SubProtocol] | | {contentQueue} | | {offerQueue} | |---------------| |---------------| |---------------| +-----------------+ +-----------------+ | (Goroutine: | | (Goroutine: | | (Goroutine: | | ListenLoop) | | ListenLoop) | | processLoop) | | - Reads UDP. | | - Listens for | | - Reads from | | - Handles | | uTP SYN. | | {contentQueue}| | discv5 msgs.| | - Spawns conn | | - Validates | | - If TALKREQ, | | handlers. | | content. | | calls | | | | - Stores via | | p.handle... | | (Goroutine: | | p.Put(). | | | | uTP Conn) | | | | | | - Read/Write | | | | | | on socket. | | | +---------------+ +---------------+ +-----------------+ */ ``` Simplified Flow Descriptions: 1. **Startup:** The main thread initializes everything and calls `Start()` on the relevant components. 2. **Core Loops:** * `PortalProtocol` starts its `table.loop` and `offerWorker` goroutines. * `DiscV5` starts its UDP listening loop. * `ZenEthUtp` starts its uTP listening loop. * Each `SubProtocol` (History, Beacon, State) starts its `processContentLoop` goroutine. 3. **Incoming Portal Message (via DiscV5):** * `DiscV5`'s `ListenLoop` receives a packet. * If it's a TALKREQ for a portal protocol, it calls `PortalProtocol.handleTalkRequest` (likely within a goroutine managed by discv5). * `handleTalkRequest` dispatches to `handlePing`, `handleFindNodes`, `handleFindContent`, or `handleOffer`. * `handleOffer`: If content is accepted, it *spawns a new goroutine* (`handleOffer Download`) to manage the uTP download. This goroutine interacts with `ZenEthUtp` and eventually pushes data to `contentQueue`. 4. **Incoming Content Processing:** * The `SubProtocol`'s `processContentLoop` blocks reading from `contentQueue`. * When data arrives (from the `handleOffer Download` goroutine), it wakes up, validates, and stores the content. 5. **Outgoing Offer (Gossip):** * Some logic (e.g., a timed gossip goroutine, not explicitly shown) decides to offer content. * It pushes an `OfferRequestWithNode` onto `offerQueue`. * An `offerWorker` goroutine picks up the request, calls `p.offer()`, which uses `DiscV5` to send the TALKREQ. 6. **Content Lookup:** * An external call (e.g., JSON-RPC `HistoryGetContent`) triggers `PortalProtocol.RecursiveFindContent`. * This method likely starts its own `ContentLookup` goroutine (or uses an internal lookup structure that manages concurrency). * The lookup process involves sending FINDCONTENT messages (via `p.findContent` -> `DiscV5`) potentially in parallel query goroutines. If a uTP response is received, it might trigger a temporary goroutine to handle the download via `ZenEthUtp`. This design uses goroutines and channels extensively to handle network I/O, background tasks (table maintenance, content processing), and concurrent operations (lookups, offer processing) without blocking primary threads. ### Storage Design ```go /* Portal Network Storage Design --------------------------------------------------------------------- Storage Mechanism (using pebble.ContentStorage as example) --------------------------------------------------------------------- Purpose: Store content locally based on distance to the node ID, manage capacity, and enforce data radius. +---------------------------+ +---------------------------+ +---------------------------+ | portalwire.PortalProtocol | | storage.ContentStorage | | pebble.ContentStorage | | (e.g., via Put/Get) | | (Interface) | | (Implements Interface) | +---------------------------+ +-------------^-------------+ +-------------^-------------+ | | | | Calls Put(key, id, content) | | |---------------------------------------->| Put(key, id, content) | | |-------------------------->| Put(key, id, content) | | | |-----------------------+ | | | 1. distance = xor(id, nodeID) | | | | 2. inRadius(distance)? | | | | (Compare distance < radius)| | | | | | | |<--------------------------+<--NO---- | Return ErrInsufficientRadius | | | | | | | YES ---> | 3. size.Add(len(content)) | | | | | 4. batch = db.NewBatch() | | | | | 5. batch.Set(distance, content) | | | | | 6. batch.Set(SizeKey, newSize)| | | | | 7. batch.Commit() | | | | | 8. If newSize > capacity: | | | | | prune() | | | | +-----------------------+ |<----------------------------------------+<--------------------------+<---------| Return nil/error | | | | | Calls Get(key, id) | | |---------------------------------------->| Get(key, id) | | |-------------------------->| Get(key, id) | | | |-----------------------+ | | | 1. distance = xor(id, nodeID) | | | | 2. data, _, err = db.Get(distance) | | | | 3. If err == pebble.ErrNotFound: | | | | Return ErrContentNotFound | | | +-----------------------+ |<----------------------------------------+<--------------------------+<---------| Return data/error | Pruning Mechanism (within pebble.ContentStorage.prune): [prune() triggered by Put when size > capacity] | ▼ [1. Calculate target deletion size (e.g., 5% of capacity)] | ▼ [2. iter = db.NewIter()] | ▼ [3. Loop iter.Last() -> iter.Prev()] | |--> [4a. If key == SizeKey: continue] | |--> [4b. If deleted_size < target_size:] | |-> [batch.Delete(iter.Key())] | |-> [deleted_size += len(key) + len(value)] | |--> [4c. Else (target_size reached):] | |-> [newRadius = distance_from_key(iter.Key())] | |-> [radius.Store(newRadius)] <-- Update Radius! | |-> [Break Loop] | ▼ [5. newSize = currentSize - deleted_size] [6. batch.Set(SizeKey, newSize)] [7. batch.Commit(Sync=true)] | ▼ [8. (Optional) db.Compact() in background goroutine] */ ``` ### Validation Flow ```go /* Portal Network Validation Design --------------------------------------------------------------------- Validation Mechanism (within Sub-Protocols) --------------------------------------------------------------------- Purpose: Ensure received content (via OFFER/ACCEPT) is valid according to the specific sub-protocol rules before storing and gossiping. +--------------------------------+ +--------------------------------+ +--------------------------------+ | portalwire.PortalProtocol | | Sub-Protocol Network | | External Services / Data | | (handleOffer Download Goroutine)| | (e.g., history.Network) | | (e.g., History API via RPC) | +--------------------------------+ +--------------------------------+ +--------------------------------+ | | | | 1. Receives content via uTP | | | | | | 2. Pushes *ContentElement onto ---> | {contentQueue} <---------------------| 3. Reads from queue | | contentQueue | | (in processContentLoop) | | | | | | | 4. Calls validateContents(keys, data)| | | (Loops through each item) | | | | | | | ▼ | | | validateContent(key, content) | | | | | | | | Switch key[0] (ContentType) | | | ▼ | | | [Specific Validation Logic] | | | - Decode SSZ/RLP | | | - Check Hashes (content vs key) | | | - Verify Merkle Proofs | | | (May need related data) | | | | | | | | Needs Header? | | | |-------------------------->| 5. Request Header via RPC | | | | | (e.g., historyGetContent) | | | |<--------------------------| 6. Receive Header | | | | | | | | | Perform Proof Verification | | | | | | | ▼ | | | [Validation Result] | | | | | | |---FAIL-->| 7a. Log Error, Discard | | | | | | |---PASS-->| 7b. Call PortalProtocol.Put | | | | (key, id, content) | |<-------------------------------------+<---------+ | | 8. Put() interacts with Storage | | | | (See Storage Mechanism diagram) | | | | | | | | | | 9. Call PortalProtocol.Gossip| | | | (node, keys, contents) | |<-------------------------------------+<---------+ | | 10. Gossip() pushes to offerQueue | | | | (See Queue Mechanism diagram) | | | +--------------------------------------+ +---------------------------+ */ ``` Key Validation Steps by Sub-Protocol: * **History:** * `BlockHeader`: Decode RLP, check hash against key, verify accumulator/beacon proof. * `BlockBody`: Decode SSZ/RLP, check Tx/Uncle hashes against header (requires fetching header). * `Receipts`: Decode SSZ/RLP, check derived root against header (requires fetching header). * **Beacon:** * `LightClientUpdate`: Decode SSZ, check period range against key. * `LightClientBootstrap`: Decode SSZ, check timestamp against current time. * `*Finality/OptimisticUpdate`: Decode SSZ, check fork digest, check slot against key. * `HistoricalSummaries`: Decode SSZ, check epoch against key, verify Merkle proof against finalized state root (requires fetching root from local light client). * **State:** * `AccountTrieNode`: Decode SSZ, check node hash against key, verify Merkle proof against state root (requires fetching header from History network via RPC). * `ContractStorageTrieNode`: Decode SSZ, check node hash against key, verify account proof and storage proof against state root (requires fetching header). * `ContractBytecode`: Decode SSZ, check code hash against key, verify account proof against state root (requires fetching header). ### High-Level Flow ```go /* High-Level Flow Architecture: +--------------------------+ +--------------------------+ +--------------------------+ | External Interface |----->| history.API |----->| portalwire.PortalProtoAPI| | (JSON-RPC: history_...) | | (Embeds PortalProtoAPI) | | (Generic RPC Wrapper) | +--------------------------+ +-------------^------------+ +-------------^------------+ | | +--------------------------+ +-------------+------------+ +-------------+------------+ | External Interface |----->| beacon.API |----->| (Shared Instance) | | (JSON-RPC: beacon_...) | | (Embeds PortalProtoAPI) | | | | | | (Uses ConsensusClient) | | | +--------------------------+ +-------------^------------+ +-------------+------------+ | | +--------------------------+ +-------------+------------+ +-------------+------------+ | External Interface |----->| state.API |----->| (Shared Instance) | | (JSON-RPC: state_...) | | (Embeds PortalProtoAPI) | | | +--------------------------+ +--------------------------+ +-------------v------------+ | Calls Methods | +-----------------------------------------------------+ | portalwire.PortalProtocol (Core) | +-----------------------------------------------------+ | | | | | | ▼ ▼ ▼ ▼ ▼ ▼ +---+ +---+ +---+ +---+ +---+ +---+ |His| |Bea| |Sta| |Dis| |UTP| |Sto| |Net| |Net| |Net| |cv5| | | |rag| +---+ +---+ +---+ +---+ +---+ +---+ ▲ ▲ ▲ └--------┴--------┴--------- Reads contentQueue */ ``` The sub-network API provides corresponding sub-network services by wrapping the portal wire API, and the request is ultimately proxied to the underlying portal wire protocol for processing. #### Detail Flow Examples ```go /* Flow: history_getContent("0xabc...") [Start: JSON-RPC] | ▼ [history.API.HistoryGetContent("0xabc...")] --> Delegates Call | ▼ [portalwire.PortalProtocolAPI.RecursiveFindContent("0xabc...")] --> Delegates Call | ▼ [portalwire.PortalProtocol.ContentLookup(key, id)] | |--> [Lookup Process Start] -----> [Query Closest Nodes] -----> [p.findContent(node, key)] | ▲ | | | | | ▼ | [Process ENRs]<-+ Sends FINDCONTENT -> [discover.UDPv5.TalkRequest] | | | | | ▼ |<---------------------------------------+ [Network Transmission] | | | ▼ | [Peer Receives FINDCONTENT] | | | ▼ | [Peer Sends CONTENT Resp] | | | ▼ |<---- Receives CONTENT <------------------------------------ [Network Transmission] | (Raw Data / ENRs / uTP ID) | | | ▼ | [p.processContent(resp)] | | | |-----> [Content Found (Raw)] ---> [Return Content]--------------------+-----> [Result to API] -> [JSON-RPC Resp] | |-----> [Content Found (uTP)] ---> [ZenEthUtp.DialWithCid] -> [Read Data] -> [Return Content] -> [Result to API] -> [JSON-RPC Resp] | |-----> [ENRs Received] ---------> [Add to Lookup Queue] ---> [Continue Lookup Process] Flow: OFFER Received (Node B from Node A) [Node A: p.offer()] -> [Sends OFFER via discv5] --> [Network] --> [Node B: Receives TALKREQ] | ▼ [Node B: p.handleTalkRequest()] | ▼ [Node B: p.handleOffer()] | | Checks Radius/Storage/etc. ▼ [Content Wanted?] --YES--> [Generate uTP ID (ZenEthUtp)] | | NO | Start Goroutine | ▼ | [Goroutine: ZenEthUtp.AcceptWithCid()] | | Waits for connection ▼ | [Node B: Sends ACCEPT (No Keys)] <---------------------------------------+ | | | |<-- [Network] <--------------------------------------------------------------------+ Sends ACCEPT (with uTP ID) | [Node A: Receives TALKRESP] | ▼ [Node A: p.processOffer()] | | Sees Accepted Keys ▼ [Node A: Start Goroutine] -> [ZenEthUtp.DialWithCid(uTP ID)] -> [Connects] -> [Write Content Data] --> [Network] | ▼ [Node B: Goroutine Accepts Connection] | Reads Data ▼ [Node B: Goroutine Puts ContentElement on contentQueue] | ▼ [Sub-Protocol: Reads from contentQueue] -> [Validate] -> [Store] */ ``` **High-Level Flow (Example: history_getContent("0xabc..."))** 1. JSON-RPC request arrives. 2. `history.API.HistoryGetContent("0xabc...")` is called. 3. It delegates to the embedded `portalwire.PortalProtocolAPI.RecursiveFindContent("0xabc...")`. 4. `RecursiveFindContent` calls `portalwire.PortalProtocol.ContentLookup(keyBytes, idBytes)`. 5. `ContentLookup` performs a DHT lookup: a. Queries nodes closest to `idBytes` using `portalwire.PortalProtocol.findContent`. b. `findContent` sends FINDCONTENT message via `discover.UDPv5.TalkRequest`. c. Peer responds with CONTENT message (containing raw data, ENRs, or uTP ID). d. `portalwire.PortalProtocol.processContent` handles the response. e. If ENRs received, `ContentLookup` queries the next set of nodes. f. If uTP ID received, `processContent` initiates download via `ZenEthUtp`. g. If raw content received, `ContentLookup` returns it. 6. The found content (or error) propagates back up to the JSON-RPC response. **High-Level Flow (Example: Node B receives OFFER from Node A)** 1. Node A calls `portalwire.PortalProtocol.offer` -> sends OFFER via `discover.UDPv5.TalkRequest`. 2. Node B receives TALKREQ -> `portalwire.PortalProtocol.handleTalkRequest` -> `handleOffer`. 3. `handleOffer` checks if content is wanted (radius, storage, etc.). 4. If wanted: a. Generates uTP connection ID via `ZenEthUtp`. b. Starts goroutine to accept uTP connection (`ZenEthUtp.AcceptWithCid`). c. Sends ACCEPT message (with uTP ID) back via TALKRESP. 5. Node A receives TALKRESP -> `portalwire.PortalProtocol.processOffer`. 6. `processOffer` sees accepted keys -> starts goroutine to connect and upload via `ZenEthUtp.DialWithCid`. 7. Node B's goroutine accepts connection, reads data from `ZenEthUtp`. 8. Node B puts received `ContentElement` onto the `portalwire.PortalProtocol.contentQueue`. 9. The specific sub-protocol's (`history.Network`, `beacon.Network`, or `state.Network`) `processContentLoop` reads from the queue. 10. The sub-protocol validates the content (`validateContent`) and stores it via `portalwire.PortalProtocol.Put` (which uses `storage.ContentStorage`). ## Testing - Unit tests are implemented internally for the core functionality - At the same time, the portal interop test is implemented through [Hive](https://github.com/ethereum/hive) ## Next Integration with go-ethereum to support EIP-4444 is the next step and progress can be seen [here](https://github.com/ethereum/go-ethereum/issues/31092)