Blob rework

This commit is contained in:
crschnick 2024-06-20 21:25:28 +00:00
parent 0587dea4ac
commit 02b979de6b
13 changed files with 208 additions and 72 deletions

View file

@ -4,16 +4,13 @@ import io.xpipe.beacon.BeaconClientException;
import lombok.Value;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@Value
public class AppBeaconCache {
Set<BeaconShellSession> shellSessions = new HashSet<>();
Map<UUID, byte[]> savedBlobs = new ConcurrentHashMap<>();
public BeaconShellSession getShellSession(UUID uuid) throws BeaconClientException {
var found = shellSessions.stream().filter(beaconShellSession -> beaconShellSession.getEntry().getUuid().equals(uuid)).findFirst();
@ -22,12 +19,4 @@ public class AppBeaconCache {
}
return found.get();
}
public byte[] getBlob(UUID uuid) throws BeaconClientException {
var found = savedBlobs.get(uuid);
if (found == null) {
throw new BeaconClientException("No saved data known for id " + uuid);
}
return found;
}
}

View file

@ -62,6 +62,9 @@ public class BeaconRequestHandler<T> implements HttpHandler {
T object;
Object response;
try {
if (beaconInterface.readRawRequestBody()) {
object = createDefaultRequest(beaconInterface);
} else {
try (InputStream is = exchange.getRequestBody()) {
var read = is.readAllBytes();
var rawDataRequestClass = beaconInterface.getRequestClass().getDeclaredFields().length == 1 &&
@ -77,6 +80,7 @@ public class BeaconRequestHandler<T> implements HttpHandler {
TrackEvent.trace("Parsed request object:\n" + object);
}
}
}
response = beaconInterface.handle(exchange, object);
} catch (BeaconClientException clientException) {
ErrorEvent.fromThrowable(clientException).omit().expected().handle();

View file

@ -0,0 +1,79 @@
package io.xpipe.app.beacon;
import io.xpipe.app.issue.ErrorEvent;
import io.xpipe.app.util.ShellTemp;
import io.xpipe.beacon.BeaconClientException;
import org.apache.commons.io.FileUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class BlobManager {
private static final Path TEMP = ShellTemp.getLocalTempDataDirectory("blob");
private static BlobManager INSTANCE;
private final Map<UUID, byte[]> memoryBlobs = new ConcurrentHashMap<>();
private final Map<UUID, Path> fileBlobs = new ConcurrentHashMap<>();
public static BlobManager get() {
return INSTANCE;
}
public static void init() {
INSTANCE = new BlobManager();
try {
FileUtils.forceMkdir(TEMP.toFile());
try {
// Remove old files in dir
FileUtils.cleanDirectory(TEMP.toFile());
} catch (IOException ignored) {
}
} catch (IOException e) {
ErrorEvent.fromThrowable(e).handle();
}
}
public static void reset() {
try {
FileUtils.cleanDirectory(TEMP.toFile());
} catch (IOException ignored) {
}
INSTANCE = null;
}
public Path newBlobFile() {
return TEMP.resolve(UUID.randomUUID().toString());
}
public void store(UUID uuid, byte[] blob) {
memoryBlobs.put(uuid, blob);
}
public void store(UUID uuid, InputStream blob) throws IOException {
var file = TEMP.resolve(uuid.toString());
try (var fileOut = Files.newOutputStream(file)) {
blob.transferTo(fileOut);
}
fileBlobs.put(uuid,file);
}
public InputStream getBlob(UUID uuid) throws Exception {
var memory = memoryBlobs.get(uuid);
if (memory != null) {
return new ByteArrayInputStream(memory);
}
var found = fileBlobs.get(uuid);
if (found == null) {
throw new BeaconClientException("No saved data known for id " + uuid);
}
return Files.newInputStream(found);
}
}

View file

@ -1,7 +1,7 @@
package io.xpipe.app.beacon.impl;
import com.sun.net.httpserver.HttpExchange;
import io.xpipe.app.beacon.AppBeaconServer;
import io.xpipe.app.beacon.BlobManager;
import io.xpipe.beacon.api.FsBlobExchange;
import lombok.SneakyThrows;
@ -13,7 +13,13 @@ public class FsBlobExchangeImpl extends FsBlobExchange {
@SneakyThrows
public Object handle(HttpExchange exchange, Request msg) {
var id = UUID.randomUUID();
AppBeaconServer.get().getCache().getSavedBlobs().put(id, msg.getPayload());
var size = exchange.getRequestBody().available();
if (size > 100_000_000) {
BlobManager.get().store(id,exchange.getRequestBody());
} else {
BlobManager.get().store(id,exchange.getRequestBody().readAllBytes());
}
return Response.builder().blob(id).build();
}
}

View file

@ -2,10 +2,16 @@ package io.xpipe.app.beacon.impl;
import com.sun.net.httpserver.HttpExchange;
import io.xpipe.app.beacon.AppBeaconServer;
import io.xpipe.app.beacon.BlobManager;
import io.xpipe.app.util.FixedSizeInputStream;
import io.xpipe.beacon.api.FsReadExchange;
import io.xpipe.core.store.ConnectionFileSystem;
import lombok.SneakyThrows;
import java.io.BufferedInputStream;
import java.io.OutputStream;
import java.nio.file.Files;
public class FsReadExchangeImpl extends FsReadExchange {
@Override
@ -13,9 +19,30 @@ public class FsReadExchangeImpl extends FsReadExchange {
public Object handle(HttpExchange exchange, Request msg) {
var shell = AppBeaconServer.get().getCache().getShellSession(msg.getConnection());
var fs = new ConnectionFileSystem(shell.getControl());
var size = fs.getFileSize(msg.getPath().toString());
if (size > 100_000_000) {
var file = BlobManager.get().newBlobFile();
try (var in = fs.openInput(msg.getPath().toString())) {
try (var fileOut = Files.newOutputStream(file.resolve(msg.getPath().getFileName()));
var fixedIn = new FixedSizeInputStream(new BufferedInputStream(in), size)) {
fixedIn.transferTo(fileOut);
}
in.transferTo(OutputStream.nullOutputStream());
}
exchange.sendResponseHeaders(200, size);
try (var fileIn = Files.newInputStream(file); var out = exchange.getResponseBody()) {
fileIn.transferTo(out);
}
return Response.builder().build();
} else {
byte[] bytes;
try (var in = fs.openInput(msg.getPath().toString())) {
bytes = in.readAllBytes();
try (var fixedIn = new FixedSizeInputStream(new BufferedInputStream(in), size)) {
bytes = fixedIn.readAllBytes();
}
in.transferTo(OutputStream.nullOutputStream());
}
exchange.sendResponseHeaders(200, bytes.length);
try (var out = exchange.getResponseBody()) {
@ -24,3 +51,4 @@ public class FsReadExchangeImpl extends FsReadExchange {
return Response.builder().build();
}
}
}

View file

@ -2,6 +2,7 @@ package io.xpipe.app.beacon.impl;
import com.sun.net.httpserver.HttpExchange;
import io.xpipe.app.beacon.AppBeaconServer;
import io.xpipe.app.beacon.BlobManager;
import io.xpipe.app.util.ScriptHelper;
import io.xpipe.beacon.api.FsScriptExchange;
import lombok.SneakyThrows;
@ -14,7 +15,10 @@ public class FsScriptExchangeImpl extends FsScriptExchange {
@SneakyThrows
public Object handle(HttpExchange exchange, Request msg) {
var shell = AppBeaconServer.get().getCache().getShellSession(msg.getConnection());
var data = new String(AppBeaconServer.get().getCache().getBlob(msg.getBlob()), StandardCharsets.UTF_8);
String data;
try (var in = BlobManager.get().getBlob(msg.getBlob())) {
data = new String(in.readAllBytes(), StandardCharsets.UTF_8);
}
var file = ScriptHelper.getExecScriptFile(shell.getControl());
shell.getControl().getShellDialect().createScriptTextFileWriteCommand(shell.getControl(), data, file.toString()).execute();
return Response.builder().path(file).build();

View file

@ -2,6 +2,7 @@ package io.xpipe.app.beacon.impl;
import com.sun.net.httpserver.HttpExchange;
import io.xpipe.app.beacon.AppBeaconServer;
import io.xpipe.app.beacon.BlobManager;
import io.xpipe.beacon.api.FsWriteExchange;
import io.xpipe.core.store.ConnectionFileSystem;
import lombok.SneakyThrows;
@ -12,10 +13,10 @@ public class FsWriteExchangeImpl extends FsWriteExchange {
@SneakyThrows
public Object handle(HttpExchange exchange, Request msg) {
var shell = AppBeaconServer.get().getCache().getShellSession(msg.getConnection());
var data = AppBeaconServer.get().getCache().getBlob(msg.getBlob());
var fs = new ConnectionFileSystem(shell.getControl());
try (var os = fs.openOutput(msg.getPath().toString(), data.length)) {
os.write(data);
try (var in = BlobManager.get().getBlob(msg.getBlob());
var os = fs.openOutput(msg.getPath().toString(), in.available())) {
in.transferTo(os);
}
return Response.builder().build();
}

View file

@ -1,6 +1,7 @@
package io.xpipe.app.core.mode;
import io.xpipe.app.beacon.AppBeaconServer;
import io.xpipe.app.beacon.BlobManager;
import io.xpipe.app.browser.session.BrowserSessionModel;
import io.xpipe.app.comp.store.StoreViewState;
import io.xpipe.app.core.*;
@ -62,6 +63,7 @@ public class BaseMode extends OperationMode {
DataStoreProviders.init();
AppFileWatcher.init();
FileBridge.init();
BlobManager.init();
ActionProvider.initProviders();
TrackEvent.info("Finished base components initialization");
initialized = true;
@ -81,6 +83,8 @@ public class BaseMode extends OperationMode {
AppResources.reset();
AppExtensionManager.reset();
AppDataLock.unlock();
BlobManager.reset();
FileBridge.reset();
// Shut down server last to keep a non-daemon thread running
AppBeaconServer.reset();
TrackEvent.info("Background mode shutdown finished");

View file

@ -27,37 +27,6 @@ import java.util.function.Consumer;
public class FileBridge {
private static class FixedSizeInputStream extends SimpleFilterInputStream {
private long count;
private final long size;
protected FixedSizeInputStream(InputStream in, long size) {
super(in);
this.size = size;
}
@Override
public int read() throws IOException {
if (count >= size) {
return -1;
}
var read = in.read();
count++;
if (read == -1) {
return 0;
} else {
return read;
}
}
@Override
public int available() {
return (int) (size - count);
}
}
private static final Path TEMP = ShellTemp.getLocalTempDataDirectory("bridge");
private static FileBridge INSTANCE;
private final Set<Entry> openEntries = new HashSet<>();
@ -97,6 +66,14 @@ public class FileBridge {
}
}
public static void reset() {
try {
FileUtils.cleanDirectory(TEMP.toFile());
} catch (IOException ignored) {
}
INSTANCE = null;
}
private synchronized void handleWatchEvent(Path changed, WatchEvent.Kind<Path> kind) {
if (kind == StandardWatchEventKinds.ENTRY_DELETE) {
event("Editor entry file " + changed.toString() + " has been removed");
@ -135,6 +112,7 @@ public class FileBridge {
try (var fixedIn = new FixedSizeInputStream(new BufferedInputStream(in), actualSize)) {
e.writer.accept(fixedIn, actualSize);
}
in.transferTo(OutputStream.nullOutputStream());
var taken = Duration.between(started, Instant.now());
event("Wrote " + HumanReadableFormat.byteCount(actualSize) + " in " + taken.toMillis() + "ms");
}

View file

@ -0,0 +1,35 @@
package io.xpipe.app.util;
import java.io.IOException;
import java.io.InputStream;
public class FixedSizeInputStream extends SimpleFilterInputStream {
private long count;
private final long size;
public FixedSizeInputStream(InputStream in, long size) {
super(in);
this.size = size;
}
@Override
public int read() throws IOException {
if (count >= size) {
return -1;
}
var read = in.read();
count++;
if (read == -1) {
return 0;
} else {
return read;
}
}
@Override
public int available() {
return (int) (size - count);
}
}

View file

@ -96,6 +96,7 @@ open module io.xpipe.app {
requires jdk.jdwp.agent;
requires org.kordamp.ikonli.core;
requires jdk.httpserver;
requires java.sql;
uses TerminalLauncher;
uses io.xpipe.app.ext.ActionProvider;

View file

@ -75,4 +75,8 @@ public abstract class BeaconInterface<T> {
throws BeaconClientException, BeaconServerException {
throw new UnsupportedOperationException();
}
public boolean readRawRequestBody() {
return false;
}
}

View file

@ -15,12 +15,15 @@ public class FsBlobExchange extends BeaconInterface<FsBlobExchange.Request> {
return "/fs/blob";
}
@Override
public boolean readRawRequestBody() {
return true;
}
@Jacksonized
@Builder
@Value
public static class Request {
byte @NonNull [] payload;
}
public static class Request {}
@Jacksonized
@Builder