Browse Source

Add message handling

Owen Schwartz 8 tháng trước cách đây
mục cha
commit
d223d4fcee
3 tập tin đã thay đổi với 195 bổ sung64 xóa
  1. 22 0
      server/routers/newt/handleNewtMessage.ts
  2. 2 1
      server/routers/newt/index.ts
  3. 171 63
      server/routers/ws.ts

+ 22 - 0
server/routers/newt/handleNewtMessage.ts

@@ -0,0 +1,22 @@
+// messageHandlers/chat.ts
+import { MessageHandler } from "../ws";
+
+export const handleNewtMessage: MessageHandler = async (context) => {
+    const { message, senderNewtId, sendToClient } = context;
+    
+    // Process chat message
+    // ... your chat logic here ...
+
+    // Example response
+    return {
+        message: {
+            type: 'newt_response',
+            data: {
+                originalMessage: message.data,
+                timestamp: new Date().toISOString()
+            }
+        },
+        broadcast: false,  // Send to all clients
+        excludeSender: false  // Include sender in broadcast
+    };
+};

+ 2 - 1
server/routers/newt/index.ts

@@ -1,2 +1,3 @@
 export * from "./createNewt";
-export * from "./getToken";
+export * from "./getToken";
+export * from "./handleNewtMessage";

+ 171 - 63
server/routers/ws.ts

