Giuliano Mega
    • Create new note
    • Create a note from template
      • Sharing URL Link copied
      • /edit
      • View mode
        • Edit mode
        • View mode
        • Book mode
        • Slide mode
        Edit mode View mode Book mode Slide mode
      • Customize slides
      • Note Permission
      • Read
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Write
        • Only me
        • Signed-in users
        • Everyone
        Only me Signed-in users Everyone
      • Engagement control Commenting, Suggest edit, Emoji Reply
    • Invite by email
      Invitee

      This note has no invitees

    • Publish Note

      Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note No publishing access yet

      Your note will be visible on your profile and discoverable by anyone.
      Your note is now live.
      This note is visible on your profile and discoverable online.
      Everyone on the web can find and read all notes of this public team.

      Your account was recently created. Publishing will be available soon, allowing you to share notes on your public page and in search results.

      Your team account was recently created. Publishing will be available soon, allowing you to share notes on your public page and in search results.

      Explore these features while you wait
      Complete general settings
      Bookmark and like published notes
      Write a few more notes
      Complete general settings
      Write a few more notes
      See published notes
      Unpublish note
      Please check the box to agree to the Community Guidelines.
      View profile
    • Commenting
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
      • Everyone
    • Suggest edit
      Permission
      Disabled Forbidden Owners Signed-in users Everyone
    • Enable
    • Permission
      • Forbidden
      • Owners
      • Signed-in users
    • Emoji Reply
    • Enable
    • Versions and GitHub Sync
    • Note settings
    • Note Insights New
    • Engagement control
    • Make a copy
    • Transfer ownership
    • Delete this note
    • Save as template
    • Insert from template
    • Import from
      • Dropbox
      • Google Drive
      • Gist
      • Clipboard
    • Export to
      • Dropbox
      • Google Drive
      • Gist
    • Download
      • Markdown
      • HTML
      • Raw HTML
Menu Note settings Note Insights Versions and GitHub Sync Sharing URL Create Help
Create Create new note Create a note from template
Menu
Options
Engagement control Make a copy Transfer ownership Delete this note
Import from
Dropbox Google Drive Gist Clipboard
Export to
Dropbox Google Drive Gist
Download
Markdown HTML Raw HTML
Back
Sharing URL Link copied
/edit
View mode
  • Edit mode
  • View mode
  • Book mode
  • Slide mode
Edit mode View mode Book mode Slide mode
Customize slides
Note Permission
Read
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Write
Only me
  • Only me
  • Signed-in users
  • Everyone
