Просмотр исходного кода

TCPProxy seems to work as intended using link(). New test creating a mock socket as inbound connection to google.

oscar 2 месяцев назад
Родитель
Сommit
422457abb8
3 измененных файлов с 129 добавлено и 73 удалено
  1. 66 49
      src/lib/TCPProxy.js
  2. 27 10
      src/lib/TCPSocketClient.js
  3. 36 14
      src/lib/WebVM.svelte

+ 66 - 49
src/lib/TCPProxy.js

@@ -3,8 +3,8 @@ import { TCPSocketClient  } from "./TCPSocketClient";
 export class TCPProxy {
   constructor() {
     this.server = null;
-    this.inboundConnections = new Map();  // clientID -> TCPSocketClient
-    this.outboundConnections = new Map(); // clientID -> TCPSocketClient
+    this.inboundConnections = new Map();  // inboundID -> TCPSocketClient
+    this.outboundConnections = new Map(); // outboundID -> TCPSocketClient
     this.forwardingPairs = new Map();     // inboundID -> outboundID
     this.reverseForwardingPairs = new Map(); // outboundID -> inboundID
     this._reader = null;
@@ -28,13 +28,17 @@ export class TCPProxy {
 
   async start(localAddress, options = {}) {
     try {
-      this.server = new TCPServerSocket(localAddress, options); // localAddress instead of ::1 but testing for now
+      this.server = new TCPServerSocket(localAddress, options);
       console.log(`Server created: `, this.server);
-      // console.log(`opened properties: `, Object.getOwnPropertyDescriptor(this.server, 'opened'));
-      // console.log(`server properties: `, Object.getOwnPropertyDescriptor(this.server));
+
+
       const openInfo = await this.server.opened;
       const { readable, localAddress: boundAddress, localPort } = openInfo;
 
+      // this logs undefined, not sure what to think of that... but seems to work anyways?
+      console.log(`opened properties: `, Object.getOwnPropertyDescriptor(this.server, 'opened'));
+      console.log(`server properties: `, Object.getOwnPropertyDescriptor(this.server));
+
       console.log(`TCPProxy started on ${boundAddress}:${localPort}`);
       if (this.onServerStart) {
         this.onServerStart(openInfo);
@@ -51,7 +55,7 @@ export class TCPProxy {
               console.log(`Server stopped accepting connections`);
               break;
             }
-            this._handleIncomingConnection(incomingSocket);
+            this._handleIncomingConnection(incomingSocket); // can get client ID here if we have use for that later
           }
         } catch (e) {
           console.error(`Error accepting connections: `, e);
@@ -89,7 +93,7 @@ export class TCPProxy {
       inboundClient.onClose = () => {
         this._handleInboundClose(connectionID);
       };
-      inboundClient.onError = (error) => {
+      inboundClient.onError = (e) => {
         if (this.onError) {
           this.onError('inbound', connectionID, e);
         }
@@ -120,17 +124,20 @@ export class TCPProxy {
     }
 
     const outboundID = this.forwardingPairs.get(inboundID);
-    if (outboundID) {
-      const outboundClient = this.outboundConnections.get(outboundID);
-      if (outboundClient) {
-        outboundClient.send(data).catch((e) => {
-          console.error(`Error forwarding data to outbound connection ${outboundID}`);
-          if (this.onError) {
-            this.onError(`outbound`, outboundID, e);
-          }
-        });
-      }
+    if (!outboundID) {
+      return;
+    }
+    const outboundClient = this.outboundConnections.get(outboundID);
+    if (!outboundClient) {
+      return;
     }
+
+    outboundClient.send(data).catch((e) => {
+      console.error(`Error forwarding data to outbound connection ${outboundID}`);
+      if (this.onError) {
+        this.onError(`outbound`, outboundID, e);
+      }
+    });
   }
 
   _handleOutboundData(outboundID, data) {
@@ -142,21 +149,23 @@ export class TCPProxy {
     }
 
     const inboundID = this.reverseForwardingPairs.get(outboundID);
-    if (inboundID) {
-      const inboundClient = this.inboundConnections.get(inboundID);
-      if (inboundClient) {
-        inboundClient.send(data).catch((e) => {
-          console.error(`Error forward data to inbound connection ${inboundID}`);
-          if (this.onError) {
-            this.onError(`inbound`, inboundID, e);
-          }
-        });
-      }
+    if (!inboundID) {
+      return;
     }
+    const inboundClient = this.inboundConnections.get(inboundID);
+    if (!inboundClient) {
+      return;
+    }
+
+    inboundClient.send(data).catch((e) => {
+      console.error(`Error forward data to inbound connection ${inboundID}`);
+      if (this.onError) {
+        this.onError(`inbound`, inboundID, e);
+      }
+    });
   }
 
   _handleInboundClose(inboundID) {
-    const inboundClient = this.inboundConnections.get(inboundID);
     this.inboundConnections.delete(inboundID);
     console.log(`Inbond connection ${inboundID} closed`);
 
@@ -166,21 +175,22 @@ export class TCPProxy {
 
     // close pair
     const outboundID = this.forwardingPairs.get(inboundID);
-    if (outboundID) {
-      const outboundClient = this.outboundConnections.get(outboundID);
-      if (outboundClient) {
-        outboundClient.close().catch((e) => {
-          console.error(`Error closing outbound connection ${outboundID}`);
-        });
-      }
-
-      this.forwardingPairs.delete(inboundID);
-      this.reverseForwardingPairs.delete(outboundID);
+    if (!outboundID) {
+      return;
+    }
+    const outboundClient = this.outboundConnections.get(outboundID);
+    if (!outboundClient) {
+      return;
     }
+    outboundClient.close().catch((e) => {
+      console.error(`Error closing outbound connection ${outboundID}`);
+    });
+
+    this.forwardingPairs.delete(inboundID);
+    this.reverseForwardingPairs.delete(outboundID);
   }
 
   _handleOutboundClose(outboundID) {
-    const outboundClient = this.outboundConnections.get(outboundID);
     this.outboundConnections.delete(outboundID);
     console.log(`Outbound connection ${outboundID} closed`);
 
@@ -191,16 +201,18 @@ export class TCPProxy {
     // close pair
     const inboundID = this.reverseForwardingPairs.get(outboundID);
     if (inboundID) {
-      const inboundClient = this.inboundConnections.get(inboundID);
-      if (inboundClient) {
-        inboundClient.close().catch((e) => {
-          console.error(`Error closing inbound connection ${inboundID}`, e);
-        });
-      }
-
-      this.reverseForwardingPairs.delete(outboundID);
-      this.forwardingPairs.delete(inboundID);
+      return;
     }
+    const inboundClient = this.inboundConnections.get(inboundID);
+    if (!inboundClient) {
+      return;
+    }
+    inboundClient.close().catch((e) => {
+      console.error(`Error closing inbound connection ${inboundID}`, e);
+    });
+
+    this.reverseForwardingPairs.delete(outboundID);
+    this.forwardingPairs.delete(inboundID);
   }
 
   async connect(remoteAddress, remotePort, options = {}) {
@@ -275,7 +287,6 @@ export class TCPProxy {
     return Promise.resolve();
   }
 
-  // need to rework so that we releaseLock() before closing() server reader. Current structure sucks for that
   async close() {
     console.log(`Closing TCP proxy`);
     for (const [id, client] of this.inboundConnections.entries()) {
@@ -293,6 +304,7 @@ export class TCPProxy {
       }
     }
 
+    // clear maps
     this.inboundConnections.clear();
     this.outboundConnections.clear();
     this.forwardingPairs.clear();
@@ -309,5 +321,10 @@ export class TCPProxy {
 
     console.log("TCP Proxy closed");
   }
+
+  // helper function for testing mockSocket (avoids using _prefixed internal functions in test)
+  acceptConnection(socket) {
+    return this._handleIncomingConnection(socket);
+  }
   
 }

+ 27 - 10
src/lib/TCPSocketClient.js

@@ -11,6 +11,7 @@ export class TCPSocketClient {
     this._reader = null;
     this._writer;
 
+    // open and closed promise as fields
     this._isOpenedSettled = false;
     this._isClosedSettled = false;
     this._openedPromise = new Promise((resolve, reject) => {
@@ -53,6 +54,7 @@ export class TCPSocketClient {
   async connect(timeoutMs = 5000) {
     try {
       let timeoutID;
+      // added timeout cause it seems to be standard
       const timeoutPromise = new Promise((_, reject) => {
         timeoutID = setTimeout(() => {
           console.log(`Connection to ${this.remoteAddress}:${this.remotePort} timed out after ${timeoutMs}ms`);
@@ -61,9 +63,9 @@ export class TCPSocketClient {
       });
 
       this.socket = new TCPSocket(this.remoteAddress, this.remotePort, this.options);
+      // race between socket.opened and timeout
       const openInfo = await Promise.race([this.socket.opened, timeoutPromise]);
       clearTimeout(timeoutID);
-      // const openInfo = await this.socket.opened;
 
       this._readable = openInfo.readable;
       this._writable = openInfo.writable;
@@ -96,6 +98,12 @@ export class TCPSocketClient {
         const {value, done} = await this._reader.read();
 
         if (done) {
+          // releaseLock() here
+          this._reader.releaseLock();
+          this._reader = null;
+          if (this.onClose) {
+            this.onClose();
+          }
           break;
         }
         if (value && value.byteLength > 0) {
@@ -109,6 +117,9 @@ export class TCPSocketClient {
         this._reader.releaseLock();
         this._reader = null;
       }
+      if (this.onClose) {
+        this.onClose();
+      }
     }
   }
 
@@ -120,6 +131,7 @@ export class TCPSocketClient {
     try {
       this._writer = this._writable.getWriter();
       let buffer = data;
+      // old: for text exchange test, can probably be removed
       if (typeof data === `string`) {
         const encoder = new TextEncoder();
         buffer = encoder.encode(data);
@@ -134,23 +146,28 @@ export class TCPSocketClient {
       if (this.onError) {
         this.onError(e);
       }
-      throw error;
+      throw e;
     }
   }
 
   async close() {
-    try {
-      if (!this.socket) {
-        throw new Error(`Socket is not connected`);
-      }
+    if (!this.socket) {
+      throw new Error(`Socket is not connected`);
+    }
 
+    try {
       if (this._isClosedSettled) {
         return this._closedPromise;
       }
 
-      // if streams are locked err
-      if ((this._readable && this._readable.locked) || (this._writable && this._writable.locked)) {
-        throw new Error(`Cannot close socket while streams are locked`);
+      // try to handle leftover locks if necessary, tho should have been handled in startReading's loop and send()
+      if (this._reader) {
+        this._reader.releaseLock();
+        this._reader = null;
+      }
+      if (this._writer) {
+        this._writer.releaseLock();
+        this._writer = null;
       }
 
       await this.socket.close();
@@ -166,7 +183,7 @@ export class TCPSocketClient {
         this.onError(e);
       }
       
-      throw error;
+      throw e;
     }
   }
 }

+ 36 - 14
src/lib/WebVM.svelte

@@ -12,6 +12,7 @@
 	import { displayConfig, handleToolImpl } from '$lib/anthropic.js'
 	import { tryPlausible } from '$lib/plausible.js'
 	import { TCPProxy } from '$lib/TCPProxy.js'
+	import { TCPSocketClient } from '$lib/TCPSocketClient.js'
 
 	export let configObj = null;
 	export let processCallback = null;
@@ -193,17 +194,20 @@
 	}
 	async function initProxy() {
 		const proxy = new TCPProxy();
-		proxy.onServerStart = (openInfo) => {
-			console.log(`Server started on ${openInfo.localAddress}:${openInfo.localPort}`);
-		};
 		proxy.onInboundConnect = (id, openInfo) => {
+		// 
+		// ideally link to destination here. Could be hard set or dynamically based on parameters
+		// 
 			console.log(`New inbound connection ${id} from ${openInfo.remoteAddress}:${openInfo.remotePort}`);
 		};
 		proxy.onOutboundConnect = (id, openInfo) => {
 			console.log(`New Outbound connection ${id} to ${openInfo.remoteAddress}:${openInfo.remotePort}`);
 		};
 		proxy.onInboundData = (id, data) => {
-			console.log(`Received data from ${id}: `, data);
+			console.log(`[INBOUND] received from client ${id}: `, data);
+		};
+		proxy.onOutboundData = (id, data) => {
+			console.log(`[OUTBOUND] Received data from server ${id}: `, data);
 		};
 		proxy.onInboundClose = (id) => {
 			console.log(`Closed inbound connection ${id}`);
@@ -217,25 +221,43 @@
 		await proxy.start('0.0.0.0', {localPort: 33000});
 		return proxy;
 	}
+	async function createMockInbound(proxy) {
+		const { readable, writable } = new TransformStream();
+		const mockSocket = {
+			opened: Promise.resolve({
+				readable,
+				writable,
+				remoteAddress: '127.0.0.1',
+				remotePort: 12345,
+				localAddress: '127.0.0.1',
+				localPort: 33000
+			})
+		};
+
+		// need to manually trigger event cause its fake so startReading's loop won't catch this as real connection
+		return proxy.acceptConnection(mockSocket);
+	}
 	async function initTerminal()
 	{
 		const proxy = await initProxy();
 		console.log(`We have proxy`);
-		const outboundID = await proxy.connect('google.com', 80);
-		console.log(`Created direct outbound connection ${outboundID}`);
-		const outboundClient = proxy.outboundConnections.get(outboundID);
-		if (outboundClient) {
+
+		const mockInboundID = await createMockInbound(proxy);
+		console.log(`Created mock inbound connection ${mockInboundID}`);
+
+		const link = await proxy.link(mockInboundID, `google.com`, 80);
+		console.log(`Linked mock client ${mockInboundID} to google ${link.outboundID}`);
+
+		const inboundClient = proxy.inboundConnections.get(mockInboundID);
+		if (inboundClient) {
 		 console.log(`We have a client`);
-		 const httpRequest =
+		 const someData =
 		 				"GET / HTTP/1.1\r\n" +
 		 				"Host: google.com\r\n" +
 		 				"Connection: close\r\n\r\n";
-		 await outboundClient.send(httpRequest);
-		 console.log(`Sent data to ${outboundID}`);
+		 await inboundClient.send(someData);
+		 console.log(`Sent data to ${link.outboundID}`);
 		}
-		outboundClient._reader.releaseLock();
-		console.log(`Released Lock`);
-		await outboundClient.close();
 		// await proxy.close();