diff --git a/server/routers/gerbil/receiveBandwidth.ts b/server/routers/gerbil/receiveBandwidth.ts index b2063c0..b883957 100644 --- a/server/routers/gerbil/receiveBandwidth.ts +++ b/server/routers/gerbil/receiveBandwidth.ts @@ -30,12 +30,13 @@ export const receiveBandwidth = async ( const { publicKey, bytesIn, bytesOut } = peer; // Find the site by public key - const site = await trx.query.sites.findFirst({ - where: eq(sites.pubKey, publicKey) - }); + const [site] = await trx + .select() + .from(sites) + .where(eq(sites.pubKey, publicKey)) + .limit(1); if (!site) { - logger.warn(`Site not found for public key: ${publicKey}`); continue; } let online = site.online; diff --git a/server/routers/messageHandlers.ts b/server/routers/messageHandlers.ts index bf8f357..f23ea0a 100644 --- a/server/routers/messageHandlers.ts +++ b/server/routers/messageHandlers.ts @@ -1,4 +1,4 @@ -import { handleNewtRegisterMessage } from "./newt"; +import { handleNewtRegisterMessage, handleReceiveBandwidthMessage } from "./newt"; import { handleOlmRegisterMessage } from "./olm"; import { handleGetConfigMessage } from "./newt/handleGetConfigMessage"; import { MessageHandler } from "./ws"; @@ -7,4 +7,5 @@ export const messageHandlers: Record = { "newt/wg/register": handleNewtRegisterMessage, "olm/wg/register": handleOlmRegisterMessage, "newt/wg/get-config": handleGetConfigMessage, + "newt/receive-bandwidth": handleReceiveBandwidthMessage }; diff --git a/server/routers/newt/handleReceiveBandwidthMessage.ts b/server/routers/newt/handleReceiveBandwidthMessage.ts new file mode 100644 index 0000000..1e1642a --- /dev/null +++ b/server/routers/newt/handleReceiveBandwidthMessage.ts @@ -0,0 +1,68 @@ +import db from "@server/db"; +import { MessageHandler } from "../ws"; +import { clients, Newt } from "@server/db/schema"; +import { eq } from "drizzle-orm"; +import logger from "@server/logger"; + +interface PeerBandwidth { + publicKey: string; + bytesIn: number; + bytesOut: number; +} + +export const handleReceiveBandwidthMessage: MessageHandler = async (context) => { + const { message, client, sendToClient } = context; + const newt = client as Newt; + + const bandwidthData: PeerBandwidth[] = message.data; + + if (!Array.isArray(bandwidthData)) { + throw new Error("Invalid bandwidth data"); + } + + await db.transaction(async (trx) => { + for (const peer of bandwidthData) { + const { publicKey, bytesIn, bytesOut } = peer; + + // Find the site by public key + const [client] = await trx + .select() + .from(clients) + .where(eq(clients.pubKey, publicKey)) + .limit(1); + + if (!client) { + continue; + } + let online = client.online; + + // if the bandwidth for the site is > 0 then set it to online. if it has been less than 0 (no update) for 5 minutes then set it to offline + if (bytesIn > 0 || bytesOut > 0) { + online = true; + } else if (client.lastBandwidthUpdate) { + const lastBandwidthUpdate = new Date( + client.lastBandwidthUpdate + ); + const currentTime = new Date(); + const diff = + currentTime.getTime() - lastBandwidthUpdate.getTime(); + if (diff < 300000) { + online = false; + } + } + + // Update the site's bandwidth usage + await trx + .update(clients) + .set({ + megabytesOut: (client.megabytesIn || 0) + bytesIn, + megabytesIn: (client.megabytesOut || 0) + bytesOut, + lastBandwidthUpdate: new Date().toISOString(), + online + }) + .where(eq(clients.clientId, client.clientId)); + } + }); + + logger.info("Handling register olm message!"); +}; diff --git a/server/routers/newt/index.ts b/server/routers/newt/index.ts index dcc4974..84b9a6e 100644 --- a/server/routers/newt/index.ts +++ b/server/routers/newt/index.ts @@ -1,3 +1,4 @@ export * from "./createNewt"; export * from "./getToken"; -export * from "./handleRegisterMessage"; \ No newline at end of file +export * from "./handleNewtRegisterMessage"; +export* from "./handleReceiveBandwidthMessage"; \ No newline at end of file