Only me Signed-in users Everyone
Engagement control Commenting, Suggest edit, Emoji Reply
  • Invite by email
    Invitee

    This note has no invitees

  • Publish Note

    Share your work with the world Congratulations! 🎉 Your note is out in the world Publish Note No publishing access yet

    Your note will be visible on your profile and discoverable by anyone.
    Your note is now live.
    This note is visible on your profile and discoverable online.
    Everyone on the web can find and read all notes of this public team.

    Your account was recently created. Publishing will be available soon, allowing you to share notes on your public page and in search results.

    Your team account was recently created. Publishing will be available soon, allowing you to share notes on your public page and in search results.

    Explore these features while you wait
    Complete general settings
    Bookmark and like published notes
    Write a few more notes
    Complete general settings
    Write a few more notes
    See published notes
    Unpublish note
    Please check the box to agree to the Community Guidelines.
    View profile
    Engagement control
    Commenting
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    • Everyone
    Suggest edit
    Permission
    Disabled Forbidden Owners Signed-in users Everyone
    Enable
    Permission
    • Forbidden
    • Owners
    • Signed-in users
    Emoji Reply
    Enable
    Import from Dropbox Google Drive Gist Clipboard
       Owned this note    Owned this note      
    Published Linked with GitHub
    • Any changes
      Be notified of any changes
    • Mention me
      Be notified of mention me
    • Unsubscribe
    # New Block Protocol - Summary **Why did we need a new protocol?** Our previous block protocol was based off of IPFS and inherited many of its design complexities, despite the fact that Logos storage aimed at a much simpler data model. Data blocks in IPFS can be arbitrarily linked as DAGs, and a block might be shared by multiple DAGs. There is no _explicit_ notion of a dataset - any DAG or subdag is potentially a dataset, and IPFS cannot make any a priori assumptions on which of those need to be findable. The implication is that, by default, IPFS must index _every_ 64KB block in the DHT. Logos Storage initially followed a similar design, and files were represented simply as sets of blocks. Each block was indexed separately in the DHT, and a special `Manifest` block bound those together by means of a `blocks: [CID]` list. The block list also provided the total order over the blocks which is required to corectly represent a file. Apart from manifest sizes growing with the number of blocks in a file, this design has a more nefarious consequence: since IPFS (and hence Logos Storage) _also_ used a DHT with provider indirection, indexing traffic also grew proportionally to the size of each file. We have known this since at least 2023. The lack of explicit datasets also meant that creating explicit swarms was impossible, and locality was implied. IPFS provides the notion [sessions](https://github.com/ipfs/boxo/blob/2f225c407b2a2701598ceb2cffef5e2953fadab9/bitswap/client/internal/session/session.go#L292) which allows some form of swarm building, but it still has severe shortcomings. This design also made simple things like tracking dataset completion difficult - you cannot, in principle, know the full size of a DAG in IPFS, and Codex inherited the same limitation by design. Internally, there was no representation of a dataset, just blocks. This also made decisions like when to disconnect from a peer difficult, as in principle any peer could be holding any block. Finally, without first-class datasets things like efficient block scheduling become extremely difficult, as do things like efficient block knowledge updates. All this changes with the new protocol. ## Code Review ### Download Manager (downloadmanager.nim) The new protocol introduces a download manager. This tracks active downloads. ```nim= const DefaultBlockRetries* = 200 DefaultRetryInterval* = 2.seconds type DownloadManager* = ref object of RootObj nextDownloadId*: uint64 = 1 # 0 is invalid blockRetries*: int retryInterval*: Duration downloads*: Table[Cid, Table[uint64, ActiveDownload]] ``` **Listing 1.** I think the first surprising thing to me is that `downloads` is `Table[Cid, Table[uint64, ActiveDownload]]`, and not `Table[Cid, ActiveDownload]`. Looks like we can have multiple downloads per `Cid`, which in principle does not seem to make that much sense to me. We then have two flavours of downloads: regular, and "background" downloads: ```nim= proc getDownload*(self: DownloadManager, treeCid: Cid): Option[ActiveDownload] = self.downloads.withValue(treeCid, innerTable): for _, download in innerTable[]: return some(download) return none(ActiveDownload) proc getBackgroundDownload*( self: DownloadManager, treeCid: Cid ): Option[ActiveDownload] = self.downloads.withValue(treeCid, innerTable): for _, download in innerTable[]: if download.isBackground: return some(download) return none(ActiveDownload) ``` **Listing 2.** Again, it is not immediately clear what the difference is. `cancelDownload` looks straightforward: lines $4$ -- $8$ cancels any pending block request batches by cancelling their timeouts and completion futures, where the latter will cascade cancellations to anyone waiting on a batch. Lines $10$--$15$ then cancel individual block handles, which I suppose are still required by the block API that we chose to maintain. Finally, lines $17$--$18$ cancel completion futures for the download itself, and lines $20$--$23$ remove it from the internal state table. **ENSURE:** I don't see anything in the way of disconnecting peers belonging to the download's swarm, but I suppose this must happen indirectly somewhere else. ```nim= proc cancelDownload*(self: DownloadManager, download: ActiveDownload) = download.cancelled = true for _, batch in download.pendingBatches: if not batch.timeoutFuture.isNil and not batch.timeoutFuture.finished: batch.timeoutFuture.cancelSoon() if not batch.requestFuture.isNil and not batch.requestFuture.finished: batch.requestFuture.cancelSoon() for address, req in download.blocks: if not req.handle.finished: req.handle.fail(newException(CancelledError, "Download cancelled")) if not req.opaqueHandle.finished: req.opaqueHandle.fail(newException(CancelledError, "Download cancelled")) download.blocks.clear() if not download.completionFuture.finished: download.completionFuture.fail(newException(CancelledError, "Download cancelled")) self.downloads.withValue(download.treeCid, innerTable): innerTable[].del(download.id) if innerTable[].len == 0: self.downloads.del(download.treeCid) ``` **Listing 3.** As before, there's a flavour of `cancelDownload` specific for "background" downloads, but it does not do much other than check if the download is indeed a background download, and turn into a no-op if it's not: ```nim= proc cancelBackgroundDownload*( self: DownloadManager, downloadId: uint64, treeCid: Cid ): bool = let download = self.getDownload(downloadId, treeCid) if download.isSome and download.get().isBackground: self.cancelDownload(download.get()) return true return false ``` **Listing 4.** `getDownloadProgress` is not that interesting, except that we apparently indeed track multiple downloads per file, and can even tell the completion state for each (which is weird - intuitively they should all have the same completion state!) COMMENT: this stuff is quite weird. I think it does not make a lot of sense to have multiple downloads for a single dataset, let alone track different completion states - the completion for all downloads should be the same as it refers to the same underlying dataset. Finally, the most interesting proc here and the thread that we pull is `startDownload`: ```nim= proc startDownload*( self: DownloadManager, desc: DownloadDesc, missingBlocks: seq[uint64] = @[] ): ActiveDownload = let ctx = DownloadContext.new(desc, missingBlocks) downloadId = self.nextDownloadId self.nextDownloadId += 1 let download = ActiveDownload( id: downloadId, treeCid: desc.treeCid, ctx: ctx, blocks: initTable[BlockAddress, BlockReq](), pendingBatches: initTable[uint64, PendingBatch](), inFlightBatches: initTable[PeerId, seq[Future[void]]](), exhaustedBlocks: initHashSet[BlockAddress](), blockRetries: self.blockRetries, retryInterval: self.retryInterval, isBackground: desc.isBackground, fetchLocal: desc.fetchLocal, completionFuture: Future[?!void].Raising([CancelledError]).init("ActiveDownload.completion"), ) self.downloads.mgetOrPut(desc.treeCid, initTable[uint64, ActiveDownload]())[ downloadId ] = download trace "Started download", treeCid = desc.treeCid, startIndex = desc.startIndex, count = desc.count, batchSize = ctx.scheduler.batchSizeCount return download ``` **Listing 5.** This creates two critical objects: `DownloadContext` and `ActiveDownload`. Other than that, however, it does not seem to actually start anything? COMMENT: Is this actually starting anything? If not, then it should probably not be called "start" download, but maybe create download? ## How are Downloads Actually Started Turn out downloads are not started in `ActiveDownload.startDownload`. Instead, they are started in `BlockExcEngine.startDownload`. Let's track this all the way up the call chain and try to reconstruct it. The entrypoint for a download remains `StorageNodeRef.retrieve`: ```nim= self: StorageNodeRef, cid: Cid, local: bool = true ): Future[?!LPStream] {.async: (raises: [CancelledError]).} = ## Retrieve by Cid a single block or an entire dataset described by manifest ## if local and not await (cid in self.networkStore): return failure((ref BlockNotFoundError)(msg: "Block not found in local store")) without manifest =? (await self.fetchManifest(cid)), err: if err of AsyncTimeoutError: return failure(err) return await self.streamSingleBlock(cid) await self.streamEntireDataset( ManifestDescriptor(manifest: manifest, manifestCid: cid) ) ``` **Listing 6.** It remains largely unchanged - we try to retrieve the block and, if it decodes successfully as a manifest, we dispatch to `streamEntireDataset`; otherwise, we dispatch to `streamSingleBlock`. NOTES: I see we kept `streamSingleBlock`, but as a local-only version of it. I feel like stripping this out completely, as I feel this is now a confusing API that is not that useful. As for the manifests, although we still store manifests as blocks, I'd probably try modify the API so that clients can no longer make that assumption. `streamEntireDataset` is the interesting bit, so we zoom into that: ```nim= proc streamEntireDataset( self: StorageNodeRef, desc: ManifestDescriptor, fetchLocal: bool = false ): Future[?!LPStream] {.async: (raises: [CancelledError]).} = ## Streams the contents of the entire dataset described by the manifest. ## trace "Retrieving blocks from manifest", manifestCid = desc.manifestCid var jobs: seq[Future[void]] let stream = LPStream(StoreStream.new(self.networkStore, desc.manifest, pad = false)) proc fetchTask(): Future[void] {.async: (raises: []).} = try: if err =? (await self.fetchDatasetAsync(desc, fetchLocal = fetchLocal)).errorOption: error "Dataset fetch failed during streaming", manifestCid = desc.manifestCid, err = err.msg await stream.close() except CancelledError: trace "Dataset fetch cancelled during streaming", manifestCid = desc.manifestCid jobs.add(fetchTask()) # Monitor stream completion and cancel background jobs when done proc monitorStream() {.async: (raises: []).} = try: await stream.join() except CancelledError as exc: warn "Stream cancelled", exc = exc.msg finally: await noCancel allFutures(jobs.mapIt(it.cancelAndWait)) self.trackedFutures.track(monitorStream()) # Retrieve all blocks of the dataset sequentially from the local store or network trace "Creating store stream for manifest", manifestCid = desc.manifestCid stream.success ``` **Listing 7.** This also still works largely as before: we create a `StoreStream` which will attempt to sequentially call `NetworkStore.getBlock` under the hood, while launching a "prefetching" task (the actual download) over `fetchDatasetAsync`. NOTE: I find that calling this "prefetching" is now confusing. This not a mere prefetch - it starts the one and only download for the whole dataset. Without "prefetch" the dataset won't end up in the machine at all. Yet, one key aspect of the new protocol is that scheduling is centralized in the downloads, so how does this work? In the old version of `getBlock`, failure to find a block locally would result in a call to `requestBlock`, which would begin the process of requesting that block (and possibly ONLY that block) to other peers immediately (i.e., `getBlock` was a de facto scheduling action): ```nim= method getBlock*( self: NetworkStore, address: BlockAddress ): Future[?!Block] {.async: (raises: [CancelledError]).} = without blk =? (await self.localStore.getBlock(address)), err: if not (err of BlockNotFoundError): error "Error getting block from local store", address, err = err.msg return failure err without newBlock =? (await self.engine.requestBlock(address)), err: error "Unable to get block from exchange engine", address, err = err.msg return failure err return success newBlock return success blk ``` **Listing 8.** in the new version, we instead obtain a completion handle from the download manager itself (line $6$): ```nim= method getBlock*( self: NetworkStore, address: BlockAddress ): Future[?!Block]: let downloadOpt = self.engine.downloadManager.getDownload(address.treeCid) if downloadOpt.isSome: let handle = downloadOpt.get().getWantHandle(address) without blk =? (await self.localStore.getBlock(address)), err: if not (err of BlockNotFoundError): handle.cancel() return failure err return await handle discard downloadOpt.get().completeWantHandle(address, some(blk)) return success blk return await self.localStore.getBlock(address) ``` **Listing 9.** My expectation is that `getWantHandle` obtains a handle but does not influence scheduling in any way - the file will be downloaded in its entirety (all blocks) to the local machine following the heuristics stipulated by the download manger, regardless of which blocks get requested via `getBlock`. This is otherwise a strange API, however: if there is no active download, we will revert this into a `getBlock` call onto the local store, which will simply fail if the block is not available locally. It would make more sense to start a download in that case - or to get rid of `getBlock` entirely. Another point that is not so clear to me is why we need line $12$ and all the complexity it brings: if the block is in the local store, can't we just return it? Why are we getting and then completing a handle as part of `getBlock`? This would look more natural to me: ```nim= method getBlock*( self: NetworkStore, address: BlockAddress ): Future[?!Block] {.async: (raises: [CancelledError]).} = # Local lookup. without blk =? (await self.localStore.getBlock(address)), err: if not (err of BlockNotFoundError): return failure err return blk # block found, just return # If block not available locally, return completion handle. The # download process will eventually get to it. let downloadOpt = self.engine.downloadManager.getDownload(address.treeCid) if downloadOpt.isSome: return await downloadOpt.get().getWantHandle(address) # If we got thus far, it means the block is not available # locally and there's no active download. failure (ref BlockNotFoundError)("Not found"); # or something like this ``` **Listing 10.** Also, another question comes to mind: this is the `BlockAddress` variant of `getBlock`. What if someone calls `getBlock(cid: Cid)`, how do we figure out which download/dataset to map it to? Well, turns out that `networkstore` no longer supports `getBlock(cid: Cid)`, or at least not the way it used to. Indeed, the old version of `getBlock` used to be: ```nim= method getBlock*( self: NetworkStore, cid: Cid ): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = ## Get a block from the blockstore ## Get a block from the local blockstore only. ## self.getBlock(BlockAddress.init(cid)) ``` **Listing 11.** now, it basically delegates to the local store: ```nim= method getBlock*( self: NetworkStore, cid: Cid ): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = ## Get a block from the local blockstore only. ## self.localStore.getBlock(cid) ``` **Listing 12.** NOTE: This again strikes me as a confusing API - the "stacked store" abstraction no longer works here. I'd rather get rid of getting blocks by CID altogether (another reason to completely remove `getBlock`), since this is not actually supported by all "stores" anymore. I think that networkstore is making very little sense to me at this point. OK so now we roughly know how `StoreStream` interacts with the engine to implement `getBlock`. Let us now look at the interesting part, which is the download process itself. This is disguised under `fetchDatasetAsync`, which, apart from the weird `startTreeDownloadOpaque` name, is refreshingly simple; i.e. it starts a download and waits for it to complete. ```nim= proc fetchDatasetAsync*( self: StorageNodeRef, manifest: Manifest, fetchLocal = true, selectionPolicy: SelectionPolicy = spSequential, ): Future[?!void] {.async: (raises: [CancelledError]).} = let treeCid = manifest.treeCid download = ?self.engine.startTreeDownloadOpaque( treeCid, manifest.blockSize.uint32, manifest.blocksCount.uint64, selectionPolicy = selectionPolicy, fetchLocal = fetchLocal, ) try: trace "Starting tree download", treeCid = treeCid, totalBlocks = manifest.blocksCount return await download.waitForComplete() finally: self.engine.releaseDownload(download) ``` **Listing 13.** NOTE: Chrys reverted back to announcing treeCids as I was reviewing this, so part of the snippets will refer to the old version and part to the new. ## The Download Process We'll now dig into `startTreeDownloadOpaque`, which seems to be the API call that actually starts a download: ```nim proc startTreeDownloadOpaque*( self: BlockExcEngine, treeCid: Cid, blockSize: uint32, totalBlocks: uint64, selectionPolicy: SelectionPolicy = spSequential, isBackground: bool = false, fetchLocal: bool = false, ): ?!DownloadHandleOpaque = startTreeDownloadGeneric[void]( self, treeCid, blockSize, totalBlocks, selectionPolicy = selectionPolicy, isBackground = isBackground, fetchLocal = fetchLocal, ) ``` **Listing 14.** This dispatches to `startTreeDownlaodGeneric[void]`. What's up with that, which returns a `DownloadHandleOpaque`. What's up with that? Looks like we have two flavours of download handles: ```nim= DownloadHandleGeneric*[T] = object cid*: Cid downloadId*: uint64 iter*: SafeAsyncIter[T] completionFuture*: Future[?!void].Raising([CancelledError]) DownloadHandle* = DownloadHandleGeneric[Block] DownloadHandleOpaque* = DownloadHandleGeneric[void] ``` **Listing 15.** Let's have a look at the API for `DownloadHandleGeneric`: ```nim= proc finished*[T](h: DownloadHandleGeneric[T]): bool = h.iter.finished proc next*[T]( h: DownloadHandleGeneric[T] ): Future[?!T] {.async: (raw: true, raises: [CancelledError]).} = h.iter.next() proc waitForComplete*[T]( h: DownloadHandleGeneric[T] ): Future[?!void] {.async: (raises: [CancelledError]).} = return await h.completionFuture proc nextBlock*( h: DownloadHandle ): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = h.iter.next() ``` **Listing 16.** So this seems to allow one to consume the blocks in the download as an iterator - this makes sense for `DownloadHandle`, but not a lot of sense for `DownloadHandleOpaque`. Yet, we get an `AsyncIterator` in `DownloadHandleOpaque` too. NOTE: Scanning through the code, I can't find any usage of `nextBlock`. Even iterating over download handles seems to be done sparingly, and only in test code. I'd extirpate all of this API and the whole complexity of having async iters here. We now look into `startTreeDownloadGeneric`, which actually starts the download: ```nim= proc startTreeDownloadGeneric[T: Block | void]( self: BlockExcEngine, treeCid: Cid, blockSize: uint32, totalBlocks: uint64, selectionPolicy: SelectionPolicy = spSequential, isBackground: bool = false, fetchLocal: bool = false, ): ?!DownloadHandleGeneric[T] = ## - T = Block: Returns actual block data (for streaming) ## - T = void: Returns success/failure only (for prefetching) let desc = toDownloadDesc( treeCid, totalBlocks, blockSize, selectionPolicy = selectionPolicy, isBackground = isBackground, fetchLocal = fetchLocal, ) activeDownload = self.startDownload(desc) # ... inlined implementation of SafeAsyncIter success DownloadHandleGeneric[T]( cid: treeCid, downloadId: activeDownload.id, iter: SafeAsyncIter[T].new(genNext, isFinished), completionFuture: activeDownload.completionFuture, ) ``` **Listing 17.** Most of the code here is implementing the iterator returned as part of `DownloadHandleGeneric` (the code I want to remove :-)). The key bits are `toDownloadDesc`, and, or course, `startDownload`. Let's look at what DownloadDescriptor, the return type of toDownloadDesc, actually is: ```nim= DownloadDesc* = object cid*: Cid blockSize*: uint32 startIndex*: uint64 count*: uint64 selectionPolicy*: SelectionPolicy isBackground*: bool fetchLocal*: bool SelectionPolicy* = enum spSequential spRandomWindow ``` **Listing 18.** This describes the parameters for the download - from the `CID` to the block size, to whether or not this is a "background" download, to the block scheduling policy. Some of these things do not make immediate sense to me (e.g. `fetchLocal`) but probably will as we read more. The two selection policies, sequential and random, make sense - one for streaming, another for faster downloads. Let us get to `startDownload`: ```nim= proc startDownload( self: BlockExcEngine, desc: DownloadDesc ): ActiveDownload {.gcsafe, raises: [].} = result = self.downloadManager.startDownload(desc) self.ensureDownloadWorker(result) ``` **Listing 19.** This just calls `DownloadManager.startDownload`, and then eventually gets to the most interesting part, `downloadWorker`. This is quite the long function, and we will spend some time talking about it. ```nim= proc downloadWorker( self: BlockExcEngine, download: ActiveDownload ) {.async: (raises: []).} = ## Continuously schedules batches to peers until download completes. ## Supports concurrent batch requests per peer based on BDP pipeline depth. ## When block size is unknown (0), BDP optimizations are disabled - first batch ## is used to discover block size, then BDP calculations start. let cid = download.cid retryInterval = self.downloadManager.retryInterval logScope: cid = cid try: let (windowStart, windowCount) = download.ctx.currentPresenceWindow() connectedPeers = self.peers.toSeq() if connectedPeers.len > 0: trace "Initial presence window broadcast", cid = cid, windowStart = windowStart, windowCount = windowCount, totalBlocks = download.ctx.totalBlocks await self.broadcastWantHave( download, cid, windowStart, windowCount, connectedPeers ) trace "Initial broadcast sent, proceeding to batch loop" else: trace "No connected peers for initial broadcast, triggering discovery" self.searchForNewPeers(cid) ``` **Listing 20.** The first point worthy of attention is in line $26$ above, where we do a broadcast of want-haves to all connected peers. There does not seem to be a per-dataset swarm -- we will blindly broadcast those to everybody. This is definitely not what I was expecting. We then have the main loop which runs as long as the download is not cancelled and not complete: ```nim=33 while not download.cancelled and not download.isDownloadComplete(): for peerId in download.inFlightBatches.keys.toSeq: var remaining: seq[Future[void]] = @[] for fut in download.inFlightBatches[peerId]: if not fut.finished: remaining.add(fut) if remaining.len > 0: download.inFlightBatches[peerId] = remaining else: download.inFlightBatches.del(peerId) ``` **Listing 21.** The first loop contained in it ($33$-$42$) goes through each "in flight" batch for each peer. What are those? Looking further down, we see: ```nim let peer = selection.peer batchFuture = self.sendWantBlocksRequest(download, start, count, peer) if peer.id notin download.inFlightBatches: download.inFlightBatches[peer.id] = @[] download.inFlightBatches[peer.id].add(batchFuture) ``` **Listing 22.** So in-flight batches are pending pending want-block requests towards a given peer. The loop is therefore simply pruning the `inFlightBatches` table of peers with no active in-flight batches. We will later delve into `sendWantBlocksRequest` as it has a lot of interesting stuff going on inside. Next, we look at the if statement in $44$--$58$. ```nim=43 let ctx = download.ctx if not download.fetchLocal and ctx.needsNextPresenceWindow(): let (newStart, newCount) = ctx.advancePresenceWindow() ctx.trimPresenceBeforeWatermark() # Broadcast want-have for the new window var swarmPeers: seq[PeerContext] = @[] for peerId in ctx.swarm.connectedPeers(): let peerCtx = self.peers.get(peerId) if not peerCtx.isNil: swarmPeers.add(peerCtx) trace "Advancing presence window", cid = cid, newWindowStart = newStart, newWindowCount = newCount, watermark = ctx.scheduler.completedWatermark() await self.broadcastWantHave(download, cid, newStart, newCount, connectedPeers) ``` **Listing 23.** The first part in the condition (`not download.fetchLocal`) indicates, I believe, that this is not a local fetch. The idea of a local fetch is quite awkward to me though as `StoreStream` can probably stream a local dataset while skipping the download manager entirely, so I still need to understand what this does/why is it needed. NOTE: ask what's the intent behind `fetchLocal`. The other condition, `ctx.needsNextPresenceWindow()`, defines if we want to add more blocks to the set of blocks we are currently considering for download. This is part of the "windowing" approach of the new protocol which attempts to cap resource usage (networking, CPU and memory) by splitting large files into fixed-size windows. We will take a short detour to look at those: ```nim= proc needsNextPresenceWindow*(ctx: DownloadContext): bool = ctx.scheduler.needsNextPresenceWindow() proc needsNextPresenceWindow*(self: Scheduler): bool = case self.windowCursor.policy of spSequential: let (windowStart, windowCount) = self.windowCursor.currentWindow() self.nextBatchStart >= windowStart + windowCount and self.windowCursor.(self.completedWatermark) of spRandomWindow: let (start, count) = self.windowCursor.currentWindow() not self.windowCursor.isDone and self.nextBatchStart >= start + count and self.requeued.len == 0 and self.inFlight.len == 0 ``` **Listing 24.** We see there are two policies: sequential and random. Sequential looks like this: ```nim= let (windowStart, windowCount) = self.windowCursor.currentWindow() self.nextBatchStart >= windowStart + windowCount and self.windowCursor.needsAdvance(self.completedWatermark) ``` **Listing 25.** This looks simple enough: `windowStart` is the block index at which the current window starts, whereas `windowCount` - I assume - is the window's length, which the code for `WindowCursor` appears to confirm: ```nim= proc currentWindow(p: WindowCursor): tuple[start: uint64, count: uint64] = (start: p.windowStart, count: min(p.windowSize, p.totalBlocks - p.windowStart)) ``` **Listing 26.** The `min` above truncates the last window at the last block index, otherwise the size is `(windowStart, windowSize)`. We then have the call to `needsAdvance(completedWatermark)`, which implements a manual dispatch table: ```nim= proc needsAdvance(p: WindowCursor, watermark: uint64): bool = case p.policy of spSequential: p.sequential.needsAdvance( p.windowStart, p.windowSize, p.totalBlocks, watermark) of spRandomWindow: false ``` **Listing 27.** The `SequentialWindowCursor` version of `needsAdvance` starts by checking if there is room for another window in line $4$: the condition in `canAdvance` will only evaluate to false when we are at the last window - the last window itself will be smaller than `windowSize`, but the condition will be false nevertheless. `needsAdvance` then evaluates to true when the watermark is above `windowSize` * `advanceThreshold`, where `0 < advanceThreshold <= 1`. All of those are fixed - the only varying parameter is the watermark. ```nim= proc needsAdvance( p: SequentialWindowCursor, windowStart, windowSize, totalBlocks, watermark: uint64 ): bool = if not p.canAdvance(windowStart, windowSize, totalBlocks): return false let thresholdPos = windowStart + (windowSize.float * p.advanceThreshold).uint64 watermark >= thresholdPos proc canAdvance( p: SequentialWindowCursor, windowStart, windowSize, totalBlocks: uint64 ): bool = windowStart + windowSize < totalBlocks ``` **Listing 28.** The watermark in principle indicates the index of the last complete segment of blocks that we possess; i.e., we are in principle assured to have all blocks in `[0, watermark]`. The watermark is taken from `scheduler.completedWatermark()`. Looking at when the watermark gets advanced, we see: ```nim= proc onBatchCompleted(self: Scheduler, batchStart: uint64) = case self.windowCursor.policy of spSequential: self.advanceWatermark(batchStart) of spRandomWindow: discard proc advanceWatermark(self: Scheduler, batchStart: uint64) = if batchStart == self.completedWatermark: self.completedWatermark = batchStart + self.batchSize while self.completedWatermark in self.completedOutOfOrder: self.completedOutOfOrder.excl(self.completedWatermark) self.completedWatermark += self.batchSize elif batchStart > self.completedWatermark: self.completedOutOfOrder.incl(batchStart) ``` **Listing 29.** This tells us that the watermark is actually not used in "random". It also shows that when a batch is completed out of order, the watermark will only advance after the batches preceding it complete (lines $12$-$13$). We can therefore assert that: 1. the watermark contains the block index for the the last completed batch in case of sequential downloads; 2. it contains no useful information in the case of random downloads. We can now get back to Listing 27 and interpret what `needsNextPresenceWindow` actually does: ```nim self.nextBatchStart >= windowStart + windowCount and self.windowCursor.needsAdvance(self.completedWatermark) ``` **Recap of Listing 27.** it returns `true` if the start of the next batch falls off of the current window and the watermark is beyond a certain threshold in the current window. This ensures that we do not move onto the next presence window before all batches for the current window are in flight. We now get back to the download process (Listing 23). After checking if we need a new presence window, we do: `ctx.advancePresenceWindow()`. Let's look at that one. ```nim= proc advancePresenceWindow*(ctx: DownloadContext): tuple[start: uint64, count: uint64] = ctx.availabilityTracker.addPendingRange(ctx.scheduler.currentPresenceWindow()) discard ctx.scheduler.advancePresenceWindow() ctx.scheduler.currentPresenceWindow() ``` **Listing 30.** The first line already throws me off: why would I add the current window to something called "pending ranges"? Worse, `currentPresenceWindow` does not return a range as we've seen before. Let's look at what `addPendingRange` does. ```nim= proc addPendingRange( t: var BroadcastAvailabilityTracker, range: tuple[start: uint64, count: uint64] ) = case t.policy of spRandomWindow: t.pendingRanges.add(range) of spSequential: discard ``` **Listing 31.** QUESTION: why do we add "expired" windows to pendingRanges? That doesn't seem to make a lot of sense. Let's look at what `pendingRanges` actually is. It use in `shouldBroadcast` gives us some hint: ```nim= proc shouldBroadcast(t: BroadcastAvailabilityTracker, watermark: uint64): bool = case t.policy of spRandomWindow: t.pendingRanges.len > 0 of spSequential: let newBlocks = watermark - t.lastBroadcastedWatermark if newBlocks == 0: return false let timeSinceLast = Moment.now() - t.lastBroadcastTime newBlocks >= PresenceBroadcastBlockThreshold or timeSinceLast >= t.broadcastInterval proc shouldBroadcastAvailability*(ctx: DownloadContext): bool = ctx.availabilityTracker.shouldBroadcast(ctx.scheduler.completedWatermark()) ``` **Listing 32.** When there are pending ranges, `shouldBroadcast` (and by extension `shouldBroadcastAvailability`) evaluates to true. This means `pendingRanges` maintains unbroadcasted block ranges, which makes even less sense: why would you add a window to the set of broadcast ranges only after you're ready to advance it? OK so apparently this broadcasts the blocks we HAVE, but only for `spRandomWindow`. For `spSequential` we follow a different rule in which we broadcast either if the watermark changes, or a timeout elapses. The next line in Listing 23, `ctx.trimPresenceBeforeWatermark()` garbage collects presence information that we no longer need. This cleans up peer context information. Again, this will probably only make sense for `spSequential`: ```nim= proc trimPresenceBeforeWatermark*(ctx: DownloadContext) = let watermark = ctx.scheduler.completedWatermark() for peerId in ctx.swarm.connectedPeers(): let peerOpt = ctx.swarm.getPeer(peerId) if peerOpt.isSome: let peer = peerOpt.get() # only trim range-based availability if peer.availability.kind == bakRanges: var newRanges: seq[tuple[start: uint64, count: uint64]] = @[] for (start, count) in peer.availability.ranges: let rangeEnd = start + count if rangeEnd > watermark: # keep ranges not entirely below watermark newRanges.add((start, count)) peer.availability = BlockAvailability.fromRanges(newRanges) ``` **Listing 33.** If a peer has many disjoint intervals occupying memory, those will be thrown away. For peers with, say, a single contiguous block range over the whole window, this will simply clip the interval and keep memory usage the same. Lines $49$--$62$ in Listing 23 then updates currently connected peers with our wants by broadcasting that we now want the blocks in the new window. Note that this is a single, small message range broadcast that covers the whole window. Next, in lines $59$--$88$ in Listing 23 broadcast any new presence information we find relevant to peers; i.e., we update peers on blocks that we currently HAVE. This is part of the push presence updates, and is similar in spirit to [Bittorent HAVE messages](https://github.com/arvidn/bittorrent-multi-have): ```nim=59 # Broadcast availability to peers if not download.fetchLocal and ctx.shouldBroadcastAvailability(): let broadcastRanges = ctx.getAvailabilityBroadcast() if broadcastRanges.len > 0: trace "Broadcasting availability to swarm", cid = cid, rangeCount = broadcastRanges.len, swarmPeers = ctx.swarm.peerCount() let presence = BlockPresence( address: BlockAddress(treeCid: cid, index: broadcastRanges[0].start.int), kind: BlockPresenceType.HaveRange, ranges: broadcastRanges, ) for peerId in ctx.swarm.connectedPeers(): let peerOpt = ctx.swarm.getPeer(peerId) if peerOpt.isSome and peerOpt.get().availability.kind == bakComplete: continue try: await self.network.request.sendPresence(peerId, @[presence]).wait( DefaultWantHaveSendTimeout ) except AsyncTimeoutError: trace "Availability broadcast send timed out", peer = peerId except CatchableError as err: trace "Failed to broadcast availability", peer = peerId, error = err.msg ctx.markAvailabilityBroadcasted() ``` **Listing 34.** One point to mention is that the granularity of pushes is VERY different across spSequential and spRandom. for `spSequential`, we push at every `PresenceBroadcastIntervalMax = 10.seconds`, or if we manage to fill up more than `50%` of a window before that time. With `spRandom`, we only push once we complete a FULL window. This will lead to very different knowledge propagation characteristics, and will likely impact performance/latency, particularly for fast peers. Next, in lines $89$--$98$, we try to generate a new request batch. Note that if we cannot generate a batch, we will either `continue`, or `break` if download is complete or there are no pending batches. I'm not sure what the latter mean - if batches are pending but not yet complete, breaking means we stop this loop before the download completes, which looks odd. ```nim=89 let batchOpt = self.downloadManager.getNextBatch(download) if batchOpt.isNone: let pendingBatchCount = download.pendingBatchCount() if pendingBatchCount == 0 and download.isDownloadComplete(): break await sleepAsync(100.milliseconds) continue ``` **Listing 35.** We will now dig into `getNextBatch`. As we can see, it delegates the task to the scheduler: ```nim= proc getNextBatch*( self: DownloadManager, download: ActiveDownload ): Option[tuple[start: uint64, count: uint64]] = let batch = download.ctx.scheduler.take() if batch.isSome: return some((start: batch.get().start, count: batch.get().count)) none(tuple[start: uint64, count: uint64]) ``` **Listing 36.** Let's dig into `take`: ```nim= proc take*(self: Scheduler): Option[BlockBatch] = while self.requeued.len > 0: let batch = self.requeued.popFirst() if batch.start < self.completedWatermark: continue if batch.start in self.completedOutOfOrder: continue self.inFlight[batch.start] = batch.count return some(batch) let batchOpt = self.generateNextBatchInternal() if batchOpt.isSome: let batch = batchOpt.get() self.inFlight[batch.start] = batch.count return batchOpt ``` **Listing 37.** This attempts to get a batch from a "requeued" list first. Requeued batches are failed/timed out batches, so these get rescheduled, except if we got the blocks in this batch from somewhere else in the meantime. This boils down to two conditions: 1. `batch.start` is below the watermark. Since the watermark advances a batch at a time, if `start` is below the watermark then we already have this whole batch; 2. `batch.start` is in `completedOutOfOrder`. Same as before - this means we got the blocks in this batch. I'm in general not fond of sprinkling those conditionals all over, and would rather guarantee that deciding that a batch has failed and processing a batch get somehow linearized (i.e. once we requeue a batch, we do not accept it anymore, and once we accept a batch, we cannot requeue it anymore). But this is valid too, I suppose. If there are no requeued batches, then we _generate_ a new batch: ```nim= proc generateNextBatchInternal(self: Scheduler): Option[BlockBatch] {.inline.} = ## does NOT add to inFlight - we must do that let (windowStart, windowCount) = self.windowCursor.currentWindow() while self.nextBatchStart < windowStart + windowCount: let start = self.nextBatchStart count = min(self.batchSize, windowStart + windowCount - start) self.nextBatchStart = start + count if start < self.completedWatermark: continue if start in self.inFlight: continue if start in self.completedOutOfOrder: continue return some((start: start, count: count)) return none(BlockBatch) ``` **Listing 38.** As we can see, batches are always generated sequentially within a window. This means that even when we're randomizing window selection, we're still streaming within windows. I suppose this adds some block diversity across the entire dataset, but it's hard for me to understand if it makes sense. Getting back to Listing 37, `take` will then set the batch as `inFlight` and return it. Simple enough. Assuming we got a valid batch, download proceeds to line $99$, where there's a curious snippet of code: ```nim=99 let (start, count) = batchOpt.get() logScope: batchStart = start batchCount = count block localCheck: var lastIdle = Moment.now() let runtimeQuota = 100.milliseconds for i in start ..< start + count: if (Moment.now() - lastIdle) >= runtimeQuota: await idleAsync() lastIdle = Moment.now() let address = download.makeBlockAddress(i) if download.isBlockExhausted(address): break localCheck let exists = try: await address in self.localStore except CatchableError: false if not exists: if download.fetchLocal: download.failLocalMissing(address) break localCheck break localCheck for i in start ..< start + count: if (Moment.now() - lastIdle) >= runtimeQuota: await idleAsync() lastIdle = Moment.now() let address = download.makeBlockAddress(i) if address in download.blocks: without blk =? (await self.localStore.getBlock(address)), err: break localCheck discard download.completeWantHandle(address, some(blk)) download.completeBatchLocal(start, count) continue if download.cancelled or download.fetchLocal: continue ``` **Listing 39.** This seems to be doing a bunch of things in one. The first loop checks that all blocks in the batch exist and none have their retries exhausted (why are we doing this here??). It then calls `failLocalMissing` in case a block does not exist (which I suppose means that the file is not locally available). We then run a **second** loop in which we retrieve the actual content for the blocks and complete their handles. I suppose this is what will fast-forward a download when part of it is available locally. If we get to the end of the block, it means that the batch is complete so we `continue`. This will restart the whole long loop again. If we break, it means there is an incomplete batch. This can be either a batch we need to fetch, or it can be a broken batch - a batch containing a block for which the retry count exceeds the maximum. I'd argue it doesn't make sense to continue a download when that happens, as a dataset missing a block is not a dataset we'd likely want to have. After we find an incomplete batch, the snippet starting in line $143$ performs swarm maintenance: ```nim=143 let swarm = download.ctx.swarm var shouldBroadcast = false let peersNeeded = swarm.peersNeeded() if peersNeeded > 0: trace "Swarm below target, triggering discovery", active = swarm.activePeerCount(), needed = peersNeeded self.searchForNewPeers(cid) ``` **Listing 40.** In case there aren't enough peers in the swarm, we issue a lookup, and then broadcast our want-haves again: ```nim=152 if swarm.peersWithRange(start, count).len == 0: shouldBroadcast = true if shouldBroadcast: let connectedPeers = self.peers.toSeq() if connectedPeers.len > 0: trace "Broadcasting want-have for batch range", cid = cid, start = start, count = count, peerCount = connectedPeers.len await self.broadcastWantHave(download, cid, start, count, connectedPeers) # Give peers a short time to respond with presence await sleepAsync(50.milliseconds) else: trace "No connected peers, searching for new peers" self.searchForNewPeers(cid) await self.handleBatchRetry(download, cid, start, count, retryInterval) continue ``` **Listing 41.** This reads like a Hail Mary. We already have push updates from peers to keep us informed, and we already send our list on window switches, why are we doing this again here? The if statement in lines $170$--$174$ also seems like it could be shifted to before `if shouldBroadcast`, eliminating the need for the else clause in $165$--$169$: ```nim=170 if self.peers.len == 0: trace "No connected peers available for batch, searching" self.searchForNewPeers(cid) await self.handleBatchRetry(download, cid, start, count, 100.milliseconds) continue ``` **Listing 42.** Next, we look at "staleUnknownPeers". Stale peers, apparently, are peers with whom we have not interacted in a while. ```nim= proc staleUnknownPeers*(swarm: Swarm): seq[PeerId] = for peerId, peer in swarm.peers: if peer.isStale and peer.availability.kind == bakUnknown: result.add(peerId) proc isStale*(peer: SwarmPeer): bool = Moment.now() - peer.lastSeen > PeerStaleTimeout proc touch*(peer: SwarmPeer) = peer.lastSeen = Moment.now() ``` **Listing 43.** Looking at where `touch` is called, we see those are in code called from message handlers: ```nim= proc updateAvailability*(peer: SwarmPeer, availability: BlockAvailability) = peer.availability = peer.availability.merge(availability) peer.availabilityUpdated = Moment.now() peer.touch() proc recordPeerSuccess*(swarm: Swarm, peerId: PeerId) = swarm.peers.withValue(peerId, peer): peer[].resetFailures() peer[].touch() proc recordBatchSuccess*( swarm: Swarm, peer: PeerContext, rttMicros: uint64, totalBytes: uint64 ) = swarm.peers.withValue(peer.id, swarmPeer): swarmPeer[].resetFailures() swarmPeer[].touch() peer.stats.recordRequest(rttMicros, totalBytes) ``` **Listing 44.** `updateAvailability` is called from `blockPresenceHandler`, `recordBatchSuccess` is called from `sendWantBlocksRequest` when we get a batch of blocks back (we'll look into `sendWantBlocksRequest` later) and `recordPeerSuccess` is called from... nowhere, apparently. Regardless, those are peers we have not interacted with in a while. ```nim=175 let staleUnknown = swarm.staleUnknownPeers() if staleUnknown.len > 0: let rangeAddress = download.makeBlockAddress(start) trace "Re-querying stale unknown peers", cid = cid, staleUnknownCount = staleUnknown.len, batchStart = start, batchCount = count for peerId in staleUnknown: try: await self.network.request .sendWantList( peerId, @[rangeAddress], priority = 0, cancel = false, wantType = WantType.WantHave, full = false, sendDontHave = false, rangeCount = count, downloadId = download.id, ) .wait(DefaultWantHaveSendTimeout) except AsyncTimeoutError: trace "Re-query stale unknown peer send timed out", peer = peerId except CatchableError as err: trace "Failed to re-query stale unknown peer", peer = peerId, error = err.msg await sleepAsync(50.milliseconds) ``` **Listing 45.** I don't understand, however, why we are singling those out here - the broadcasts we do in principle should hit these peers as well, and if they fail to respond we should time them out, which we already do during the regular broadcasts. What's the assumption here? What is this trying to cover? Next, we finally process the batch. Let's start with the actual scheduling code in lines $207$--$219$: we call `selectPeerForBatch` in line $209$. ```nim=207 let batchBytes = download.ctx.batchBytes selection = swarm.selectPeerForBatch( self.peers, start, count, batchBytes, download.inFlightBatches ) if selection.kind == pskNoPeers: trace "No peer with range, searching for new peers" let hasActivePeers = swarm.activePeerCount() > 0 waitTime = if hasActivePeers: retryInterval else: 100.milliseconds await self.handleBatchRetry(download, cid, start, count, waitTime) continue ``` **Listing 46.** Turns out this is quite the complicated piece of code, so let's unpack it. Lines $10$--$15$ start by calculating a timeout penalty for peers. Let's denote this by $w_t \times c_t$, where $c_t$ is the number of timeout events for the peer, and $w_t$ the weight for timeouts. In the current implementation, this is set to `TimeoutPenaltyWeight* = 3.0`. ```nim= proc selectPeerForBatch*( swarm: Swarm, peers: PeerContextStore, start: uint64, count: uint64, batchBytes: uint64, inFlightBatches: var Table[PeerId, seq[Future[void]]], ): PeerSelection = var penalties: Table[PeerId, float] for peerId, swarmPeer in swarm.peers: if swarmPeer.timeoutCount > 0: penalties[peerId] = swarmPeer.timeoutCount.float * TimeoutPenaltyWeight let candidates = swarm.peersWithRange(start, count) ``` Candidates are then selected by checking their overlap with the batch we're trying to fetch, based on block presence information we've collected earlier. `peersWithRange` will filter out any peers that do not contain the full range of blocks for the batch. If there are no candidates with the full range of blocks, we go for peers with any partially overlapping ranges (lines $15$--$20$): ```nim=15 if candidates.len == 0: let partialCandidates = swarm.peersWithAnyInRange(start, count) trace "No full range peers, checking partial", start = start, count = count, partialPeers = partialCandidates.len if partialCandidates.len == 0: return PeerSelection(kind: pskNoPeers) var peerCtxs: seq[PeerContext] for peerId in partialCandidates: let peer = peers.get(peerId) if peer.isNil: # peer disconnected, remove from swarm immediately discard swarm.removePeer(peerId) continue let currentInFlight = inFlightBatches.getOrDefault(peerId, @[]).len if currentInFlight < peer.optimalPipelineDepth(batchBytes): peerCtxs.add(peer) if peerCtxs.len == 0: return PeerSelection(kind: pskAtCapacity) let selected = selectByBDP(peerCtxs, batchBytes, inFlightBatches, penalties) if selected.isSome: return PeerSelection(kind: pskFound, peer: selected.get()) return PeerSelection(kind: pskNoPeers) ``` A few things stand out. The first one is lines $25$--$28$, where we seem to be embedding garbage collection of the peer array in the swarm within peer selection. This strikes me as an odd choice, particularly since libp2p provides disconnection events, and those could be used to clear up the swarm in a more timely manner. The second thing is line $30$, where we'll only consider peers for which the number of inflight batches is smaller than the optimal pipeline depth. The calculation for optimal pipeline depth however merits its own analysis. ```nim= proc optimalPipelineDepth*(self: PeerPerfStats, batchBytes: uint64): int = if batchBytes == 0: return DefaultPipelineDepth let rttMicrosOpt = self.avgRttMicros() if rttMicrosOpt.isNone: return DefaultRequestsPerPeer let throughputOpt = self.throughputBps() if throughputOpt.isNone: return DefaultRequestsPerPeer let rttMicros = rttMicrosOpt.get() throughput = throughputOpt.get() rttSecs = rttMicros.float64 / 1_000_000.0 bdpBytes = throughput.float64 * rttSecs optimalRequests = ceil(bdpBytes / batchBytes.float64).int return clamp(optimalRequests, MinRequestsPerPeer, MaxRequestsPerPeer) ``` We have to go through each of these in detail. **avgRttMicros.** From the name I'd guess this is the average round trip delay in microsseconds. How this is calculated however is key. The average is simple enough: ```nim= proc avgRttMicros*(self: PeerPerfStats): Option[uint64] = if self.rttSamples.len == 0: return none(uint64) var total: uint64 = 0 for sample in self.rttSamples: total += sample some(total div self.rttSamples.len.uint64) ``` Measurements are inserted in `recordRequest`, which is called from `recordBatchSuccess`: ```nim= proc recordRequest*(self: var PeerPerfStats, rttMicros: uint64, bytes: uint64) = if self.rttSamples.len >= RttSampleCount: discard self.rttSamples.popFirst() self.rttSamples.addLast(rttMicros) let now = Moment.now() if self.firstByteTime.isNone: self.firstByteTime = some(now) self.lastByteTime = some(now) self.totalBytes += bytes proc recordBatchSuccess*( swarm: Swarm, peer: PeerContext, rttMicros: uint64, totalBytes: uint64 ) = swarm.peers.withValue(peer.id, swarmPeer): swarmPeer[].resetFailures() swarmPeer[].touch() peer.stats.recordRequest(rttMicros, totalBytes) ``` `recordBatchSuccess` is instead called by `sendWantBlocksRequest`. The relevant snippet is: ```nim= let requestStartTime = Moment.now() requestResult = await self.requestWantBlocks(peer.id, BlockRange(cid: cid, ranges: ranges)) rttMicros = (Moment.now() - requestStartTime).microseconds.uint64 ``` We therefore seem to be using `requestWantBlocks` as a measurement of round trip time. `requestWantBlocks` seems dominated by `NetworkPeer#sendWantBlocksRequest`: ```nim= proc sendWantBlocksRequest*( self: NetworkPeer, blockRange: BlockRange ): Future[WantBlocksResult[WantBlocksResponse]] {.async: (raises: [CancelledError]).} = let requestId = self.nextRequestId self.nextRequestId += 1 let responseFuture = WantBlocksResponseFuture.init("wantBlocksRequest") self.pendingWantBlocksRequests[requestId] = responseFuture try: let conn = await self.connect() if isNil(conn): self.pendingWantBlocksRequests.del(requestId) return err(wantBlocksError(NoConnection, "No connection available")) let req = WantBlocksRequest( requestId: requestId, cid: blockRange.cid, ranges: blockRange.ranges ) await writeWantBlocksRequest(conn, req) return await responseFuture except CancelledError as exc: self.pendingWantBlocksRequests.del(requestId) raise exc except CatchableError as err: self.pendingWantBlocksRequests.del(requestId) return err(wantBlocksError(RequestFailed, "WantBlocks request failed: " & err.msg)) ``` This is not RTT, but we can see what it is: it's the time it takes to run a request for a range of blocks - reply included. Looking at when `responseFuture` gets completed, we see it's inside of the read loop, right after reading the blocks to memory: ```nim= proc readLoop*(self: NetworkPeer, conn: Connection) {.async: (raises: []).} = # ... case msgType of mtWantBlocksResponse: let respResult = await readWantBlocksResponse(conn, dataLen) if respResult.isErr: warn "Failed to read WantBlocks response", peer = self.id, error = respResult.error.msg return let response = respResult.get self.pendingWantBlocksRequests.withValue(response.requestId, fut): if not fut[].finished: fut[].complete(WantBlocksResult[WantBlocksResponse].ok(response)) self.pendingWantBlocksRequests.del(response.requestId) do: warn "Received WantBlocks response for unknown request ID", peer = self.id, requestId = response.requestId ``` This means our measurement is pretty much uncontaminated, which is great. **throughputBps.** Next, we look at `throughputBps`: ```nim= proc throughputBps*(self: PeerPerfStats): Option[uint64] = if self.firstByteTime.isNone or self.lastByteTime.isNone: return none(uint64) let first = self.firstByteTime.get() last = self.lastByteTime.get() duration = last - first if duration < MinThroughputDuration: return none(uint64) let secs = duration.nanoseconds.float64 / 1_000_000_000.0 some((self.totalBytes.float64 / secs).uint64) ``` This is just a duration, so the name `bps` already looks weird as this does not seems like a ratio of bits/bytes and seconds. Browsing the code, we see that `firstByteTime` `lastByteTime` get set in `recordRequest`: ```nim= proc recordRequest*(self: var PeerPerfStats, rttMicros: uint64, bytes: uint64) = if self.rttSamples.len >= RttSampleCount: discard self.rttSamples.popFirst() self.rttSamples.addLast(rttMicros) let now = Moment.now() if self.firstByteTime.isNone: self.firstByteTime = some(now) self.lastByteTime = some(now) self.totalBytes += bytes ``` As `recordRequest` gets repeatedly called, it will record: 1. the total number of bytes received from a peer; 2. the interval of time over which such bytes were received. `recordRequest` is called as part of `recordBatchSucess`: ```nim= proc recordBatchSuccess*( swarm: Swarm, peer: PeerContext, rttMicros: uint64, totalBytes: uint64 ) = swarm.peers.withValue(peer.id, swarmPeer): swarmPeer[].resetFailures() swarmPeer[].touch() peer.stats.recordRequest(rttMicros, totalBytes) ``` This means "throughput" doesn't depend on the peer alone - if we take a long time to issue the next request, throughput will drop. We can now formalize the optimal pipeline depth. Let $p$ be a peer we are connected to. Assume we are at some time instant $t$. Let $t_i < t$ be the time at which we got the first byte from $p$ over _any_ request, and let $t_f \leq t$ be the time at which we got the latest byte from $p$. Let $b$ be the total amount of bytes received from $p$ over $[t_i, t_f]$. We're defining the transfer rate of $p$ as: $$ r_p = \frac{b}{t_f - t_i} $$ We then measure delay as the average time it takes to complete a batch. We take $k$ such measurements and compute an average, which I'll denote as $\bar{d}$. Finally, let $s$ be the batch size. The optimal pipeline depth is then defined as: $$ o = \frac{r_p \times \bar{d}}{s} = \frac{\frac{b}{t_f - t_i}\times{\bar{d}}}{s} = \frac{b \times \bar{d}}{(t_f - t_i) \times s} $$ This is then _clamped_ between a minimum value $o_{\text{min}}$, and a maximum value $o_{\text{max}}$, so that the actual depth $o_c$ is given by $\min(o_{\text{max}}, \max(o, o_{\text{min}}))$. I am not convinced this is the right way to measure this, fundamentally because $\bar{d}$ is not a delay measurement and already factors in bandwidth indirectly; i.e., it depends on the batch size - if your batch is large and your connection is slow, then your "RTT" measurement will be bigger. I am also bothered by the fact that we already use the "RTT" measurement in the transfer rate calculation, so it seems like we're dividing by a quantity then multplying by it again later. What I think would be more correct would be to measure batch completion time stability as a function of the number pipelined batches size - i.e., we fire $k$ pipelined batches and measure the average completion time. Then we fire $2k$, $4k$, and so on. As soon as $d$ starts oscillating, we know we've saturated the capacity for the peer. NOTE: Chrys made changes to the calculation, and I have [summarised those here](https://hackmd.io/2brFqqXzS9GQ4Ei5xe0gLw#Estimation-of-the-Number-of-in-flight-Batches). I am now reasonably convinced that we are doing something that makes sense, even though I haven't proved all parts of it formally. Getting back to peer selection: ```nim=15 if candidates.len == 0: let partialCandidates = swarm.peersWithAnyInRange(start, count) trace "No full range peers, checking partial", start = start, count = count, partialPeers = partialCandidates.len if partialCandidates.len == 0: return PeerSelection(kind: pskNoPeers) var peerCtxs: seq[PeerContext] for peerId in partialCandidates: let peer = peers.get(peerId) if peer.isNil: # peer disconnected, remove from swarm immediately discard swarm.removePeer(peerId) continue let currentInFlight = inFlightBatches.getOrDefault(peerId, @[]).len if currentInFlight < peer.optimalPipelineDepth(batchBytes): peerCtxs.add(peer) if peerCtxs.len == 0: return PeerSelection(kind: pskAtCapacity) let selected = selectByBDP(peerCtxs, batchBytes, inFlightBatches, penalties) if selected.isSome: return PeerSelection(kind: pskFound, peer: selected.get()) return PeerSelection(kind: pskNoPeers) ``` After singling out peers which can partially complete a batch, we test if the peer still has capacity for an extra batch request (line $31$). If all peers are full, we return `PeerSelection(kind: pskAtCapacity)` to signal that we cannot schedule to any peer. After filtering for capacity and overlap, we finally pass on to `selectByBDP` for the actual selection. The first part filters out (again) peers that are above capacity: ```nim= proc selectByBDP*( peers: seq[PeerContext], batchBytes: uint64, inFlightBatches: var Table[PeerId, seq[Future[void]]], penalties: var Table[PeerId, float], explorationProb: float = ExplorationProbability, ): Option[PeerContext] {.gcsafe, raises: [].} = if peers.len == 0: return none(PeerContext) if peers.len == 1: return some(peers[0]) var untriedPeers: seq[PeerContext] for peer in peers: if peer.stats.throughputBps().isNone: let pipelineDepth = peer.optimalPipelineDepth(batchBytes) currentLoad = inFlightBatches.getOrDefault(peer.id, @[]).len if currentLoad < pipelineDepth: untriedPeers.add(peer) ``` Next, we get the least loaded peer. This feels strange though cause we're just counting the requests in flight and not actually looking at pipeline utilization: ```nim=21 if untriedPeers.len > 0: var bestPeer = untriedPeers[0] bestLoad = inFlightBatches.getOrDefault(bestPeer.id, @[]).len for i in 1 ..< untriedPeers.len: let load = inFlightBatches.getOrDefault(untriedPeers[i].id, @[]).len if load < bestLoad: bestLoad = load bestPeer = untriedPeers[i] return some(bestPeer) ``` Then, with probability $1/5$, we pick a random peer (exploration). This allows us to build knowledge about peers which would otherwise never be selected, allowing them an opportunity to score better. ```nim=31 let exploreRoll = rand(1.0) if exploreRoll < explorationProb: var peersWithCapacity: seq[PeerContext] for peer in peers: let pipelineDepth = peer.optimalPipelineDepth(batchBytes) currentLoad = inFlightBatches.getOrDefault(peer.id, @[]).len if currentLoad < pipelineDepth: peersWithCapacity.add(peer) if peersWithCapacity.len > 0: let idx = rand(peersWithCapacity.len - 1) return some(peersWithCapacity[idx]) ``` Finally, we rank peers by "BDP score" and return the best one, or break ties randomly: ```nim=44 var bestPeers: seq[PeerContext] = @[peers[0]] bestScore = peers[0].evalBDPScore( batchBytes, inFlightBatches.getOrDefault(peers[0].id, @[]).len, penalties.getOrDefault(peers[0].id, 0.0), ) for i in 1 ..< peers.len: let score = peers[i].evalBDPScore( batchBytes, inFlightBatches.getOrDefault(peers[i].id, @[]).len, penalties.getOrDefault(peers[i].id, 0.0), ) if score < bestScore: bestScore = score bestPeers = @[peers[i]] elif score == bestScore: bestPeers.add(peers[i]) if bestPeers.len > 1: let idx = rand(bestPeers.len - 1) return some(bestPeers[idx]) else: return some(bestPeers[0]) ``` Let's look at what BDP score is: ```nim= proc evalBDPScore*( self: PeerContext, batchBytes: uint64, currentLoad: int, penalty: float ): float = let pipelineDepth = self.optimalPipelineDepth(batchBytes) capacityScore = if currentLoad >= pipelineDepth: 100.0 else: (currentLoad.float / pipelineDepth.float) * 10.0 throughputScore = if self.stats.throughputBps().isSome: let bps = self.stats.throughputBps().get().float if bps > 0: ThroughputScoreBaseline / bps else: 50.0 else: 25.0 # normalization fallback rttScore = if self.stats.avgRttMicros().isSome: self.stats.avgRttMicros().get().float / 10000.0 else: 5.0 # normalization fallback return capacityScore + throughputScore + rttScore + penalty ``` `capacityScore` is the fraction of the pipeline that's occupied, so a value between $0$ and $1$, with idle pipelines getting a lower score. Overflowing pipelines get $100$. I've added some thoughts to the github review that these normalization factors look somewhat like magic numbers and that we're probably better off using something like z-scores and then reweighting if we want to keep these things dimensionless. Regardless, BDP score is a sum across pipeline occupation rate (load), throughput (observed transfer rates) and rttScore. It is clear now that the exploration part of the selection is needed so we can build those metrics towards new peers. This in some sense plays the part of optimistic unchokes in bittorrent. ```nim=40 var peerCtxs: seq[PeerContext] for peerId in candidates: let peer = peers.get(peerId) if peer.isNil: # peer disconnected - remove from swarm immediately discard swarm.removePeer(peerId) continue let currentInFlight = inFlightBatches.getOrDefault(peerId, @[]).len if currentInFlight < peer.optimalPipelineDepth(batchBytes): peerCtxs.add(peer) if peerCtxs.len == 0: return PeerSelection(kind: pskAtCapacity) let selected = selectByBDP(peerCtxs, batchBytes, inFlightBatches, penalties) if selected.isSome: return PeerSelection(kind: pskFound, peer: selected.get()) return PeerSelection(kind: pskNoPeers) ``` We can finally end our digression on `selectPeerForBatch`, and get back to download worker. Once a peer for the current batch been selected, we either lookup for new peers if none is found, or we requeue if all peers are at capacity: ```nim=220 if selection.kind == pskNoPeers: trace "No peer with range, searching for new peers" let hasActivePeers = swarm.activePeerCount() > 0 waitTime = if hasActivePeers: retryInterval else: 100.milliseconds await download.handleBatchRetry(start, count, waitTime) continue if selection.kind == pskAtCapacity: download.requeueBatch(start, count, front = false) await sleepAsync(10.milliseconds) continue ``` Unlike what the log says, we are not doing any peer lookup here: ```nim= proc handleBatchRetry*( download: ActiveDownload, start: uint64, count: uint64, waitTime: Duration ) {.async: (raises: [CancelledError]).} = let addresses = download.getBlockAddressesForRange(start, count) exhausted = download.decrementBlockRetries(addresses) if exhausted.len > 0: warn "Block retries exhausted", treeCid = download.treeCid, exhaustedCount = exhausted.len download.failExhaustedBlocks(exhausted) download.requeueBatch(start, count, front = false) await sleepAsync(waitTime) ``` So what appears to happen here is that we have two different retry paths depending on whether we got `pskNoPeers` or `pskAtCapacity`. For some reason, we only decrement the retry count for the former. I'm not sure why but, then again, I never liked those retry counters anyway. In case we do have a viable peer, we proceed to `sendWantBlockRequest`: ```nim=232 let peer = selection.peer download.markBatchInFlight(start, count, localBlockCount, peer.id) let batchFuture = self.sendWantBlocksRequest(download, start, count, missingIndices, peer) self.downloadManager.peerTracker.track(peer.id, batchFuture) download.setBatchRequestFuture(start, batchFuture) ``` This is also a quite large piece of code which we'll need to digress into. ```nim= proc sendWantBlocksRequest( self: BlockExcEngine, download: ActiveDownload, start: uint64, count: uint64, missingIndices: seq[uint64], peer: PeerContext, ): Future[void] {.async: (raises: [CancelledError]).} = if download.cancelled: return let treeCid = download.treeCid var ranges: seq[tuple[start: uint64, count: uint64]] = @[] if missingIndices.len > 0: var rangeStart = missingIndices[0] rangeCount: uint64 = 1 for i in 1 ..< missingIndices.len: if missingIndices[i] == rangeStart + rangeCount: rangeCount += 1 else: ranges.add((rangeStart, rangeCount)) rangeStart = missingIndices[i] rangeCount = 1 ranges.add((rangeStart, rangeCount)) trace "Requesting missing blocks", treeCid = treeCid, originalRange = $(start, count), missing = missingIndices.len, ranges = ranges.len, peer = peer.id let requestStartTime = Moment.now() requestResult = await self.requestWantBlocks( peer.id, BlockRange(treeCid: treeCid, ranges: ranges) ) rttMicros = (Moment.now() - requestStartTime).microseconds.uint64 if download.cancelled: return ``` The first part is straightforward and builds a set of ranges based on the contents of `missingIndices`. `missingIndices` contains the blocks we actually want (would probably be better off as `wantedBlockIndices` or something). If it contains all the indices, we will end up with a single range. Otherwise we will end up with a disjoint set of intervals. The code assumes `missingIndices` is sorted but this is not described/enforced anywhere. Next, we have the actual call to `requestWantBlocks`. We have discussed this at length already and will not do it again here. We then spot check the state of download in line $44$, which is awkward as the state might change after our check at the first call to await -- and we have several, as we will see -- but we do not seem to care about that. ```nim=45 # request might have timed-out and have been requeued to another peer # if yes, then discard response, it's already handled. download.pendingBatches.withValue(start, pending): if pending[].peerId != peer.id: # discard it, was reassigned return do: # either completed or requeued return if requestResult.isErr: warn "Batch request failed", peer = peer.id, error = requestResult.error.msg let swarm = download.ctx.swarm if swarm.recordPeerFailure(peer.id): warn "Peer exceeded max failures, removing from swarm", peer = peer.id if swarm.removePeer(peer.id).isNone: trace "Peer was not in swarm", peer = peer.id download.handlePeerFailure(peer.id) if swarm.needsPeers(): self.searchForNewPeers(download.manifestCid) else: # we can requeue immediately (cancels timeout), no benefit waiting for timeout. download.requeueBatch(start, count, front = true) return ``` Next, we check if the request has timed out before running the retry handlers ourselves in case of failure. This to some extent assumes knowledge of what `batchTimeoutHandler` does in here, which makes me uncomfortable. We then handle batch request failures by dropping the peer and performing swarm maintenance - feels like the wrong place to do this, I'd be more confortable if we had swarm maintenance logic within the swarm itself, and fed the network reconfiguration events into it instead of having logic spread and slightly duplicated (and missing, e.g. in peerDepartHandler). The next part processes the response for a successful batch request. Lines $71$--$74$ do special treatment for the case in which the response is empty, requeueing the batch. If there is even a single valid block, we process it in lines $88$--$130$. Receiving unrequested blocks is treated like a minor fault and those are simply skipped, but I am not sure why. ```nim=71 if allBlockViews.len == 0: trace "Peer responded with zero blocks", peer = peer.id, treeCid = treeCid download.requeueBatch(start, count, front = false) return trace "Received batch response", treeCid = treeCid, originalRange = $(start, count), received = allBlockViews.len, requested = missingIndices.len, peer = peer.id var totalBytes: uint64 = 0 validCount: int = 0 receivedIndices: HashSet[uint64] for view in allBlockViews: if not bexutils.isIndexInRanges( view.address.index.uint64, ranges, sortedRanges = true ): warn "Received unrequested block", index = view.address.index, ranges = ranges.len continue if view.address.index.uint64 >= download.ctx.totalBlocks: warn "Received block with out-of-bounds index - banning peer", index = view.address.index, totalBlocks = download.ctx.totalBlocks, peer = peer.id await self.banAndDropPeer(download, peer.id) return if err =? self.validateBlockDeliveryView(view).errorOption: error "Block validation failed - corrupted data from peer", address = view.address, msg = err.msg, peer = peer.id warn "Banning peer for sending corrupted block data", peer = peer.id await self.banAndDropPeer(download, peer.id) return let bd = view.toBlockDelivery() putResult = await self.localStore.putBlock(bd.blk) if putResult.isErr: warn "Failed to store block", address = bd.address, error = putResult.error.msg continue let proofResult = await self.localStore.putCidAndProof( bd.address.treeCid, bd.address.index, bd.blk.cid, bd.proof.get ) if proofResult.isErr: warn "Failed to store proof", address = bd.address discard await self.localStore.delBlock(bd.blk.cid) continue totalBytes += bd.blk.data[].len.uint64 validCount += 1 receivedIndices.incl(bd.address.index.uint64) if bd.address in download.blocks: discard download.completeWantHandle(bd.address, some(bd.blk)) storage_block_exchange_blocks_received.inc(validCount.int64) download.ctx.swarm.recordBatchSuccess(peer, rttMicros, totalBytes) ``` Lines $103$--$130$ then validate block data (indexes and Merkle proofs) and ban peers which send invalid data. Finally, data is stored and the corresponding block want handles are completed. Batch success is recorded in line $134$. The naming is a bit misleading as the response might be incomplete -- in fact, as we pointed out earlier, it might contain a single block. The last part deals with partial batch completions: ```nim=135 if validCount < missingIndices.len: trace "Peer delivered partial batch, computing missing ranges", peer = peer.id, requested = missingIndices.len, received = validCount var stillMissing: seq[uint64] for idx in missingIndices: if idx notin receivedIndices: stillMissing.add(idx) if stillMissing.len > 0: var penaltyAddresses: seq[BlockAddress] let peerAvail = download.ctx.swarm.getPeer(peer.id) for idx in stillMissing: if peerAvail.isSome and peerAvail.get().availability.hasRange(idx, 1): penaltyAddresses.add(download.makeBlockAddress(idx)) let exhausted = download.decrementBlockRetries(penaltyAddresses) if exhausted.len > 0: warn "Blocks exhausted retries after partial delivery", treeCid = treeCid, exhaustedCount = exhausted.len download.failExhaustedBlocks(exhausted) let exhaustedIndices = exhausted.mapIt(it.index.uint64).toHashSet stillMissing = stillMissing.filterIt(it notin exhaustedIndices) var missingRanges: seq[tuple[start: uint64, count: uint64]] = @[] if stillMissing.len > 0: stillMissing.sort() var rangeStart = stillMissing[0] rangeCount: uint64 = 1 for i in 1 ..< stillMissing.len: if stillMissing[i] == rangeStart + rangeCount: rangeCount += 1 else: missingRanges.add((rangeStart, rangeCount)) rangeStart = stillMissing[i] rangeCount = 1 missingRanges.add((rangeStart, rangeCount)) trace "Partial batch completion - requeuing missing ranges", treeCid = treeCid, originalStart = start, originalCount = count, received = validCount, missingRanges = missingRanges.len download.partialCompleteBatch( start, count, validCount.uint64, missingRanges, totalBytes ) else: download.completeBatch(start, validCount.uint64, totalBytes) ``` Lines $139$--$142$ collect the blocks that are still missing. It then does something quite weird, which is to decrement the retry counter for the block only if the peer reported having the block. Does not make sense to me. Finally, we collect the missing indices into ranges again and submit those to `partialCompleteBatch`.

    Import from clipboard

    Paste your markdown or webpage here...

    Advanced permission required

    Your current role can only read. Ask the system administrator to acquire write and comment permission.

    This team is disabled

    Sorry, this team is disabled. You can't edit this note.

    This note is locked

    Sorry, only owner can edit this note.

    Reach the limit

    Sorry, you've reached the max length this note can be.
    Please reduce the content or divide it to more notes, thank you!

    Import from Gist

    Import from Snippet

    or

    Export to Snippet

    Are you sure?

    Do you really want to delete this note?
    All users will lose their connection.

    Create a note from template

    Create a note from template

    Oops...
    This template has been removed or transferred.
    Upgrade
    All
    • All
    • Team
    No template.

    Create a template

    Upgrade

    Delete template

    Do you really want to delete this template?
    Turn this template into a regular note and keep its content, versions, and comments.

    This page need refresh

    You have an incompatible client version.
    Refresh to update.
    New version available!
    See releases notes here
    Refresh to enjoy new features.
    Your user state has changed.
    Refresh to load new user state.

    Sign in

    Forgot password
    or
    Sign in via Google Sign in via Facebook Sign in via X(Twitter) Sign in via GitHub Sign in via Dropbox Sign in with Wallet
    Wallet ( )
    Connect another wallet

    New to HackMD? Sign up

    By signing in, you agree to our terms of service.

    Help

    • English
    • 中文
    • Français
    • Deutsch
    • 日本語
    • Español
    • Català
    • Ελληνικά
    • Português
    • italiano
    • Türkçe
    • Русский
    • Nederlands
    • hrvatski jezik
    • język polski
    • Українська
    • हिन्दी
    • svenska
    • Esperanto
    • dansk

    Documents

    Help & Tutorial

    How to use Book mode

    Slide Example

    API Docs

    Edit in VSCode

    Install browser extension

    Contacts

    Feedback

    Discord

    Send us email

    Resources

    Releases

    Pricing

    Blog

    Policy

    Terms

    Privacy

    Cheatsheet

    Syntax Example Reference
    # Header Header 基本排版
    - Unordered List
    • Unordered List
    1. Ordered List
    1. Ordered List
    - [ ] Todo List
    • Todo List
    > Blockquote
    Blockquote
    **Bold font** Bold font
    *Italics font* Italics font
    ~~Strikethrough~~ Strikethrough
    19^th^ 19th
    H~2~O H2O
    ++Inserted text++ Inserted text
    ==Marked text== Marked text
    [link text](https:// "title") Link
    ![image alt](https:// "title") Image
    `Code` Code 在筆記中貼入程式碼
    ```javascript
    var i = 0;
    ```
    var i = 0;
    :smile: :smile: Emoji list
    {%youtube youtube_id %} Externals
    $L^aT_eX$ LaTeX
    :::info
    This is a alert area.
    :::

    This is a alert area.

    Versions and GitHub Sync
    Get Full History Access

    • Edit version name
    • Delete

    revision author avatar     named on  

    More Less

    Note content is identical to the latest version.
    Compare
      Choose a version
      No search result
      Version not found
    Sign in to link this note to GitHub
    Learn more
    This note is not linked with GitHub
     

    Feedback

    Submission failed, please try again

    Thanks for your support.

    On a scale of 0-10, how likely is it that you would recommend HackMD to your friends, family or business associates?

    Please give us some advice and help us improve HackMD.

     

    Thanks for your feedback

    Remove version name

    Do you want to remove this version name and description?

    Transfer ownership

    Transfer to
      Warning: is a public team. If you transfer note to this team, everyone on the web can find and read this note.

        Link with GitHub

        Please authorize HackMD on GitHub
        • Please sign in to GitHub and install the HackMD app on your GitHub repo.
        • HackMD links with GitHub through a GitHub App. You can choose which repo to install our App.
        Learn more  Sign in to GitHub

        Push the note to GitHub Push to GitHub Pull a file from GitHub

          Authorize again
         

        Choose which file to push to

        Select repo
        Refresh Authorize more repos
        Select branch
        Select file
        Select branch
        Choose version(s) to push
        • Save a new version and push
        • Choose from existing versions
        Include title and tags
        Available push count

        Pull from GitHub

         
        File from GitHub
        File from HackMD

        GitHub Link Settings

        File linked

        Linked by
        File path
        Last synced branch
        Available push count

        Danger Zone

        Unlink
        You will no longer receive notification when GitHub file changes after unlink.

        Syncing

        Push failed

        Push successfully