From be173e1d48b4f267001b5e115a0d081e38e34141 Mon Sep 17 00:00:00 2001 From: Marcel Baumgartner Date: Tue, 16 Apr 2024 07:40:48 +0200 Subject: [PATCH] Working on improved console streaming --- .../Core/Helpers/AdvancedWebsocketStream.cs | 169 ++++++++++++++++++ .../Features/Servers/Helpers/ServerConsole.cs | 20 +-- .../Http/Controllers/ServersControllers.cs | 13 +- Moonlight/Moonlight.csproj | 2 +- 4 files changed, 187 insertions(+), 17 deletions(-) create mode 100644 Moonlight/Core/Helpers/AdvancedWebsocketStream.cs diff --git a/Moonlight/Core/Helpers/AdvancedWebsocketStream.cs b/Moonlight/Core/Helpers/AdvancedWebsocketStream.cs new file mode 100644 index 0000000..5d43791 --- /dev/null +++ b/Moonlight/Core/Helpers/AdvancedWebsocketStream.cs @@ -0,0 +1,169 @@ +using System.Net.WebSockets; +using System.Text; +using MoonCore.Helpers; +using Newtonsoft.Json; + +namespace Moonlight.Core.Helpers; + +public class AdvancedWebsocketStream +{ + private readonly WebSocket Socket; + private readonly Dictionary Packets = new(); + + public AdvancedWebsocketStream(WebSocket socket) + { + Socket = socket; + } + + public void RegisterPacket(int id) => RegisterPacket(id, typeof(T)); + + public void RegisterPacket(int id, Type type) + { + Packets.Add(id, type); + } + + public async Task ReceivePacket() + { + if (Socket.State != WebSocketState.Open) + throw new ArgumentException("The websocket connection needs to be open in order to receive packets"); + + // Length + var lengthBuffer = new byte[4]; + await Socket.ReceiveAsync(lengthBuffer, CancellationToken.None); + var length = BitConverter.ToInt32(lengthBuffer); + + Logger.Debug($"Received length: {length}"); + + if (length <= 0) + throw new ArgumentException("The packet length cannot be less or equal than zero"); + + var packetBuffer = new byte[length]; + var received = await Socket.ReceiveAsync(packetBuffer, CancellationToken.None); + + Logger.Debug($"Lenght expected: {length}. Lenght got: {received.Count}"); + + return DecodePacket(packetBuffer); + } + + public async Task ReceivePacket() + { + var packet = await ReceivePacket(); + + if (packet == null) + return default; + + if (packet is not T) + throw new ArgumentException($"Received packet {packet.GetType().Name} matches not the type {typeof(T).Name}"); + + return (T)packet; + } + + public async Task SendPacket(object packet) + { + if (Socket.State != WebSocketState.Open) + throw new ArgumentException("The websocket connection needs to be open in order to send packets"); + + var buffer = EncodePacket(packet); + + // Send length + var length = buffer.Length; + var lengthBuffer = BitConverter.GetBytes(length); + + await Socket.SendAsync(lengthBuffer, WebSocketMessageType.Binary, WebSocketMessageFlags.None, + CancellationToken.None); + + // Send packet + await Socket.SendAsync(buffer, WebSocketMessageType.Binary, WebSocketMessageFlags.None, CancellationToken.None); + } + + public async Task WaitForClose() + { + var source = new TaskCompletionSource(); + + Task.Run(async () => + { + while (Socket.State == WebSocketState.Open) + await Task.Delay(10); + + source.SetResult(); + }); + + await source.Task; + } + + public async Task Close() + { + if(Socket.State == WebSocketState.Open) + await Socket.CloseOutputAsync(WebSocketCloseStatus.Empty, null, CancellationToken.None); + } + + private byte[] EncodePacket(object packet) + { + var type = packet.GetType(); + + var packetId = Packets.Values.Contains(type) ? Packets.First(x => x.Value == type).Key : -1; + + if (packetId == -1) + throw new ArgumentException($"Sending packet type which has not been registered: {packet.GetType().Name}"); + + // Header + var headerBuffer = BitConverter.GetBytes(packetId); + + // Body + var jsonText = JsonConvert.SerializeObject(packet); + var bodyBuffer = Encoding.UTF8.GetBytes(jsonText); + + return headerBuffer.Concat(bodyBuffer).ToArray(); + } + + private object? DecodePacket(byte[] buffer) + { + if (buffer.Length < 5) // 4 (header) + minimum 1 as body + { + Logger.Warn($"Received buffer is too small ({buffer.Length} bytes)"); + return default; + } + + var headerBuffer = new byte[4]; + Array.Copy(buffer, 0, headerBuffer, 0, 4); + var packetId = BitConverter.ToInt32(headerBuffer); + + Logger.Info($"Packet Id: {packetId}"); + + var packetType = Packets.TryGetValue(packetId, out var packet) ? packet : default; + + if (packetType == null) + { + Logger.Warn($"Received packet id which has not been registered: {packetId}"); + + Logger.Info("Packet dumped: " + Encoding.UTF8.GetString(buffer)); + + return default; + } + + var bodyBuffer = new byte[buffer.Length - 4]; + Array.Copy(buffer, 4, bodyBuffer, 0, buffer.Length - 4); + + var jsonText = Encoding.UTF8.GetString(bodyBuffer); + + if (string.IsNullOrEmpty(jsonText)) + { + Logger.Warn("Received empty json text"); + return default; + } + + object? result = default; + + try + { + result = JsonConvert.DeserializeObject(jsonText, packetType); + } + catch (JsonReaderException e) + { + Logger.Warn($"An error occured while deserializating the json text of the packet {packetType.Name}"); + Logger.Warn(e); + } + + return result; + } +} \ No newline at end of file diff --git a/Moonlight/Features/Servers/Helpers/ServerConsole.cs b/Moonlight/Features/Servers/Helpers/ServerConsole.cs index b1b9ec7..9c13b34 100644 --- a/Moonlight/Features/Servers/Helpers/ServerConsole.cs +++ b/Moonlight/Features/Servers/Helpers/ServerConsole.cs @@ -2,8 +2,8 @@ using MoonCore.Helpers; using Moonlight.Features.Servers.Api.Packets; using Moonlight.Features.Servers.Entities; -using Moonlight.Features.Servers.Models.Abstractions; using Moonlight.Features.Servers.Models.Enums; +using AdvancedWebsocketStream = Moonlight.Core.Helpers.AdvancedWebsocketStream; namespace Moonlight.Features.Servers.Helpers; @@ -23,7 +23,7 @@ public class ServerConsole private readonly Server Server; private ClientWebSocket WebSocket; - private WsPacketConnection PacketConnection; + private AdvancedWebsocketStream WebsocketStream; private CancellationTokenSource Cancellation = new(); @@ -50,11 +50,11 @@ public class ServerConsole wsUrl = $"ws://{Server.Node.Fqdn}:{Server.Node.HttpPort}/servers/{Server.Id}/ws"; await WebSocket.ConnectAsync(new Uri(wsUrl), CancellationToken.None); - PacketConnection = new WsPacketConnection(WebSocket); + WebsocketStream = new AdvancedWebsocketStream(WebSocket); - await PacketConnection.RegisterPacket("output"); - await PacketConnection.RegisterPacket("state"); - await PacketConnection.RegisterPacket("stats"); + WebsocketStream.RegisterPacket(1); + WebsocketStream.RegisterPacket(2); + WebsocketStream.RegisterPacket(3); Task.Run(Worker); } @@ -65,7 +65,7 @@ public class ServerConsole { try { - var packet = await PacketConnection.Receive(); + var packet = await WebsocketStream.ReceivePacket(); if (packet == null) continue; @@ -111,7 +111,7 @@ public class ServerConsole } await OnDisconnected.Invoke(); - await PacketConnection.Close(); + await WebsocketStream.Close(); } public async Task Close() @@ -119,8 +119,8 @@ public class ServerConsole if(!Cancellation.IsCancellationRequested) Cancellation.Cancel(); - if(PacketConnection != null) - await PacketConnection.Close(); + if(WebsocketStream != null) + await WebsocketStream.Close(); } private string[] GetMessageCache() diff --git a/Moonlight/Features/Servers/Http/Controllers/ServersControllers.cs b/Moonlight/Features/Servers/Http/Controllers/ServersControllers.cs index 9a7199b..ba3c5a9 100644 --- a/Moonlight/Features/Servers/Http/Controllers/ServersControllers.cs +++ b/Moonlight/Features/Servers/Http/Controllers/ServersControllers.cs @@ -8,6 +8,7 @@ using Moonlight.Features.Servers.Events; using Moonlight.Features.Servers.Extensions; using Moonlight.Features.Servers.Http.Requests; using Moonlight.Features.Servers.Models.Abstractions; +using AdvancedWebsocketStream = Moonlight.Core.Helpers.AdvancedWebsocketStream; namespace Moonlight.Features.Servers.Http.Controllers; @@ -36,9 +37,9 @@ public class ServersControllers : Controller var websocket = await HttpContext.WebSockets.AcceptWebSocketAsync(); // Build connection wrapper - var wsPacketConnection = new WsPacketConnection(websocket); - await wsPacketConnection.RegisterPacket("amount"); - await wsPacketConnection.RegisterPacket("serverConfiguration"); + var websocketStream = new AdvancedWebsocketStream(websocket); + websocketStream.RegisterPacket(1); + websocketStream.RegisterPacket(2); // Read server data for the node var node = (HttpContext.Items["Node"] as ServerNode)!; @@ -62,13 +63,13 @@ public class ServersControllers : Controller .ToArray(); // Send the amount of configs the node will receive - await wsPacketConnection.Send(servers.Length); + await websocketStream.SendPacket(servers.Length); // Send the server configurations foreach (var serverConfiguration in serverConfigurations) - await wsPacketConnection.Send(serverConfiguration); + await websocketStream.SendPacket(serverConfiguration); - await wsPacketConnection.WaitForClose(); + await websocketStream.WaitForClose(); return Ok(); } diff --git a/Moonlight/Moonlight.csproj b/Moonlight/Moonlight.csproj index f140329..7ac8423 100644 --- a/Moonlight/Moonlight.csproj +++ b/Moonlight/Moonlight.csproj @@ -90,7 +90,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - +