# MailStream - Complete Architecture & Implementation Documentation ## 🎯 Project Overview **MailStream** is an intelligent email management system that syncs multiple IMAP accounts, categorizes emails using AI, provides smart reply suggestions with RAG (Retrieval-Augmented Generation), and maintains contact memory for personalized responses. --- ## πŸ—οΈ System Architecture ### **Tech Stack** - **Backend**: Node.js + Express + TypeScript - **Database**: PostgreSQL (via Prisma ORM) - **AI/ML**: Google Gemini 2.5 Flash (categorization, embeddings, RAG) - **Search**: Elasticsearch (full-text email search) - **Vectors**:pinecone vector database - **Email**: IMAP (node-imap) + SMTP (nodemailer) - **Authentication**: JWT + Google OAuth - **Real-time**: IMAP IDLE for live email sync - **Notifications**: Slack webhooks + custom webhooks ### **High-Level Architecture Diagram** ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Frontend β”‚ β”‚ (React/ β”‚ β”‚ Next.js) β”‚ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜ β”‚ HTTP/REST β”‚ JWT Auth β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Express.js API Server β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Auth β”‚ β”‚ Account β”‚ β”‚ Email β”‚ β”‚ β”‚ β”‚ Routes β”‚ β”‚ Routes β”‚ β”‚ Routes β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ RAG β”‚ β”‚ Search β”‚ β”‚ Sync β”‚ β”‚ β”‚ β”‚ Routes β”‚ β”‚ Routes β”‚ β”‚ Routes β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ β”‚ Middleware: JWT Verification, CORS, Body Parser β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β–Ό β–Ό β–Ό β–Ό β–Ό β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚Prisma/ β”‚ β”‚ IMAP β”‚ β”‚Elastic β”‚ β”‚Geminiβ”‚ β”‚ Slack/ β”‚ β”‚Pinecone β”‚ β”‚Postgre β”‚ β”‚Service β”‚ β”‚ search β”‚ β”‚ AI β”‚ β”‚Webhooks β”‚ β”‚ Vector β”‚ β”‚ SQL β”‚ β”‚(IDLE) β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ DB β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ └──► Gmail/Outlook/Yahoo IMAP Servers (Real-time email sync via IDLE) ``` --- ## πŸ“‚ Project Structure ``` backend/ β”œβ”€β”€ src/ β”‚ β”œβ”€β”€ config/ β”‚ β”‚ └── elasticsearch.ts # ES client config β”‚ β”œβ”€β”€ controllers/ β”‚ β”‚ β”œβ”€β”€ account.controller.ts # Account CRUD β”‚ β”‚ β”œβ”€β”€ auth.controller.ts # Login/OAuth β”‚ β”‚ β”œβ”€β”€ contactMemory.controller.ts β”‚ β”‚ β”œβ”€β”€ email.controller.ts # Send/get replies β”‚ β”‚ β”œβ”€β”€ rag.controller.ts # AI suggestions β”‚ β”‚ β”œβ”€β”€ search.controller.ts # ES search β”‚ β”‚ └── sync.controller.ts # Sync management β”‚ β”œβ”€β”€ middleware/ β”‚ β”‚ └── auth.middleware.ts # JWT verification β”‚ β”œβ”€β”€ routes/ β”‚ β”‚ β”œβ”€β”€ account.route.ts β”‚ β”‚ β”œβ”€β”€ auth.route.ts β”‚ β”‚ β”œβ”€β”€ contactMemory.routes.ts β”‚ β”‚ β”œβ”€β”€ email.routes.ts β”‚ β”‚ β”œβ”€β”€ rag.routes.ts β”‚ β”‚ β”œβ”€β”€ search.routes.ts β”‚ β”‚ └── sync.routes.ts β”‚ β”œβ”€β”€ services/ β”‚ β”‚ β”œβ”€β”€ ai.service.ts # Gemini AI calls β”‚ β”‚ β”œβ”€β”€ contactMemory.service.ts β”‚ β”‚ β”œβ”€β”€ imap.service.ts # Email sync (IDLE) β”‚ β”‚ β”œβ”€β”€ notification.service.ts β”‚ β”‚ └── rag.service.ts # RAG implementation β”‚ β”œβ”€β”€ utils/ β”‚ β”‚ β”œβ”€β”€ db.ts # Prisma client β”‚ β”‚ └── vector.ts # Cosine similarity β”‚ └── index.ts # Express server β”œβ”€β”€ prisma/ β”‚ └── schema.prisma # Database schema └── package.json ``` --- ## πŸ”Œ API Routes & Implementation ### **1. Authentication Routes** (`/api/auth`) | Method | Endpoint | Description | Auth | Controller | |--------|----------|-------------|------|------------| | POST | `/login` | OAuth login (Google) | ❌ | `AuthController.login` | **Implementation Flow:** ``` 1. Frontend sends OAuth data (name, email, provider, oauth_id) 2. Check if user exists in DB 3. If new β†’ Create User + OAuthAccount 4. If exists β†’ Link new provider if needed 5. Generate JWT token (365 days expiry) 6. Auto-connect user's IMAP accounts 7. Return user data + Bearer token ``` **Response:** ```json { "message": "Login successful", "user": { "id": "cuid", "email": "user@example.com", "name": "John Doe", "provider": "google", "token": "Bearer eyJhbGc..." } } ``` --- ### **2. Account Management Routes** (`/api/accounts`) | Method | Endpoint | Description | Auth | Controller | |--------|----------|-------------|------|------------| | GET | `/status?userId=X` | Check if user has accounts | ❌ | Inline handler | | POST | `/add` | Add new IMAP account | βœ… | `AccountController.addAccount` | | GET | `/` | Get user's accounts | βœ… | `AccountController.getUserAccounts` | | GET | `/allDetails` | Get all emails grouped by account | βœ… | `AccountController.getAllDetailsOfEmailsByUserId` | **Add Account Flow (`POST /add`):** ``` 1. Extract email & IMAP password from request 2. Auto-detect provider (Gmail, Outlook, Yahoo, Zoho) 3. Set IMAP host based on domain: - gmail.com β†’ imap.gmail.com:993 - outlook/office365 β†’ outlook.office365.com:993 - yahoo β†’ imap.mail.yahoo.com:993 - zoho β†’ imap.zoho.com:993 - custom β†’ imap.{domain}:993 4. Save account to PostgreSQL 5. Immediately connect via IMAP and start syncing 6. Return account details ``` **Request:** ```json { "email": "user@gmail.com", "imapPassword": "app-password-here" } ``` **Response:** ```json { "message": "Account connected successfully", "account": { "id": "uuid", "email": "user@gmail.com", "provider": "gmail", "imapHost": "imap.gmail.com", "isActive": true } } ``` --- ### **3. Email Routes** (`/api/email`) | Method | Endpoint | Description | Auth | Controller | |--------|----------|-------------|------|------------| | POST | `/reply` | Send email reply via SMTP | βœ… | `EmailController.sendReply` | | GET | `/:emailId/replies` | Get all replies to an email | βœ… | `EmailController.getReplies` | **Send Reply Flow (`POST /reply`):** ``` 1. Fetch original email from DB (with account details) 2. Create SMTP transporter: - Replace "imap" with "smtp" in host - Port: 465 (secure) - Auth: account IMAP credentials 3. Send reply email (nodemailer) 4. Save reply to `Reply` table 5. Update contact memory with reply summary (AI-generated) 6. Return saved reply ``` **Request:** ```json { "emailId": "uuid", "replyText": "Thank you for your interest! Let's schedule a call." } ``` --- ### **4. RAG/AI Routes** (`/api/rag`) | Method | Endpoint | Description | Auth | Controller | |--------|----------|-------------|------|------------| | POST | `/suggest-reply` | Generate AI reply suggestions | βœ… | `AIController.suggestReply` | | POST | `/add` | Add custom RAG context | βœ… | `AIController.addContext` | | GET | `/list` | List user's RAG contexts | βœ… | `AIController.listContexts` | | DELETE | `/:id` | Delete/deactivate context | βœ… | `AIController.deleteContext` | **Suggest Reply Flow (`POST /suggest-reply`):** ``` 1. Fetch email by emailId 2. Call RAG service: a. Generate embedding for email (subject + body) b. Retrieve top 5 similar RAG contexts (cosine similarity) c. Build prompt with user contexts d. Call Gemini AI for reply suggestions e. Parse JSON response (2 suggestions with tone) 3. Save suggestions to `SuggestedReply` table 4. Return suggestions + retrieved contexts ``` **Request:** ```json { "emailId": "uuid" } ``` **Response:** ```json { "success": true, "suggestions": [ { "text": "Thank you for your interest! I'd be happy to schedule a demo. Here's my calendar link: calendly.com/...", "tone": "friendly" }, { "text": "I appreciate your inquiry. Let me send you our product deck and we can discuss further.", "tone": "formal" } ], "reasoning": "Lead shows strong interest, offering next steps", "retrievedContexts": [ {"id": "ctx1", "contextType": "product_info", "score": 0.85} ] } ``` **Add RAG Context Flow (`POST /add`):** ``` 1. Receive content & contextType (e.g., "product_info", "pricing") 2. Generate embedding using Gemini (text-embedding-004) 3. Save to RAGContext table and copy with vectors in pinecone vector database 4. Return saved context ``` --- ### **5. Search Routes** (`/api/search`) | Method | Endpoint | Description | Auth | Controller | |--------|----------|-------------|------|------------| | GET | `/elastic?q=keyword` | Search emails via Elasticsearch | βœ… | `SearchController.searchEmails` | **Search Implementation:** ``` 1. Build Elasticsearch query: - Multi-match across: subject^4, from^3, body, bodyPreview - Fuzzy matching (AUTO fuzziness) - Optional filters: accountId, folder 2. Sort by date (descending) 3. Highlight matching text (<mark> tags) 4. Return top 50 results with scores ``` **Query Parameters:** - `q` (required): Search keyword - `accountId` (optional): Filter by account - `folder` (optional): Filter by folder (e.g., INBOX) **Response:** ```json { "success": true, "total": 12, "results": [ { "id": "email-uuid", "score": 8.5, "subject": "Meeting follow-up", "from": "client@example.com", "date": "2025-01-15T10:30:00Z", "highlight": { "subject": ["<mark>Meeting</mark> follow-up"], "body": ["discussing the <mark>meeting</mark> agenda"] } } ] } ``` --- ### **6. Sync Routes** (`/api/sync`) | Method | Endpoint | Description | Auth | Controller | |--------|----------|-------------|------|------------| | POST | `/start` | Start syncing all accounts | ❌ | `SyncController.startSync` | | POST | `/account/:accountId` | Sync specific account | ❌ | `SyncController.syncAccount` | | GET | `/status` | Get sync status | ❌ | `SyncController.getSyncStatus` | | POST | `/stop` | Stop all syncing | ❌ | `SyncController.stopSync` | **Sync Status Response:** ```json { "success": true, "accounts": [ { "id": "acc-uuid", "email": "user@gmail.com", "isActive": true, "syncStatus": "completed", "lastSyncedAt": "2025-01-15T12:00:00Z", "idleActive": true, "idleLastPing": "2025-01-15T12:05:00Z", "_count": { "emails": 1250 } } ], "recentLogs": [...] } ``` --- ### **7. Contact Memory Routes** (`/api/memory`) | Method | Endpoint | Description | Auth | Controller | |--------|----------|-------------|------|------------| | GET | `/mira` | List all contacts with summaries | βœ… | `ContactMemoryController.listContacts` | | GET | `/mira/:email` | Get conversation history with contact | βœ… | `ContactMemoryController.getContactSummaries` | **"Mira" Feature:** - Tracks all email interactions with each contact - Auto-generates AI summaries for: - Received emails (what they said) - Sent replies (what you said) - Maintains chronological conversation history **Response (`GET /mira`):** ```json { "contacts": [ { "id": "contact-uuid", "emailAddress": "client@example.com", "lastUpdated": "2025-01-15T14:30:00Z", "summaries": [ { "summaryText": "Requested product demo and pricing info", "createdAt": "2025-01-15T14:30:00Z" } ] } ] } ``` --- ## πŸ€– Core Services Implementation ### **1. IMAP Service** (`imap.service.ts`) **Responsibilities:** - Connect to multiple IMAP accounts - Fetch last 30 days of emails - Real-time sync via IMAP IDLE - Save emails to PostgreSQL + Elasticsearch - Auto-reconnect on disconnection **Key Methods:** ```typescript - connectAccount(accountId): Connect single account - connectAllAccounts(userId?): Connect all active accounts - fetchRecentEmails(): Fetch last 30 days - startIdleMode(): Enable real-time notifications - processMessage(): Parse & save individual email - saveEmail(): Save to DB + ES + trigger AI categorization - searchEmails(): Query Elasticsearch - disconnectAll(): Graceful shutdown ``` **IDLE Mode Flow:** ``` 1. Open INBOX 2. Fetch recent emails (last 30 days) 3. Enter IDLE state 4. On "mail" event (new email): a. Exit IDLE b. Disconnect c. Reconnect (fetches new emails) d. Re-enter IDLE 5. On disconnect: Auto-reconnect after 5s ``` **Email Processing Pipeline:** ``` Email received β†’ Parse (mailparser) β†’ Save to PostgreSQL ↓ AI Categorization (Gemini) ↓ Index to Elasticsearch ↓ Notification (if INTERESTED) ↓ Update Contact Memory ``` --- ### **2. AI Service** (`ai.service.ts`) **Gemini Models Used:** - `gemini-2.5-flash`: Categorization, summaries, reply generation - `text-embedding-004`: Vector embeddings for RAG **Key Functions:** **a) Email Categorization:** ```typescript categorizeEmailAI(subject, body) β†’ EmailCategory ``` Returns: `INTERESTED`, `MEETING_BOOKED`, `NOT_INTERESTED`, `SPAM`, `OUT_OF_OFFICE`, `UNCATEGORIZED` **b) Email Summary:** ```typescript generateEmailSummary(subject, body) β†’ string ``` Returns: 20-word summary of received email **c) Reply Summary:** ```typescript generateReplySummary(body) β†’ string ``` Returns: 20-word summary of sent reply (past tense) **d) Embeddings:** ```typescript generateEmbedding(content) β†’ number[] ``` Returns: 768-dimensional vector for RAG --- ### **3. RAG Service** (`rag.service.ts`) **RAG (Retrieval-Augmented Generation) Implementation:** ``` User adds context β†’ Generate embedding β†’ Store in DB and pinecone ↓ When generating reply: 1. Generate query embedding (email subject + body) 2. Retrieve top K contexts (cosine similarity) 3. Build prompt with retrieved contexts 4. Call Gemini for reply suggestions 5. Return suggestions + reasoning ``` **Prompt Structure:** ``` You are an AI assistant... Relevant user-specific knowledge: Context 1 (score=0.85): [Product pricing: $99/month...] Context 2 (score=0.72): [Meeting link: calendly.com/...] Email subject: "Interested in your product" Email body: "Can you tell me more about pricing?" Generate 2 reply suggestions... Return JSON only: {suggestions: [...], reasoning: "..."} ``` --- ### **4. Contact Memory Service** (`contactMemory.service.ts`) **Purpose:** Track conversation history with each contact **Flow:** ``` Email received/sent β†’ Generate AI summary ↓ Find/create ContactMemory ↓ Add ContactSummary entry ↓ Update lastUpdated timestamp ``` **Data Structure:** ``` ContactMemory (per contact) β”œβ”€β”€ emailAddress: "client@example.com" β”œβ”€β”€ lastUpdated: DateTime └── summaries: [ {text: "Requested demo", isReply: false, createdAt: ...}, {text: "Sent pricing deck", isReply: true, createdAt: ...} ] ``` --- ### **5. Notification Service** (`notification.service.ts`) **Triggers:** Only fires for `INTERESTED` category emails **Actions:** 1. Send Slack webhook with email details 2. Trigger external webhook (e.g., webhook.site) 3. Mark email as `slackNotified: true` **Slack Message Format:** ``` *New Interested Lead Detected!* From: client@example.com Subject: Inquiry about your service Account: user@gmail.com ``` --- ## πŸ” Authentication & Security ### **JWT Middleware** (`auth.middleware.ts`) ```typescript verifyToken(req, res, next) { 1. Extract Authorization header 2. Verify Bearer token format 3. Verify JWT signature (JWT_SECRET_KEY) 4. Attach decoded user to req.user 5. Call next() or return 401/403 } ``` **Token Payload:** ```json { "id": "user-cuid", "email": "user@example.com", "name": "John Doe", "provider": "google", "iat": 1736950000, "exp": 1768486000 // 365 days } ```