# Pub/Sub server Centralized pub/sub server dedicated to the coordination of buyers and suppliers in the WindingTree Market protocol. ## Architecture ```mermaid graph LR dapp[Buyer<br>Dapp] node[Supplier<br>node] nginx[Nginx<br>reverse<br>proxy] style container stroke-dasharray:5 5; style app2 fill:transparent; style app3 fill:transparent; style appN fill:transparent; subgraph container[ ] subgraph node.js app subgraph libp2p socket[CenterSub<br><small>GossipSub</small>] handlers[Workflow<br>handlers] end subgraph <br>Storage<br>abstraction db[DB<br>connector] end end app2[App instance #2] app3[App instance #3] appN[App instance #n] end subgraph Database requests[(Requests<br>cache)] offers[(Offers<br>cache)] end node --- |wss|nginx dapp --- |wss|nginx nginx --- |ws|socket nginx --- |ws|app2 nginx --- |ws|app3 nginx --- |ws|appN socket --- handlers handlers --- db db --- requests db --- offers ``` > MVP of the Pub/Sub server will not authenticate connections. > Next version of the server will use JWT-based authorization schema. To the architecture will be added special `Authorization` server that will validate suppliers identity and generates a secured JWT ## Workflow ```mermaid sequenceDiagram participant dapp as Dapp (Buyer) participant server as Server participant node as Supplier server->>server: Setup server node->>node: Setup client node->>node: client.sunscribe(`request/{subject}`) loop Every connect/re-connect node->>server: client.connect(`${addr}`, subject) activate server server->>server: getCachedRequests(subject) loop Every cached request server-->>node: socket.emit(`request/{subject}`, cachedRequest) end deactivate server end dapp->>dapp: Setup client dapp->>dapp: client.subscribe('ready') loop Every connect/re-connect dapp->>server: client.connect(`${addr}`) activate server server-->>dapp: socket.emit('ready') deactivate server end dapp->>dapp: Handles 'ready' event loop Every 'ready' event activate dapp dapp->>dapp: getCachedRequests() loop through cached requests dapp->>dapp: client.subscribe(`offer/${request.id}`) end deactivate dapp end dapp->>dapp: Handles users requests via UI loop Every user request dapp->>dapp: Builds a request dapp->>server: client.emit(`request/{subject}`, request) activate server server->>server: getCachedOffers(subject, request.id) alt if cached offers found loop Every cached offer server-->>dapp: socket.emit(`offer/${request.id}`, offer) end server->>server: Stop processing end server->>server: saveRequestToCache(subject, request) server->>server: server.emit(`request/{subject}`, request) deactivate server end node->>node: Handles 'request' events loop Every request server->>node: request activate node node->>node: getCachedOffer(request.id) alt if the offer found node->>node: Re-sign offer with updated exiration time node->>node: saveOffer(request, offer) node-->>server: server.emit(`offer/${request.id}`, offer) node->>node: Stop processing end node->>node: Validate request node->>node: Builds an offer node->>node: saveOffer(request, offer) node-->>server: server.emit(`offer/${request.id}`, offer) deactivate node activate server server->>server: saveOfferToCache(request.id, offer) server->>server: server.emit(`offer/${request.id}`) deactivate server end dapp->>dapp: Handles 'offer' events loop Every offer dapp->>dapp: Validates offer dapp->>dapp: Process offer end ``` ## Storage abstraction - key-value storage - Must support the following options: in-memory, Redis - Every record must have TTL. Expired records must be removed from storage ## Glossary - Transaction: a completed interaction between a buyer and supplier that includes the following steps: - Publication of the request on the pub/sub server - Handling of the request on the server - Obtaining the request by the supplier - Processing the request by the supplier and generation of the offer - Publishing of the offer by the supplier - Handling of the offer on the server - Obtaining the offer by the buyer ## Performance and quality metrics - ATT: Average transaction time, the average time of the transaction in the measured time interval with one buyer and one supplier connected. - BCP: The buyers capacity, the maximum number of connected buyers clients that can be handled by the server, in the measured time interval, without decreasing of ATT by more than 50% with the ONE supplier node connected. During the calculation of the metric, every connected buyer client is generating unique requests in the amount of 1 per second (this value can be adjustable). - SCP: The suppliers capacity, the maximum number of suppliers clients connected to the server, in the measured time interval, without decreasing of ATT by more than 50% with the ONE supplier node connected. During the calculation of the metric, one buyer client must be connected to the server and generates 1 unique request per second (this value can be adjustable). - TPS: The number of transactions per second at BCP. - MEM: The average server memory consumption at BCP and SCP. ## Scalability The solution performance can be increased in the following ways: - By upgrading of the server host parameters: CPU type and number of cores, memory size - By using of the reverse proxy as a load balancer that routes connections and traffic to the multiple instances of the server node.js application. Number of the CPU cores of the server host must be = node.js app instances number + 1 - By using a scalable database on the backend. For example Redis DB clusterisation and shards. - The more radical solution may be transforming the server into a multi-server p2p network. This solution may increase BCP and SCP but will reduce general workflow performance specifically the ATT will increase.