# P2P Messaging
###### tags: `Kizuna-architecture`
<style> .ui-infobar, #doc.markdown-body { max-width: 1000px; } </style>
<img src="https://i.imgur.com/fVyoNsV.png" width="150" style="padding-right:5px"/>
<img src="https://i.imgur.com/ht3KLk4.png" width="150"/>
#### Reply
<img src="https://i.imgur.com/qX84J6o.png" width="150" style="padding:5px"/>
<img src="https://i.imgur.com/WnfFxXX.png" width="150" style="padding:5px"/>
<img src="https://i.imgur.com/j8LOqxO.png" width="150" style="padding:5px"/>
### TODO/Comments
- Encryption for messages
## Entries
### Message
```rust=
#[hdk_entry(id = "p2pmessage", visibility = "private")]
pub struct P2PMessage {
author: AgentPubKey,
receiver: AgentPubKey,
payload: Payload,
time_sent: Timestamp,
reply_to: Option<EntryHash>
}
```
### Receipt
```rust=
#[hdk_entry(id = "p2pmessagereceipt", visibility = "private")]
pub struct P2PMessageReceipt {
id: Vec<EntryHash>,
time_received: Option<Timestamp>,
status: Status
}
```
### Pin
```rust=
#[hdk_entry(id = "p2pmessagereceipt", visibility = "private")]
pub struct P2PMessagePin {
id: Vec<EntryHash>,
conversants: Vec<AgentPubKey>
status: PinStatus
}
```
### File Bytes
```rust=
#[hdk_entry(id = "p2pfilebytes", visibility = "private")]
pub struct P2PFileBytes (SerializedBytes)
```
## Entry Relationship Diagram
```mermaid
graph LR
subgraph p2pmessage zome
subgraph agents
alice
bobby
end
subgraph messages
message_1
receipt_1
message_2
receipt_2
alice.->|has in source chain|message_1
bobby.->|has in source chain|message_1
alice.->|has in source chain|message_2
bobby.->|has in source chain|message_2
alice.->|has in source chain|receipt_1
bobby.->|has in source chain|receipt_1
alice.->|has in source chain|receipt_2
bobby.->|has in source chain|receipt_2
receipt_1.->message_1
receipt_2.->message_2
end
subgraph async messages in DHT
async_1
bobby-->|async_messages|async_1
message_2 ==> async_1
end
end
```
## Data Structures
### Message Receipt Status
```rust=
enum Status {
Sent,
Received {timestamp: Timestamp},
Read {timestamp: Timestamp}
}
```
### PinStatus
```rust=
pub struct PinStatus {
pinned: { timestamp: Timestamp },
unpinnned: { timestamp: Timestamp }
}
```
### File Metadata
```rust=
struct FileMetadata {
file_name: String,
file_size: usize,
file_type: String,
file_hash: EntryHash // entryhash of File???
}
```
### File Type
```rust=
enum FileType {
Image {
thumbnail: serializedBytes
},
Video {
thumbnail: SerializedBytes
},
Others
}
```
### Payload
```rust=
enum Payload {
Text {payload: String},
File {
metadata: FileMetadata,
file_type: FileType
}
}
```
<!-- Input Structures -->
### Message Input
```rust=
struct MessageInput {
receiver: AgentPubKey,
payload: PayloadInput,
file_bytes: Option<SerializedBytes>,
reply_to: Option<EntryHash>
}
```
### Read Receipt Input
```rust=
struct ReadReceiptInput {
sender: AgentPubKey,
receipt: P2PMessageReceipt,
}
```
### File Metadata Input
```rust=
/* This will be defined in a /commons crate that will be used for both p2pmessage and group zome */
struct FileMetadataInput {
file_name: String,
file_size: usize,
file_type: String,
}
```
### Payload Input
```rust=
struct PayloadInput {
Text {payload: String},
File {
metadata: FileMetadataInput
file_type: FileType
}
}
```
### PinInput
```rust=
pub struct PinMessageInput {
id: Vec<EntryHash>,
conversants: Vec<AgentPubkey>,
status: PinStatus,
}
```
<!-- Output Structures -->
### Messaage Output
```rust=
pub struct P2PMessagesOutput(
AgentMessages,
MessageContents,
ReceiptContents
)
```
### Agent - Message Hash Map
```rust=
pub struct AgentMessages(HashMap<AgentPubKey, Vec<Entryhash>>)
```
### Message Hash - Message Hash Map
```rust=
pub struct MessageContents(HashMap<EntryHash, MessageBundle>)
```
### Receipt Hash - Receipt Hash Map
```rust=
pub struct ReceiptContents(HashMap<EntryHash, P2PMessageReceipt>)
```
### Message Bundle
```rust=
pub struct MessageBundle(P2PMessageElement, Vec<ReceiptHash>)
```
### PinContents
```rust=
pub struct PinMessageOutput(HashMap<String, P2PMessagePin>)
```
## Zome Functions
### `init`
```mermaid
sequenceDiagram
participant ACC as Alice_Conversation_Cell
participant BCC as Bobby_Conversation_Cell
par alice init
ACC-->>ACC: set unrestricted access to receive_message
ACC-->>ACC: set unrestricted access to notify_delivery
ACC-->>ACC: set unrestricted access to recv_remote_signal
and bob init
BCC-->>BCC: set unrestricted access to receive_message
BCC-->>BCC: set unrestricted acesss to notify_delivery
BCC-->>BCC: set unrestricted access to recv_remote_signal
end
```
### `send_message`
```rust=
fn send_message(message_input: MessageInput) -> ExternResult<MessageBundle>
struct MessageInput {
receiver: AgentPubKey,
payload: PayloadInput,
file_bytes: Option<SerializedBytes>,
reply_to: Option<EntryHash>
}
struct MessageBundle(P2PMessageElement, Vec<P2PMessageReceiptElement>)
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
participant BCC as Bobby_Conversation_Cell
participant BUI as Bobby_UI
participant NH as Neighborhood
AUI-->>ACC: call `send_message` function
rect rgba(255,0,0,0.3)
ACC-->>ACC: check if receiver is blocked by calling `in_blocked` <br> in Contacts zome
end
opt receiver is blocked
ACC-->>AUI: return error (cannot send to blocked)
end
opt payload is File
ACC-->>ACC: construct P2PFileBytes
ACC-->>ACC: commit P2PFileBytes
end
ACC-->>ACC: construct P2PMessage from input
ACC-->>ACC: construct 'Sent' P2PMesageReceipt
ACC-->>ACC: commit P2PMessage and P2PMessageReceipt to source chain <br> while calling post_commit callback
ACC-->>AUI: return MessageBundle <br> (skip this until post_commit becomes available)
Note over ACC: yellow box will be moved to post commit
rect rgba(255,255,0,0.2)
ACC-->>BCC: call_remote Bobby's `receive_message` function
alt call_remote ok (receiver is online)
rect rgba(0, 0, 255, 0.4)
BCC-->>BCC: invoke `receive_message`
end
BCC-->>ACC: return P2PMessage and P2PMessageReceipt
opt payload is File
ACC-->>ACC: commit P2PFileBytes
end
ACC-->>ACC: commit P2PMessageReceipt to source chain
ACC-->>AUI: return MessageBundle
else call_remote timeout (receiver is offline)
Note over ACC, BCC: timeout can also mean that the peer was not found in which case the receiver needs to poll the message from the neighborhood
rect rgba(0,255,0,0.8)
ACC-->>NH: call `send_to_neighbor` with P2PMessage as payload
end
ACC-->>ACC: commit P2PMessageReceipt to source chain
ACC-->>AUI: return MessageBundle
else call_remote unauthorized
Note over ACC, BCC: case should not exist
end
end
```
### `receive_message`
```rust
fn receive_message(message: P2PMessage) -> ExternResult<MessageBundle>
```
```mermaid
sequenceDiagram
participant ACC as Alice_Conversation_Cell
participant BCC as Bobby_Conversation_Cell
participant BUI as Bobby_UI
rect rgba(255,0,0,0.3)
BCC-->>BCC: check if author is blocked by calling `in_blocked` <br> in Contacts zome
end
opt author is not blocked
BCC-->>BCC: generate timestamp for 'Delivered' receipt
BCC-->>BCC: construct P2PMessage <br> and 'Delivered' P2PMessageReceipt
BCC-->>BCC: commit P2PMessage and P2PMessageReceipt to source chain
BCC-->>BUI: emit_signal MessageBundle
BCC-->>ACC: return MessageBundle
end
```
### `notify_delivery`
```rust
fn notify_delivery(message_receipt: P2PMessageReceipt) -> ExternResult<BooleanWrapper>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
participant BCC as Bobby_Conversation_Cell
ACC-->>ACC: commit P2PMessageReceipt to source chain
ACC-->>AUI: emit signal P2PReceiptElement
AUI-->>AUI: display message (flag as Delivered)
ACC-->>BCC: return true
```
### `get_messages_by_agent_by_timestamp`
```rust=
pub struct P2PAgentDateFilter {
conversant: AgentPubKey,
date: Timestamp,
payload_type: String
}
fn get_messages_by_agent_by_timestamp(filter: P2PAgentDateFilter) -> ExternResult<P2PMessagesOutput>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
AUI-->>ACC: call `get_messages_by_agent_by_timestamp`
ACC-->>ACC: query source chain for P2PMessage elements
ACC-->>ACC: query source chain for P2PMessageReceipt elements
ACC-->>ACC: initialize AgentMessages (conversant as single key)
ACC-->>ACC: initialize MessageContents (empty)
ACC-->>ACC: initialize ReceiptContents (empty)
loop for every P2PMessage
alt P2PMessage.payload == Payload::Text && payload_type == Text
opt P2PMessage.author == conversant or P2PMessage.receiver == conversant
opt date_end >= P2PMessage.header.timestamp >= date_start
ACC-->>ACC: call `process_message`
end
end
else P2PMessage.payload == Payload::File && payload_type == File
opt P2PMessage.author == conversant or P2PMessage.receiver == conversant
opt date_end >= P2PMessage.header.timestamp >= date_start
ACC-->>ACC: call `process_message`
end
end
else payload_type == All
opt P2PMessage.author == conversant or P2PMessage.receiver == conversant
opt date_end >= P2PMessage.header.timestamp >= date_start
ACC-->>ACC: call `process_message`
end
end
end
end
loop for every P2PMessageReceipt
loop for every id in P2PMessageReceipt.ids
opt id in MessageContents.keys
ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id]
ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents
end
end
end
ACC-->>AUI: construct and return P2PMessagesOutput
```
### `get_latest_messages`
```rust=
pub struct BatchSize(u8)
fn get_latest_messages(batch_size: BatchSize) -> ExternResult<P2PMessagesOutput>
```
```mermaid
sequenceDiagram
participant AGG as Aggregator_Zome
participant ACC as Alice_Conversation_Cell
AGG-->>ACC: call `get_messages_by_agent_by_timestamp`
ACC-->>ACC: query source chain for P2PMessage elements
ACC-->>ACC: query source chain for P2PMessageReceipt elements
ACC-->>ACC: initialize AgentMessages (empty)
ACC-->>ACC: initialize MessageContents (empty)
ACC-->>ACC: initialize ReceiptContents (empty)
loop for every P2PMessage
opt P2PMessage.author != self && P2PMessage.author not in AgentMessages.keys
ACC-->>ACC: insert P2PMessage.author:<empty entryhash vector> to AgentMessages
end
opt AgentMessages[P2PMessage.author].length >= batch_size
ACC-->>ACC: continue with next item in loop
end
alt author == self
ACC-->>ACC: add P2PMessage.entry_hash to AgentMessages[P2PMessage.receiver]
else author != self
ACC-->>ACC: add P2PMessage.entry_hash to AgentMessages[P2PMessage.author]
end
ACC-->>ACC: construct MessageBundle(P2PMessage, <mutable empty vector>)
ACC-->>ACC: insert P2PMessage.entry_hash:MessageBundle to MessageContents
end
loop for every P2PMessageReceipt
loop for every id in P2PMessageReceipt.ids
opt id in MessageContents.keys
ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id]
ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents
end
end
end
ACC-->>AGG: construct and return P2PMessagesOutput
```
### `get_next_batch_messages`
```rust=
pub struct P2PMessageFilterBatch {
conversant: AgentPubKey,
batch_size: u8,
payload_type: String,
last_fetched_timestamp: Option<Timestamp>, // header timestamp; oldest message in the last fetched message
last_fetched_message_id: Option<EntryHash>
}
fn get_next_batch_messages(filter: P2PChatFilterBatch) -> ExternResult<P2PMessageHashTables>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
AUI-->>ACC: call `get_next_batch_messages`
ACC-->>ACC: query source chain for P2PMessage elements
ACC-->>ACC: query source chain for P2PMessageReceipt elements
ACC-->>ACC: initialize AgentMessages (conversant as single key)
ACC-->>ACC: initialize MessageContents (empty)
ACC-->>ACC: initialize ReceiptContents (empty)
opt last_fetched_timestamp == None && last_fetched_id == None
ACC-->>ACC: set last_fetched timestamp to current date
end
loop for every P2PMessage
alt P2PMessage.payload == Payload::Text && payload_type == Text
opt P2PMessage.header.timestamp <= last_fetched_timestamp <br> && P2PMessage.entry_hash != last_fetched_message_id <br> && (P2PMessage.author == conversant || P2PMessage.receiver == conversant)
opt AgentMessages[conversant].length >= batch_size
ACC-->>ACC: break the loop
end
ACC-->>ACC: call `process_message`
end
else P2PMessage.payload == Payload::File && payload_type == File
opt P2PMessage.header.timestamp <= last_fetched_timestamp <br> && P2PMessage.entry_hash != last_fetched_message_id <br> && (P2PMessage.author == conversant || P2PMessage.receiver == conversant)
opt AgentMessages[conversant].length >= batch_size
ACC-->>ACC: break the loop
end
ACC-->>ACC: call `process_message`
end
else payload_type == All
opt P2PMessage.header.timestamp <= last_fetched_timestamp <br> && P2PMessage.entry_hash != last_fetched_message_id <br> && (P2PMessage.author == conversant || P2PMessage.receiver == conversant)
opt AgentMessages[conversant].length >= batch_size
ACC-->>ACC: break the loop
end
ACC-->>ACC: call `process_message`
end
end
end
loop for every P2PMessageReceipt
loop for every id in P2PMessageReceipt.ids
opt id in MessageContents.keys
ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id]
ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents
end
end
end
ACC-->>AUI: consruct and return P2PMessagesOutput
```
### `get_file_bytes`
```rust=
fn get_file_bytes(entry_hash: EntryHash) -> ExternResult(P2PFileBytes)
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
AUI-->>ACC: call `get_files_bytes`
Note over ACC: what happens if two private entries result in the same entry hash?
ACC-->>ACC: get() using entry_hash and content() option
ACC-->>AUI: return P2PFileBytes
```
### `process_message`
```rust=
fn process_message(agent_messages: &AgentMessages, message_contents: &MessageContents, message: P2PMessage) {
```
```mermaid
sequenceDiagram
participant ACC as Alice_Conversation_Cell
ACC-->>ACC: add P2PMessage.entry_hash to AgentMessages[conversant]
ACC-->>ACC: construct MessageBundle(P2PMessage, <mutable empty vector>)
ACC-->>ACC: insert P2PMessage.entry_hash:MessageBundle to MessageContents
ACC-->>ACC: return
```
### `typing`
```rust=
#[derive(Serialize, Deserialize, SerializedBytes, Clone, Debug)]
struct P2PTypingDetailIO {
agent: AgentPubKey,
is_typing: bool,
}
enum Signal {
P2PTypingDetailSignal {
agent: AgentPubKey,
is_typing: bool
}
}
#[hdk_extern]
fn typing(typing: P2PTypingDetailIO) -> ExternResult<()>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ALD as Alice_Preference_Cell
participant BLD as Bobby_Preference_C
participant BUI as Bobby_UI
AUI-->>ALD: sets preference
ALD-->>AUI: fetch preference
AUI-->>AUI: starts or stops typing
alt typing preference is on
alt If Alice allows Bob to see if she's typing
AUI-->>ALD: call typing
ALD-->>BLD: remote_signal P2PTypingDetailIO
alt User is online
BLD-->>BUI: emit_signal P2PTypingDetailIO
alt if is_typing is true
BUI-->>BUI: show that alice is typing
else if is_typing is false
BUI-->>BUI: remove typing indicator from UI
end
else User is offline
Note left of BUI: Do nothing
end
end
end
```
### `read_message`
```rust=
fn read_message(receipt: P2PMessageReceipt) -> ExternResult<ReceiptContents>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ALD as Alice_Cell
participant BLD as Bobby_Cell
participant BUI as Bobby_UI
AUI-->>ALD: sets preference
ALD-->>AUI: fetch preference
AUI-->>ALD: sets receipt preference for bob
ALD-->>AUI: fetch ReceiptPreference
BUI-->>BLD: send message payload
rect rgba(100,100,200,.5)
BLD-->>ALD: call_remote `receive_message`
end
ALD-->>AUI: emit_signal message
AUI-->>AUI: reads the message
alt if read_receipts is on
alt if Alice did not include Bob in her read preference
AUI-->>ALD: call `read_message`
ALD-->>ALD: commit to source chain the receipt entry with status being read
ALD-->>AUI: return ReceiptContents <br> (skip this until post_commit becomes available)
rect rgba(255,255,0,0.2)
Note over ALD: yellow box will be moved to post_commit
ALD-->>BLD: call_remote `receive_read_receipt`
alt call_remote ok
rect rgba(200,100,100,.5)
BLD-->>BLD: invoke `receive_read_receipt`
BLD-->>ALD: ReceiptContents
end
ALD-->>AUI: return ReceiptContents <br> (do this for now until post_commit is available)
else call_remote timeout (receiver is offline)
rect rgba(100,100,200,.5)
ALD-->>ALD: call `send_to_neighbor` with the receipt as payload
end
ALD-->>AUI: return ReceiptContents <br> (do this for now until post_commit is available)
else call_remote unauthorized
Note over BLD, AUI: this case should not exist
end
end
end
end
```
### `receive_read_receipt`
```rust=
fn receive_read_receipt(receipt: P2PMessageReceipt) -> ExternResult<ReceiptContents>
```
```mermaid
sequenceDiagram
participant BC as Bobby_Cell
participant BUI as Bobby_UI
rect rgba(200,100,100,.5)
BC-->>BC: invoke `commit_receipts`
end
BC-->>BUI: emit signal ReceiptContents
BUI-->>BUI: update status of relevant message
BC-->>BC: return ReceiptContents
```
### `commit_receipts`
```rust=
fn commit_receipts(receipts: Vec<P2PMessageReceipt>) -> ExternResult<ReceiptContents>
```
```mermaid
sequenceDiagram
participant BLD as Bobby_Cell
BLD-->>BLD: query receipts from source chain
BLD-->>BLD: create empty HashMap ReceiptsContents
BLD-->>BLD: create empty HashSet
loop for each P2PMessageReceipt in arg
BLD-->>BLD: construct EntryHash from P2PMessageReceipt
BLD-->>BLD: insert EntryHash in HashSet
end
loop for each P2PMessageReceipt Element in source chain
BLD-->>BLD: check whether EntryHash is in HashSet
opt EntryHash is found in HashSet
BLD-->>BLD: remove P2PMessageReceipt with found EntryHash from Vec<P2PMessageReceipt>
end
opt if HashSet empty
BLD-->>BLD: break the loop
end
end
opt if Vec<P2PMessageReceipt> is not empty
BLD-->>BLD: commit P2PMessageReceipt remaining in Vec<P2PMessageReceipt>
BLD-->>BLD: insert EntryHash:P2PMessageReceipt in ReceiptContents HashMap
end
BLD-->>BLD: return ReceiptContents
```
### `fetch_read_receipts`
```rust=
fn fetch_read_receipts(_: ()) -> ExterResult<ReceiptContents>
```
```mermaid
sequenceDiagram
participant BUI as Bobby_UI
participant BCC as Bobby_Conversation_Cell
participant N as Neighbors
BCC-->>N: fetch read receipts
rect rgba(200,100,100,.5)
BCC-->>BCC: invoke 'commit_receipts' with Vec<P2PMessageReceipt>
end
BCC-->>BUI: emit_signal ReceiptContents to UI
BUI-->>BUI: update relevant messages' status
```
### `pin_message`
```rust=
fn pin_message(receipt: PinMessageInput) -> ExternResult<PinMessageOutput>
pub struct PinMessageInput {
message_hashes: Vec<EntryHash>,
conversants: Vec<AgentPubKey>,
status: PinMessage
}
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ALD as Alice_Cell
participant BLD as Bobby_Cell
participant BUI as Bobby_UI
AUI-->>ALD: call `pin_message`
ALD-->>ALD: construct P2PMessagePin entry
ALD-->>ALD: commit to source chain the P2PMessagePin entry
rect rgba(255,255,0,0.2)
Note over ALD: yellow box will be moved to post_commit
ALD-->>BLD: call_remote `sync_pin_message`
alt call_remote ok
rect rgba(200,100,100,.5)
ALD-->>ALD: return PinContents
end
else call_remote timeout (receiver is offline)
ALD-->>AUI: return error
else call_remote unauthorized
Note over BLD, AUI: this case should not exist
ALD-->>AUI: return error
end
end
```
### `sync_pin_message`
```rust=
fn sync_pin_message(pin_status: P2PMessagePin) -> ExternResult<PinContents>
```
```mermaid
sequenceDiagram
participant BCC as Bobby_Conversation_Cell
participant BUI as Bobby_UI
BCC-->>BCC: commit P2PMessagPin to source chain
BCC-->>BCC: construct HashMap <String, P2PMessagePin>
BCC-->>BCC: insert P2PMessagePin to HashMap
BCC-->>BUI: emit signal with the HashMap as payload
BCC-->>BCC: return HashMap
```
### `get_pinned_messages`
```rust=
fn get_pinned_messages(conversant: AgentPubKey) -> ExternResult<P2PMessageHashTables>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
AUI-->>ACC: call `get_pinned_messages`
ACC-->>ACC: initialize AgentMessages (conversant as single key)
ACC-->>ACC: initialize MessageContents (empty)
ACC-->>ACC: initialize ReceiptContents (empty)
ACC-->>ACC: initialize HashMap message_pins <String, PinStatus>
ACC-->>ACC: initialize HashMap pinned_message_hashes <MessageHash, P2PMessagePin>
ACC-->>ACC: initialize for unpinned_message_hashes <MessageHash, P2PMessagePin>
ACC-->>ACC: query source chain for P2PMessage elements
ACC-->>ACC: query source chain for P2PMessagePin elements
loop for every P2PMessagePin in query results
opt if P2PMessagePin.status == Pinned <br> && P2PMessagePin.conversant <br> == input.conversant
loop for every hash in pin.hashes
alt if unpinned_message_hashes[hash] exists
ACC-->>ACC: remove hash key from unpinned_message_hashes
else if unpinned_message_hashes[hash] does not exist
ACC-->>ACC: add hash to pinned_message_hashes
end
end
end
opt if P2PMessagePin.status == Unpinned <br> && P2PMessagePin.conversant <br> == input.conversant
loop for every hash in pin.hashes
alt if pinned_message_hashes[hash] exists
ACC-->>ACC: do nothing
else if pinned_message_hashes[hash] does not exist
ACC-->>ACC: add hash to unpinned_message_hashes
end
end
end
end
loop for every P2PMessage in query results
opt if pinned_message_hashes[P2PMessageHash] exists
ACC-->>ACC: call `process_message`
end
end
loop for every P2PMessageReceipt
loop for every id in P2PMessageReceipt.ids
opt id in MessageContents.keys
ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id]
ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents
end
end
end
ACC-->>AUI: consruct and return P2PMessageHashTables
```
### `get_previous_messages`
*formerly get_next_batch_messages*
```rust=
pub struct P2PMessageFilterBatch {
conversant: AgentPubKey,
batch_size: u8,
payload_type: String,
last_fetched_timestamp: Option<Timestamp>, // header timestamp; oldest message in the last fetched message
last_fetched_message_id: Option<EntryHash>
}
fn get_previous_messages(filter: P2PChatFilterBatch) -> ExternResult<P2PMessageHashTables>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
AUI-->>ACC: call `get_next_batch_messages`
ACC-->>ACC: query source chain for P2PMessage elements
ACC-->>ACC: query source chain for P2PMessageReceipt elements
ACC-->>ACC: initialize AgentMessages (conversant as single key)
ACC-->>ACC: initialize MessageContents (empty)
ACC-->>ACC: initialize ReceiptContents (empty)
opt last_fetched_timestamp == None && last_fetched_id == None
ACC-->>ACC: set last_fetched timestamp to current date
end
loop for every P2PMessage
alt P2PMessage.payload == Payload::Text && payload_type == Text
opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
opt AgentMessages[conversant].length >= batch_size
ACC-->>ACC: break the loop
end
ACC-->>ACC: call `process_message`
end
else P2PMessage.payload == Payload::File && payload_type == File
opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
opt AgentMessages[conversant].length >= batch_size
ACC-->>ACC: break the loop
end
ACC-->>ACC: call `process_message`
end
else payload_type == All
opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
opt AgentMessages[conversant].length >= batch_size
ACC-->>ACC: break the loop
end
ACC-->>ACC: call `process_message`
end
end
end
loop for every P2PMessageReceipt
loop for every id in P2PMessageReceipt.ids
opt id in MessageContents.keys
ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id]
ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents
end
end
end
ACC-->>AUI: consruct and return P2PMessagesOutput
```
TODO: can we break if we reach earlier timestamps?
### `get_next_messages`
```rust=
pub struct P2PMessageFilterBatch {
conversant: AgentPubKey,
batch_size: u8, // per direction (10 previous and 10 next)
payload_type: String,
last_fetched_timestamp: Option<Timestamp>, // header timestamp; oldest message in the last fetched message
last_fetched_message_id: Option<EntryHash>
}
fn get_next_messages(filter: P2PChatFilterBatch) -> ExternResult<P2PMessageHashTables>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
AUI-->>ACC: call `get_next_batch_messages`
ACC-->>ACC: query source chain for P2PMessage elements
ACC-->>ACC: query source chain for P2PMessageReceipt elements
ACC-->>ACC: initialize AgentMessages (conversant as single key)
ACC-->>ACC: initialize MessageContents (empty)
ACC-->>ACC: initialize ReceiptContents (empty)
Note over ACC: empty vector will hold messages with timestamps later than the target message <br> since we don't know in advance if that message falls within the batch size <br> directly adjacent the target
ACC-->>ACC: initialize empty vector [later_messages]
opt last_fetched_timestamp == None && last_fetched_id == None
ACC-->>ACC: set last_fetched timestamp to current date
end
loop for every P2PMessage
alt P2PMessage.payload == Payload::Text && payload_type == Text
opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
ACC-->>ACC: add message to later_messages
end
else P2PMessage.payload == Payload::File && payload_type == File
opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
ACC-->>ACC: add message to later_messages
end
else payload_type == All
opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
ACC-->>ACC: add message to later_messages
end
end
end
ACC-->>ACC: initialize start_index = 0
opt later_messages.size > batch_size
ACC-->>ACC: start_index = later_messages.size - batch_size
end
loop for every P2PMessage in later_messages[start_index:end]
ACC-->>ACC: call `process_message`
end
loop for every P2PMessageReceipt
loop for every id in P2PMessageReceipt.ids
opt id in MessageContents.keys
ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id]
ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents
end
end
end
ACC-->>AUI: consruct and return P2PMessagesOutput
```
### `get_adjacent_messages`
```rust=
pub struct P2PMessageFilterBatch {
conversant: AgentPubKey,
batch_size: u8, // per direction (10 previous and 10 next)
payload_type: String,
last_fetched_timestamp: Option<Timestamp>, // header timestamp; oldest message in the last fetched message
last_fetched_message_id: Option<EntryHash>
}
fn get_adjacent_messages(filter: P2PChatFilterBatch) -> ExternResult<P2PMessageHashTables>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
AUI-->>ACC: call `get_next_batch_messages`
ACC-->>ACC: query source chain for P2PMessage elements
ACC-->>ACC: query source chain for P2PMessageReceipt elements
ACC-->>ACC: initialize AgentMessages (conversant as single key)
ACC-->>ACC: initialize MessageContents (empty)
ACC-->>ACC: initialize ReceiptContents (empty)
Note over ACC: empty vector will hold messages with timestamps earlier than the target message <br> since we don't know in advance if that message falls within the batch size <br> directly adjacent the target
ACC-->>ACC: initialize empty vector [later_messages]
opt last_fetched_timestamp == None && last_fetched_id == None
ACC-->>ACC: set last_fetched timestamp to current date
end
loop for every P2PMessage
alt P2PMessage.payload == Payload::Text && payload_type == Text
opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
ACC-->>ACC: add message to later_messages
end
opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
opt AgentMessages[conversant].length >= batch_size
ACC-->>ACC: break the loop
end
ACC-->>ACC: call `process_message`
end
else P2PMessage.payload == Payload::File && payload_type == File
opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
ACC-->>ACC: add message to later_messages
end
opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
opt AgentMessages[conversant].length >= batch_size
ACC-->>ACC: break the loop
end
ACC-->>ACC: call `process_message`
end
else payload_type == All
opt P2PMessage.header.timestamp <br> >= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
ACC-->>ACC: add message to later_messages
end
opt P2PMessage.header.timestamp <br> <= last_fetched_timestamp <br> && P2PMessage.entry_hash <br>!= last_fetched_message_id <br> && (P2PMessage.author == conversant <br> || P2PMessage.receiver == conversant)
opt AgentMessages[conversant].length >= batch_size
ACC-->>ACC: break the loop
end
ACC-->>ACC: call `process_message`
end
end
end
ACC-->>ACC: initialize start_index = 0
opt later_messages.size > batch_size
ACC-->>ACC: start_index = later_messages.size - batch_size
end
loop for every P2PMessage in later_messages[start_index:end]
ACC-->>ACC: call `process_message`
end
loop for every P2PMessageReceipt
loop for every id in P2PMessageReceipt.ids
opt id in MessageContents.keys
ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id]
ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents
end
end
end
ACC-->>AUI: consruct and return P2PMessagesOutput
```
## Pin Functions
TODO: check for inconsistencies in source chains before pinning (rely on receipts)
### `pin_message`
```rust=
fn pin_message(receipt: PinMessageInput) -> ExternResult<PinMessageOutput>
pub struct PinMessageInput {
message_hashes: Vec<EntryHash>,
conversants: Vec<AgentPubKey>,
status: PinMessage
}
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ALD as Alice_Cell
participant BLD as Bobby_Cell
participant BUI as Bobby_UI
AUI-->>ALD: call `pin_message`
ALD-->>ALD: construct P2PMessagePin entry
ALD-->>ALD: commit to source chain the P2PMessagePin entry
rect rgba(255,255,0,0.2)
Note over ALD: yellow box will be moved to post_commit
ALD-->>BLD: call_remote `sync_pin_message`
alt call_remote ok
rect rgba(200,100,100,.5)
ALD-->>ALD: return PinContents
end
else call_remote timeout (receiver is offline)
ALD-->>AUI: return error
else call_remote unauthorized
Note over BLD, AUI: this case should not exist
ALD-->>AUI: return error
end
end
```
### `sync_pin_message`
```rust=
fn sync_pin_message(pin_status: P2PMessagePin) -> ExternResult<PinContents>
```
```mermaid
sequenceDiagram
participant BCC as Bobby_Conversation_Cell
participant BUI as Bobby_UI
BCC-->>BCC: commit P2PMessagPin to source chain
BCC-->>BCC: construct HashMap <String, P2PMessagePin>
BCC-->>BCC: insert P2PMessagePin to HashMap
BCC-->>BUI: emit signal with the HashMap as payload
BCC-->>BCC: return HashMap
```
### `get_pinned_messages`
```rust=
fn get_pinned_messages(conversant: AgentPubKey) -> ExternResult<P2PMessageHashTables>
```
```mermaid
sequenceDiagram
participant AUI as Alice_UI
participant ACC as Alice_Conversation_Cell
AUI-->>ACC: call `get_pinned_messages`
ACC-->>ACC: initialize AgentMessages (conversant as single key)
ACC-->>ACC: initialize MessageContents (empty)
ACC-->>ACC: initialize ReceiptContents (empty)
ACC-->>ACC: initialize HashMap message_pins <String, PinStatus>
ACC-->>ACC: initialize HashMap pinned_message_hashes <MessageHash, P2PMessagePin>
ACC-->>ACC: initialize for unpinned_message_hashes <MessageHash, P2PMessagePin>
ACC-->>ACC: query source chain for P2PMessage elements
ACC-->>ACC: query source chain for P2PMessagePin elements
loop for every P2PMessagePin in query results
opt if P2PMessagePin.status == Pinned <br> && P2PMessagePin.conversant <br> == input.conversant
loop for every hash in pin.hashes
alt if unpinned_message_hashes[hash] exists
ACC-->>ACC: remove hash key from unpinned_message_hashes
else if unpinned_message_hashes[hash] does not exist
ACC-->>ACC: add hash to pinned_message_hashes
end
end
end
opt if P2PMessagePin.status == Unpinned <br> && P2PMessagePin.conversant <br> == input.conversant
loop for every hash in pin.hashes
alt if pinned_message_hashes[hash] exists
ACC-->>ACC: do nothing
else if pinned_message_hashes[hash] does not exist
ACC-->>ACC: add hash to unpinned_message_hashes
end
end
end
end
loop for every P2PMessage in query results
opt if pinned_message_hashes[P2PMessageHash] exists
ACC-->>ACC: call `process_message`
end
end
loop for every P2PMessageReceipt
loop for every id in P2PMessageReceipt.ids
opt id in MessageContents.keys
ACC-->>ACC: add P2PMessageReceipt.entry_hash to the MessageBundle in MessageContents[P2PMessageReceipt.id]
ACC-->>ACC: insert P2PMessageReceipt.entry_hash:P2PMessageReceipt to ReceiptContents
end
end
end
ACC-->>AUI: consruct and return P2PMessageHashTables
```
### `send_receipt_to_neighbor`
```rust=
fn send_receipt_to_neighbor(receipt: P2PMessageReceipt) -> ExternResult<P2PMessageReceipt>
```
```mermaid
sequenceDiagram
participant ACC as Alice_Conversation_Cell
participant N as Neighborhood
participant BCC as Bob_Conversation_Cell
ACC-->>N: send message receipt via ephemeral delivery system
N-->>ACC: return true
Note over BCC: When bobby goes back online
rect rgba(200,100,100,.5)
BCC-->>BCC: invoke 'fetch_read_receipts'
end
```
### `fetch_undelivered_items`
```rust=
pub struct P2PMessagesOutput(
AgentMessages,
MessageContents,
ReceiptContents
)
fn fetch_undelivered_items() -> ExternResult<P2PMessagesOutput>
```
```mermaid
sequenceDiagram
participant ACC as Alice_Conversation_Cell
participant BCC as Bobby_Conversation_Cell
participant NH as Neighborhood
participant BUI as Bobby_UI
Note over BCC, NH: This process will be elaborated when the ephemeral delivery system is implemented
rect rgba(200, 200, 200, 0.5)
BCC-->>NH: ask for P2PMessage/P2PMessageReceipts intended for Bobby
NH-->>BCC: return P2PMessage/P2PMessageReceipts
end
BCC-->>BCC: initialize empty vector "ids" of type EntryHash
BCC-->>BCC: initialize empty vector "messages" of type P2PMessage
loop for every P2PMessage/P2PMessageReceipts
rect rgba(255,0,0,0.3)
BCC-->>BCC: check if author is blocked by calling `in_blocked` <br> in Contacts zome
end
opt author is not blocked
alt entry type is P2PMessageReceipt
rect rgba(0, 255, 0, .5)
BCC-->>BCC: call `notify_delivery`
end
else entry type is P2PMessage
BCC-->>BCC: commit P2PMessage entry to source chain
BCC-->>BCC: append P2PMessage.entry_hash to vector "ids"
BCC-->>BCC: append P2PMessage to vector "messages"
end
end
end
BCC-->>BCC: generate timestamp for P2PMessageReceipt
BCC-->>BCC: construct 'Delivered' P2PMessageReceipt with the vector "ids" as id
BCC-->>BCC: commit P2PMessageReceipt entry to source chain
rect rgba(0, 0, 255, .3)
BCC-->>ACC: call_remote `notify_delivery` function
end
ACC-->>BCC: return call_remote result
alt call_remote ok
else call_remote timeout
rect rgba(0, 255, 0, .5)
BCC-->>NH: call `send_to_neighbor` with the P2PMessageReceipt as payload
end
else call_remote unauthorized
Note over BCC, NH: case should not exist
end
BCC-->>BUI: return true
```
## Validations
- no validation since all entris are private
## Redux states
```typescript=
type ProfileID = string;
type P2PMessageReceiptID = string;
type GroupFileBytesID = string; // GroupFileBytes' EntryHash
type P2PMessageStatus = "Received" | "Read" | "Sent";
type Payload = TextPayload | FilePayload
type AgentPubKeyBase64 = string;
type HoloHashBase64 = string;
// for reference
interface Profile {
agentPubKey: ProfileID;
username: string;
}
interface TextPayload {
type: "TEXT";
payload: { payload: string };
}
interface FilePayload {
type: "FILE";
fileName: string;
fileSize: number;
fileType: "IMAGE" | "VIDEO" | "OTHER";
fileHash: FileBytesID;
thumbnail?: Uint8Array;
}
// AgentMessages(HashMap<AgentPubKey, Vec<MessageHash>>)
interface P2PConversation {
messages: MessageID[];
}
// MessageContents(HashMap<MessageHash, MessageBundle>)
interface P2PMessage {
p2pMessageEntryHash: MessageID;
author: ProfileID;
receiver: ProfileID;
payload: Payload;
timestamp: Date;
replyTo?: MessageID;
receipts: P2PMessageReceiptID[];
}
// pub struct ReceiptContents(HashMap<ReceiptHash, P2PMessageReceipt>)
interface P2PMessageReceipt {
p2pMessageReceiptEntryHash: P2PMessageReceiptID;
p2pMessageEntryHashes: MessageID[];
timestamp: Date;
status: P2PMessageStatus;
}
interface P2PFile {
fileHash: FileBytesID;
fileBytes: Uint8Array;
}
interface P2PHashMap {
conversations: {
[key: string]: P2PConversation;
};
messages: {
[key: string]: P2PMessage;
};
receipts: {
[key: string]: P2PMessageReceipt;
};
}
interface P2PMessageConversationState {
conversations: {
[key: string]: P2PConversation;
};
messages: {
[key: string]: P2PMessage;
};
receipts: {
[key: string]: P2PMessageReceipt;
};
files: {
[key: string]: Uint8Array;
};
typing: {
[key: string]: Profile;
};
}
const initialState: P2PMessageConversationState = {
conversations: {},
messages: {},
receipts: {},
files: {},
typing: {}
};
```