# P2P Messaging ###### tags: `Kizuna-architecture` <style> .ui-infobar, #doc.markdown-body { max-width: 1000px; } </style> <img src="https://i.imgur.com/fVyoNsV.png" width="150" style="padding-right:5px"/> <img src="https://i.imgur.com/ht3KLk4.png" width="150"/> #### Reply <img src="https://i.imgur.com/qX84J6o.png" width="150" style="padding:5px"/> <img src="https://i.imgur.com/WnfFxXX.png" width="150" style="padding:5px"/> <img src="https://i.imgur.com/j8LOqxO.png" width="150" style="padding:5px"/> ### TODO/Comments - Encryption for messages ## Entries ### Message ```rust= #[hdk_entry(id = "p2pmessage", visibility = "private")] pub struct P2PMessage { author: AgentPubKey, receiver: AgentPubKey, payload: Payload, time_sent: Timestamp, reply_to: Option<EntryHash> } ``` ### Receipt ```rust= #[hdk_entry(id = "p2pmessagereceipt", visibility = "private")] pub struct P2PMessageReceipt { id: Vec<EntryHash>, time_received: Option<Timestamp>, status: Status } ``` ### Pin ```rust= #[hdk_entry(id = "p2pmessagereceipt", visibility = "private")] pub struct P2PMessagePin { id: Vec<EntryHash>, conversants: Vec<AgentPubKey> status: PinStatus } ``` ### File Bytes ```rust= #[hdk_entry(id = "p2pfilebytes", visibility = "private")] pub struct P2PFileBytes (SerializedBytes) ``` ## Entry Relationship Diagram ```mermaid graph LR subgraph p2pmessage zome subgraph agents alice bobby end subgraph messages message_1 receipt_1 message_2 receipt_2 alice.->|has in source chain|message_1 bobby.->|has in source chain|message_1 alice.->|has in source chain|message_2 bobby.->|has in source chain|message_2 alice.->|has in source chain|receipt_1 bobby.->|has in source chain|receipt_1 alice.->|has in source chain|receipt_2 bobby.->|has in source chain|receipt_2 receipt_1.->message_1 receipt_2.->message_2 end subgraph async messages in DHT async_1 bobby-->|async_messages|async_1 message_2 ==> async_1 end end ``` ## Data Structures ### Message Receipt Status ```rust= enum Status { Sent, Received {timestamp: Timestamp}, Read {timestamp: Timestamp} } ``` ### PinStatus ```rust= pub struct PinStatus { pinned: { timestamp: Timestamp }, unpinnned: { timestamp: Timestamp } } ``` ### File Metadata ```rust= struct FileMetadata { file_name: String, file_size: usize, file_type: String, file_hash: EntryHash // entryhash of File??? } ``` ### File Type ```rust= enum FileType { Image { thumbnail: serializedBytes }, Video { thumbnail: SerializedBytes }, Others } ``` ### Payload ```rust= enum Payload { Text {payload: String}, File { metadata: FileMetadata, file_type: FileType } } ``` <!-- Input Structures --> ### Message Input ```rust= struct MessageInput { receiver: AgentPubKey, payload: PayloadInput, file_bytes: Option<SerializedBytes>, reply_to: Option<EntryHash> } ``` ### Read Receipt Input ```rust= struct ReadReceiptInput { sender: AgentPubKey, receipt: P2PMessageReceipt, } ``` ### File Metadata Input ```rust= /* This will be defined in a /commons crate that will be used for both p2pmessage and group zome */ struct FileMetadataInput { file_name: String, file_size: usize, file_type: String, } ``` ### Payload Input ```rust= struct PayloadInput { Text {payload: String}, File { metadata: FileMetadataInput file_type: FileType } } ``` ### PinInput ```rust= pub struct PinMessageInput { id: Vec<EntryHash>, conversants: Vec<AgentPubkey>, status: PinStatus, } ``` <!-- Output Structures --> ### Messaage Output ```rust= pub struct P2PMessagesOutput( AgentMessages, MessageContents, ReceiptContents ) ``` ### Agent - Message Hash Map ```rust= pub struct AgentMessages(HashMap<AgentPubKey, Vec<Entryhash>>) ``` ### Message Hash - Message Hash Map ```rust= pub struct MessageContents(HashMap<EntryHash, MessageBundle>) ``` ### Receipt Hash - Receipt Hash Map ```rust= pub struct ReceiptContents(HashMap<EntryHash, P2PMessageReceipt>) ``` ### Message Bundle ```rust= pub struct MessageBundle(P2PMessageElement, Vec<ReceiptHash>) ``` ### PinContents ```rust= pub struct PinMessageOutput(HashMap<String, P2PMessagePin>) ``` ## Zome Functions ### `init` ```mermaid sequenceDiagram participant ACC as Alice_Conversation_Cell participant BCC as Bobby_Conversation_Cell par alice init ACC-->>ACC: set unrestricted access to receive_message ACC-->>ACC: set unrestricted access to notify_delivery ACC-->>ACC: set unrestricted access to recv_remote_signal and bob init BCC-->>BCC: set unrestricted access to receive_message BCC-->>BCC: set unrestricted acesss to notify_delivery BCC-->>BCC: set unrestricted access to recv_remote_signal end ``` ### `send_message` ```rust= fn send_message(message_input: MessageInput) -> ExternResult<MessageBundle> struct MessageInput { receiver: AgentPubKey, payload: PayloadInput, file_bytes: Option<SerializedBytes>, reply_to: Option<EntryHash> } struct MessageBundle(P2PMessageElement, Vec<P2PMessageReceiptElement>) ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell participant BCC as Bobby_Conversation_Cell participant BUI as Bobby_UI participant NH as Neighborhood AUI-->>ACC: call `send_message` function rect rgba(255,0,0,0.3) ACC-->>ACC: check if receiver is blocked by calling `in_blocked` <br> in Contacts zome end opt receiver is blocked ACC-->>AUI: return error (cannot send to blocked) end opt payload is File ACC-->>ACC: construct P2PFileBytes ACC-->>ACC: commit P2PFileBytes end ACC-->>ACC: construct P2PMessage from input ACC-->>ACC: construct 'Sent' P2PMesageReceipt ACC-->>ACC: commit P2PMessage and P2PMessageReceipt to source chain <br> while calling post_commit callback ACC-->>AUI: return MessageBundle <br> (skip this until post_commit becomes available) Note over ACC: yellow box will be moved to post commit rect rgba(255,255,0,0.2) ACC-->>BCC: call_remote Bobby's `receive_message` function alt call_remote ok (receiver is online) rect rgba(0, 0, 255, 0.4) BCC-->>BCC: invoke `receive_message` end BCC-->>ACC: return P2PMessage and P2PMessageReceipt opt payload is File ACC-->>ACC: commit P2PFileBytes end ACC-->>ACC: commit P2PMessageReceipt to source chain ACC-->>AUI: return MessageBundle else call_remote timeout (receiver is offline) Note over ACC, BCC: timeout can also mean that the peer was not found in which case the receiver needs to poll the message from the neighborhood rect rgba(0,255,0,0.8) ACC-->>NH: call `send_to_neighbor` with P2PMessage as payload end ACC-->>ACC: commit P2PMessageReceipt to source chain ACC-->>AUI: return MessageBundle else call_remote unauthorized Note over ACC, BCC: case should not exist end end ``` ### `receive_message` ```rust fn receive_message(message: P2PMessage) -> ExternResult<MessageBundle> ``` ```mermaid sequenceDiagram participant ACC as Alice_Conversation_Cell participant BCC as Bobby_Conversation_Cell participant BUI as Bobby_UI rect rgba(255,0,0,0.3) BCC-->>BCC: check if author is blocked by calling `in_blocked` <br> in Contacts zome end opt author is not blocked BCC-->>BCC: generate timestamp for 'Delivered' receipt BCC-->>BCC: construct P2PMessage <br> and 'Delivered' P2PMessageReceipt BCC-->>BCC: commit P2PMessage and P2PMessageReceipt to source chain BCC-->>BUI: emit_signal MessageBundle BCC-->>ACC: return MessageBundle end ``` ### `notify_delivery` ```rust fn notify_delivery(message_receipt: P2PMessageReceipt) -> ExternResult<BooleanWrapper> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell participant BCC as Bobby_Conversation_Cell ACC-->>ACC: commit P2PMessageReceipt to source chain ACC-->>AUI: emit signal P2PReceiptElement AUI-->>AUI: display message (flag as Delivered) ACC-->>BCC: return true ``` ### `get_messages_by_agent_by_timestamp` ```rust= pub struct P2PAgentDateFilter { conversant: AgentPubKey, date: Timestamp, payload_type: String } fn get_messages_by_agent_by_timestamp(filter: P2PAgentDateFilter) -> ExternResult<P2PMessagesOutput> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell AUI-->>ACC: call `get_messages_by_agent_by_timestamp` ACC-->>ACC: query source chain for P2PMessage elements ACC-->>ACC: query source chain for P2PMessageReceipt elements ACC-->>ACC: initialize AgentMessages (conversant as single key) ACC-->>ACC: initialize MessageContents (empty) ACC-->>ACC: initialize ReceiptContents (empty) loop for every P2PMessage alt P2PMessage.payload == Payload::Text && payload_type == Text opt P2PMessage.author == conversant or P2PMessage.receiver == conversant opt date_end >= P2PMessage.header.timestamp >= date_start ACC-->>ACC: call `process_message` end end else P2PMessage.payload == Payload::File && payload_type == File opt P2PMessage.author == conversant or P2PMessage.receiver == conversant opt date_end >= P2PMessage.header.timestamp >= date_start ACC-->>ACC: call `process_message` end end else payload_type == All opt P2PMessage.author == conversant or P2PMessage.receiver == conversant opt date_end >= P2PMessage.header.timestamp >= date_start ACC-->>ACC: call `process_message` end end end end loop for every P2PMessageReceipt loop for every id in P2PMessageReceipt.ids opt id in MessageContents.keys ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id] ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents end end end ACC-->>AUI: construct and return P2PMessagesOutput ``` ### `get_latest_messages` ```rust= pub struct BatchSize(u8) fn get_latest_messages(batch_size: BatchSize) -> ExternResult<P2PMessagesOutput> ``` ```mermaid sequenceDiagram participant AGG as Aggregator_Zome participant ACC as Alice_Conversation_Cell AGG-->>ACC: call `get_messages_by_agent_by_timestamp` ACC-->>ACC: query source chain for P2PMessage elements ACC-->>ACC: query source chain for P2PMessageReceipt elements ACC-->>ACC: initialize AgentMessages (empty) ACC-->>ACC: initialize MessageContents (empty) ACC-->>ACC: initialize ReceiptContents (empty) loop for every P2PMessage opt P2PMessage.author != self && P2PMessage.author not in AgentMessages.keys ACC-->>ACC: insert P2PMessage.author:<empty entryhash vector> to AgentMessages end opt AgentMessages[P2PMessage.author].length >= batch_size ACC-->>ACC: continue with next item in loop end alt author == self ACC-->>ACC: add P2PMessage.entry_hash to AgentMessages[P2PMessage.receiver] else author != self ACC-->>ACC: add P2PMessage.entry_hash to AgentMessages[P2PMessage.author] end ACC-->>ACC: construct MessageBundle(P2PMessage, <mutable empty vector>) ACC-->>ACC: insert P2PMessage.entry_hash:MessageBundle to MessageContents end loop for every P2PMessageReceipt loop for every id in P2PMessageReceipt.ids opt id in MessageContents.keys ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id] ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents end end end ACC-->>AGG: construct and return P2PMessagesOutput ``` ### `get_next_batch_messages` ```rust= pub struct P2PMessageFilterBatch { conversant: AgentPubKey, batch_size: u8, payload_type: String, last_fetched_timestamp: Option<Timestamp>, // header timestamp; oldest message in the last fetched message last_fetched_message_id: Option<EntryHash> } fn get_next_batch_messages(filter: P2PChatFilterBatch) -> ExternResult<P2PMessageHashTables> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell AUI-->>ACC: call `get_next_batch_messages` ACC-->>ACC: query source chain for P2PMessage elements ACC-->>ACC: query source chain for P2PMessageReceipt elements ACC-->>ACC: initialize AgentMessages (conversant as single key) ACC-->>ACC: initialize MessageContents (empty) ACC-->>ACC: initialize ReceiptContents (empty) opt last_fetched_timestamp == None && last_fetched_id == None ACC-->>ACC: set last_fetched timestamp to current date end loop for every P2PMessage alt P2PMessage.payload == Payload::Text && payload_type == Text opt P2PMessage.header.timestamp <= last_fetched_timestamp <br> && P2PMessage.entry_hash != last_fetched_message_id <br> && (P2PMessage.author == conversant || P2PMessage.receiver == conversant) opt AgentMessages[conversant].length >= batch_size ACC-->>ACC: break the loop end ACC-->>ACC: call `process_message` end else P2PMessage.payload == Payload::File && payload_type == File opt P2PMessage.header.timestamp <= last_fetched_timestamp <br> && P2PMessage.entry_hash != last_fetched_message_id <br> && (P2PMessage.author == conversant || P2PMessage.receiver == conversant) opt AgentMessages[conversant].length >= batch_size ACC-->>ACC: break the loop end ACC-->>ACC: call `process_message` end else payload_type == All opt P2PMessage.header.timestamp <= last_fetched_timestamp <br> && P2PMessage.entry_hash != last_fetched_message_id <br> && (P2PMessage.author == conversant || P2PMessage.receiver == conversant) opt AgentMessages[conversant].length >= batch_size ACC-->>ACC: break the loop end ACC-->>ACC: call `process_message` end end end loop for every P2PMessageReceipt loop for every id in P2PMessageReceipt.ids opt id in MessageContents.keys ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id] ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents end end end ACC-->>AUI: consruct and return P2PMessagesOutput ``` ### `get_file_bytes` ```rust= fn get_file_bytes(entry_hash: EntryHash) -> ExternResult(P2PFileBytes) ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell AUI-->>ACC: call `get_files_bytes` Note over ACC: what happens if two private entries result in the same entry hash? ACC-->>ACC: get() using entry_hash and content() option ACC-->>AUI: return P2PFileBytes ``` ### `process_message` ```rust= fn process_message(agent_messages: &AgentMessages, message_contents: &MessageContents, message: P2PMessage) { ``` ```mermaid sequenceDiagram participant ACC as Alice_Conversation_Cell ACC-->>ACC: add P2PMessage.entry_hash to AgentMessages[conversant] ACC-->>ACC: construct MessageBundle(P2PMessage, <mutable empty vector>) ACC-->>ACC: insert P2PMessage.entry_hash:MessageBundle to MessageContents ACC-->>ACC: return ``` ### `typing` ```rust= #[derive(Serialize, Deserialize, SerializedBytes, Clone, Debug)] struct P2PTypingDetailIO { agent: AgentPubKey, is_typing: bool, } enum Signal { P2PTypingDetailSignal { agent: AgentPubKey, is_typing: bool } } #[hdk_extern] fn typing(typing: P2PTypingDetailIO) -> ExternResult<()> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ALD as Alice_Preference_Cell participant BLD as Bobby_Preference_C participant BUI as Bobby_UI AUI-->>ALD: sets preference ALD-->>AUI: fetch preference AUI-->>AUI: starts or stops typing alt typing preference is on alt If Alice allows Bob to see if she's typing AUI-->>ALD: call typing ALD-->>BLD: remote_signal P2PTypingDetailIO alt User is online BLD-->>BUI: emit_signal P2PTypingDetailIO alt if is_typing is true BUI-->>BUI: show that alice is typing else if is_typing is false BUI-->>BUI: remove typing indicator from UI end else User is offline Note left of BUI: Do nothing end end end ``` ### `read_message` ```rust= fn read_message(receipt: P2PMessageReceipt) -> ExternResult<ReceiptContents> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ALD as Alice_Cell participant BLD as Bobby_Cell participant BUI as Bobby_UI AUI-->>ALD: sets preference ALD-->>AUI: fetch preference AUI-->>ALD: sets receipt preference for bob ALD-->>AUI: fetch ReceiptPreference BUI-->>BLD: send message payload rect rgba(100,100,200,.5) BLD-->>ALD: call_remote `receive_message` end ALD-->>AUI: emit_signal message AUI-->>AUI: reads the message alt if read_receipts is on alt if Alice did not include Bob in her read preference AUI-->>ALD: call `read_message` ALD-->>ALD: commit to source chain the receipt entry with status being read ALD-->>AUI: return ReceiptContents <br> (skip this until post_commit becomes available) rect rgba(255,255,0,0.2) Note over ALD: yellow box will be moved to post_commit ALD-->>BLD: call_remote `receive_read_receipt` alt call_remote ok rect rgba(200,100,100,.5) BLD-->>BLD: invoke `receive_read_receipt` BLD-->>ALD: ReceiptContents end ALD-->>AUI: return ReceiptContents <br> (do this for now until post_commit is available) else call_remote timeout (receiver is offline) rect rgba(100,100,200,.5) ALD-->>ALD: call `send_to_neighbor` with the receipt as payload end ALD-->>AUI: return ReceiptContents <br> (do this for now until post_commit is available) else call_remote unauthorized Note over BLD, AUI: this case should not exist end end end end ``` ### `receive_read_receipt` ```rust= fn receive_read_receipt(receipt: P2PMessageReceipt) -> ExternResult<ReceiptContents> ``` ```mermaid sequenceDiagram participant BC as Bobby_Cell participant BUI as Bobby_UI rect rgba(200,100,100,.5) BC-->>BC: invoke `commit_receipts` end BC-->>BUI: emit signal ReceiptContents BUI-->>BUI: update status of relevant message BC-->>BC: return ReceiptContents ``` ### `commit_receipts` ```rust= fn commit_receipts(receipts: Vec<P2PMessageReceipt>) -> ExternResult<ReceiptContents> ``` ```mermaid sequenceDiagram participant BLD as Bobby_Cell BLD-->>BLD: query receipts from source chain BLD-->>BLD: create empty HashMap ReceiptsContents BLD-->>BLD: create empty HashSet loop for each P2PMessageReceipt in arg BLD-->>BLD: construct EntryHash from P2PMessageReceipt BLD-->>BLD: insert EntryHash in HashSet end loop for each P2PMessageReceipt Element in source chain BLD-->>BLD: check whether EntryHash is in HashSet opt EntryHash is found in HashSet BLD-->>BLD: remove P2PMessageReceipt with found EntryHash from Vec<P2PMessageReceipt> end opt if HashSet empty BLD-->>BLD: break the loop end end opt if Vec<P2PMessageReceipt> is not empty BLD-->>BLD: commit P2PMessageReceipt remaining in Vec<P2PMessageReceipt> BLD-->>BLD: insert EntryHash:P2PMessageReceipt in ReceiptContents HashMap end BLD-->>BLD: return ReceiptContents ``` ### `fetch_read_receipts` ```rust= fn fetch_read_receipts(_: ()) -> ExterResult<ReceiptContents> ``` ```mermaid sequenceDiagram participant BUI as Bobby_UI participant BCC as Bobby_Conversation_Cell participant N as Neighbors BCC-->>N: fetch read receipts rect rgba(200,100,100,.5) BCC-->>BCC: invoke 'commit_receipts' with Vec<P2PMessageReceipt> end BCC-->>BUI: emit_signal ReceiptContents to UI BUI-->>BUI: update relevant messages' status ``` ### `pin_message` ```rust= fn pin_message(receipt: PinMessageInput) -> ExternResult<PinMessageOutput> pub struct PinMessageInput { message_hashes: Vec<EntryHash>, conversants: Vec<AgentPubKey>, status: PinMessage } ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ALD as Alice_Cell participant BLD as Bobby_Cell participant BUI as Bobby_UI AUI-->>ALD: call `pin_message` ALD-->>ALD: construct P2PMessagePin entry ALD-->>ALD: commit to source chain the P2PMessagePin entry rect rgba(255,255,0,0.2) Note over ALD: yellow box will be moved to post_commit ALD-->>BLD: call_remote `sync_pin_message` alt call_remote ok rect rgba(200,100,100,.5) ALD-->>ALD: return PinContents end else call_remote timeout (receiver is offline) ALD-->>AUI: return error else call_remote unauthorized Note over BLD, AUI: this case should not exist ALD-->>AUI: return error end end ``` ### `sync_pin_message` ```rust= fn sync_pin_message(pin_status: P2PMessagePin) -> ExternResult<PinContents> ``` ```mermaid sequenceDiagram participant BCC as Bobby_Conversation_Cell participant BUI as Bobby_UI BCC-->>BCC: commit P2PMessagPin to source chain BCC-->>BCC: construct HashMap <String, P2PMessagePin> BCC-->>BCC: insert P2PMessagePin to HashMap BCC-->>BUI: emit signal with the HashMap as payload BCC-->>BCC: return HashMap ``` ### `get_pinned_messages` ```rust= fn get_pinned_messages(conversant: AgentPubKey) -> ExternResult<P2PMessageHashTables> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell AUI-->>ACC: call `get_pinned_messages` ACC-->>ACC: initialize AgentMessages (conversant as single key) ACC-->>ACC: initialize MessageContents (empty) ACC-->>ACC: initialize ReceiptContents (empty) ACC-->>ACC: initialize HashMap message_pins <String, PinStatus> ACC-->>ACC: initialize HashMap pinned_message_hashes <MessageHash, P2PMessagePin> ACC-->>ACC: initialize for unpinned_message_hashes <MessageHash, P2PMessagePin> ACC-->>ACC: query source chain for P2PMessage elements ACC-->>ACC: query source chain for P2PMessagePin elements loop for every P2PMessagePin in query results opt if P2PMessagePin.status == Pinned <br> && P2PMessagePin.conversant <br> == input.conversant loop for every hash in pin.hashes alt if unpinned_message_hashes[hash] exists ACC-->>ACC: remove hash key from unpinned_message_hashes else if unpinned_message_hashes[hash] does not exist ACC-->>ACC: add hash to pinned_message_hashes end end end opt if P2PMessagePin.status == Unpinned <br> && P2PMessagePin.conversant <br> == input.conversant loop for every hash in pin.hashes alt if pinned_message_hashes[hash] exists ACC-->>ACC: do nothing else if pinned_message_hashes[hash] does not exist ACC-->>ACC: add hash to unpinned_message_hashes end end end end loop for every P2PMessage in query results opt if pinned_message_hashes[P2PMessageHash] exists ACC-->>ACC: call `process_message` end end loop for every P2PMessageReceipt loop for every id in P2PMessageReceipt.ids opt id in MessageContents.keys ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id] ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents end end end ACC-->>AUI: consruct and return P2PMessageHashTables ``` ### `get_previous_messages` *formerly get_next_batch_messages* ```rust= pub struct P2PMessageFilterBatch { conversant: AgentPubKey, batch_size: u8, payload_type: String, last_fetched_timestamp: Option<Timestamp>, // header timestamp; oldest message in the last fetched message last_fetched_message_id: Option<EntryHash> } fn get_previous_messages(filter: P2PChatFilterBatch) -> ExternResult<P2PMessageHashTables> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell AUI-->>ACC: call `get_next_batch_messages` ACC-->>ACC: query source chain for P2PMessage elements ACC-->>ACC: query source chain for P2PMessageReceipt elements ACC-->>ACC: initialize AgentMessages (conversant as single key) ACC-->>ACC: initialize MessageContents (empty) ACC-->>ACC: initialize ReceiptContents (empty) opt last_fetched_timestamp == None && last_fetched_id == None ACC-->>ACC: set last_fetched timestamp to current date end loop for every P2PMessage alt P2PMessage.payload == Payload::Text && payload_type == Text opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) opt AgentMessages[conversant].length >= batch_size ACC-->>ACC: break the loop end ACC-->>ACC: call `process_message` end else P2PMessage.payload == Payload::File && payload_type == File opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) opt AgentMessages[conversant].length >= batch_size ACC-->>ACC: break the loop end ACC-->>ACC: call `process_message` end else payload_type == All opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) opt AgentMessages[conversant].length >= batch_size ACC-->>ACC: break the loop end ACC-->>ACC: call `process_message` end end end loop for every P2PMessageReceipt loop for every id in P2PMessageReceipt.ids opt id in MessageContents.keys ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id] ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents end end end ACC-->>AUI: consruct and return P2PMessagesOutput ``` TODO: can we break if we reach earlier timestamps? ### `get_next_messages` ```rust= pub struct P2PMessageFilterBatch { conversant: AgentPubKey, batch_size: u8, // per direction (10 previous and 10 next) payload_type: String, last_fetched_timestamp: Option<Timestamp>, // header timestamp; oldest message in the last fetched message last_fetched_message_id: Option<EntryHash> } fn get_next_messages(filter: P2PChatFilterBatch) -> ExternResult<P2PMessageHashTables> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell AUI-->>ACC: call `get_next_batch_messages` ACC-->>ACC: query source chain for P2PMessage elements ACC-->>ACC: query source chain for P2PMessageReceipt elements ACC-->>ACC: initialize AgentMessages (conversant as single key) ACC-->>ACC: initialize MessageContents (empty) ACC-->>ACC: initialize ReceiptContents (empty) Note over ACC: empty vector will hold messages with timestamps later than the target message <br> since we don't know in advance if that message falls within the batch size <br> directly adjacent the target ACC-->>ACC: initialize empty vector [later_messages] opt last_fetched_timestamp == None && last_fetched_id == None ACC-->>ACC: set last_fetched timestamp to current date end loop for every P2PMessage alt P2PMessage.payload == Payload::Text && payload_type == Text opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) ACC-->>ACC: add message to later_messages end else P2PMessage.payload == Payload::File && payload_type == File opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) ACC-->>ACC: add message to later_messages end else payload_type == All opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) ACC-->>ACC: add message to later_messages end end end ACC-->>ACC: initialize start_index = 0 opt later_messages.size > batch_size ACC-->>ACC: start_index = later_messages.size - batch_size end loop for every P2PMessage in later_messages[start_index:end] ACC-->>ACC: call `process_message` end loop for every P2PMessageReceipt loop for every id in P2PMessageReceipt.ids opt id in MessageContents.keys ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id] ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents end end end ACC-->>AUI: consruct and return P2PMessagesOutput ``` ### `get_adjacent_messages` ```rust= pub struct P2PMessageFilterBatch { conversant: AgentPubKey, batch_size: u8, // per direction (10 previous and 10 next) payload_type: String, last_fetched_timestamp: Option<Timestamp>, // header timestamp; oldest message in the last fetched message last_fetched_message_id: Option<EntryHash> } fn get_adjacent_messages(filter: P2PChatFilterBatch) -> ExternResult<P2PMessageHashTables> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell AUI-->>ACC: call `get_next_batch_messages` ACC-->>ACC: query source chain for P2PMessage elements ACC-->>ACC: query source chain for P2PMessageReceipt elements ACC-->>ACC: initialize AgentMessages (conversant as single key) ACC-->>ACC: initialize MessageContents (empty) ACC-->>ACC: initialize ReceiptContents (empty) Note over ACC: empty vector will hold messages with timestamps earlier than the target message <br> since we don't know in advance if that message falls within the batch size <br> directly adjacent the target ACC-->>ACC: initialize empty vector [later_messages] opt last_fetched_timestamp == None && last_fetched_id == None ACC-->>ACC: set last_fetched timestamp to current date end loop for every P2PMessage alt P2PMessage.payload == Payload::Text && payload_type == Text opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) ACC-->>ACC: add message to later_messages end opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) opt AgentMessages[conversant].length >= batch_size ACC-->>ACC: break the loop end ACC-->>ACC: call `process_message` end else P2PMessage.payload == Payload::File && payload_type == File opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) ACC-->>ACC: add message to later_messages end opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) opt AgentMessages[conversant].length >= batch_size ACC-->>ACC: break the loop end ACC-->>ACC: call `process_message` end else payload_type == All opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) ACC-->>ACC: add message to later_messages end opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant) opt AgentMessages[conversant].length >= batch_size ACC-->>ACC: break the loop end ACC-->>ACC: call `process_message` end end end ACC-->>ACC: initialize start_index = 0 opt later_messages.size > batch_size ACC-->>ACC: start_index = later_messages.size - batch_size end loop for every P2PMessage in later_messages[start_index:end] ACC-->>ACC: call `process_message` end loop for every P2PMessageReceipt loop for every id in P2PMessageReceipt.ids opt id in MessageContents.keys ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id] ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents end end end ACC-->>AUI: consruct and return P2PMessagesOutput ``` ## Pin Functions TODO: check for inconsistencies in source chains before pinning (rely on receipts) ### `pin_message` ```rust= fn pin_message(receipt: PinMessageInput) -> ExternResult<PinMessageOutput> pub struct PinMessageInput { message_hashes: Vec<EntryHash>, conversants: Vec<AgentPubKey>, status: PinMessage } ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ALD as Alice_Cell participant BLD as Bobby_Cell participant BUI as Bobby_UI AUI-->>ALD: call `pin_message` ALD-->>ALD: construct P2PMessagePin entry ALD-->>ALD: commit to source chain the P2PMessagePin entry rect rgba(255,255,0,0.2) Note over ALD: yellow box will be moved to post_commit ALD-->>BLD: call_remote `sync_pin_message` alt call_remote ok rect rgba(200,100,100,.5) ALD-->>ALD: return PinContents end else call_remote timeout (receiver is offline) ALD-->>AUI: return error else call_remote unauthorized Note over BLD, AUI: this case should not exist ALD-->>AUI: return error end end ``` ### `sync_pin_message` ```rust= fn sync_pin_message(pin_status: P2PMessagePin) -> ExternResult<PinContents> ``` ```mermaid sequenceDiagram participant BCC as Bobby_Conversation_Cell participant BUI as Bobby_UI BCC-->>BCC: commit P2PMessagPin to source chain BCC-->>BCC: construct HashMap <String, P2PMessagePin> BCC-->>BCC: insert P2PMessagePin to HashMap BCC-->>BUI: emit signal with the HashMap as payload BCC-->>BCC: return HashMap ``` ### `get_pinned_messages` ```rust= fn get_pinned_messages(conversant: AgentPubKey) -> ExternResult<P2PMessageHashTables> ``` ```mermaid sequenceDiagram participant AUI as Alice_UI participant ACC as Alice_Conversation_Cell AUI-->>ACC: call `get_pinned_messages` ACC-->>ACC: initialize AgentMessages (conversant as single key) ACC-->>ACC: initialize MessageContents (empty) ACC-->>ACC: initialize ReceiptContents (empty) ACC-->>ACC: initialize HashMap message_pins <String, PinStatus> ACC-->>ACC: initialize HashMap pinned_message_hashes <MessageHash, P2PMessagePin> ACC-->>ACC: initialize for unpinned_message_hashes <MessageHash, P2PMessagePin> ACC-->>ACC: query source chain for P2PMessage elements ACC-->>ACC: query source chain for P2PMessagePin elements loop for every P2PMessagePin in query results opt if P2PMessagePin.status == Pinned <br> && P2PMessagePin.conversant <br> == input.conversant loop for every hash in pin.hashes alt if unpinned_message_hashes[hash] exists ACC-->>ACC: remove hash key from unpinned_message_hashes else if unpinned_message_hashes[hash] does not exist ACC-->>ACC: add hash to pinned_message_hashes end end end opt if P2PMessagePin.status == Unpinned <br> && P2PMessagePin.conversant <br> == input.conversant loop for every hash in pin.hashes alt if pinned_message_hashes[hash] exists ACC-->>ACC: do nothing else if pinned_message_hashes[hash] does not exist ACC-->>ACC: add hash to unpinned_message_hashes end end end end loop for every P2PMessage in query results opt if pinned_message_hashes[P2PMessageHash] exists ACC-->>ACC: call `process_message` end end loop for every P2PMessageReceipt loop for every id in P2PMessageReceipt.ids opt id in MessageContents.keys ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id] ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents end end end ACC-->>AUI: consruct and return P2PMessageHashTables ``` ### `send_receipt_to_neighbor` ```rust= fn send_receipt_to_neighbor(receipt: P2PMessageReceipt) -> ExternResult<P2PMessageReceipt> ``` ```mermaid sequenceDiagram participant ACC as Alice_Conversation_Cell participant N as Neighborhood participant BCC as Bob_Conversation_Cell ACC-->>N: send message receipt via ephemeral delivery system N-->>ACC: return true Note over BCC: When bobby goes back online rect rgba(200,100,100,.5) BCC-->>BCC: invoke 'fetch_read_receipts' end ``` ### `fetch_undelivered_items` ```rust= pub struct P2PMessagesOutput( AgentMessages, MessageContents, ReceiptContents ) fn fetch_undelivered_items() -> ExternResult<P2PMessagesOutput> ``` ```mermaid sequenceDiagram participant ACC as Alice_Conversation_Cell participant BCC as Bobby_Conversation_Cell participant NH as Neighborhood participant BUI as Bobby_UI Note over BCC, NH: This process will be elaborated when the ephemeral delivery system is implemented rect rgba(200, 200, 200, 0.5) BCC-->>NH: ask for P2PMessage/P2PMessageReceipts intended for Bobby NH-->>BCC: return P2PMessage/P2PMessageReceipts end BCC-->>BCC: initialize empty vector "ids" of type EntryHash BCC-->>BCC: initialize empty vector "messages" of type P2PMessage loop for every P2PMessage/P2PMessageReceipts rect rgba(255,0,0,0.3) BCC-->>BCC: check if author is blocked by calling `in_blocked` <br> in Contacts zome end opt author is not blocked alt entry type is P2PMessageReceipt rect rgba(0, 255, 0, .5) BCC-->>BCC: call `notify_delivery` end else entry type is P2PMessage BCC-->>BCC: commit P2PMessage entry to source chain BCC-->>BCC: append P2PMessage.entry_hash to vector "ids" BCC-->>BCC: append P2PMessage to vector "messages" end end end BCC-->>BCC: generate timestamp for P2PMessageReceipt BCC-->>BCC: construct 'Delivered' P2PMessageReceipt with the vector "ids" as id BCC-->>BCC: commit P2PMessageReceipt entry to source chain rect rgba(0, 0, 255, .3) BCC-->>ACC: call_remote `notify_delivery` function end ACC-->>BCC: return call_remote result alt call_remote ok else call_remote timeout rect rgba(0, 255, 0, .5) BCC-->>NH: call `send_to_neighbor` with the P2PMessageReceipt as payload end else call_remote unauthorized Note over BCC, NH: case should not exist end BCC-->>BUI: return true ``` ## Validations - no validation since all entris are private ## Redux states ```typescript= type ProfileID = string; type P2PMessageReceiptID = string; type GroupFileBytesID = string; // GroupFileBytes' EntryHash type P2PMessageStatus = "Received" | "Read" | "Sent"; type Payload = TextPayload | FilePayload type AgentPubKeyBase64 = string; type HoloHashBase64 = string; // for reference interface Profile { agentPubKey: ProfileID; username: string; } interface TextPayload { type: "TEXT"; payload: { payload: string }; } interface FilePayload { type: "FILE"; fileName: string; fileSize: number; fileType: "IMAGE" | "VIDEO" | "OTHER"; fileHash: FileBytesID; thumbnail?: Uint8Array; } // AgentMessages(HashMap<AgentPubKey, Vec<MessageHash>>) interface P2PConversation { messages: MessageID[]; } // MessageContents(HashMap<MessageHash, MessageBundle>) interface P2PMessage { p2pMessageEntryHash: MessageID; author: ProfileID; receiver: ProfileID; payload: Payload; timestamp: Date; replyTo?: MessageID; receipts: P2PMessageReceiptID[]; } // pub struct ReceiptContents(HashMap<ReceiptHash, P2PMessageReceipt>) interface P2PMessageReceipt { p2pMessageReceiptEntryHash: P2PMessageReceiptID; p2pMessageEntryHashes: MessageID[]; timestamp: Date; status: P2PMessageStatus; } interface P2PFile { fileHash: FileBytesID; fileBytes: Uint8Array; } interface P2PHashMap { conversations: { [key: string]: P2PConversation; }; messages: { [key: string]: P2PMessage; }; receipts: { [key: string]: P2PMessageReceipt; }; } interface P2PMessageConversationState { conversations: { [key: string]: P2PConversation; }; messages: { [key: string]: P2PMessage; }; receipts: { [key: string]: P2PMessageReceipt; }; files: { [key: string]: Uint8Array; }; typing: { [key: string]: Profile; }; } const initialState: P2PMessageConversationState = { conversations: {}, messages: {}, receipts: {}, files: {}, typing: {} }; ```