More work on proxies

This commit is contained in:
Christopher Schnick 2022-11-30 19:04:35 +01:00
parent 803ff2ccf2
commit 74691c5a03
30 changed files with 442 additions and 213 deletions

View file

@ -15,6 +15,7 @@ import io.xpipe.core.store.ShellStore;
import io.xpipe.core.util.JacksonMapper;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
@ -31,7 +32,7 @@ import static io.xpipe.beacon.BeaconConfig.BODY_SEPARATOR;
public class BeaconClient implements AutoCloseable {
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public static abstract class ClientInformation {
public abstract static class ClientInformation {
public final CliClientInformation cli() {
return (CliClientInformation) this;
@ -55,6 +56,19 @@ public class BeaconClient implements AutoCloseable {
}
}
@JsonTypeName("reachableCheck")
@Value
@Builder
@Jacksonized
@EqualsAndHashCode(callSuper = false)
public static class ReachableCheckInformation extends ClientInformation {
@Override
public String toDisplayString() {
return "Reachable check";
}
}
@JsonTypeName("gateway")
@Value
@Builder
@ -86,12 +100,14 @@ public class BeaconClient implements AutoCloseable {
}
}
private final Closeable closeable;
@Getter
private final Closeable base;
private final InputStream in;
private final OutputStream out;
private BeaconClient(Closeable closeable, InputStream in, OutputStream out) {
this.closeable = closeable;
private BeaconClient(Closeable base, InputStream in, OutputStream out) {
this.base = base;
this.in = in;
this.out = out;
}
@ -105,11 +121,33 @@ public class BeaconClient implements AutoCloseable {
public static BeaconClient connectProxy(ShellStore proxy) throws Exception {
var control = proxy.create().start();
var command = control.command("xpipe beacon").start();
var command = control.command("xpipe beacon --raw").start();
command.discardErr();
return new BeaconClient(() -> {
command.close();
}, command.getStdout(), command.getStdin());
return new BeaconClient(command, command.getStdout(), command.getStdin()) {
@Override
public <T extends ResponseMessage> T receiveResponse()
throws ConnectorException, ClientException, ServerException {
try {
sendEOF();
getRawOutputStream().close();
} catch (IOException ex) {
throw new ConnectorException(ex);
}
return super.receiveResponse();
}
@Override
public void close() throws ConnectorException {
try {
getRawInputStream().readAllBytes();
} catch (IOException ex) {
throw new ConnectorException(ex);
}
super.close();
}
};
}
public static Optional<BeaconClient> tryConnect(ClientInformation information) {
@ -122,7 +160,7 @@ public class BeaconClient implements AutoCloseable {
public void close() throws ConnectorException {
try {
closeable.close();
base.close();
} catch (IOException ex) {
throw new ConnectorException("Couldn't close client", ex);
}
@ -170,6 +208,13 @@ public class BeaconClient implements AutoCloseable {
sendObject(msg);
}
public void sendEOF() throws ConnectorException {
try (OutputStream ignored = BeaconFormat.writeBlocks(out)) {
} catch (IOException ex) {
throw new ConnectorException("Couldn't write to socket", ex);
}
}
public void sendObject(JsonNode node) throws ConnectorException {
var writer = new StringWriter();
var mapper = JacksonMapper.newMapper();

View file

@ -10,6 +10,6 @@ public class BeaconJacksonModule extends SimpleModule {
context.registerSubtypes(
new NamedType(BeaconClient.ApiClientInformation.class),
new NamedType(BeaconClient.CliClientInformation.class),
new NamedType(BeaconClient.GatewayClientInformation.class));
new NamedType(BeaconClient.ReachableCheckInformation.class));
}
}

View file

@ -0,0 +1,139 @@
package io.xpipe.beacon;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import io.xpipe.beacon.exchange.ProxyFunctionExchange;
import io.xpipe.beacon.exchange.ProxyReadConnectionExchange;
import io.xpipe.beacon.exchange.ProxyWriteConnectionExchange;
import io.xpipe.core.impl.InputStreamStore;
import io.xpipe.core.impl.OutputStreamStore;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.source.DataSourceConnection;
import io.xpipe.core.source.DataSourceReadConnection;
import io.xpipe.core.source.WriteMode;
import io.xpipe.core.store.ShellStore;
import io.xpipe.core.util.JacksonMapper;
import io.xpipe.core.util.ProxyFunction;
import io.xpipe.core.util.ProxyProvider;
import io.xpipe.core.util.Proxyable;
import lombok.SneakyThrows;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
public class BeaconProxyImpl extends ProxyProvider {
@SneakyThrows
private static DataSource<?> downstreamTransform(DataSource<?> input, ShellStore proxy) {
var proxyNode = JacksonMapper.newMapper().valueToTree(proxy);
var inputNode = JacksonMapper.newMapper().valueToTree(input);
var localNode = JacksonMapper.newMapper().valueToTree(ShellStore.local());
replace(inputNode, node -> node.equals(proxyNode) ? Optional.of(localNode) : Optional.empty());
return JacksonMapper.newMapper().treeToValue(inputNode, DataSource.class);
}
private static JsonNode replace(JsonNode node, Function<JsonNode, Optional<JsonNode>> function) {
var value = function.apply(node);
if (value.isPresent()) {
return value.get();
}
if (!node.isObject()) {
return node;
}
var replacement = JsonNodeFactory.instance.objectNode();
var iterator = node.fields();
while (iterator.hasNext()) {
var stringJsonNodeEntry = iterator.next();
var resolved = function.apply(stringJsonNodeEntry.getValue()).orElse(stringJsonNodeEntry.getValue());
replacement.set(stringJsonNodeEntry.getKey(), resolved);
}
return replacement;
}
@Override
public ShellStore getProxy(Object base) {
var proxy = base instanceof Proxyable p ? p.getProxy() : null;
return ShellStore.isLocal(proxy) ? (BeaconConfig.localProxy() ? proxy : null) : proxy;
}
@Override
public boolean isRemote(Object base) {
if (base == null) {
throw new IllegalArgumentException("Proxy base is null");
}
return getProxy(base) != null;
}
@Override
public <T extends DataSourceReadConnection> T createRemoteReadConnection(DataSource<?> source, ShellStore proxy) throws Exception {
var downstream = downstreamTransform(source, proxy);
BeaconClient client = null;
try {
client = BeaconClient.connectProxy(proxy);
client.sendRequest(ProxyReadConnectionExchange.Request.builder()
.source(downstream)
.build());
client.receiveResponse();
BeaconClient finalClient = client;
var inputStream = new FilterInputStream(finalClient.receiveBody()) {
@Override
@SneakyThrows
public void close() throws IOException {
super.close();
finalClient.close();
}
};
var inputSource = DataSource.createInternalDataSource(source.getType(), new InputStreamStore(inputStream));
return (T) inputSource.openReadConnection();
} catch (Exception ex) {
if (client != null) client.close();
throw ex;
}
}
@Override
public <T extends DataSourceConnection> T createRemoteWriteConnection(DataSource<?> source, WriteMode mode, ShellStore proxy) throws Exception {
var downstream = downstreamTransform(source, proxy);
BeaconClient client = null;
try {
client = BeaconClient.connectProxy(proxy);
client.sendRequest(ProxyWriteConnectionExchange.Request.builder()
.source(downstream)
.build());
BeaconClient finalClient = client;
var outputStream = new FilterOutputStream(client.sendBody()) {
@Override
@SneakyThrows
public void close() throws IOException {
super.close();
finalClient.receiveResponse();
finalClient.close();
}
};
var outputSource = DataSource.createInternalDataSource(source.getType(), new OutputStreamStore(outputStream));
return (T) outputSource.openWriteConnection(mode);
} catch (Exception ex) {
if (client != null) client.close();
throw ex;
}
}
@Override
@SneakyThrows
public ProxyFunction call(ProxyFunction func, ShellStore proxy) {
try (var client = BeaconClient.connectProxy(proxy)) {
client.sendRequest(
ProxyFunctionExchange.Request.builder().function(func).build());
ProxyFunctionExchange.Response response = client.receiveResponse();
return response.getFunction();
}
}
}

View file

@ -18,7 +18,8 @@ import java.nio.file.Path;
public class BeaconServer {
public static boolean isRunning() {
try (var ignored = BeaconClient.connect(null)) {
try (var ignored = BeaconClient.connect(
BeaconClient.ReachableCheckInformation.builder().build())) {
return true;
} catch (Exception e) {
return false;

View file

@ -1,21 +0,0 @@
package io.xpipe.beacon;
import io.xpipe.core.store.ShellStore;
public interface Proxyable {
public static ShellStore getProxy(Object base) {
var proxy = base instanceof Proxyable p ? p.getProxy() : null;
return ShellStore.isLocal(proxy) ? (BeaconConfig.localProxy() ? proxy : null) : proxy;
}
public static boolean isRemote(Object base) {
if (base == null) {
throw new IllegalArgumentException("Proxy base is null");
}
return base instanceof Proxyable p && !ShellStore.isLocal(p.getProxy());
}
ShellStore getProxy();
}

View file

@ -2,18 +2,18 @@ package io.xpipe.beacon.exchange;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import io.xpipe.beacon.NamedFunction;
import io.xpipe.core.util.ProxyFunction;
import io.xpipe.beacon.RequestMessage;
import io.xpipe.beacon.ResponseMessage;
import lombok.Builder;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
public class NamedFunctionExchange implements MessageExchange {
public class ProxyFunctionExchange implements MessageExchange {
@Override
public String getId() {
return "namedFunction";
return "proxyFunction";
}
@Jacksonized
@ -21,9 +21,9 @@ public class NamedFunctionExchange implements MessageExchange {
@Value
public static class Request implements RequestMessage {
@JsonSerialize(using = NamedFunction.Serializer.class, as = NamedFunction.class)
@JsonDeserialize(using = NamedFunction.Deserializer.class, as = NamedFunction.class)
NamedFunction<?> function;
@JsonSerialize(using = ProxyFunction.Serializer.class, as = ProxyFunction.class)
@JsonDeserialize(using = ProxyFunction.Deserializer.class, as = ProxyFunction.class)
ProxyFunction function;
}
@Jacksonized
@ -31,6 +31,8 @@ public class NamedFunctionExchange implements MessageExchange {
@Value
public static class Response implements ResponseMessage {
Object returnValue;
@JsonSerialize(using = ProxyFunction.Serializer.class, as = ProxyFunction.class)
@JsonDeserialize(using = ProxyFunction.Deserializer.class, as = ProxyFunction.class)
ProxyFunction function;
}
}

View file

@ -0,0 +1,32 @@
package io.xpipe.beacon.exchange;
import io.xpipe.beacon.RequestMessage;
import io.xpipe.beacon.ResponseMessage;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.source.WriteMode;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
public class ProxyWriteConnectionExchange implements MessageExchange {
@Override
public String getId() {
return "proxyWriteConnection";
}
@Jacksonized
@Builder
@Value
public static class Request implements RequestMessage {
@NonNull DataSource<?> source;
@NonNull WriteMode mode;
}
@Jacksonized
@Builder
@Value
public static class Response implements ResponseMessage {
}
}

View file

@ -1,10 +1,13 @@
import com.fasterxml.jackson.databind.Module;
import io.xpipe.beacon.BeaconJacksonModule;
import io.xpipe.beacon.BeaconProxyImpl;
import io.xpipe.core.util.ProxyFunction;
import io.xpipe.beacon.exchange.*;
import io.xpipe.beacon.exchange.api.QueryRawDataExchange;
import io.xpipe.beacon.exchange.api.QueryTableDataExchange;
import io.xpipe.beacon.exchange.api.QueryTextDataExchange;
import io.xpipe.beacon.exchange.cli.*;
import io.xpipe.core.util.ProxyProvider;
module io.xpipe.beacon {
exports io.xpipe.beacon;
@ -25,8 +28,9 @@ module io.xpipe.beacon {
requires static lombok;
uses MessageExchange;
uses io.xpipe.beacon.NamedFunction;
uses ProxyFunction;
provides ProxyProvider with BeaconProxyImpl;
provides Module with BeaconJacksonModule;
provides io.xpipe.beacon.exchange.MessageExchange with
ForwardExchange,
@ -37,7 +41,8 @@ module io.xpipe.beacon {
ListCollectionsExchange,
ListEntriesExchange,
ModeExchange,
NamedFunctionExchange,
ProxyWriteConnectionExchange,
ProxyFunctionExchange,
StatusExchange,
StopExchange,
RenameStoreExchange,

View file

@ -17,6 +17,7 @@ dependencies {
api group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "2.13.0"
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-parameter-names', version: "2.13.0"
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: "2.13.0"
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jdk8', version: "2.13.0"
}
version = file('../misc/version').text

View file

@ -1,12 +1,12 @@
package io.xpipe.core.impl;
import io.xpipe.core.store.DataFlow;
import io.xpipe.core.store.StreamDataStore;
import java.io.InputStream;
/**
* A data store that is only represented by an InputStream.
* This can be useful for development.
*/
public class InputStreamStore implements StreamDataStore {
@ -21,6 +21,11 @@ public class InputStreamStore implements StreamDataStore {
return in;
}
@Override
public DataFlow getFlow() {
return DataFlow.INPUT;
}
@Override
public boolean canOpen() {
return true;

View file

@ -12,10 +12,10 @@ public abstract class LocalProcessControlProvider {
INSTANCE = layer != null
? ServiceLoader.load(layer, LocalProcessControlProvider.class)
.findFirst()
.orElseThrow()
.orElse(null)
: ServiceLoader.load(LocalProcessControlProvider.class)
.findFirst()
.orElseThrow();
.orElse(null);
}
public static ShellProcessControl create() {

View file

@ -1,5 +1,6 @@
package io.xpipe.core.impl;
import io.xpipe.core.store.DataFlow;
import io.xpipe.core.store.StreamDataStore;
import java.io.InputStream;
@ -13,6 +14,11 @@ public class OutputStreamStore implements StreamDataStore {
this.out = out;
}
@Override
public DataFlow getFlow() {
return DataFlow.OUTPUT;
}
@Override
public InputStream openInput() throws Exception {
throw new UnsupportedOperationException("No input available");

View file

@ -29,15 +29,17 @@ public class PreservingWriteConnection implements DataSourceConnection {
if (source.getStore().canOpen()) {
try (var in = source.openReadConnection();
var out = nativeSource.openWriteConnection(WriteMode.REPLACE)) {
in.init();
out.init();
in.forward(out);
}
;
}
connection.init();
if (source.getStore().canOpen()) {
try (var in = nativeSource.openReadConnection()) {
in.init();
in.forward(connection);
}
}

View file

@ -1,5 +1,6 @@
package io.xpipe.core.process;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -7,7 +8,7 @@ import java.nio.charset.Charset;
import java.util.List;
import java.util.stream.Collectors;
public interface ProcessControl extends AutoCloseable {
public interface ProcessControl extends Closeable, AutoCloseable {
static String join(List<String> command) {
return command.stream().map(s -> s.contains(" ") ? "\"" + s + "\"" : s).collect(Collectors.joining(" "));

View file

@ -12,6 +12,11 @@ public abstract class CollectionDataSource<DS extends DataStore> extends DataSou
@Singular
private final Map<String, String> preferredProviders;
@Override
public DataSourceType getType() {
return DataSourceType.COLLECTION;
}
public CollectionDataSource<DS> annotate(String file, String provider) {
preferredProviders.put(file, provider);
return this;
@ -22,18 +27,12 @@ public abstract class CollectionDataSource<DS extends DataStore> extends DataSou
return this;
}
@Override
public final DataSourceInfo determineInfo() throws Exception {
try (var con = openReadConnection()) {
var c = (int) con.listEntries().count();
return new DataSourceInfo.Collection(c);
}
}
public final CollectionReadConnection openReadConnection() throws Exception {
var con = newReadConnection();
con.init();
return con;
if (!isComplete()) {
throw new UnsupportedOperationException();
}
return newReadConnection();
}
public final CollectionWriteConnection openWriteConnection(WriteMode mode) throws Exception {
@ -42,7 +41,6 @@ public abstract class CollectionDataSource<DS extends DataStore> extends DataSou
throw new UnsupportedOperationException(mode.getId());
}
con.init();
return con;
}

View file

@ -123,13 +123,6 @@ public abstract class DataSource<DS extends DataStore> extends JacksonizedValue
return Optional.empty();
}
/**
* Determines the data source info.
* This is usually called only once on data source
* creation as this process might be expensive.
*/
public abstract DataSourceInfo determineInfo() throws Exception;
public DataSourceReadConnection openReadConnection() throws Exception {
throw new UnsupportedOperationException();
}
@ -141,4 +134,6 @@ public abstract class DataSource<DS extends DataStore> extends JacksonizedValue
public DS getStore() {
return store;
}
public abstract DataSourceType getType();
}

View file

@ -9,19 +9,17 @@ public abstract class RawDataSource<DS extends DataStore> extends DataSource<DS>
private static final int MAX_BYTES_READ = 100000;
@Override
public final DataSourceInfo determineInfo() throws Exception {
try (var con = openReadConnection()) {
var b = con.readBytes(MAX_BYTES_READ);
int usedCount = b.length == MAX_BYTES_READ ? -1 : b.length;
return new DataSourceInfo.Raw(usedCount);
}
public DataSourceType getType() {
return DataSourceType.RAW;
}
@Override
public final RawReadConnection openReadConnection() throws Exception {
var con = newReadConnection();
con.init();
return con;
if (!isComplete()) {
throw new UnsupportedOperationException();
}
return newReadConnection();
}
@Override
@ -31,7 +29,6 @@ public abstract class RawDataSource<DS extends DataStore> extends DataSource<DS>
throw new UnsupportedOperationException(mode.getId());
}
con.init();
return con;
}

View file

@ -7,6 +7,11 @@ import lombok.experimental.SuperBuilder;
@SuperBuilder
public abstract class StructureDataSource<DS extends DataStore> extends DataSource<DS> {
@Override
public DataSourceType getType() {
return DataSourceType.STRUCTURE;
}
private int countEntries(DataStructureNode n) {
if (n.isValue()) {
return 1;
@ -19,19 +24,12 @@ public abstract class StructureDataSource<DS extends DataStore> extends DataSour
return c;
}
@Override
public final DataSourceInfo determineInfo() throws Exception {
try (var con = openReadConnection()) {
var n = con.read();
var c = countEntries(n);
return new DataSourceInfo.Structure(c);
}
}
public final StructureReadConnection openReadConnection() throws Exception {
var con = newReadConnection();
con.init();
return con;
if (!isComplete()) {
throw new UnsupportedOperationException();
}
return newReadConnection();
}
public final StructureWriteConnection openWriteConnection(WriteMode mode) throws Exception {
@ -40,7 +38,6 @@ public abstract class StructureDataSource<DS extends DataStore> extends DataSour
throw new UnsupportedOperationException(mode.getId());
}
con.init();
return con;
}

View file

@ -3,6 +3,10 @@ package io.xpipe.core.source;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.impl.PreservingTableWriteConnection;
import io.xpipe.core.store.DataStore;
import io.xpipe.core.util.ProxyProvider;
import io.xpipe.core.util.SimpleProxyFunction;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.experimental.SuperBuilder;
import java.util.Optional;
@ -24,34 +28,47 @@ public abstract class TableDataSource<DS extends DataStore> extends DataSource<D
}
@Override
public final DataSourceInfo determineInfo() throws Exception {
if (!getFlow().hasInput() || !getStore().canOpen()) {
return new DataSourceInfo.Table(null, -1);
}
try {
checkComplete();
} catch (Exception e) {
return new DataSourceInfo.Table(null, -1);
}
try (var con = openReadConnection()) {
var dataType = con.getDataType();
var rowCount = con.getRowCount();
return new DataSourceInfo.Table(dataType, rowCount.orElse(-1));
}
public DataSourceType getType() {
return DataSourceType.TABLE;
}
public final TableReadConnection openReadConnection() throws Exception {
try {
checkComplete();
} catch (Exception e) {
if (!isComplete()) {
return TableReadConnection.empty();
}
var con = newReadConnection();
con.init();
return con;
var proxy = ProxyProvider.get().getProxy(this);
if (proxy != null) {
return ProxyProvider.get().createRemoteReadConnection(this, proxy);
}
return newReadConnection();
}
@NoArgsConstructor
private static class CreateMappingFunction extends SimpleProxyFunction<TableMapping> {
private TableDataSource<?> source;
private TupleType type;
public CreateMappingFunction(TableDataSource<?> source, TupleType type) {
this.source = source;
this.type = type;
}
private TableMapping mapping;
@SneakyThrows
public void callLocal() {
try (TableWriteConnection w = source.openWriteConnection(WriteMode.REPLACE)) {
w.init();
mapping = w.createMapping(type).orElse(null);
}
}
}
public final Optional<TableMapping> createMapping(TupleType inputType) throws Exception {
return Optional.ofNullable(new CreateMappingFunction(this, inputType).callAndGet());
}
public final TableWriteConnection openWriteConnection(WriteMode mode) throws Exception {
@ -60,7 +77,11 @@ public abstract class TableDataSource<DS extends DataStore> extends DataSource<D
throw new UnsupportedOperationException(mode.getId());
}
con.init();
var proxy = ProxyProvider.get().getProxy(this);
if (proxy != null) {
return ProxyProvider.get().createRemoteWriteConnection(this, mode, proxy);
}
return con;
}

View file

@ -1,5 +1,6 @@
package io.xpipe.core.source;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.xpipe.core.data.type.TupleType;
import lombok.Getter;
@ -63,6 +64,7 @@ public class TableMapping {
protected final Integer[] columMap;
@JsonCreator
public TableMapping(TupleType inputType, TupleType outputType, Integer[] columMap) {
this.inputType = inputType;
this.outputType = outputType;

View file

@ -4,46 +4,36 @@ import io.xpipe.core.impl.PreservingTextWriteConnection;
import io.xpipe.core.store.DataStore;
import lombok.experimental.SuperBuilder;
import java.util.concurrent.atomic.AtomicInteger;
@SuperBuilder
public abstract class TextDataSource<DS extends DataStore> extends DataSource<DS> {
private static final int MAX_LINE_READ = 1000;
@Override
public final DataSourceInfo determineInfo() throws Exception {
if (!getStore().canOpen()) {
return new DataSourceInfo.Text(-1, -1);
}
try (var con = openReadConnection()) {
AtomicInteger lineCount = new AtomicInteger();
AtomicInteger charCount = new AtomicInteger();
con.lines().limit(MAX_LINE_READ).forEach(s -> {
lineCount.getAndIncrement();
charCount.addAndGet(s.length());
});
boolean limitHit = lineCount.get() == MAX_LINE_READ;
return new DataSourceInfo.Text(limitHit ? -1 : charCount.get(), limitHit ? -1 : lineCount.get());
}
public DataSourceType getType() {
return DataSourceType.TEXT;
}
@Override
public final TextReadConnection openReadConnection() throws Exception {
var con = newReadConnection();
con.init();
return con;
if (!isComplete()) {
throw new UnsupportedOperationException();
}
return newReadConnection();
}
@Override
public final TextWriteConnection openWriteConnection(WriteMode mode) throws Exception {
var con = newWriteConnection(mode);
con.init();
if (con == null) {
throw new UnsupportedOperationException(mode.getId());
}
return con;
}
protected TextWriteConnection newWriteConnection(WriteMode mode) {
protected TextWriteConnection newWriteConnection(WriteMode mode) {
if (mode.equals(WriteMode.PREPEND)) {
return new PreservingTextWriteConnection(this, newWriteConnection(WriteMode.REPLACE), false);
}

View file

@ -1,4 +1,4 @@
package io.xpipe.beacon;
package io.xpipe.core.util;
import com.fasterxml.jackson.core.JacksonException;
import com.fasterxml.jackson.core.JsonGenerator;
@ -9,8 +9,6 @@ import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import io.xpipe.beacon.exchange.NamedFunctionExchange;
import io.xpipe.core.util.JacksonMapper;
import lombok.Getter;
import lombok.SneakyThrows;
@ -18,7 +16,7 @@ import java.io.IOException;
import java.util.Arrays;
@Getter
public abstract class NamedFunction<T> {
public abstract class ProxyFunction {
private static ModuleLayer layer;
@ -26,14 +24,14 @@ public abstract class NamedFunction<T> {
layer = l;
}
public static class Serializer extends StdSerializer<NamedFunction> {
public static class Serializer extends StdSerializer<ProxyFunction> {
protected Serializer() {
super(NamedFunction.class);
super(ProxyFunction.class);
}
@Override
public void serialize(NamedFunction value, JsonGenerator gen, SerializerProvider provider) throws IOException {
public void serialize(ProxyFunction value, JsonGenerator gen, SerializerProvider provider) throws IOException {
var node = (ObjectNode) JacksonMapper.getDefault().valueToTree(value);
node.set("module", new TextNode(value.getClass().getModule().getName()));
node.set("class", new TextNode(value.getClass().getName()));
@ -41,41 +39,36 @@ public abstract class NamedFunction<T> {
}
}
public static class Deserializer extends StdDeserializer<NamedFunction<?>> {
public static class Deserializer extends StdDeserializer<ProxyFunction> {
protected Deserializer() {
super(NamedFunction.class);
super(ProxyFunction.class);
}
@Override
@SneakyThrows
public NamedFunction<?> deserialize(JsonParser p, DeserializationContext ctxt)
public ProxyFunction deserialize(JsonParser p, DeserializationContext ctxt)
throws IOException, JacksonException {
var tree = (ObjectNode) JacksonMapper.getDefault().readTree(p);
var moduleReference = tree.remove("module").asText();
var classReference = tree.remove("class").asText();
var module = layer.findModule(moduleReference).orElseThrow();
var targetClass = Class.forName(module, classReference);
if (targetClass == null) {
throw new IllegalArgumentException("Named function class not found: " + classReference);
}
return (NamedFunction<?>) JacksonMapper.getDefault().treeToValue(tree, targetClass);
return (ProxyFunction) JacksonMapper.getDefault().treeToValue(tree, targetClass);
}
}
@SneakyThrows
public T call() {
var proxyStore = Proxyable.getProxy(getProxyBase());
public ProxyFunction callAndCopy() {
var proxyStore = ProxyProvider.get().getProxy(getProxyBase());
if (proxyStore != null) {
var client = BeaconClient.connectProxy(proxyStore);
client.sendRequest(
NamedFunctionExchange.Request.builder().function(this).build());
NamedFunctionExchange.Response response = client.receiveResponse();
return (T) response.getReturnValue();
return ProxyProvider.get().call(this, proxyStore);
} else {
return callLocal();
callLocal();
return this;
}
}
@ -86,5 +79,5 @@ public abstract class NamedFunction<T> {
return first.get(this);
}
public abstract T callLocal();
public abstract void callLocal();
}

View file

@ -0,0 +1,36 @@
package io.xpipe.core.util;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.source.DataSourceConnection;
import io.xpipe.core.source.DataSourceReadConnection;
import io.xpipe.core.source.WriteMode;
import io.xpipe.core.store.ShellStore;
import java.util.ServiceLoader;
public abstract class ProxyProvider {
private static ProxyProvider INSTANCE;
public static ProxyProvider get() {
if (INSTANCE == null) {
INSTANCE = ServiceLoader.load(ModuleLayer.boot(), ProxyProvider.class)
.findFirst()
.orElseThrow();
}
return INSTANCE;
}
public abstract ShellStore getProxy(Object base);
public abstract boolean isRemote(Object base);
public abstract <T extends DataSourceReadConnection> T createRemoteReadConnection(
DataSource<?> source, ShellStore proxy) throws Exception;
public abstract <T extends DataSourceConnection> T createRemoteWriteConnection(
DataSource<?> source, WriteMode mode, ShellStore proxy) throws Exception;
public abstract ProxyFunction call(ProxyFunction func, ShellStore proxy);
}

View file

@ -0,0 +1,8 @@
package io.xpipe.core.util;
import io.xpipe.core.store.ShellStore;
public interface Proxyable {
ShellStore getProxy();
}

View file

@ -0,0 +1,21 @@
package io.xpipe.core.util;
import lombok.SneakyThrows;
public abstract class SimpleProxyFunction<T> extends ProxyFunction {
@SneakyThrows
public T getResult() {
var fields = getClass().getDeclaredFields();
var last = fields[fields.length - 1];
last.setAccessible(true);
return (T) last.get(this);
}
@SneakyThrows
public T callAndGet() {
var result = callAndCopy();
return ((SimpleProxyFunction<T>) result).getResult();
}
}

View file

@ -25,6 +25,7 @@ open module io.xpipe.core {
uses com.fasterxml.jackson.databind.Module;
uses io.xpipe.core.source.WriteMode;
uses LocalProcessControlProvider;
uses io.xpipe.core.util.ProxyProvider;
provides WriteMode with WriteMode.Replace, WriteMode.Append, WriteMode.Prepend;
provides com.fasterxml.jackson.databind.Module with

View file

@ -1,75 +1,23 @@
package io.xpipe.extension;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import io.xpipe.api.connector.XPipeApiConnection;
import io.xpipe.beacon.exchange.ProxyReadConnectionExchange;
import io.xpipe.core.impl.FileNames;
import io.xpipe.core.impl.InputStreamStore;
import io.xpipe.core.process.ShellProcessControl;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.source.DataSourceReadConnection;
import io.xpipe.core.store.ShellStore;
import io.xpipe.core.util.JacksonMapper;
import io.xpipe.core.util.XPipeInstallation;
import io.xpipe.extension.util.XPipeDaemon;
import lombok.SneakyThrows;
import java.io.IOException;
import java.util.Optional;
import java.util.function.Function;
public class XPipeProxy {
@SneakyThrows
private static DataSource<?> downstreamTransform(DataSource<?> input, ShellStore proxy) {
var proxyNode = JacksonMapper.newMapper().valueToTree(proxy);
var inputNode = JacksonMapper.newMapper().valueToTree(input);
var localNode = JacksonMapper.newMapper().valueToTree(ShellStore.local());
replace(inputNode, node -> node.equals(proxyNode) ? Optional.of(localNode) : Optional.empty());
return JacksonMapper.newMapper().treeToValue(inputNode, DataSource.class);
}
private static JsonNode replace(JsonNode node, Function<JsonNode, Optional<JsonNode>> function) {
var value = function.apply(node);
if (value.isPresent()) {
return value.get();
}
if (!node.isObject()) {
return node;
}
var replacement = JsonNodeFactory.instance.objectNode();
var iterator = node.fields();
while (iterator.hasNext()) {
var stringJsonNodeEntry = iterator.next();
var resolved = function.apply(stringJsonNodeEntry.getValue()).orElse(stringJsonNodeEntry.getValue());
replacement.set(stringJsonNodeEntry.getKey(), resolved);
}
return replacement;
}
public static <T extends DataSourceReadConnection> T remoteReadConnection(DataSource<?> source, ShellStore proxy) {
var downstream = downstreamTransform(source, proxy);
return (T) XPipeApiConnection.execute(con -> {
con.sendRequest(ProxyReadConnectionExchange.Request.builder()
.source(downstream)
.build());
con.receiveResponse();
var inputSource = DataSource.createInternalDataSource(
source.determineInfo().getType(), new InputStreamStore(con.receiveBody()));
return inputSource.openReadConnection();
});
}
public static void checkSupport(ShellStore store) throws Exception {
var version = XPipeDaemon.getInstance().getVersion();
try (ShellProcessControl s = store.create().start()) {
var defaultInstallationExecutable = FileNames.join(
XPipeInstallation.getDefaultInstallationBasePath(s),
XPipeInstallation.getDaemonExecutablePath(s.getOsType()));
if (!s.executeBooleanSimpleCommand(s.getShellType().createFileExistsCommand(defaultInstallationExecutable))) {
if (!s.executeBooleanSimpleCommand(
s.getShellType().createFileExistsCommand(defaultInstallationExecutable))) {
throw new IOException(I18n.get("noInstallationFound"));
}

View file

@ -1,7 +1,7 @@
package io.xpipe.extension;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import io.xpipe.beacon.NamedFunction;
import io.xpipe.core.util.ProxyFunction;
import io.xpipe.core.impl.LocalProcessControlProvider;
import io.xpipe.core.util.JacksonMapper;
import io.xpipe.extension.event.TrackEvent;
@ -36,7 +36,7 @@ public class XPipeServiceProviders {
SupportedApplicationProviders.loadAll(layer);
PrefsProviders.init(layer);
NamedFunction.init(layer);
ProxyFunction.init(layer);
TrackEvent.info("Finished loading extension providers");
}
}

View file

@ -37,6 +37,10 @@ public class TrackEvent {
return builder().type("info").message(message);
}
public static TrackEventBuilder withInfo(String category, String message) {
return builder().category(category).type("info").message(message);
}
public static TrackEventBuilder withWarn(String category, String message) {
return builder().category(category).type("warn").message(message);
}

View file

@ -1,4 +1,4 @@
import io.xpipe.beacon.NamedFunction;
import io.xpipe.core.util.ProxyFunction;
import io.xpipe.extension.DataSourceProvider;
import io.xpipe.extension.DataStoreActionProvider;
import io.xpipe.extension.SupportedApplicationProvider;
@ -41,5 +41,5 @@ open module io.xpipe.extension {
uses XPipeDaemon;
uses io.xpipe.extension.Cache;
uses io.xpipe.extension.DataSourceActionProvider;
uses NamedFunction;
uses ProxyFunction;
}