@@ -1,12 +1,13 @@
-import { Router, Request, Response } from 'express';
-import { Server as HttpServer } from 'http';
-import { WebSocket, WebSocketServer } from 'ws';
-import { IncomingMessage } from 'http';
-import { Socket } from 'net';
-import { Newt, newts, NewtSession } from '@server/db/schema';
-import { eq } from 'drizzle-orm';
-import db from '@server/db';
-import { validateNewtSessionToken } from '@server/auth/newt';
+import { Router, Request, Response } from "express";
+import { Server as HttpServer } from "http";
+import { WebSocket, WebSocketServer } from "ws";
+import { IncomingMessage } from "http";
+import { Socket } from "net";
+import { Newt, newts, NewtSession } from "@server/db/schema";
+import { eq } from "drizzle-orm";
+import db from "@server/db";
+import { validateNewtSessionToken } from "@server/auth/newt";
+import { handleNewtMessage } from "./newt";
 
 // Custom interfaces
 interface WebSocketRequest extends IncomingMessage {
@@ -23,16 +24,93 @@ interface TokenPayload {
     session: NewtSession;
 }
 
+interface WSMessage {
+    type: string;
+    data: any;
+}
+
+interface HandlerResponse {
+    message: WSMessage;
+    broadcast?: boolean;
+    excludeSender?: boolean;
+    targetNewtId?: string;
+}
+
+interface HandlerContext {
+    message: WSMessage;
+    senderWs: WebSocket;
+    senderNewtId: string;
+    sendToClient: (newtId: string, message: WSMessage) => boolean;
+    broadcastToAllExcept: (message: WSMessage, excludeNewtId?: string) => void;
+    connectedClients: Map<string, WebSocket[]>;
+}
+
+export type MessageHandler = (context: HandlerContext) => Promise<HandlerResponse | void>;
+
+const messageHandlers: Record<string, MessageHandler> = {
+    "newt": handleNewtMessage,
+};
+
 const router: Router = Router();
 const wss: WebSocketServer = new WebSocketServer({ noServer: true });
 
-// Token verification middleware
+// Client tracking map
+let connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
+
+// Helper functions for client management
+const addClient = (newtId: string, ws: AuthenticatedWebSocket): void => {
+    const existingClients = connectedClients.get(newtId) || [];
+    existingClients.push(ws);
+    connectedClients.set(newtId, existingClients);
+    console.log(`Client added to tracking - Newt ID: ${newtId}, Total connections: ${existingClients.length}`);
+};
+
+const removeClient = (newtId: string, ws: AuthenticatedWebSocket): void => {
+    const existingClients = connectedClients.get(newtId) || [];
+    const updatedClients = existingClients.filter(client => client !== ws);
+    
+    if (updatedClients.length === 0) {
+        connectedClients.delete(newtId);
+        console.log(`All connections removed for Newt ID: ${newtId}`);
+    } else {
+        connectedClients.set(newtId, updatedClients);
+        console.log(`Connection removed - Newt ID: ${newtId}, Remaining connections: ${updatedClients.length}`);
+    }
+};
+
+// Helper functions for sending messages
+const sendToClient = (newtId: string, message: WSMessage): boolean => {
+    const clients = connectedClients.get(newtId);
+    if (!clients || clients.length === 0) {
+        console.log(`No active connections found for Newt ID: ${newtId}`);
+        return false;
+    }
+
+    const messageString = JSON.stringify(message);
+    clients.forEach(client => {
+        if (client.readyState === WebSocket.OPEN) {
+            client.send(messageString);
+        }
+    });
+    return true;
+};
+
+const broadcastToAllExcept = (message: WSMessage, excludeNewtId?: string): void => {
+    connectedClients.forEach((clients, newtId) => {
+        if (newtId !== excludeNewtId) {
+            clients.forEach(client => {
+                if (client.readyState === WebSocket.OPEN) {
+                    client.send(JSON.stringify(message));
+                }
+            });
+        }
+    });
+};
+
+// Token verification middleware (unchanged)
 const verifyToken = async (token: string): Promise<TokenPayload | null> => {
     try {
-
-        const { session, newt } = await validateNewtSessionToken(
-            token
-        );
+        const { session, newt } = await validateNewtSessionToken(token);
 
         if (!session || !newt) {
             return null;
@@ -49,115 +127,145 @@ const verifyToken = async (token: string): Promise<TokenPayload | null> => {
 
         return { newt: existingNewt[0], session };
     } catch (error) {
-        console.error('Token verification failed:', error);
+        console.error("Token verification failed:", error);
         return null;
     }
 };
 
-// Handle WebSocket upgrade requests
-router.get('/ws', (req: Request, res: Response) => {
-    // WebSocket upgrade will be handled by the server
-    res.status(200).send('WebSocket endpoint');
+// Router endpoint (unchanged)
+router.get("/ws", (req: Request, res: Response) => {
+    res.status(200).send("WebSocket endpoint");
 });
 
-// Set up WebSocket server handling
+// WebSocket upgrade handler
 const handleWSUpgrade = (server: HttpServer): void => {
-    server.on('upgrade', async (request: WebSocketRequest, socket: Socket, head: Buffer) => {
+    server.on("upgrade", async (request: WebSocketRequest, socket: Socket, head: Buffer) => {
         try {
-            // Extract token from query parameters or headers
-            const token = request.url?.includes('?')
-                ? new URLSearchParams(request.url.split('?')[1]).get('token') || ''
-                : request.headers['sec-websocket-protocol'];
+            const token = request.url?.includes("?")
+                ? new URLSearchParams(request.url.split("?")[1]).get("token") || ""
+                : request.headers["sec-websocket-protocol"];
 
             if (!token) {
-                socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
+                socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
                 socket.destroy();
                 return;
             }
 
-            // Verify the token
             const tokenPayload = await verifyToken(token);
             if (!tokenPayload) {
-                socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n');
+                socket.write("HTTP/1.1 401 Unauthorized\r\n\r\n");
                 socket.destroy();
                 return;
             }
 
-            // Store token payload data in the request for later use
             request.token = token;
 
             wss.handleUpgrade(request, socket, head, (ws: AuthenticatedWebSocket) => {
-                // Attach newt data to the WebSocket instance
                 ws.newt = tokenPayload.newt;
                 ws.isAlive = true;
-                wss.emit('connection', ws, request);
+                wss.emit("connection", ws, request);
             });
         } catch (error) {
-            console.error('Upgrade error:', error);
-            socket.write('HTTP/1.1 500 Internal Server Error\r\n\r\n');
+            console.error("Upgrade error:", error);
+            socket.write("HTTP/1.1 500 Internal Server Error\r\n\r\n");
             socket.destroy();
         }
     });
 };
 
-// WebSocket message interface
-interface WSMessage {
-    type: string;
-    data: any;
-}
-
 // WebSocket connection handler
-wss.on('connection', (ws: AuthenticatedWebSocket, request: WebSocketRequest) => {
-    console.log(`Client connected - Newt ID: ${ws.newt?.newtId}`);
+wss.on("connection", (ws: AuthenticatedWebSocket, request: WebSocketRequest) => {
+    const newtId = ws.newt?.newtId;
+    if (!newtId) {
+        console.error("Connection attempt without newt ID");
+        return ws.terminate();
+    }
+
+    // Add client to tracking
+    addClient(newtId, ws);
 
     // Set up ping-pong for connection health check
     const pingInterval = setInterval(() => {
         if (ws.isAlive === false) {
             clearInterval(pingInterval);
+            removeClient(newtId, ws);
             return ws.terminate();
         }
         ws.isAlive = false;
         ws.ping();
     }, 30000);
 
-    // Handle pong response
-    ws.on('pong', () => {
+    ws.on("pong", () => {
         ws.isAlive = true;
     });
 
-    // Set up message handler
-    ws.on('message', (data) => {
+    ws.on("message", async (data) => {
         try {
             const message: WSMessage = JSON.parse(data.toString());
-            console.log('Received:', message);
-
-            // Echo the message back
-            ws.send(JSON.stringify({
-                type: 'echo',
-                data: message
-            }));
+            console.log(`Message received from Newt ID ${newtId}:`, message);
+    
+            // Validate message format
+            if (!message.type || typeof message.type !== "string") {
+                throw new Error("Invalid message format: missing or invalid type");
+            }
+    
+            // Get the appropriate handler for the message type
+            const handler = messageHandlers[message.type];
+            if (!handler) {
+                throw new Error(`Unsupported message type: ${message.type}`);
+            }
+    
+            // Process the message and get response
+            const response = await handler({
+                message,
+                senderWs: ws,
+                senderNewtId: newtId,
+                sendToClient,
+                broadcastToAllExcept,
+                connectedClients
+            });
+    
+            // Send response if one was returned
+            if (response) {
+                if (response.broadcast) {
+                    // Broadcast to all clients except sender if specified
+                    broadcastToAllExcept(response.message, response.excludeSender ? newtId : undefined);
+                } else if (response.targetNewtId) {
+                    // Send to specific client if targetNewtId is provided
+                    sendToClient(response.targetNewtId, response.message);
+                } else {
+                    // Send back to sender
+                    ws.send(JSON.stringify(response.message));
+                }
+            }
+    
         } catch (error) {
-            console.error('Message parsing error:', error);
+            console.error("Message handling error:", error);
             ws.send(JSON.stringify({
-                type: 'error',
-                data: 'Invalid message format'
+                type: "error",
+                data: {
+                    message: error instanceof Error ? error.message : "Unknown error occurred",
+                    originalMessage: data.toString()
+                }
             }));
         }
-    });
+    });    
 
-    // Handle client disconnect
-    ws.on('close', () => {
+    ws.on("close", () => {
         clearInterval(pingInterval);
-        console.log(`Client disconnected - Newt ID: ${ws.newt?.newtId}`);
+        removeClient(newtId, ws);
+        console.log(`Client disconnected - Newt ID: ${newtId}`);
     });
 
-    // Handle errors
-    ws.on('error', (error: Error) => {
-        console.error('WebSocket error:', error);
+    ws.on("error", (error: Error) => {
+        console.error(`WebSocket error for Newt ID ${newtId}:`, error);
     });
 });
 
 export {
     router,
-    handleWSUpgrade
+    handleWSUpgrade,
+    sendToClient,
+    broadcastToAllExcept,
+    connectedClients
 };