# New Block Protocol - Summary
**Why did we need a new protocol?** Our previous block protocol was based off of IPFS and inherited many of its design complexities, despite the fact that Logos storage aimed at a much simpler data model.
Data blocks in IPFS can be arbitrarily linked as DAGs, and a block might be shared by multiple DAGs. There is no _explicit_ notion of a dataset - any DAG or subdag is potentially a dataset, and IPFS cannot make any a priori assumptions on which of those need to be findable. The implication is that, by default, IPFS must index _every_ 64KB block in the DHT.
Logos Storage initially followed a similar design, and files were represented simply as sets of blocks. Each block was indexed separately in the DHT, and a special `Manifest` block bound those together by means of a `blocks: [CID]` list. The block list also provided the total order over the blocks which is required to corectly represent a file.
Apart from manifest sizes growing with the number of blocks in a file, this design has a more nefarious consequence: since IPFS (and hence Logos Storage) _also_ used a DHT with provider indirection, indexing traffic also grew proportionally to the size of each file. We have known this since at least 2023.
The lack of explicit datasets also meant that creating explicit swarms was impossible, and locality was implied. IPFS provides the notion [sessions](https://github.com/ipfs/boxo/blob/2f225c407b2a2701598ceb2cffef5e2953fadab9/bitswap/client/internal/session/session.go#L292) which allows some form of swarm building, but it still has severe shortcomings.
This design also made simple things like tracking dataset completion difficult - you cannot, in principle, know the full size of a DAG in IPFS, and Codex inherited the same limitation by design. Internally, there was no representation of a dataset, just blocks. This also made decisions like when to disconnect from a peer difficult, as in principle any peer could be holding any block.
Finally, without first-class datasets things like efficient block scheduling become extremely difficult, as do things like efficient block knowledge updates. All this changes with the new protocol.
## Code Review
### Download Manager (downloadmanager.nim)
The new protocol introduces a download manager. This tracks active downloads.
```nim=
const
DefaultBlockRetries* = 200
DefaultRetryInterval* = 2.seconds
type DownloadManager* = ref object of RootObj
nextDownloadId*: uint64 = 1 # 0 is invalid
blockRetries*: int
retryInterval*: Duration
downloads*: Table[Cid, Table[uint64, ActiveDownload]]
```
**Listing 1.**
I think the first surprising thing to me is that `downloads` is `Table[Cid, Table[uint64, ActiveDownload]]`, and not `Table[Cid, ActiveDownload]`. Looks like we can have multiple downloads per `Cid`, which in principle does not seem to make that much sense to me.
We then have two flavours of downloads: regular, and "background" downloads:
```nim=
proc getDownload*(self: DownloadManager, treeCid: Cid): Option[ActiveDownload] =
self.downloads.withValue(treeCid, innerTable):
for _, download in innerTable[]:
return some(download)
return none(ActiveDownload)
proc getBackgroundDownload*(
self: DownloadManager, treeCid: Cid
): Option[ActiveDownload] =
self.downloads.withValue(treeCid, innerTable):
for _, download in innerTable[]:
if download.isBackground:
return some(download)
return none(ActiveDownload)
```
**Listing 2.**
Again, it is not immediately clear what the difference is. `cancelDownload` looks straightforward: lines $4$ -- $8$ cancels any pending block request batches by cancelling their timeouts and completion futures, where the latter will cascade cancellations to anyone waiting on a batch. Lines $10$--$15$ then cancel individual block handles, which I suppose are still required by the block API that we chose to maintain.
Finally, lines $17$--$18$ cancel completion futures for the download itself, and lines $20$--$23$ remove it from the internal state table.
**ENSURE:** I don't see anything in the way of disconnecting peers belonging to the download's swarm, but I suppose this must happen indirectly somewhere else.
```nim=
proc cancelDownload*(self: DownloadManager, download: ActiveDownload) =
download.cancelled = true
for _, batch in download.pendingBatches:
if not batch.timeoutFuture.isNil and not batch.timeoutFuture.finished:
batch.timeoutFuture.cancelSoon()
if not batch.requestFuture.isNil and not batch.requestFuture.finished:
batch.requestFuture.cancelSoon()
for address, req in download.blocks:
if not req.handle.finished:
req.handle.fail(newException(CancelledError, "Download cancelled"))
if not req.opaqueHandle.finished:
req.opaqueHandle.fail(newException(CancelledError, "Download cancelled"))
download.blocks.clear()
if not download.completionFuture.finished:
download.completionFuture.fail(newException(CancelledError, "Download cancelled"))
self.downloads.withValue(download.treeCid, innerTable):
innerTable[].del(download.id)
if innerTable[].len == 0:
self.downloads.del(download.treeCid)
```
**Listing 3.**
As before, there's a flavour of `cancelDownload` specific for "background" downloads, but it does not do much other than check if the download is indeed a background download, and turn into a no-op if it's not:
```nim=
proc cancelBackgroundDownload*(
self: DownloadManager, downloadId: uint64, treeCid: Cid
): bool =
let download = self.getDownload(downloadId, treeCid)
if download.isSome and download.get().isBackground:
self.cancelDownload(download.get())
return true
return false
```
**Listing 4.**
`getDownloadProgress` is not that interesting, except that we apparently indeed track multiple downloads per file, and can even tell the completion state for each (which is weird - intuitively they should all have the same completion state!)
COMMENT: this stuff is quite weird. I think it does not make a lot of sense to have multiple downloads for a single dataset, let alone track different completion states - the completion for all downloads should be the same as it refers to the same underlying dataset.
Finally, the most interesting proc here and the thread that we pull is `startDownload`:
```nim=
proc startDownload*(
self: DownloadManager, desc: DownloadDesc, missingBlocks: seq[uint64] = @[]
): ActiveDownload =
let
ctx = DownloadContext.new(desc, missingBlocks)
downloadId = self.nextDownloadId
self.nextDownloadId += 1
let download = ActiveDownload(
id: downloadId,
treeCid: desc.treeCid,
ctx: ctx,
blocks: initTable[BlockAddress, BlockReq](),
pendingBatches: initTable[uint64, PendingBatch](),
inFlightBatches: initTable[PeerId, seq[Future[void]]](),
exhaustedBlocks: initHashSet[BlockAddress](),
blockRetries: self.blockRetries,
retryInterval: self.retryInterval,
isBackground: desc.isBackground,
fetchLocal: desc.fetchLocal,
completionFuture:
Future[?!void].Raising([CancelledError]).init("ActiveDownload.completion"),
)
self.downloads.mgetOrPut(desc.treeCid, initTable[uint64, ActiveDownload]())[
downloadId
] = download
trace "Started download",
treeCid = desc.treeCid,
startIndex = desc.startIndex,
count = desc.count,
batchSize = ctx.scheduler.batchSizeCount
return download
```
**Listing 5.**
This creates two critical objects: `DownloadContext` and `ActiveDownload`. Other than that, however, it does not seem to actually start anything?
COMMENT: Is this actually starting anything? If not, then it should probably not be called "start" download, but maybe create download?
## How are Downloads Actually Started
Turn out downloads are not started in `ActiveDownload.startDownload`. Instead, they are started in `BlockExcEngine.startDownload`. Let's track this all the way up the call chain and try to reconstruct it.
The entrypoint for a download remains `StorageNodeRef.retrieve`:
```nim=
self: StorageNodeRef, cid: Cid, local: bool = true
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
## Retrieve by Cid a single block or an entire dataset described by manifest
##
if local and not await (cid in self.networkStore):
return failure((ref BlockNotFoundError)(msg: "Block not found in local store"))
without manifest =? (await self.fetchManifest(cid)), err:
if err of AsyncTimeoutError:
return failure(err)
return await self.streamSingleBlock(cid)
await self.streamEntireDataset(
ManifestDescriptor(manifest: manifest, manifestCid: cid)
)
```
**Listing 6.**
It remains largely unchanged - we try to retrieve the block and, if it decodes successfully as a manifest, we dispatch to `streamEntireDataset`; otherwise, we dispatch to `streamSingleBlock`.
NOTES: I see we kept `streamSingleBlock`, but as a local-only version of it. I feel like stripping this out completely, as I feel this is now a confusing API that is not that useful. As for the manifests, although we still store manifests as blocks, I'd probably try modify the API so that clients can no longer make that assumption.
`streamEntireDataset` is the interesting bit, so we zoom into that:
```nim=
proc streamEntireDataset(
self: StorageNodeRef, desc: ManifestDescriptor, fetchLocal: bool = false
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
## Streams the contents of the entire dataset described by the manifest.
##
trace "Retrieving blocks from manifest", manifestCid = desc.manifestCid
var jobs: seq[Future[void]]
let stream = LPStream(StoreStream.new(self.networkStore, desc.manifest, pad = false))
proc fetchTask(): Future[void] {.async: (raises: []).} =
try:
if err =? (await self.fetchDatasetAsync(desc, fetchLocal = fetchLocal)).errorOption:
error "Dataset fetch failed during streaming",
manifestCid = desc.manifestCid, err = err.msg
await stream.close()
except CancelledError:
trace "Dataset fetch cancelled during streaming", manifestCid = desc.manifestCid
jobs.add(fetchTask())
# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async: (raises: []).} =
try:
await stream.join()
except CancelledError as exc:
warn "Stream cancelled", exc = exc.msg
finally:
await noCancel allFutures(jobs.mapIt(it.cancelAndWait))
self.trackedFutures.track(monitorStream())
# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid = desc.manifestCid
stream.success
```
**Listing 7.**
This also still works largely as before: we create a `StoreStream` which will attempt to sequentially call `NetworkStore.getBlock` under the hood, while launching a "prefetching" task (the actual download) over `fetchDatasetAsync`.
NOTE: I find that calling this "prefetching" is now confusing. This not a mere prefetch - it starts the one and only download for the whole dataset. Without "prefetch" the dataset won't end up in the machine at all.
Yet, one key aspect of the new protocol is that scheduling is centralized in the downloads, so how does this work? In the old version of `getBlock`, failure to find a block locally would result in a call to `requestBlock`, which would begin the process of requesting that block (and possibly ONLY that block) to other peers immediately (i.e., `getBlock` was a de facto scheduling action):
```nim=
method getBlock*(
self: NetworkStore, address: BlockAddress
): Future[?!Block] {.async: (raises: [CancelledError]).} =
without blk =? (await self.localStore.getBlock(address)), err:
if not (err of BlockNotFoundError):
error "Error getting block from local store", address, err = err.msg
return failure err
without newBlock =? (await self.engine.requestBlock(address)), err:
error "Unable to get block from exchange engine", address, err = err.msg
return failure err
return success newBlock
return success blk
```
**Listing 8.**
in the new version, we instead obtain a completion handle from the download manager itself (line $6$):
```nim=
method getBlock*(
self: NetworkStore, address: BlockAddress
): Future[?!Block]:
let downloadOpt = self.engine.downloadManager.getDownload(address.treeCid)
if downloadOpt.isSome:
let handle = downloadOpt.get().getWantHandle(address)
without blk =? (await self.localStore.getBlock(address)), err:
if not (err of BlockNotFoundError):
handle.cancel()
return failure err
return await handle
discard downloadOpt.get().completeWantHandle(address, some(blk))
return success blk
return await self.localStore.getBlock(address)
```
**Listing 9.**
My expectation is that `getWantHandle` obtains a handle but does not influence scheduling in any way - the file will be downloaded in its entirety (all blocks) to the local machine following the heuristics stipulated by the download manger, regardless of which blocks get requested via `getBlock`.
This is otherwise a strange API, however: if there is no active download, we will revert this into a `getBlock` call onto the local store, which will simply fail if the block is not available locally. It would make more sense to start a download in that case - or to get rid of `getBlock` entirely.
Another point that is not so clear to me is why we need line $12$ and all the complexity it brings: if the block is in the local store, can't we just return it? Why are we getting and then completing a handle as part of `getBlock`? This would look more natural to me:
```nim=
method getBlock*(
self: NetworkStore, address: BlockAddress
): Future[?!Block] {.async: (raises: [CancelledError]).} =
# Local lookup.
without blk =? (await self.localStore.getBlock(address)), err:
if not (err of BlockNotFoundError):
return failure err
return blk # block found, just return
# If block not available locally, return completion handle. The
# download process will eventually get to it.
let downloadOpt = self.engine.downloadManager.getDownload(address.treeCid)
if downloadOpt.isSome:
return await downloadOpt.get().getWantHandle(address)
# If we got thus far, it means the block is not available
# locally and there's no active download.
failure (ref BlockNotFoundError)("Not found"); # or something like this
```
**Listing 10.**
Also, another question comes to mind: this is the `BlockAddress` variant of `getBlock`. What if someone calls `getBlock(cid: Cid)`, how do we figure out which download/dataset to map it to?
Well, turns out that `networkstore` no longer supports `getBlock(cid: Cid)`, or at least not the way it used to. Indeed, the old version of `getBlock` used to be:
```nim=
method getBlock*(
self: NetworkStore, cid: Cid
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
## Get a block from the blockstore
## Get a block from the local blockstore only.
##
self.getBlock(BlockAddress.init(cid))
```
**Listing 11.**
now, it basically delegates to the local store:
```nim=
method getBlock*(
self: NetworkStore, cid: Cid
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
## Get a block from the local blockstore only.
##
self.localStore.getBlock(cid)
```
**Listing 12.**
NOTE: This again strikes me as a confusing API - the "stacked store" abstraction no longer works here. I'd rather get rid of getting blocks by CID altogether (another reason to completely remove `getBlock`), since this is not actually supported by all "stores" anymore. I think that networkstore is making very little sense to me at this point.
OK so now we roughly know how `StoreStream` interacts with the engine to implement `getBlock`. Let us now look at the interesting part, which is the download process itself. This is disguised under `fetchDatasetAsync`, which, apart from the weird `startTreeDownloadOpaque` name, is refreshingly simple; i.e. it starts a download and waits for it to complete.
```nim=
proc fetchDatasetAsync*(
self: StorageNodeRef,
manifest: Manifest,
fetchLocal = true,
selectionPolicy: SelectionPolicy = spSequential,
): Future[?!void] {.async: (raises: [CancelledError]).} =
let
treeCid = manifest.treeCid
download = ?self.engine.startTreeDownloadOpaque(
treeCid,
manifest.blockSize.uint32,
manifest.blocksCount.uint64,
selectionPolicy = selectionPolicy,
fetchLocal = fetchLocal,
)
try:
trace "Starting tree download",
treeCid = treeCid, totalBlocks = manifest.blocksCount
return await download.waitForComplete()
finally:
self.engine.releaseDownload(download)
```
**Listing 13.**
NOTE: Chrys reverted back to announcing treeCids as I was reviewing this, so part of the snippets will refer to the old version and part to the new.
## The Download Process
We'll now dig into `startTreeDownloadOpaque`, which seems to be the API call that actually starts a download:
```nim
proc startTreeDownloadOpaque*(
self: BlockExcEngine,
treeCid: Cid,
blockSize: uint32,
totalBlocks: uint64,
selectionPolicy: SelectionPolicy = spSequential,
isBackground: bool = false,
fetchLocal: bool = false,
): ?!DownloadHandleOpaque =
startTreeDownloadGeneric[void](
self,
treeCid,
blockSize,
totalBlocks,
selectionPolicy = selectionPolicy,
isBackground = isBackground,
fetchLocal = fetchLocal,
)
```
**Listing 14.**
This dispatches to `startTreeDownlaodGeneric[void]`. What's up with that, which returns a `DownloadHandleOpaque`. What's up with that? Looks like we have two flavours of download handles:
```nim=
DownloadHandleGeneric*[T] = object
cid*: Cid
downloadId*: uint64
iter*: SafeAsyncIter[T]
completionFuture*: Future[?!void].Raising([CancelledError])
DownloadHandle* = DownloadHandleGeneric[Block]
DownloadHandleOpaque* = DownloadHandleGeneric[void]
```
**Listing 15.**
Let's have a look at the API for `DownloadHandleGeneric`:
```nim=
proc finished*[T](h: DownloadHandleGeneric[T]): bool =
h.iter.finished
proc next*[T](
h: DownloadHandleGeneric[T]
): Future[?!T] {.async: (raw: true, raises: [CancelledError]).} =
h.iter.next()
proc waitForComplete*[T](
h: DownloadHandleGeneric[T]
): Future[?!void] {.async: (raises: [CancelledError]).} =
return await h.completionFuture
proc nextBlock*(
h: DownloadHandle
): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} =
h.iter.next()
```
**Listing 16.**
So this seems to allow one to consume the blocks in the download as an iterator - this makes sense for `DownloadHandle`, but not a lot of sense for `DownloadHandleOpaque`. Yet, we get an `AsyncIterator` in `DownloadHandleOpaque` too.
NOTE: Scanning through the code, I can't find any usage of `nextBlock`. Even iterating over download handles seems to be done sparingly, and only in test code. I'd extirpate all of this API and the whole complexity of having async iters here.
We now look into `startTreeDownloadGeneric`, which actually starts the download:
```nim=
proc startTreeDownloadGeneric[T: Block | void](
self: BlockExcEngine,
treeCid: Cid,
blockSize: uint32,
totalBlocks: uint64,
selectionPolicy: SelectionPolicy = spSequential,
isBackground: bool = false,
fetchLocal: bool = false,
): ?!DownloadHandleGeneric[T] =
## - T = Block: Returns actual block data (for streaming)
## - T = void: Returns success/failure only (for prefetching)
let
desc = toDownloadDesc(
treeCid,
totalBlocks,
blockSize,
selectionPolicy = selectionPolicy,
isBackground = isBackground,
fetchLocal = fetchLocal,
)
activeDownload = self.startDownload(desc)
# ... inlined implementation of SafeAsyncIter
success DownloadHandleGeneric[T](
cid: treeCid,
downloadId: activeDownload.id,
iter: SafeAsyncIter[T].new(genNext, isFinished),
completionFuture: activeDownload.completionFuture,
)
```
**Listing 17.**
Most of the code here is implementing the iterator returned as part of `DownloadHandleGeneric` (the code I want to remove :-)). The key bits are `toDownloadDesc`, and, or course, `startDownload`. Let's look at what DownloadDescriptor, the return type of toDownloadDesc, actually is:
```nim=
DownloadDesc* = object
cid*: Cid
blockSize*: uint32
startIndex*: uint64
count*: uint64
selectionPolicy*: SelectionPolicy
isBackground*: bool
fetchLocal*: bool
SelectionPolicy* = enum
spSequential
spRandomWindow
```
**Listing 18.**
This describes the parameters for the download - from the `CID` to the block size, to whether or not this is a "background" download, to the block scheduling policy. Some of these things do not make immediate sense to me (e.g. `fetchLocal`) but probably will as we read more. The two selection policies, sequential and random, make sense - one for streaming, another for faster downloads.
Let us get to `startDownload`:
```nim=
proc startDownload(
self: BlockExcEngine, desc: DownloadDesc
): ActiveDownload {.gcsafe, raises: [].} =
result = self.downloadManager.startDownload(desc)
self.ensureDownloadWorker(result)
```
**Listing 19.**
This just calls `DownloadManager.startDownload`, and then eventually gets to the most interesting part, `downloadWorker`. This is quite the long function, and we will spend some time talking about it.
```nim=
proc downloadWorker(
self: BlockExcEngine, download: ActiveDownload
) {.async: (raises: []).} =
## Continuously schedules batches to peers until download completes.
## Supports concurrent batch requests per peer based on BDP pipeline depth.
## When block size is unknown (0), BDP optimizations are disabled - first batch
## is used to discover block size, then BDP calculations start.
let
cid = download.cid
retryInterval = self.downloadManager.retryInterval
logScope:
cid = cid
try:
let
(windowStart, windowCount) = download.ctx.currentPresenceWindow()
connectedPeers = self.peers.toSeq()
if connectedPeers.len > 0:
trace "Initial presence window broadcast",
cid = cid,
windowStart = windowStart,
windowCount = windowCount,
totalBlocks = download.ctx.totalBlocks
await self.broadcastWantHave(
download, cid, windowStart, windowCount, connectedPeers
)
trace "Initial broadcast sent, proceeding to batch loop"
else:
trace "No connected peers for initial broadcast, triggering discovery"
self.searchForNewPeers(cid)
```
**Listing 20.**
The first point worthy of attention is in line $26$ above, where we do a broadcast of want-haves to all connected peers. There does not seem to be a per-dataset swarm -- we will blindly broadcast those to everybody. This is definitely not what I was expecting. We then have the main loop which runs as long as the download is not cancelled and not complete:
```nim=33
while not download.cancelled and not download.isDownloadComplete():
for peerId in download.inFlightBatches.keys.toSeq:
var remaining: seq[Future[void]] = @[]
for fut in download.inFlightBatches[peerId]:
if not fut.finished:
remaining.add(fut)
if remaining.len > 0:
download.inFlightBatches[peerId] = remaining
else:
download.inFlightBatches.del(peerId)
```
**Listing 21.**
The first loop contained in it ($33$-$42$) goes through each "in flight" batch for each peer. What are those? Looking further down, we see:
```nim
let
peer = selection.peer
batchFuture = self.sendWantBlocksRequest(download, start, count, peer)
if peer.id notin download.inFlightBatches:
download.inFlightBatches[peer.id] = @[]
download.inFlightBatches[peer.id].add(batchFuture)
```
**Listing 22.**
So in-flight batches are pending pending want-block requests towards a given peer. The loop is therefore simply pruning the `inFlightBatches` table of peers with no active in-flight batches. We will later delve into `sendWantBlocksRequest` as it has a lot of interesting stuff going on inside. Next, we look at the if statement in $44$--$58$.
```nim=43
let ctx = download.ctx
if not download.fetchLocal and ctx.needsNextPresenceWindow():
let (newStart, newCount) = ctx.advancePresenceWindow()
ctx.trimPresenceBeforeWatermark()
# Broadcast want-have for the new window
var swarmPeers: seq[PeerContext] = @[]
for peerId in ctx.swarm.connectedPeers():
let peerCtx = self.peers.get(peerId)
if not peerCtx.isNil:
swarmPeers.add(peerCtx)
trace "Advancing presence window",
cid = cid,
newWindowStart = newStart,
newWindowCount = newCount,
watermark = ctx.scheduler.completedWatermark()
await self.broadcastWantHave(download, cid, newStart, newCount, connectedPeers)
```
**Listing 23.**
The first part in the condition (`not download.fetchLocal`) indicates, I believe, that this is not a local fetch. The idea of a local fetch is quite awkward to me though as `StoreStream` can probably stream a local dataset while skipping the download manager entirely, so I still need to understand what this does/why is it needed.
NOTE: ask what's the intent behind `fetchLocal`.
The other condition, `ctx.needsNextPresenceWindow()`, defines if we want to add more blocks to the set of blocks we are currently considering for download. This is part of the "windowing" approach of the new protocol which attempts to cap resource usage (networking, CPU and memory) by splitting large files into fixed-size windows. We will take a short detour to look at those:
```nim=
proc needsNextPresenceWindow*(ctx: DownloadContext): bool =
ctx.scheduler.needsNextPresenceWindow()
proc needsNextPresenceWindow*(self: Scheduler): bool =
case self.windowCursor.policy
of spSequential:
let (windowStart, windowCount) = self.windowCursor.currentWindow()
self.nextBatchStart >= windowStart + windowCount and
self.windowCursor.(self.completedWatermark)
of spRandomWindow:
let (start, count) = self.windowCursor.currentWindow()
not self.windowCursor.isDone and
self.nextBatchStart >= start + count and
self.requeued.len == 0 and self.inFlight.len == 0
```
**Listing 24.**
We see there are two policies: sequential and random. Sequential looks like this:
```nim=
let (windowStart, windowCount) = self.windowCursor.currentWindow()
self.nextBatchStart >= windowStart + windowCount and
self.windowCursor.needsAdvance(self.completedWatermark)
```
**Listing 25.**
This looks simple enough: `windowStart` is the block index at which the current window starts, whereas `windowCount` - I assume - is the window's length, which the code for `WindowCursor` appears to confirm:
```nim=
proc currentWindow(p: WindowCursor): tuple[start: uint64, count: uint64] =
(start: p.windowStart, count: min(p.windowSize, p.totalBlocks - p.windowStart))
```
**Listing 26.**
The `min` above truncates the last window at the last block index, otherwise the size is `(windowStart, windowSize)`. We then have the call to `needsAdvance(completedWatermark)`, which implements a manual dispatch table:
```nim=
proc needsAdvance(p: WindowCursor, watermark: uint64): bool =
case p.policy
of spSequential:
p.sequential.needsAdvance(
p.windowStart, p.windowSize, p.totalBlocks, watermark)
of spRandomWindow:
false
```
**Listing 27.**
The `SequentialWindowCursor` version of `needsAdvance` starts by checking if there is room for another window in line $4$: the condition in `canAdvance` will only evaluate to false when we are at the last window - the last window itself will be smaller than `windowSize`, but the condition will be false nevertheless.
`needsAdvance` then evaluates to true when the watermark is above `windowSize` * `advanceThreshold`, where `0 < advanceThreshold <= 1`. All of those are fixed - the only varying parameter is the watermark.
```nim=
proc needsAdvance(
p: SequentialWindowCursor, windowStart, windowSize, totalBlocks, watermark: uint64
): bool =
if not p.canAdvance(windowStart, windowSize, totalBlocks):
return false
let thresholdPos = windowStart + (windowSize.float * p.advanceThreshold).uint64
watermark >= thresholdPos
proc canAdvance(
p: SequentialWindowCursor, windowStart, windowSize, totalBlocks: uint64
): bool =
windowStart + windowSize < totalBlocks
```
**Listing 28.**
The watermark in principle indicates the index of the last complete segment of blocks that we possess; i.e., we are in principle assured to have all blocks in `[0, watermark]`. The watermark is taken from `scheduler.completedWatermark()`. Looking at when the watermark gets advanced, we see:
```nim=
proc onBatchCompleted(self: Scheduler, batchStart: uint64) =
case self.windowCursor.policy
of spSequential:
self.advanceWatermark(batchStart)
of spRandomWindow:
discard
proc advanceWatermark(self: Scheduler, batchStart: uint64) =
if batchStart == self.completedWatermark:
self.completedWatermark = batchStart + self.batchSize
while self.completedWatermark in self.completedOutOfOrder:
self.completedOutOfOrder.excl(self.completedWatermark)
self.completedWatermark += self.batchSize
elif batchStart > self.completedWatermark:
self.completedOutOfOrder.incl(batchStart)
```
**Listing 29.**
This tells us that the watermark is actually not used in "random". It also shows that when a batch is completed out of order, the watermark will only advance after the batches preceding it complete (lines $12$-$13$).
We can therefore assert that:
1. the watermark contains the block index for the the last completed batch in case of sequential downloads;
2. it contains no useful information in the case of random downloads.
We can now get back to Listing 27 and interpret what `needsNextPresenceWindow` actually does:
```nim
self.nextBatchStart >= windowStart + windowCount and
self.windowCursor.needsAdvance(self.completedWatermark)
```
**Recap of Listing 27.**
it returns `true` if the start of the next batch falls off of the current window and the watermark is beyond a certain threshold in the current window. This ensures that we do not move onto the next presence window before all batches for the current window are in flight.
We now get back to the download process (Listing 23). After checking if we need a new presence window, we do: `ctx.advancePresenceWindow()`. Let's look at that one.
```nim=
proc advancePresenceWindow*(ctx: DownloadContext): tuple[start: uint64, count: uint64] =
ctx.availabilityTracker.addPendingRange(ctx.scheduler.currentPresenceWindow())
discard ctx.scheduler.advancePresenceWindow()
ctx.scheduler.currentPresenceWindow()
```
**Listing 30.**
The first line already throws me off: why would I add the current window to something called "pending ranges"? Worse, `currentPresenceWindow` does not return a range as we've seen before. Let's look at what `addPendingRange` does.
```nim=
proc addPendingRange(
t: var BroadcastAvailabilityTracker, range: tuple[start: uint64, count: uint64]
) =
case t.policy
of spRandomWindow:
t.pendingRanges.add(range)
of spSequential:
discard
```
**Listing 31.**
QUESTION: why do we add "expired" windows to pendingRanges? That doesn't seem to make a lot of sense.
Let's look at what `pendingRanges` actually is. It use in `shouldBroadcast` gives us some hint:
```nim=
proc shouldBroadcast(t: BroadcastAvailabilityTracker, watermark: uint64): bool =
case t.policy
of spRandomWindow:
t.pendingRanges.len > 0
of spSequential:
let newBlocks = watermark - t.lastBroadcastedWatermark
if newBlocks == 0:
return false
let timeSinceLast = Moment.now() - t.lastBroadcastTime
newBlocks >= PresenceBroadcastBlockThreshold or timeSinceLast >= t.broadcastInterval
proc shouldBroadcastAvailability*(ctx: DownloadContext): bool =
ctx.availabilityTracker.shouldBroadcast(ctx.scheduler.completedWatermark())
```
**Listing 32.**
When there are pending ranges, `shouldBroadcast` (and by extension `shouldBroadcastAvailability`) evaluates to true. This means `pendingRanges` maintains unbroadcasted block ranges, which makes even less sense: why would you add a window to the set of broadcast ranges only after you're ready to advance it?
OK so apparently this broadcasts the blocks we HAVE, but only for `spRandomWindow`. For `spSequential` we follow a different rule in which we broadcast either if the watermark changes, or a timeout elapses.
The next line in Listing 23, `ctx.trimPresenceBeforeWatermark()` garbage collects presence information that we no longer need. This cleans up peer context information. Again, this will probably only make sense for `spSequential`:
```nim=
proc trimPresenceBeforeWatermark*(ctx: DownloadContext) =
let watermark = ctx.scheduler.completedWatermark()
for peerId in ctx.swarm.connectedPeers():
let peerOpt = ctx.swarm.getPeer(peerId)
if peerOpt.isSome:
let peer = peerOpt.get()
# only trim range-based availability
if peer.availability.kind == bakRanges:
var newRanges: seq[tuple[start: uint64, count: uint64]] = @[]
for (start, count) in peer.availability.ranges:
let rangeEnd = start + count
if rangeEnd > watermark:
# keep ranges not entirely below watermark
newRanges.add((start, count))
peer.availability = BlockAvailability.fromRanges(newRanges)
```
**Listing 33.**
If a peer has many disjoint intervals occupying memory, those will be thrown away. For peers with, say, a single contiguous block range over the whole window, this will simply clip the interval and keep memory usage the same. Lines $49$--$62$ in Listing 23 then updates currently connected peers with our wants by broadcasting that we now want the blocks in the new window. Note that this is a single, small message range broadcast that covers the whole window.
Next, in lines $59$--$88$ in Listing 23 broadcast any new presence information we find relevant to peers; i.e., we update peers on blocks that we currently HAVE. This is part of the push presence updates, and is similar in spirit to [Bittorent HAVE messages](https://github.com/arvidn/bittorrent-multi-have):
```nim=59
# Broadcast availability to peers
if not download.fetchLocal and ctx.shouldBroadcastAvailability():
let broadcastRanges = ctx.getAvailabilityBroadcast()
if broadcastRanges.len > 0:
trace "Broadcasting availability to swarm",
cid = cid,
rangeCount = broadcastRanges.len,
swarmPeers = ctx.swarm.peerCount()
let presence = BlockPresence(
address: BlockAddress(treeCid: cid, index: broadcastRanges[0].start.int),
kind: BlockPresenceType.HaveRange,
ranges: broadcastRanges,
)
for peerId in ctx.swarm.connectedPeers():
let peerOpt = ctx.swarm.getPeer(peerId)
if peerOpt.isSome and peerOpt.get().availability.kind == bakComplete:
continue
try:
await self.network.request.sendPresence(peerId, @[presence]).wait(
DefaultWantHaveSendTimeout
)
except AsyncTimeoutError:
trace "Availability broadcast send timed out", peer = peerId
except CatchableError as err:
trace "Failed to broadcast availability", peer = peerId, error = err.msg
ctx.markAvailabilityBroadcasted()
```
**Listing 34.**
One point to mention is that the granularity of pushes is VERY different across spSequential and spRandom. for `spSequential`, we push at every `PresenceBroadcastIntervalMax = 10.seconds`, or if we manage to fill up more than `50%` of a window before that time. With `spRandom`, we only push once we complete a FULL window. This will lead to very different knowledge propagation characteristics, and will likely impact performance/latency, particularly for fast peers.
Next, in lines $89$--$98$, we try to generate a new request batch. Note that if we cannot generate a batch, we will either `continue`, or `break` if download is complete or there are no pending batches. I'm not sure what the latter mean - if batches are pending but not yet complete, breaking means we stop this loop before the download completes, which looks odd.
```nim=89
let batchOpt = self.downloadManager.getNextBatch(download)
if batchOpt.isNone:
let pendingBatchCount = download.pendingBatchCount()
if pendingBatchCount == 0 and download.isDownloadComplete():
break
await sleepAsync(100.milliseconds)
continue
```
**Listing 35.**
We will now dig into `getNextBatch`. As we can see, it delegates the task to the scheduler:
```nim=
proc getNextBatch*(
self: DownloadManager, download: ActiveDownload
): Option[tuple[start: uint64, count: uint64]] =
let batch = download.ctx.scheduler.take()
if batch.isSome:
return some((start: batch.get().start, count: batch.get().count))
none(tuple[start: uint64, count: uint64])
```
**Listing 36.**
Let's dig into `take`:
```nim=
proc take*(self: Scheduler): Option[BlockBatch] =
while self.requeued.len > 0:
let batch = self.requeued.popFirst()
if batch.start < self.completedWatermark:
continue
if batch.start in self.completedOutOfOrder:
continue
self.inFlight[batch.start] = batch.count
return some(batch)
let batchOpt = self.generateNextBatchInternal()
if batchOpt.isSome:
let batch = batchOpt.get()
self.inFlight[batch.start] = batch.count
return batchOpt
```
**Listing 37.**
This attempts to get a batch from a "requeued" list first. Requeued batches are failed/timed out batches, so these get rescheduled, except if we got the blocks in this batch from somewhere else in the meantime. This boils down to two conditions:
1. `batch.start` is below the watermark. Since the watermark advances a batch at a time, if `start` is below the watermark then we already have this whole batch;
2. `batch.start` is in `completedOutOfOrder`. Same as before - this means we got the blocks in this batch.
I'm in general not fond of sprinkling those conditionals all over, and would rather guarantee that deciding that a batch has failed and processing a batch get somehow linearized (i.e. once we requeue a batch, we do not accept it anymore, and once we accept a batch, we cannot requeue it anymore). But this is valid too, I suppose.
If there are no requeued batches, then we _generate_ a new batch:
```nim=
proc generateNextBatchInternal(self: Scheduler): Option[BlockBatch] {.inline.} =
## does NOT add to inFlight - we must do that
let (windowStart, windowCount) = self.windowCursor.currentWindow()
while self.nextBatchStart < windowStart + windowCount:
let
start = self.nextBatchStart
count = min(self.batchSize, windowStart + windowCount - start)
self.nextBatchStart = start + count
if start < self.completedWatermark:
continue
if start in self.inFlight:
continue
if start in self.completedOutOfOrder:
continue
return some((start: start, count: count))
return none(BlockBatch)
```
**Listing 38.**
As we can see, batches are always generated sequentially within a window. This means that even when we're randomizing window selection, we're still streaming within windows. I suppose this adds some block diversity across the entire dataset, but it's hard for me to understand if it makes sense.
Getting back to Listing 37, `take` will then set the batch as `inFlight` and return it. Simple enough. Assuming we got a valid batch, download proceeds to line $99$, where there's a curious snippet of code:
```nim=99
let (start, count) = batchOpt.get()
logScope:
batchStart = start
batchCount = count
block localCheck:
var lastIdle = Moment.now()
let runtimeQuota = 100.milliseconds
for i in start ..< start + count:
if (Moment.now() - lastIdle) >= runtimeQuota:
await idleAsync()
lastIdle = Moment.now()
let address = download.makeBlockAddress(i)
if download.isBlockExhausted(address):
break localCheck
let exists =
try:
await address in self.localStore
except CatchableError:
false
if not exists:
if download.fetchLocal:
download.failLocalMissing(address)
break localCheck
break localCheck
for i in start ..< start + count:
if (Moment.now() - lastIdle) >= runtimeQuota:
await idleAsync()
lastIdle = Moment.now()
let address = download.makeBlockAddress(i)
if address in download.blocks:
without blk =? (await self.localStore.getBlock(address)), err:
break localCheck
discard download.completeWantHandle(address, some(blk))
download.completeBatchLocal(start, count)
continue
if download.cancelled or download.fetchLocal:
continue
```
**Listing 39.**
This seems to be doing a bunch of things in one. The first loop checks that all blocks in the batch exist and none have their retries exhausted (why are we doing this here??). It then calls `failLocalMissing` in case a block does not exist (which I suppose means that the file is not locally available).
We then run a **second** loop in which we retrieve the actual content for the blocks and complete their handles. I suppose this is what will fast-forward a download when part of it is available locally. If we get to the end of the block, it means that the batch is complete so we `continue`. This will restart the whole long loop again.
If we break, it means there is an incomplete batch. This can be either a batch we need to fetch, or it can be a broken batch - a batch containing a block for which the retry count exceeds the maximum. I'd argue it doesn't make sense to continue a download when that happens, as a dataset missing a block is not a dataset we'd likely want to have.
After we find an incomplete batch, the snippet starting in line $143$ performs swarm maintenance:
```nim=143
let swarm = download.ctx.swarm
var shouldBroadcast = false
let peersNeeded = swarm.peersNeeded()
if peersNeeded > 0:
trace "Swarm below target, triggering discovery",
active = swarm.activePeerCount(), needed = peersNeeded
self.searchForNewPeers(cid)
```
**Listing 40.**
In case there aren't enough peers in the swarm, we issue a lookup, and then broadcast our want-haves again:
```nim=152
if swarm.peersWithRange(start, count).len == 0:
shouldBroadcast = true
if shouldBroadcast:
let connectedPeers = self.peers.toSeq()
if connectedPeers.len > 0:
trace "Broadcasting want-have for batch range",
cid = cid, start = start, count = count, peerCount = connectedPeers.len
await self.broadcastWantHave(download, cid, start, count, connectedPeers)
# Give peers a short time to respond with presence
await sleepAsync(50.milliseconds)
else:
trace "No connected peers, searching for new peers"
self.searchForNewPeers(cid)
await self.handleBatchRetry(download, cid, start, count, retryInterval)
continue
```
**Listing 41.**
This reads like a Hail Mary. We already have push updates from peers to keep us informed, and we already send our list on window switches, why are we doing this again here?
The if statement in lines $170$--$174$ also seems like it could be shifted to before `if shouldBroadcast`, eliminating the need for the else clause in $165$--$169$:
```nim=170
if self.peers.len == 0:
trace "No connected peers available for batch, searching"
self.searchForNewPeers(cid)
await self.handleBatchRetry(download, cid, start, count, 100.milliseconds)
continue
```
**Listing 42.**
Next, we look at "staleUnknownPeers". Stale peers, apparently, are peers with whom we have not interacted in a while.
```nim=
proc staleUnknownPeers*(swarm: Swarm): seq[PeerId] =
for peerId, peer in swarm.peers:
if peer.isStale and peer.availability.kind == bakUnknown:
result.add(peerId)
proc isStale*(peer: SwarmPeer): bool =
Moment.now() - peer.lastSeen > PeerStaleTimeout
proc touch*(peer: SwarmPeer) =
peer.lastSeen = Moment.now()
```
**Listing 43.**
Looking at where `touch` is called, we see those are in code called from message handlers:
```nim=
proc updateAvailability*(peer: SwarmPeer, availability: BlockAvailability) =
peer.availability = peer.availability.merge(availability)
peer.availabilityUpdated = Moment.now()
peer.touch()
proc recordPeerSuccess*(swarm: Swarm, peerId: PeerId) =
swarm.peers.withValue(peerId, peer):
peer[].resetFailures()
peer[].touch()
proc recordBatchSuccess*(
swarm: Swarm, peer: PeerContext, rttMicros: uint64, totalBytes: uint64
) =
swarm.peers.withValue(peer.id, swarmPeer):
swarmPeer[].resetFailures()
swarmPeer[].touch()
peer.stats.recordRequest(rttMicros, totalBytes)
```
**Listing 44.**
`updateAvailability` is called from `blockPresenceHandler`, `recordBatchSuccess` is called from `sendWantBlocksRequest` when we get a batch of blocks back (we'll look into `sendWantBlocksRequest` later) and `recordPeerSuccess` is called from... nowhere, apparently. Regardless, those are peers we have not interacted with in a while.
```nim=175
let staleUnknown = swarm.staleUnknownPeers()
if staleUnknown.len > 0:
let rangeAddress = download.makeBlockAddress(start)
trace "Re-querying stale unknown peers",
cid = cid,
staleUnknownCount = staleUnknown.len,
batchStart = start,
batchCount = count
for peerId in staleUnknown:
try:
await self.network.request
.sendWantList(
peerId,
@[rangeAddress],
priority = 0,
cancel = false,
wantType = WantType.WantHave,
full = false,
sendDontHave = false,
rangeCount = count,
downloadId = download.id,
)
.wait(DefaultWantHaveSendTimeout)
except AsyncTimeoutError:
trace "Re-query stale unknown peer send timed out", peer = peerId
except CatchableError as err:
trace "Failed to re-query stale unknown peer",
peer = peerId, error = err.msg
await sleepAsync(50.milliseconds)
```
**Listing 45.**
I don't understand, however, why we are singling those out here - the broadcasts we do in principle should hit these peers as well, and if they fail to respond we should time them out, which we already do during the regular broadcasts. What's the assumption here? What is this trying to cover?
Next, we finally process the batch. Let's start with the actual scheduling code in lines $207$--$219$: we call `selectPeerForBatch` in line $209$.
```nim=207
let
batchBytes = download.ctx.batchBytes
selection = swarm.selectPeerForBatch(
self.peers, start, count, batchBytes, download.inFlightBatches
)
if selection.kind == pskNoPeers:
trace "No peer with range, searching for new peers"
let
hasActivePeers = swarm.activePeerCount() > 0
waitTime = if hasActivePeers: retryInterval else: 100.milliseconds
await self.handleBatchRetry(download, cid, start, count, waitTime)
continue
```
**Listing 46.**
Turns out this is quite the complicated piece of code, so let's unpack it. Lines $10$--$15$ start by calculating a timeout penalty for peers. Let's denote this by $w_t \times c_t$, where $c_t$ is the number of timeout events for the peer, and $w_t$ the weight for timeouts. In the current implementation, this is set to `TimeoutPenaltyWeight* = 3.0`.
```nim=
proc selectPeerForBatch*(
swarm: Swarm,
peers: PeerContextStore,
start: uint64,
count: uint64,
batchBytes: uint64,
inFlightBatches: var Table[PeerId, seq[Future[void]]],
): PeerSelection =
var penalties: Table[PeerId, float]
for peerId, swarmPeer in swarm.peers:
if swarmPeer.timeoutCount > 0:
penalties[peerId] = swarmPeer.timeoutCount.float * TimeoutPenaltyWeight
let candidates = swarm.peersWithRange(start, count)
```
Candidates are then selected by checking their overlap with the batch we're trying to fetch, based on block presence information we've collected earlier. `peersWithRange` will filter out any peers that do not contain the full range of blocks for the batch.
If there are no candidates with the full range of blocks, we go for peers with any partially overlapping ranges (lines $15$--$20$):
```nim=15
if candidates.len == 0:
let partialCandidates = swarm.peersWithAnyInRange(start, count)
trace "No full range peers, checking partial",
start = start, count = count, partialPeers = partialCandidates.len
if partialCandidates.len == 0:
return PeerSelection(kind: pskNoPeers)
var peerCtxs: seq[PeerContext]
for peerId in partialCandidates:
let peer = peers.get(peerId)
if peer.isNil:
# peer disconnected, remove from swarm immediately
discard swarm.removePeer(peerId)
continue
let currentInFlight = inFlightBatches.getOrDefault(peerId, @[]).len
if currentInFlight < peer.optimalPipelineDepth(batchBytes):
peerCtxs.add(peer)
if peerCtxs.len == 0:
return PeerSelection(kind: pskAtCapacity)
let selected = selectByBDP(peerCtxs, batchBytes, inFlightBatches, penalties)
if selected.isSome:
return PeerSelection(kind: pskFound, peer: selected.get())
return PeerSelection(kind: pskNoPeers)
```
A few things stand out. The first one is lines $25$--$28$, where we seem to be embedding garbage collection of the peer array in the swarm within peer selection. This strikes me as an odd choice, particularly since libp2p provides disconnection events, and those could be used to clear up the swarm in a more timely manner.
The second thing is line $30$, where we'll only consider peers for which the number of inflight batches is smaller than the optimal pipeline depth. The calculation for optimal pipeline depth however merits its own analysis.
```nim=
proc optimalPipelineDepth*(self: PeerPerfStats, batchBytes: uint64): int =
if batchBytes == 0:
return DefaultPipelineDepth
let rttMicrosOpt = self.avgRttMicros()
if rttMicrosOpt.isNone:
return DefaultRequestsPerPeer
let throughputOpt = self.throughputBps()
if throughputOpt.isNone:
return DefaultRequestsPerPeer
let
rttMicros = rttMicrosOpt.get()
throughput = throughputOpt.get()
rttSecs = rttMicros.float64 / 1_000_000.0
bdpBytes = throughput.float64 * rttSecs
optimalRequests = ceil(bdpBytes / batchBytes.float64).int
return clamp(optimalRequests, MinRequestsPerPeer, MaxRequestsPerPeer)
```
We have to go through each of these in detail.
**avgRttMicros.** From the name I'd guess this is the average round trip delay in microsseconds. How this is calculated however is key. The average is simple enough:
```nim=
proc avgRttMicros*(self: PeerPerfStats): Option[uint64] =
if self.rttSamples.len == 0:
return none(uint64)
var total: uint64 = 0
for sample in self.rttSamples:
total += sample
some(total div self.rttSamples.len.uint64)
```
Measurements are inserted in `recordRequest`, which is called from `recordBatchSuccess`:
```nim=
proc recordRequest*(self: var PeerPerfStats, rttMicros: uint64, bytes: uint64) =
if self.rttSamples.len >= RttSampleCount:
discard self.rttSamples.popFirst()
self.rttSamples.addLast(rttMicros)
let now = Moment.now()
if self.firstByteTime.isNone:
self.firstByteTime = some(now)
self.lastByteTime = some(now)
self.totalBytes += bytes
proc recordBatchSuccess*(
swarm: Swarm, peer: PeerContext, rttMicros: uint64, totalBytes: uint64
) =
swarm.peers.withValue(peer.id, swarmPeer):
swarmPeer[].resetFailures()
swarmPeer[].touch()
peer.stats.recordRequest(rttMicros, totalBytes)
```
`recordBatchSuccess` is instead called by `sendWantBlocksRequest`. The relevant snippet is:
```nim=
let
requestStartTime = Moment.now()
requestResult =
await self.requestWantBlocks(peer.id, BlockRange(cid: cid, ranges: ranges))
rttMicros = (Moment.now() - requestStartTime).microseconds.uint64
```
We therefore seem to be using `requestWantBlocks` as a measurement of round trip time. `requestWantBlocks` seems dominated by `NetworkPeer#sendWantBlocksRequest`:
```nim=
proc sendWantBlocksRequest*(
self: NetworkPeer, blockRange: BlockRange
): Future[WantBlocksResult[WantBlocksResponse]] {.async: (raises: [CancelledError]).} =
let requestId = self.nextRequestId
self.nextRequestId += 1
let responseFuture = WantBlocksResponseFuture.init("wantBlocksRequest")
self.pendingWantBlocksRequests[requestId] = responseFuture
try:
let conn = await self.connect()
if isNil(conn):
self.pendingWantBlocksRequests.del(requestId)
return err(wantBlocksError(NoConnection, "No connection available"))
let req = WantBlocksRequest(
requestId: requestId, cid: blockRange.cid, ranges: blockRange.ranges
)
await writeWantBlocksRequest(conn, req)
return await responseFuture
except CancelledError as exc:
self.pendingWantBlocksRequests.del(requestId)
raise exc
except CatchableError as err:
self.pendingWantBlocksRequests.del(requestId)
return err(wantBlocksError(RequestFailed, "WantBlocks request failed: " & err.msg))
```
This is not RTT, but we can see what it is: it's the time it takes to run a request for a range of blocks - reply included. Looking at when `responseFuture` gets completed, we see it's inside of the read loop, right after reading the blocks to memory:
```nim=
proc readLoop*(self: NetworkPeer, conn: Connection) {.async: (raises: []).} =
# ...
case msgType
of mtWantBlocksResponse:
let respResult = await readWantBlocksResponse(conn, dataLen)
if respResult.isErr:
warn "Failed to read WantBlocks response",
peer = self.id, error = respResult.error.msg
return
let response = respResult.get
self.pendingWantBlocksRequests.withValue(response.requestId, fut):
if not fut[].finished:
fut[].complete(WantBlocksResult[WantBlocksResponse].ok(response))
self.pendingWantBlocksRequests.del(response.requestId)
do:
warn "Received WantBlocks response for unknown request ID",
peer = self.id, requestId = response.requestId
```
This means our measurement is pretty much uncontaminated, which is great.
**throughputBps.** Next, we look at `throughputBps`:
```nim=
proc throughputBps*(self: PeerPerfStats): Option[uint64] =
if self.firstByteTime.isNone or self.lastByteTime.isNone:
return none(uint64)
let
first = self.firstByteTime.get()
last = self.lastByteTime.get()
duration = last - first
if duration < MinThroughputDuration:
return none(uint64)
let secs = duration.nanoseconds.float64 / 1_000_000_000.0
some((self.totalBytes.float64 / secs).uint64)
```
This is just a duration, so the name `bps` already looks weird as this does not seems like a ratio of bits/bytes and seconds. Browsing the code, we see that `firstByteTime` `lastByteTime` get set in `recordRequest`:
```nim=
proc recordRequest*(self: var PeerPerfStats, rttMicros: uint64, bytes: uint64) =
if self.rttSamples.len >= RttSampleCount:
discard self.rttSamples.popFirst()
self.rttSamples.addLast(rttMicros)
let now = Moment.now()
if self.firstByteTime.isNone:
self.firstByteTime = some(now)
self.lastByteTime = some(now)
self.totalBytes += bytes
```
As `recordRequest` gets repeatedly called, it will record:
1. the total number of bytes received from a peer;
2. the interval of time over which such bytes were received.
`recordRequest` is called as part of `recordBatchSucess`:
```nim=
proc recordBatchSuccess*(
swarm: Swarm, peer: PeerContext, rttMicros: uint64, totalBytes: uint64
) =
swarm.peers.withValue(peer.id, swarmPeer):
swarmPeer[].resetFailures()
swarmPeer[].touch()
peer.stats.recordRequest(rttMicros, totalBytes)
```
This means "throughput" doesn't depend on the peer alone - if we take a long time to issue the next request, throughput will drop. We can now formalize the optimal pipeline depth.
Let $p$ be a peer we are connected to. Assume we are at some time instant $t$. Let $t_i < t$ be the time at which we got the first byte from $p$ over _any_ request, and let $t_f \leq t$ be the time at which we got the latest byte from $p$. Let $b$ be the total amount of bytes received from $p$ over $[t_i, t_f]$.
We're defining the transfer rate of $p$ as:
$$
r_p = \frac{b}{t_f - t_i}
$$
We then measure delay as the average time it takes to complete a batch. We take $k$ such measurements and compute an average, which I'll denote as $\bar{d}$. Finally, let $s$ be the batch size.
The optimal pipeline depth is then defined as:
$$
o = \frac{r_p \times \bar{d}}{s} = \frac{\frac{b}{t_f - t_i}\times{\bar{d}}}{s} = \frac{b \times \bar{d}}{(t_f - t_i) \times s}
$$
This is then _clamped_ between a minimum value $o_{\text{min}}$, and a maximum value $o_{\text{max}}$, so that the actual depth $o_c$ is given by $\min(o_{\text{max}}, \max(o, o_{\text{min}}))$.
I am not convinced this is the right way to measure this, fundamentally because $\bar{d}$ is not a delay measurement and already factors in bandwidth indirectly; i.e., it depends on the batch size - if your batch is large and your connection is slow, then your "RTT" measurement will be bigger.
I am also bothered by the fact that we already use the "RTT" measurement in the transfer rate calculation, so it seems like we're dividing by a quantity then multplying by it again later.
What I think would be more correct would be to measure batch completion time stability as a function of the number pipelined batches size - i.e., we fire $k$ pipelined batches and measure the average completion time. Then we fire $2k$, $4k$, and so on. As soon as $d$ starts oscillating, we know we've saturated the capacity for the peer.
NOTE: Chrys made changes to the calculation, and I have [summarised those here](https://hackmd.io/2brFqqXzS9GQ4Ei5xe0gLw#Estimation-of-the-Number-of-in-flight-Batches). I am now reasonably convinced that we are doing something that makes sense, even though I haven't proved all parts of it formally.
Getting back to peer selection:
```nim=15
if candidates.len == 0:
let partialCandidates = swarm.peersWithAnyInRange(start, count)
trace "No full range peers, checking partial",
start = start, count = count, partialPeers = partialCandidates.len
if partialCandidates.len == 0:
return PeerSelection(kind: pskNoPeers)
var peerCtxs: seq[PeerContext]
for peerId in partialCandidates:
let peer = peers.get(peerId)
if peer.isNil:
# peer disconnected, remove from swarm immediately
discard swarm.removePeer(peerId)
continue
let currentInFlight = inFlightBatches.getOrDefault(peerId, @[]).len
if currentInFlight < peer.optimalPipelineDepth(batchBytes):
peerCtxs.add(peer)
if peerCtxs.len == 0:
return PeerSelection(kind: pskAtCapacity)
let selected = selectByBDP(peerCtxs, batchBytes, inFlightBatches, penalties)
if selected.isSome:
return PeerSelection(kind: pskFound, peer: selected.get())
return PeerSelection(kind: pskNoPeers)
```
After singling out peers which can partially complete a batch, we test if the peer still has capacity for an extra batch request (line $31$). If all peers are full, we return `PeerSelection(kind: pskAtCapacity)` to signal that we cannot schedule to any peer. After filtering for capacity and overlap, we finally pass on to `selectByBDP` for the actual selection. The first part filters out (again) peers that are above capacity:
```nim=
proc selectByBDP*(
peers: seq[PeerContext],
batchBytes: uint64,
inFlightBatches: var Table[PeerId, seq[Future[void]]],
penalties: var Table[PeerId, float],
explorationProb: float = ExplorationProbability,
): Option[PeerContext] {.gcsafe, raises: [].} =
if peers.len == 0:
return none(PeerContext)
if peers.len == 1:
return some(peers[0])
var untriedPeers: seq[PeerContext]
for peer in peers:
if peer.stats.throughputBps().isNone:
let
pipelineDepth = peer.optimalPipelineDepth(batchBytes)
currentLoad = inFlightBatches.getOrDefault(peer.id, @[]).len
if currentLoad < pipelineDepth:
untriedPeers.add(peer)
```
Next, we get the least loaded peer. This feels strange though cause we're just counting the requests in flight and not actually looking at pipeline utilization:
```nim=21
if untriedPeers.len > 0:
var
bestPeer = untriedPeers[0]
bestLoad = inFlightBatches.getOrDefault(bestPeer.id, @[]).len
for i in 1 ..< untriedPeers.len:
let load = inFlightBatches.getOrDefault(untriedPeers[i].id, @[]).len
if load < bestLoad:
bestLoad = load
bestPeer = untriedPeers[i]
return some(bestPeer)
```
Then, with probability $1/5$, we pick a random peer (exploration). This allows us to build knowledge about peers which would otherwise never be selected, allowing them an opportunity to score better.
```nim=31
let exploreRoll = rand(1.0)
if exploreRoll < explorationProb:
var peersWithCapacity: seq[PeerContext]
for peer in peers:
let
pipelineDepth = peer.optimalPipelineDepth(batchBytes)
currentLoad = inFlightBatches.getOrDefault(peer.id, @[]).len
if currentLoad < pipelineDepth:
peersWithCapacity.add(peer)
if peersWithCapacity.len > 0:
let idx = rand(peersWithCapacity.len - 1)
return some(peersWithCapacity[idx])
```
Finally, we rank peers by "BDP score" and return the best one, or break ties randomly:
```nim=44
var
bestPeers: seq[PeerContext] = @[peers[0]]
bestScore = peers[0].evalBDPScore(
batchBytes,
inFlightBatches.getOrDefault(peers[0].id, @[]).len,
penalties.getOrDefault(peers[0].id, 0.0),
)
for i in 1 ..< peers.len:
let score = peers[i].evalBDPScore(
batchBytes,
inFlightBatches.getOrDefault(peers[i].id, @[]).len,
penalties.getOrDefault(peers[i].id, 0.0),
)
if score < bestScore:
bestScore = score
bestPeers = @[peers[i]]
elif score == bestScore:
bestPeers.add(peers[i])
if bestPeers.len > 1:
let idx = rand(bestPeers.len - 1)
return some(bestPeers[idx])
else:
return some(bestPeers[0])
```
Let's look at what BDP score is:
```nim=
proc evalBDPScore*(
self: PeerContext, batchBytes: uint64, currentLoad: int, penalty: float
): float =
let
pipelineDepth = self.optimalPipelineDepth(batchBytes)
capacityScore =
if currentLoad >= pipelineDepth:
100.0
else:
(currentLoad.float / pipelineDepth.float) * 10.0
throughputScore =
if self.stats.throughputBps().isSome:
let bps = self.stats.throughputBps().get().float
if bps > 0:
ThroughputScoreBaseline / bps
else:
50.0
else:
25.0 # normalization fallback
rttScore =
if self.stats.avgRttMicros().isSome:
self.stats.avgRttMicros().get().float / 10000.0
else:
5.0 # normalization fallback
return capacityScore + throughputScore + rttScore + penalty
```
`capacityScore` is the fraction of the pipeline that's occupied, so a value between $0$ and $1$, with idle pipelines getting a lower score. Overflowing pipelines get $100$.
I've added some thoughts to the github review that these normalization factors look somewhat like magic numbers and that we're probably better off using something like z-scores and then reweighting if we want to keep these things dimensionless.
Regardless, BDP score is a sum across pipeline occupation rate (load), throughput (observed transfer rates) and rttScore. It is clear now that the exploration part of the selection is needed so we can build those metrics towards new peers. This in some sense plays the part of optimistic unchokes in bittorrent.
```nim=40
var peerCtxs: seq[PeerContext]
for peerId in candidates:
let peer = peers.get(peerId)
if peer.isNil:
# peer disconnected - remove from swarm immediately
discard swarm.removePeer(peerId)
continue
let currentInFlight = inFlightBatches.getOrDefault(peerId, @[]).len
if currentInFlight < peer.optimalPipelineDepth(batchBytes):
peerCtxs.add(peer)
if peerCtxs.len == 0:
return PeerSelection(kind: pskAtCapacity)
let selected = selectByBDP(peerCtxs, batchBytes, inFlightBatches, penalties)
if selected.isSome:
return PeerSelection(kind: pskFound, peer: selected.get())
return PeerSelection(kind: pskNoPeers)
```
We can finally end our digression on `selectPeerForBatch`, and get back to download worker. Once a peer for the current batch been selected, we either lookup for new peers if none is found, or we requeue if all peers are at capacity:
```nim=220
if selection.kind == pskNoPeers:
trace "No peer with range, searching for new peers"
let
hasActivePeers = swarm.activePeerCount() > 0
waitTime = if hasActivePeers: retryInterval else: 100.milliseconds
await download.handleBatchRetry(start, count, waitTime)
continue
if selection.kind == pskAtCapacity:
download.requeueBatch(start, count, front = false)
await sleepAsync(10.milliseconds)
continue
```
Unlike what the log says, we are not doing any peer lookup here:
```nim=
proc handleBatchRetry*(
download: ActiveDownload, start: uint64, count: uint64, waitTime: Duration
) {.async: (raises: [CancelledError]).} =
let
addresses = download.getBlockAddressesForRange(start, count)
exhausted = download.decrementBlockRetries(addresses)
if exhausted.len > 0:
warn "Block retries exhausted",
treeCid = download.treeCid, exhaustedCount = exhausted.len
download.failExhaustedBlocks(exhausted)
download.requeueBatch(start, count, front = false)
await sleepAsync(waitTime)
```
So what appears to happen here is that we have two different retry paths depending on whether we got `pskNoPeers` or `pskAtCapacity`. For some reason, we only decrement the retry count for the former. I'm not sure why but, then again, I never liked those retry counters anyway.
In case we do have a viable peer, we proceed to `sendWantBlockRequest`:
```nim=232
let peer = selection.peer
download.markBatchInFlight(start, count, localBlockCount, peer.id)
let batchFuture =
self.sendWantBlocksRequest(download, start, count, missingIndices, peer)
self.downloadManager.peerTracker.track(peer.id, batchFuture)
download.setBatchRequestFuture(start, batchFuture)
```
This is also a quite large piece of code which we'll need to digress into.
```nim=
proc sendWantBlocksRequest(
self: BlockExcEngine,
download: ActiveDownload,
start: uint64,
count: uint64,
missingIndices: seq[uint64],
peer: PeerContext,
): Future[void] {.async: (raises: [CancelledError]).} =
if download.cancelled:
return
let treeCid = download.treeCid
var ranges: seq[tuple[start: uint64, count: uint64]] = @[]
if missingIndices.len > 0:
var
rangeStart = missingIndices[0]
rangeCount: uint64 = 1
for i in 1 ..< missingIndices.len:
if missingIndices[i] == rangeStart + rangeCount:
rangeCount += 1
else:
ranges.add((rangeStart, rangeCount))
rangeStart = missingIndices[i]
rangeCount = 1
ranges.add((rangeStart, rangeCount))
trace "Requesting missing blocks",
treeCid = treeCid,
originalRange = $(start, count),
missing = missingIndices.len,
ranges = ranges.len,
peer = peer.id
let
requestStartTime = Moment.now()
requestResult = await self.requestWantBlocks(
peer.id, BlockRange(treeCid: treeCid, ranges: ranges)
)
rttMicros = (Moment.now() - requestStartTime).microseconds.uint64
if download.cancelled:
return
```
The first part is straightforward and builds a set of ranges based on the contents of `missingIndices`. `missingIndices` contains the blocks we actually want (would probably be better off as `wantedBlockIndices` or something). If it contains all the indices, we will end up with a single range. Otherwise we will end up with a disjoint set of intervals. The code assumes `missingIndices` is sorted but this is not described/enforced anywhere.
Next, we have the actual call to `requestWantBlocks`. We have discussed this at length already and will not do it again here.
We then spot check the state of download in line $44$, which is awkward as the state might change after our check at the first call to await -- and we have several, as we will see -- but we do not seem to care about that.
```nim=45
# request might have timed-out and have been requeued to another peer
# if yes, then discard response, it's already handled.
download.pendingBatches.withValue(start, pending):
if pending[].peerId != peer.id:
# discard it, was reassigned
return
do:
# either completed or requeued
return
if requestResult.isErr:
warn "Batch request failed", peer = peer.id, error = requestResult.error.msg
let swarm = download.ctx.swarm
if swarm.recordPeerFailure(peer.id):
warn "Peer exceeded max failures, removing from swarm", peer = peer.id
if swarm.removePeer(peer.id).isNone:
trace "Peer was not in swarm", peer = peer.id
download.handlePeerFailure(peer.id)
if swarm.needsPeers():
self.searchForNewPeers(download.manifestCid)
else:
# we can requeue immediately (cancels timeout), no benefit waiting for timeout.
download.requeueBatch(start, count, front = true)
return
```
Next, we check if the request has timed out before running the retry handlers ourselves in case of failure. This to some extent assumes knowledge of what `batchTimeoutHandler` does in here, which makes me uncomfortable.
We then handle batch request failures by dropping the peer and performing swarm maintenance - feels like the wrong place to do this, I'd be more confortable if we had swarm maintenance logic within the swarm itself, and fed the network reconfiguration events into it instead of having logic spread and slightly duplicated (and missing, e.g. in peerDepartHandler).
The next part processes the response for a successful batch request. Lines $71$--$74$ do special treatment for the case in which the response is empty, requeueing the batch. If there is even a single valid block, we process it in lines $88$--$130$. Receiving unrequested blocks is treated like a minor fault and those are simply skipped, but I am not sure why.
```nim=71
if allBlockViews.len == 0:
trace "Peer responded with zero blocks", peer = peer.id, treeCid = treeCid
download.requeueBatch(start, count, front = false)
return
trace "Received batch response",
treeCid = treeCid,
originalRange = $(start, count),
received = allBlockViews.len,
requested = missingIndices.len,
peer = peer.id
var
totalBytes: uint64 = 0
validCount: int = 0
receivedIndices: HashSet[uint64]
for view in allBlockViews:
if not bexutils.isIndexInRanges(
view.address.index.uint64, ranges, sortedRanges = true
):
warn "Received unrequested block", index = view.address.index, ranges = ranges.len
continue
if view.address.index.uint64 >= download.ctx.totalBlocks:
warn "Received block with out-of-bounds index - banning peer",
index = view.address.index,
totalBlocks = download.ctx.totalBlocks,
peer = peer.id
await self.banAndDropPeer(download, peer.id)
return
if err =? self.validateBlockDeliveryView(view).errorOption:
error "Block validation failed - corrupted data from peer",
address = view.address, msg = err.msg, peer = peer.id
warn "Banning peer for sending corrupted block data", peer = peer.id
await self.banAndDropPeer(download, peer.id)
return
let
bd = view.toBlockDelivery()
putResult = await self.localStore.putBlock(bd.blk)
if putResult.isErr:
warn "Failed to store block", address = bd.address, error = putResult.error.msg
continue
let proofResult = await self.localStore.putCidAndProof(
bd.address.treeCid, bd.address.index, bd.blk.cid, bd.proof.get
)
if proofResult.isErr:
warn "Failed to store proof", address = bd.address
discard await self.localStore.delBlock(bd.blk.cid)
continue
totalBytes += bd.blk.data[].len.uint64
validCount += 1
receivedIndices.incl(bd.address.index.uint64)
if bd.address in download.blocks:
discard download.completeWantHandle(bd.address, some(bd.blk))
storage_block_exchange_blocks_received.inc(validCount.int64)
download.ctx.swarm.recordBatchSuccess(peer, rttMicros, totalBytes)
```
Lines $103$--$130$ then validate block data (indexes and Merkle proofs) and ban peers which send invalid data. Finally, data is stored and the corresponding block want handles are completed. Batch success is recorded in line $134$. The naming is a bit misleading as the response might be incomplete -- in fact, as we pointed out earlier, it might contain a single block.
The last part deals with partial batch completions:
```nim=135
if validCount < missingIndices.len:
trace "Peer delivered partial batch, computing missing ranges",
peer = peer.id, requested = missingIndices.len, received = validCount
var stillMissing: seq[uint64]
for idx in missingIndices:
if idx notin receivedIndices:
stillMissing.add(idx)
if stillMissing.len > 0:
var penaltyAddresses: seq[BlockAddress]
let peerAvail = download.ctx.swarm.getPeer(peer.id)
for idx in stillMissing:
if peerAvail.isSome and peerAvail.get().availability.hasRange(idx, 1):
penaltyAddresses.add(download.makeBlockAddress(idx))
let exhausted = download.decrementBlockRetries(penaltyAddresses)
if exhausted.len > 0:
warn "Blocks exhausted retries after partial delivery",
treeCid = treeCid, exhaustedCount = exhausted.len
download.failExhaustedBlocks(exhausted)
let exhaustedIndices = exhausted.mapIt(it.index.uint64).toHashSet
stillMissing = stillMissing.filterIt(it notin exhaustedIndices)
var missingRanges: seq[tuple[start: uint64, count: uint64]] = @[]
if stillMissing.len > 0:
stillMissing.sort()
var
rangeStart = stillMissing[0]
rangeCount: uint64 = 1
for i in 1 ..< stillMissing.len:
if stillMissing[i] == rangeStart + rangeCount:
rangeCount += 1
else:
missingRanges.add((rangeStart, rangeCount))
rangeStart = stillMissing[i]
rangeCount = 1
missingRanges.add((rangeStart, rangeCount))
trace "Partial batch completion - requeuing missing ranges",
treeCid = treeCid,
originalStart = start,
originalCount = count,
received = validCount,
missingRanges = missingRanges.len
download.partialCompleteBatch(
start, count, validCount.uint64, missingRanges, totalBytes
)
else:
download.completeBatch(start, validCount.uint64, totalBytes)
```
Lines $139$--$142$ collect the blocks that are still missing. It then does something quite weird, which is to decrement the retry counter for the block only if the peer reported having the block. Does not make sense to me.
Finally, we collect the missing indices into ranges again and submit those to `partialCompleteBatch`.