diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java index 111708eea..7ab55224e 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java @@ -15,6 +15,7 @@ import io.xpipe.core.store.ShellStore; import io.xpipe.core.util.JacksonMapper; import lombok.Builder; import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.Value; import lombok.extern.jackson.Jacksonized; @@ -31,7 +32,7 @@ import static io.xpipe.beacon.BeaconConfig.BODY_SEPARATOR; public class BeaconClient implements AutoCloseable { @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") - public static abstract class ClientInformation { + public abstract static class ClientInformation { public final CliClientInformation cli() { return (CliClientInformation) this; @@ -55,6 +56,19 @@ public class BeaconClient implements AutoCloseable { } } + @JsonTypeName("reachableCheck") + @Value + @Builder + @Jacksonized + @EqualsAndHashCode(callSuper = false) + public static class ReachableCheckInformation extends ClientInformation { + + @Override + public String toDisplayString() { + return "Reachable check"; + } + } + @JsonTypeName("gateway") @Value @Builder @@ -86,12 +100,14 @@ public class BeaconClient implements AutoCloseable { } } - private final Closeable closeable; + @Getter + private final Closeable base; + private final InputStream in; private final OutputStream out; - private BeaconClient(Closeable closeable, InputStream in, OutputStream out) { - this.closeable = closeable; + private BeaconClient(Closeable base, InputStream in, OutputStream out) { + this.base = base; this.in = in; this.out = out; } @@ -105,11 +121,33 @@ public class BeaconClient implements AutoCloseable { public static BeaconClient connectProxy(ShellStore proxy) throws Exception { var control = proxy.create().start(); - var command = control.command("xpipe beacon").start(); + var command = control.command("xpipe beacon --raw").start(); command.discardErr(); - return new BeaconClient(() -> { - command.close(); - }, command.getStdout(), command.getStdin()); + return new BeaconClient(command, command.getStdout(), command.getStdin()) { + @Override + public T receiveResponse() + throws ConnectorException, ClientException, ServerException { + try { + sendEOF(); + getRawOutputStream().close(); + } catch (IOException ex) { + throw new ConnectorException(ex); + } + + return super.receiveResponse(); + } + + @Override + public void close() throws ConnectorException { + try { + getRawInputStream().readAllBytes(); + } catch (IOException ex) { + throw new ConnectorException(ex); + } + + super.close(); + } + }; } public static Optional tryConnect(ClientInformation information) { @@ -122,7 +160,7 @@ public class BeaconClient implements AutoCloseable { public void close() throws ConnectorException { try { - closeable.close(); + base.close(); } catch (IOException ex) { throw new ConnectorException("Couldn't close client", ex); } @@ -170,6 +208,13 @@ public class BeaconClient implements AutoCloseable { sendObject(msg); } + public void sendEOF() throws ConnectorException { + try (OutputStream ignored = BeaconFormat.writeBlocks(out)) { + } catch (IOException ex) { + throw new ConnectorException("Couldn't write to socket", ex); + } + } + public void sendObject(JsonNode node) throws ConnectorException { var writer = new StringWriter(); var mapper = JacksonMapper.newMapper(); diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconJacksonModule.java b/beacon/src/main/java/io/xpipe/beacon/BeaconJacksonModule.java index cc3a6dfa0..ff2150c00 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconJacksonModule.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconJacksonModule.java @@ -10,6 +10,6 @@ public class BeaconJacksonModule extends SimpleModule { context.registerSubtypes( new NamedType(BeaconClient.ApiClientInformation.class), new NamedType(BeaconClient.CliClientInformation.class), - new NamedType(BeaconClient.GatewayClientInformation.class)); + new NamedType(BeaconClient.ReachableCheckInformation.class)); } } diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconProxyImpl.java b/beacon/src/main/java/io/xpipe/beacon/BeaconProxyImpl.java new file mode 100644 index 000000000..36df8b425 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconProxyImpl.java @@ -0,0 +1,139 @@ +package io.xpipe.beacon; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import io.xpipe.beacon.exchange.ProxyFunctionExchange; +import io.xpipe.beacon.exchange.ProxyReadConnectionExchange; +import io.xpipe.beacon.exchange.ProxyWriteConnectionExchange; +import io.xpipe.core.impl.InputStreamStore; +import io.xpipe.core.impl.OutputStreamStore; +import io.xpipe.core.source.DataSource; +import io.xpipe.core.source.DataSourceConnection; +import io.xpipe.core.source.DataSourceReadConnection; +import io.xpipe.core.source.WriteMode; +import io.xpipe.core.store.ShellStore; +import io.xpipe.core.util.JacksonMapper; +import io.xpipe.core.util.ProxyFunction; +import io.xpipe.core.util.ProxyProvider; +import io.xpipe.core.util.Proxyable; +import lombok.SneakyThrows; + +import java.io.FilterInputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.util.Optional; +import java.util.function.Function; + +public class BeaconProxyImpl extends ProxyProvider { + + @SneakyThrows + private static DataSource downstreamTransform(DataSource input, ShellStore proxy) { + var proxyNode = JacksonMapper.newMapper().valueToTree(proxy); + var inputNode = JacksonMapper.newMapper().valueToTree(input); + var localNode = JacksonMapper.newMapper().valueToTree(ShellStore.local()); + replace(inputNode, node -> node.equals(proxyNode) ? Optional.of(localNode) : Optional.empty()); + return JacksonMapper.newMapper().treeToValue(inputNode, DataSource.class); + } + + private static JsonNode replace(JsonNode node, Function> function) { + var value = function.apply(node); + if (value.isPresent()) { + return value.get(); + } + + if (!node.isObject()) { + return node; + } + + var replacement = JsonNodeFactory.instance.objectNode(); + var iterator = node.fields(); + while (iterator.hasNext()) { + var stringJsonNodeEntry = iterator.next(); + var resolved = function.apply(stringJsonNodeEntry.getValue()).orElse(stringJsonNodeEntry.getValue()); + replacement.set(stringJsonNodeEntry.getKey(), resolved); + } + return replacement; + } + + @Override + public ShellStore getProxy(Object base) { + var proxy = base instanceof Proxyable p ? p.getProxy() : null; + return ShellStore.isLocal(proxy) ? (BeaconConfig.localProxy() ? proxy : null) : proxy; + } + + @Override + public boolean isRemote(Object base) { + if (base == null) { + throw new IllegalArgumentException("Proxy base is null"); + } + + return getProxy(base) != null; + } + + @Override + public T createRemoteReadConnection(DataSource source, ShellStore proxy) throws Exception { + var downstream = downstreamTransform(source, proxy); + + BeaconClient client = null; + try { + client = BeaconClient.connectProxy(proxy); + client.sendRequest(ProxyReadConnectionExchange.Request.builder() + .source(downstream) + .build()); + client.receiveResponse(); + BeaconClient finalClient = client; + var inputStream = new FilterInputStream(finalClient.receiveBody()) { + @Override + @SneakyThrows + public void close() throws IOException { + super.close(); + finalClient.close(); + } + }; + var inputSource = DataSource.createInternalDataSource(source.getType(), new InputStreamStore(inputStream)); + return (T) inputSource.openReadConnection(); + } catch (Exception ex) { + if (client != null) client.close(); + throw ex; + } + } + + @Override + public T createRemoteWriteConnection(DataSource source, WriteMode mode, ShellStore proxy) throws Exception { + var downstream = downstreamTransform(source, proxy); + + BeaconClient client = null; + try { + client = BeaconClient.connectProxy(proxy); + client.sendRequest(ProxyWriteConnectionExchange.Request.builder() + .source(downstream) + .build()); + BeaconClient finalClient = client; + var outputStream = new FilterOutputStream(client.sendBody()) { + @Override + @SneakyThrows + public void close() throws IOException { + super.close(); + finalClient.receiveResponse(); + finalClient.close(); + } + }; + var outputSource = DataSource.createInternalDataSource(source.getType(), new OutputStreamStore(outputStream)); + return (T) outputSource.openWriteConnection(mode); + } catch (Exception ex) { + if (client != null) client.close(); + throw ex; + } + } + + @Override + @SneakyThrows + public ProxyFunction call(ProxyFunction func, ShellStore proxy) { + try (var client = BeaconClient.connectProxy(proxy)) { + client.sendRequest( + ProxyFunctionExchange.Request.builder().function(func).build()); + ProxyFunctionExchange.Response response = client.receiveResponse(); + return response.getFunction(); + } + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java b/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java index 8394edcf3..1a0d98f26 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java @@ -18,7 +18,8 @@ import java.nio.file.Path; public class BeaconServer { public static boolean isRunning() { - try (var ignored = BeaconClient.connect(null)) { + try (var ignored = BeaconClient.connect( + BeaconClient.ReachableCheckInformation.builder().build())) { return true; } catch (Exception e) { return false; diff --git a/beacon/src/main/java/io/xpipe/beacon/Proxyable.java b/beacon/src/main/java/io/xpipe/beacon/Proxyable.java deleted file mode 100644 index 2dad3b726..000000000 --- a/beacon/src/main/java/io/xpipe/beacon/Proxyable.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.xpipe.beacon; - -import io.xpipe.core.store.ShellStore; - -public interface Proxyable { - - public static ShellStore getProxy(Object base) { - var proxy = base instanceof Proxyable p ? p.getProxy() : null; - return ShellStore.isLocal(proxy) ? (BeaconConfig.localProxy() ? proxy : null) : proxy; - } - - public static boolean isRemote(Object base) { - if (base == null) { - throw new IllegalArgumentException("Proxy base is null"); - } - - return base instanceof Proxyable p && !ShellStore.isLocal(p.getProxy()); - } - - ShellStore getProxy(); -} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/NamedFunctionExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/ProxyFunctionExchange.java similarity index 51% rename from beacon/src/main/java/io/xpipe/beacon/exchange/NamedFunctionExchange.java rename to beacon/src/main/java/io/xpipe/beacon/exchange/ProxyFunctionExchange.java index f952d365c..3d708e56b 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/NamedFunctionExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/ProxyFunctionExchange.java @@ -2,18 +2,18 @@ package io.xpipe.beacon.exchange; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import io.xpipe.beacon.NamedFunction; +import io.xpipe.core.util.ProxyFunction; import io.xpipe.beacon.RequestMessage; import io.xpipe.beacon.ResponseMessage; import lombok.Builder; import lombok.Value; import lombok.extern.jackson.Jacksonized; -public class NamedFunctionExchange implements MessageExchange { +public class ProxyFunctionExchange implements MessageExchange { @Override public String getId() { - return "namedFunction"; + return "proxyFunction"; } @Jacksonized @@ -21,9 +21,9 @@ public class NamedFunctionExchange implements MessageExchange { @Value public static class Request implements RequestMessage { - @JsonSerialize(using = NamedFunction.Serializer.class, as = NamedFunction.class) - @JsonDeserialize(using = NamedFunction.Deserializer.class, as = NamedFunction.class) - NamedFunction function; + @JsonSerialize(using = ProxyFunction.Serializer.class, as = ProxyFunction.class) + @JsonDeserialize(using = ProxyFunction.Deserializer.class, as = ProxyFunction.class) + ProxyFunction function; } @Jacksonized @@ -31,6 +31,8 @@ public class NamedFunctionExchange implements MessageExchange { @Value public static class Response implements ResponseMessage { - Object returnValue; + @JsonSerialize(using = ProxyFunction.Serializer.class, as = ProxyFunction.class) + @JsonDeserialize(using = ProxyFunction.Deserializer.class, as = ProxyFunction.class) + ProxyFunction function; } } diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/ProxyWriteConnectionExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/ProxyWriteConnectionExchange.java new file mode 100644 index 000000000..ab0aed91a --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/ProxyWriteConnectionExchange.java @@ -0,0 +1,32 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.RequestMessage; +import io.xpipe.beacon.ResponseMessage; +import io.xpipe.core.source.DataSource; +import io.xpipe.core.source.WriteMode; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class ProxyWriteConnectionExchange implements MessageExchange { + + @Override + public String getId() { + return "proxyWriteConnection"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull DataSource source; + @NonNull WriteMode mode; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/beacon/src/main/java/module-info.java b/beacon/src/main/java/module-info.java index ab0131ea5..cd2826a70 100644 --- a/beacon/src/main/java/module-info.java +++ b/beacon/src/main/java/module-info.java @@ -1,10 +1,13 @@ import com.fasterxml.jackson.databind.Module; import io.xpipe.beacon.BeaconJacksonModule; +import io.xpipe.beacon.BeaconProxyImpl; +import io.xpipe.core.util.ProxyFunction; import io.xpipe.beacon.exchange.*; import io.xpipe.beacon.exchange.api.QueryRawDataExchange; import io.xpipe.beacon.exchange.api.QueryTableDataExchange; import io.xpipe.beacon.exchange.api.QueryTextDataExchange; import io.xpipe.beacon.exchange.cli.*; +import io.xpipe.core.util.ProxyProvider; module io.xpipe.beacon { exports io.xpipe.beacon; @@ -25,8 +28,9 @@ module io.xpipe.beacon { requires static lombok; uses MessageExchange; - uses io.xpipe.beacon.NamedFunction; + uses ProxyFunction; + provides ProxyProvider with BeaconProxyImpl; provides Module with BeaconJacksonModule; provides io.xpipe.beacon.exchange.MessageExchange with ForwardExchange, @@ -37,7 +41,8 @@ module io.xpipe.beacon { ListCollectionsExchange, ListEntriesExchange, ModeExchange, - NamedFunctionExchange, + ProxyWriteConnectionExchange, + ProxyFunctionExchange, StatusExchange, StopExchange, RenameStoreExchange, diff --git a/core/build.gradle b/core/build.gradle index a0c1bb0f6..b6d35499c 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -17,6 +17,7 @@ dependencies { api group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "2.13.0" implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names', version: "2.13.0" implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: "2.13.0" + implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jdk8', version: "2.13.0" } version = file('../misc/version').text diff --git a/core/src/main/java/io/xpipe/core/impl/InputStreamStore.java b/core/src/main/java/io/xpipe/core/impl/InputStreamStore.java index 5f8e44fdb..223befa2a 100644 --- a/core/src/main/java/io/xpipe/core/impl/InputStreamStore.java +++ b/core/src/main/java/io/xpipe/core/impl/InputStreamStore.java @@ -1,12 +1,12 @@ package io.xpipe.core.impl; +import io.xpipe.core.store.DataFlow; import io.xpipe.core.store.StreamDataStore; import java.io.InputStream; /** * A data store that is only represented by an InputStream. - * This can be useful for development. */ public class InputStreamStore implements StreamDataStore { @@ -21,6 +21,11 @@ public class InputStreamStore implements StreamDataStore { return in; } + @Override + public DataFlow getFlow() { + return DataFlow.INPUT; + } + @Override public boolean canOpen() { return true; diff --git a/core/src/main/java/io/xpipe/core/impl/LocalProcessControlProvider.java b/core/src/main/java/io/xpipe/core/impl/LocalProcessControlProvider.java index edf8d5a7c..62cfaafa2 100644 --- a/core/src/main/java/io/xpipe/core/impl/LocalProcessControlProvider.java +++ b/core/src/main/java/io/xpipe/core/impl/LocalProcessControlProvider.java @@ -12,10 +12,10 @@ public abstract class LocalProcessControlProvider { INSTANCE = layer != null ? ServiceLoader.load(layer, LocalProcessControlProvider.class) .findFirst() - .orElseThrow() + .orElse(null) : ServiceLoader.load(LocalProcessControlProvider.class) .findFirst() - .orElseThrow(); + .orElse(null); } public static ShellProcessControl create() { diff --git a/core/src/main/java/io/xpipe/core/impl/OutputStreamStore.java b/core/src/main/java/io/xpipe/core/impl/OutputStreamStore.java index df6092a58..1c93ecfe9 100644 --- a/core/src/main/java/io/xpipe/core/impl/OutputStreamStore.java +++ b/core/src/main/java/io/xpipe/core/impl/OutputStreamStore.java @@ -1,5 +1,6 @@ package io.xpipe.core.impl; +import io.xpipe.core.store.DataFlow; import io.xpipe.core.store.StreamDataStore; import java.io.InputStream; @@ -13,6 +14,11 @@ public class OutputStreamStore implements StreamDataStore { this.out = out; } + @Override + public DataFlow getFlow() { + return DataFlow.OUTPUT; + } + @Override public InputStream openInput() throws Exception { throw new UnsupportedOperationException("No input available"); diff --git a/core/src/main/java/io/xpipe/core/impl/PreservingWriteConnection.java b/core/src/main/java/io/xpipe/core/impl/PreservingWriteConnection.java index a36f020b3..34769d2f2 100644 --- a/core/src/main/java/io/xpipe/core/impl/PreservingWriteConnection.java +++ b/core/src/main/java/io/xpipe/core/impl/PreservingWriteConnection.java @@ -29,15 +29,17 @@ public class PreservingWriteConnection implements DataSourceConnection { if (source.getStore().canOpen()) { try (var in = source.openReadConnection(); var out = nativeSource.openWriteConnection(WriteMode.REPLACE)) { + in.init(); + out.init(); in.forward(out); } - ; } connection.init(); if (source.getStore().canOpen()) { try (var in = nativeSource.openReadConnection()) { + in.init(); in.forward(connection); } } diff --git a/core/src/main/java/io/xpipe/core/process/ProcessControl.java b/core/src/main/java/io/xpipe/core/process/ProcessControl.java index 69be16665..72a521c1d 100644 --- a/core/src/main/java/io/xpipe/core/process/ProcessControl.java +++ b/core/src/main/java/io/xpipe/core/process/ProcessControl.java @@ -1,5 +1,6 @@ package io.xpipe.core.process; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -7,7 +8,7 @@ import java.nio.charset.Charset; import java.util.List; import java.util.stream.Collectors; -public interface ProcessControl extends AutoCloseable { +public interface ProcessControl extends Closeable, AutoCloseable { static String join(List command) { return command.stream().map(s -> s.contains(" ") ? "\"" + s + "\"" : s).collect(Collectors.joining(" ")); diff --git a/core/src/main/java/io/xpipe/core/source/CollectionDataSource.java b/core/src/main/java/io/xpipe/core/source/CollectionDataSource.java index a8d939ba0..43e9d85b3 100644 --- a/core/src/main/java/io/xpipe/core/source/CollectionDataSource.java +++ b/core/src/main/java/io/xpipe/core/source/CollectionDataSource.java @@ -12,6 +12,11 @@ public abstract class CollectionDataSource extends DataSou @Singular private final Map preferredProviders; + @Override + public DataSourceType getType() { + return DataSourceType.COLLECTION; + } + public CollectionDataSource annotate(String file, String provider) { preferredProviders.put(file, provider); return this; @@ -22,18 +27,12 @@ public abstract class CollectionDataSource extends DataSou return this; } - @Override - public final DataSourceInfo determineInfo() throws Exception { - try (var con = openReadConnection()) { - var c = (int) con.listEntries().count(); - return new DataSourceInfo.Collection(c); - } - } - public final CollectionReadConnection openReadConnection() throws Exception { - var con = newReadConnection(); - con.init(); - return con; + if (!isComplete()) { + throw new UnsupportedOperationException(); + } + + return newReadConnection(); } public final CollectionWriteConnection openWriteConnection(WriteMode mode) throws Exception { @@ -42,7 +41,6 @@ public abstract class CollectionDataSource extends DataSou throw new UnsupportedOperationException(mode.getId()); } - con.init(); return con; } diff --git a/core/src/main/java/io/xpipe/core/source/DataSource.java b/core/src/main/java/io/xpipe/core/source/DataSource.java index 60abd9089..9f42444e5 100644 --- a/core/src/main/java/io/xpipe/core/source/DataSource.java +++ b/core/src/main/java/io/xpipe/core/source/DataSource.java @@ -123,13 +123,6 @@ public abstract class DataSource extends JacksonizedValue return Optional.empty(); } - /** - * Determines the data source info. - * This is usually called only once on data source - * creation as this process might be expensive. - */ - public abstract DataSourceInfo determineInfo() throws Exception; - public DataSourceReadConnection openReadConnection() throws Exception { throw new UnsupportedOperationException(); } @@ -141,4 +134,6 @@ public abstract class DataSource extends JacksonizedValue public DS getStore() { return store; } + + public abstract DataSourceType getType(); } diff --git a/core/src/main/java/io/xpipe/core/source/RawDataSource.java b/core/src/main/java/io/xpipe/core/source/RawDataSource.java index 5dec34a1a..6965e56f6 100644 --- a/core/src/main/java/io/xpipe/core/source/RawDataSource.java +++ b/core/src/main/java/io/xpipe/core/source/RawDataSource.java @@ -9,19 +9,17 @@ public abstract class RawDataSource extends DataSource private static final int MAX_BYTES_READ = 100000; @Override - public final DataSourceInfo determineInfo() throws Exception { - try (var con = openReadConnection()) { - var b = con.readBytes(MAX_BYTES_READ); - int usedCount = b.length == MAX_BYTES_READ ? -1 : b.length; - return new DataSourceInfo.Raw(usedCount); - } + public DataSourceType getType() { + return DataSourceType.RAW; } @Override public final RawReadConnection openReadConnection() throws Exception { - var con = newReadConnection(); - con.init(); - return con; + if (!isComplete()) { + throw new UnsupportedOperationException(); + } + + return newReadConnection(); } @Override @@ -31,7 +29,6 @@ public abstract class RawDataSource extends DataSource throw new UnsupportedOperationException(mode.getId()); } - con.init(); return con; } diff --git a/core/src/main/java/io/xpipe/core/source/StructureDataSource.java b/core/src/main/java/io/xpipe/core/source/StructureDataSource.java index fbb0e27d4..222937645 100644 --- a/core/src/main/java/io/xpipe/core/source/StructureDataSource.java +++ b/core/src/main/java/io/xpipe/core/source/StructureDataSource.java @@ -7,6 +7,11 @@ import lombok.experimental.SuperBuilder; @SuperBuilder public abstract class StructureDataSource extends DataSource { + @Override + public DataSourceType getType() { + return DataSourceType.STRUCTURE; + } + private int countEntries(DataStructureNode n) { if (n.isValue()) { return 1; @@ -19,19 +24,12 @@ public abstract class StructureDataSource extends DataSour return c; } - @Override - public final DataSourceInfo determineInfo() throws Exception { - try (var con = openReadConnection()) { - var n = con.read(); - var c = countEntries(n); - return new DataSourceInfo.Structure(c); - } - } - public final StructureReadConnection openReadConnection() throws Exception { - var con = newReadConnection(); - con.init(); - return con; + if (!isComplete()) { + throw new UnsupportedOperationException(); + } + + return newReadConnection(); } public final StructureWriteConnection openWriteConnection(WriteMode mode) throws Exception { @@ -40,7 +38,6 @@ public abstract class StructureDataSource extends DataSour throw new UnsupportedOperationException(mode.getId()); } - con.init(); return con; } diff --git a/core/src/main/java/io/xpipe/core/source/TableDataSource.java b/core/src/main/java/io/xpipe/core/source/TableDataSource.java index d60e8c873..619efc4ad 100644 --- a/core/src/main/java/io/xpipe/core/source/TableDataSource.java +++ b/core/src/main/java/io/xpipe/core/source/TableDataSource.java @@ -3,6 +3,10 @@ package io.xpipe.core.source; import io.xpipe.core.data.type.TupleType; import io.xpipe.core.impl.PreservingTableWriteConnection; import io.xpipe.core.store.DataStore; +import io.xpipe.core.util.ProxyProvider; +import io.xpipe.core.util.SimpleProxyFunction; +import lombok.NoArgsConstructor; +import lombok.SneakyThrows; import lombok.experimental.SuperBuilder; import java.util.Optional; @@ -24,34 +28,47 @@ public abstract class TableDataSource extends DataSource { + + private TableDataSource source; + private TupleType type; + + public CreateMappingFunction(TableDataSource source, TupleType type) { + this.source = source; + this.type = type; + } + + private TableMapping mapping; + + @SneakyThrows + public void callLocal() { + try (TableWriteConnection w = source.openWriteConnection(WriteMode.REPLACE)) { + w.init(); + mapping = w.createMapping(type).orElse(null); + } + } + } + + public final Optional createMapping(TupleType inputType) throws Exception { + return Optional.ofNullable(new CreateMappingFunction(this, inputType).callAndGet()); } public final TableWriteConnection openWriteConnection(WriteMode mode) throws Exception { @@ -60,7 +77,11 @@ public abstract class TableDataSource extends DataSource extends DataSource { private static final int MAX_LINE_READ = 1000; @Override - public final DataSourceInfo determineInfo() throws Exception { - if (!getStore().canOpen()) { - return new DataSourceInfo.Text(-1, -1); - } - - try (var con = openReadConnection()) { - AtomicInteger lineCount = new AtomicInteger(); - AtomicInteger charCount = new AtomicInteger(); - con.lines().limit(MAX_LINE_READ).forEach(s -> { - lineCount.getAndIncrement(); - charCount.addAndGet(s.length()); - }); - boolean limitHit = lineCount.get() == MAX_LINE_READ; - return new DataSourceInfo.Text(limitHit ? -1 : charCount.get(), limitHit ? -1 : lineCount.get()); - } + public DataSourceType getType() { + return DataSourceType.TEXT; } @Override public final TextReadConnection openReadConnection() throws Exception { - var con = newReadConnection(); - con.init(); - return con; + if (!isComplete()) { + throw new UnsupportedOperationException(); + } + + return newReadConnection(); } @Override public final TextWriteConnection openWriteConnection(WriteMode mode) throws Exception { var con = newWriteConnection(mode); - con.init(); + if (con == null) { + throw new UnsupportedOperationException(mode.getId()); + } + return con; } - protected TextWriteConnection newWriteConnection(WriteMode mode) { + protected TextWriteConnection newWriteConnection(WriteMode mode) { if (mode.equals(WriteMode.PREPEND)) { return new PreservingTextWriteConnection(this, newWriteConnection(WriteMode.REPLACE), false); } diff --git a/beacon/src/main/java/io/xpipe/beacon/NamedFunction.java b/core/src/main/java/io/xpipe/core/util/ProxyFunction.java similarity index 65% rename from beacon/src/main/java/io/xpipe/beacon/NamedFunction.java rename to core/src/main/java/io/xpipe/core/util/ProxyFunction.java index 2565856ab..9289d8ee5 100644 --- a/beacon/src/main/java/io/xpipe/beacon/NamedFunction.java +++ b/core/src/main/java/io/xpipe/core/util/ProxyFunction.java @@ -1,4 +1,4 @@ -package io.xpipe.beacon; +package io.xpipe.core.util; import com.fasterxml.jackson.core.JacksonException; import com.fasterxml.jackson.core.JsonGenerator; @@ -9,8 +9,6 @@ import com.fasterxml.jackson.databind.deser.std.StdDeserializer; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.TextNode; import com.fasterxml.jackson.databind.ser.std.StdSerializer; -import io.xpipe.beacon.exchange.NamedFunctionExchange; -import io.xpipe.core.util.JacksonMapper; import lombok.Getter; import lombok.SneakyThrows; @@ -18,7 +16,7 @@ import java.io.IOException; import java.util.Arrays; @Getter -public abstract class NamedFunction { +public abstract class ProxyFunction { private static ModuleLayer layer; @@ -26,14 +24,14 @@ public abstract class NamedFunction { layer = l; } - public static class Serializer extends StdSerializer { + public static class Serializer extends StdSerializer { protected Serializer() { - super(NamedFunction.class); + super(ProxyFunction.class); } @Override - public void serialize(NamedFunction value, JsonGenerator gen, SerializerProvider provider) throws IOException { + public void serialize(ProxyFunction value, JsonGenerator gen, SerializerProvider provider) throws IOException { var node = (ObjectNode) JacksonMapper.getDefault().valueToTree(value); node.set("module", new TextNode(value.getClass().getModule().getName())); node.set("class", new TextNode(value.getClass().getName())); @@ -41,41 +39,36 @@ public abstract class NamedFunction { } } - public static class Deserializer extends StdDeserializer> { + public static class Deserializer extends StdDeserializer { protected Deserializer() { - super(NamedFunction.class); + super(ProxyFunction.class); } @Override @SneakyThrows - public NamedFunction deserialize(JsonParser p, DeserializationContext ctxt) + public ProxyFunction deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JacksonException { var tree = (ObjectNode) JacksonMapper.getDefault().readTree(p); var moduleReference = tree.remove("module").asText(); var classReference = tree.remove("class").asText(); var module = layer.findModule(moduleReference).orElseThrow(); var targetClass = Class.forName(module, classReference); - if (targetClass == null) { throw new IllegalArgumentException("Named function class not found: " + classReference); } - - return (NamedFunction) JacksonMapper.getDefault().treeToValue(tree, targetClass); + return (ProxyFunction) JacksonMapper.getDefault().treeToValue(tree, targetClass); } } @SneakyThrows - public T call() { - var proxyStore = Proxyable.getProxy(getProxyBase()); + public ProxyFunction callAndCopy() { + var proxyStore = ProxyProvider.get().getProxy(getProxyBase()); if (proxyStore != null) { - var client = BeaconClient.connectProxy(proxyStore); - client.sendRequest( - NamedFunctionExchange.Request.builder().function(this).build()); - NamedFunctionExchange.Response response = client.receiveResponse(); - return (T) response.getReturnValue(); + return ProxyProvider.get().call(this, proxyStore); } else { - return callLocal(); + callLocal(); + return this; } } @@ -86,5 +79,5 @@ public abstract class NamedFunction { return first.get(this); } - public abstract T callLocal(); + public abstract void callLocal(); } diff --git a/core/src/main/java/io/xpipe/core/util/ProxyProvider.java b/core/src/main/java/io/xpipe/core/util/ProxyProvider.java new file mode 100644 index 000000000..ec47e95b6 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/util/ProxyProvider.java @@ -0,0 +1,36 @@ +package io.xpipe.core.util; + +import io.xpipe.core.source.DataSource; +import io.xpipe.core.source.DataSourceConnection; +import io.xpipe.core.source.DataSourceReadConnection; +import io.xpipe.core.source.WriteMode; +import io.xpipe.core.store.ShellStore; + +import java.util.ServiceLoader; + +public abstract class ProxyProvider { + + private static ProxyProvider INSTANCE; + + public static ProxyProvider get() { + if (INSTANCE == null) { + INSTANCE = ServiceLoader.load(ModuleLayer.boot(), ProxyProvider.class) + .findFirst() + .orElseThrow(); + } + + return INSTANCE; + } + + public abstract ShellStore getProxy(Object base); + + public abstract boolean isRemote(Object base); + + public abstract T createRemoteReadConnection( + DataSource source, ShellStore proxy) throws Exception; + + public abstract T createRemoteWriteConnection( + DataSource source, WriteMode mode, ShellStore proxy) throws Exception; + + public abstract ProxyFunction call(ProxyFunction func, ShellStore proxy); +} diff --git a/core/src/main/java/io/xpipe/core/util/Proxyable.java b/core/src/main/java/io/xpipe/core/util/Proxyable.java new file mode 100644 index 000000000..3d912e367 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/util/Proxyable.java @@ -0,0 +1,8 @@ +package io.xpipe.core.util; + +import io.xpipe.core.store.ShellStore; + +public interface Proxyable { + + ShellStore getProxy(); +} diff --git a/core/src/main/java/io/xpipe/core/util/SimpleProxyFunction.java b/core/src/main/java/io/xpipe/core/util/SimpleProxyFunction.java new file mode 100644 index 000000000..cca847c59 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/util/SimpleProxyFunction.java @@ -0,0 +1,21 @@ +package io.xpipe.core.util; + +import lombok.SneakyThrows; + +public abstract class SimpleProxyFunction extends ProxyFunction { + + @SneakyThrows + public T getResult() { + var fields = getClass().getDeclaredFields(); + var last = fields[fields.length - 1]; + last.setAccessible(true); + return (T) last.get(this); + } + + @SneakyThrows + public T callAndGet() { + var result = callAndCopy(); + return ((SimpleProxyFunction) result).getResult(); + } + +} diff --git a/core/src/main/java/module-info.java b/core/src/main/java/module-info.java index 0a2ba3a8a..f488c7259 100644 --- a/core/src/main/java/module-info.java +++ b/core/src/main/java/module-info.java @@ -25,6 +25,7 @@ open module io.xpipe.core { uses com.fasterxml.jackson.databind.Module; uses io.xpipe.core.source.WriteMode; uses LocalProcessControlProvider; + uses io.xpipe.core.util.ProxyProvider; provides WriteMode with WriteMode.Replace, WriteMode.Append, WriteMode.Prepend; provides com.fasterxml.jackson.databind.Module with diff --git a/extension/src/main/java/io/xpipe/extension/XPipeProxy.java b/extension/src/main/java/io/xpipe/extension/XPipeProxy.java index 66b3b54d0..a0112571c 100644 --- a/extension/src/main/java/io/xpipe/extension/XPipeProxy.java +++ b/extension/src/main/java/io/xpipe/extension/XPipeProxy.java @@ -1,75 +1,23 @@ package io.xpipe.extension; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import io.xpipe.api.connector.XPipeApiConnection; -import io.xpipe.beacon.exchange.ProxyReadConnectionExchange; import io.xpipe.core.impl.FileNames; -import io.xpipe.core.impl.InputStreamStore; import io.xpipe.core.process.ShellProcessControl; -import io.xpipe.core.source.DataSource; -import io.xpipe.core.source.DataSourceReadConnection; import io.xpipe.core.store.ShellStore; -import io.xpipe.core.util.JacksonMapper; import io.xpipe.core.util.XPipeInstallation; import io.xpipe.extension.util.XPipeDaemon; -import lombok.SneakyThrows; import java.io.IOException; -import java.util.Optional; -import java.util.function.Function; public class XPipeProxy { - @SneakyThrows - private static DataSource downstreamTransform(DataSource input, ShellStore proxy) { - var proxyNode = JacksonMapper.newMapper().valueToTree(proxy); - var inputNode = JacksonMapper.newMapper().valueToTree(input); - var localNode = JacksonMapper.newMapper().valueToTree(ShellStore.local()); - replace(inputNode, node -> node.equals(proxyNode) ? Optional.of(localNode) : Optional.empty()); - return JacksonMapper.newMapper().treeToValue(inputNode, DataSource.class); - } - - private static JsonNode replace(JsonNode node, Function> function) { - var value = function.apply(node); - if (value.isPresent()) { - return value.get(); - } - - if (!node.isObject()) { - return node; - } - - var replacement = JsonNodeFactory.instance.objectNode(); - var iterator = node.fields(); - while (iterator.hasNext()) { - var stringJsonNodeEntry = iterator.next(); - var resolved = function.apply(stringJsonNodeEntry.getValue()).orElse(stringJsonNodeEntry.getValue()); - replacement.set(stringJsonNodeEntry.getKey(), resolved); - } - return replacement; - } - - public static T remoteReadConnection(DataSource source, ShellStore proxy) { - var downstream = downstreamTransform(source, proxy); - return (T) XPipeApiConnection.execute(con -> { - con.sendRequest(ProxyReadConnectionExchange.Request.builder() - .source(downstream) - .build()); - con.receiveResponse(); - var inputSource = DataSource.createInternalDataSource( - source.determineInfo().getType(), new InputStreamStore(con.receiveBody())); - return inputSource.openReadConnection(); - }); - } - public static void checkSupport(ShellStore store) throws Exception { var version = XPipeDaemon.getInstance().getVersion(); try (ShellProcessControl s = store.create().start()) { var defaultInstallationExecutable = FileNames.join( XPipeInstallation.getDefaultInstallationBasePath(s), XPipeInstallation.getDaemonExecutablePath(s.getOsType())); - if (!s.executeBooleanSimpleCommand(s.getShellType().createFileExistsCommand(defaultInstallationExecutable))) { + if (!s.executeBooleanSimpleCommand( + s.getShellType().createFileExistsCommand(defaultInstallationExecutable))) { throw new IOException(I18n.get("noInstallationFound")); } diff --git a/extension/src/main/java/io/xpipe/extension/XPipeServiceProviders.java b/extension/src/main/java/io/xpipe/extension/XPipeServiceProviders.java index 8c223943c..42ec3dd3f 100644 --- a/extension/src/main/java/io/xpipe/extension/XPipeServiceProviders.java +++ b/extension/src/main/java/io/xpipe/extension/XPipeServiceProviders.java @@ -1,7 +1,7 @@ package io.xpipe.extension; import com.fasterxml.jackson.databind.jsontype.NamedType; -import io.xpipe.beacon.NamedFunction; +import io.xpipe.core.util.ProxyFunction; import io.xpipe.core.impl.LocalProcessControlProvider; import io.xpipe.core.util.JacksonMapper; import io.xpipe.extension.event.TrackEvent; @@ -36,7 +36,7 @@ public class XPipeServiceProviders { SupportedApplicationProviders.loadAll(layer); PrefsProviders.init(layer); - NamedFunction.init(layer); + ProxyFunction.init(layer); TrackEvent.info("Finished loading extension providers"); } } diff --git a/extension/src/main/java/io/xpipe/extension/event/TrackEvent.java b/extension/src/main/java/io/xpipe/extension/event/TrackEvent.java index 0d6c99ed1..b365534d2 100644 --- a/extension/src/main/java/io/xpipe/extension/event/TrackEvent.java +++ b/extension/src/main/java/io/xpipe/extension/event/TrackEvent.java @@ -37,6 +37,10 @@ public class TrackEvent { return builder().type("info").message(message); } + public static TrackEventBuilder withInfo(String category, String message) { + return builder().category(category).type("info").message(message); + } + public static TrackEventBuilder withWarn(String category, String message) { return builder().category(category).type("warn").message(message); } diff --git a/extension/src/main/java/module-info.java b/extension/src/main/java/module-info.java index cafb2428a..e13cb0803 100644 --- a/extension/src/main/java/module-info.java +++ b/extension/src/main/java/module-info.java @@ -1,4 +1,4 @@ -import io.xpipe.beacon.NamedFunction; +import io.xpipe.core.util.ProxyFunction; import io.xpipe.extension.DataSourceProvider; import io.xpipe.extension.DataStoreActionProvider; import io.xpipe.extension.SupportedApplicationProvider; @@ -41,5 +41,5 @@ open module io.xpipe.extension { uses XPipeDaemon; uses io.xpipe.extension.Cache; uses io.xpipe.extension.DataSourceActionProvider; - uses NamedFunction; + uses ProxyFunction; }