Various reworks

This commit is contained in:
Christopher Schnick 2022-09-29 14:51:02 +02:00
parent 5459347482
commit f12ce82933
68 changed files with 854 additions and 425 deletions

View file

@ -4,6 +4,7 @@ import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.source.DataSourceInfo;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public interface DataTable extends Iterable<TupleNode>, DataSource {
@ -15,4 +16,14 @@ public interface DataTable extends Iterable<TupleNode>, DataSource {
ArrayNode readAll();
ArrayNode read(int maxRows);
default int countAndDiscard() {
AtomicInteger count = new AtomicInteger();
try (var stream = stream()) {
stream.forEach(dataStructureNodes -> {
count.getAndIncrement();
});
}
return count.get();
}
}

View file

@ -3,7 +3,6 @@ package io.xpipe.api.connector;
import io.xpipe.beacon.*;
import io.xpipe.beacon.exchange.cli.DialogExchange;
import io.xpipe.core.dialog.DialogReference;
import io.xpipe.core.util.JacksonHelper;
import java.util.Optional;
@ -64,10 +63,6 @@ public final class XPipeConnection extends BeaconConnection {
@Override
protected void constructSocket() {
if (!JacksonHelper.isInit()) {
JacksonHelper.initModularized(ModuleLayer.boot());
}
if (!BeaconServer.isRunning()) {
try {
start();
@ -79,13 +74,13 @@ public final class XPipeConnection extends BeaconConnection {
if (r.isEmpty()) {
throw new BeaconException("Wait for xpipe daemon timed out");
} else {
socket = r.get();
beaconClient = r.get();
return;
}
}
try {
socket = new BeaconClient();
beaconClient = new BeaconClient();
} catch (Exception ex) {
throw new BeaconException("Unable to connect to running xpipe daemon", ex);
}

View file

@ -4,6 +4,7 @@ import io.xpipe.api.DataSourceConfig;
import io.xpipe.api.DataTable;
import io.xpipe.api.connector.XPipeConnection;
import io.xpipe.beacon.BeaconConnection;
import io.xpipe.beacon.BeaconException;
import io.xpipe.beacon.exchange.api.QueryTableDataExchange;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.DataStructureNode;
@ -16,10 +17,9 @@ import io.xpipe.core.source.DataSourceInfo;
import io.xpipe.core.source.DataSourceReference;
import io.xpipe.core.source.DataSourceType;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@ -43,8 +43,9 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
}
public Stream<TupleNode> stream() {
var iterator = new TableIterator();
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED), false);
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false).onClose(iterator::finish);
}
@Override
@ -71,54 +72,54 @@ public class DataTableImpl extends DataSourceImpl implements DataTable {
return ArrayNode.of(nodes);
}
private class TableIterator implements Iterator<TupleNode> {
private final BeaconConnection connection;
private final TypedDataStreamParser parser;
private final TypedAbstractReader nodeReader;
private TupleNode node;
{
nodeReader = TypedDataStructureNodeReader.of(info.getDataType());
parser = new TypedDataStreamParser(info.getDataType());
connection = XPipeConnection.open();
var req = QueryTableDataExchange.Request.builder()
.ref(DataSourceReference.id(getId())).maxRows(Integer.MAX_VALUE).build();
connection.sendRequest(req);
connection.receiveResponse();
connection.receiveBody();
}
private void finish() {
connection.close();
}
@Override
public boolean hasNext() {
connection.checkClosed();
try {
node = (TupleNode) parser.parseStructure(connection.getInputStream(), nodeReader);
} catch (IOException e) {
throw new BeaconException(e);
}
if (node == null) {
// finish();
}
return node != null;
}
@Override
public TupleNode next() {
connection.checkClosed();
return node;
}
};
@Override
public Iterator<TupleNode> iterator() {
return new Iterator<>() {
private final BeaconConnection connection;
private final TypedDataStreamParser parser;
private final TypedAbstractReader nodeReader;
{
nodeReader = TypedDataStructureNodeReader.of(info.getDataType());
parser = new TypedDataStreamParser(info.getDataType());
connection = XPipeConnection.open();
var req = QueryTableDataExchange.Request.builder()
.ref(DataSourceReference.id(getId())).build();
connection.sendRequest(req);
connection.receiveResponse();
connection.receiveBody();
}
private void finish() {
connection.close();
}
@Override
public boolean hasNext() {
connection.checkClosed();
AtomicBoolean hasNext = new AtomicBoolean(false);
connection.withInputStream(in -> {
hasNext.set(parser.hasNext(in));
});
if (!hasNext.get()) {
finish();
}
return hasNext.get();
}
@Override
public TupleNode next() {
connection.checkClosed();
AtomicReference<TupleNode> current = new AtomicReference<>();
connection.withInputStream(in -> {
current.set((TupleNode) parser.parseStructure(connection.getInputStream(), nodeReader));
});
return current.get();
}
};
return new TableIterator();
}
}

View file

@ -1,7 +1,7 @@
package io.xpipe.beacon;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
@ -11,12 +11,11 @@ import io.xpipe.beacon.exchange.data.ClientErrorMessage;
import io.xpipe.beacon.exchange.data.ServerErrorMessage;
import io.xpipe.core.util.JacksonHelper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Optional;
@ -50,21 +49,28 @@ public class BeaconClient implements AutoCloseable {
}
}
private final Socket socket;
private final Closeable closeable;
private final InputStream in;
private final OutputStream out;
public BeaconClient() throws IOException {
socket = new Socket(InetAddress.getLoopbackAddress(), BeaconConfig.getUsedPort());
var socket = new Socket(InetAddress.getLoopbackAddress(), BeaconConfig.getUsedPort());
closeable = socket;
in = socket.getInputStream();
out = socket.getOutputStream();
}
public BeaconClient(Closeable closeable, InputStream in, OutputStream out) {
this.closeable = closeable;
this.in = in;
this.out = out;
}
public void close() throws ConnectorException {
try {
socket.close();
closeable.close();
} catch (IOException ex) {
throw new ConnectorException("Couldn't close socket", ex);
throw new ConnectorException("Couldn't close client", ex);
}
}
@ -74,7 +80,7 @@ public class BeaconClient implements AutoCloseable {
if (sep.length != 0 && !Arrays.equals(BODY_SEPARATOR, sep)) {
throw new ConnectorException("Invalid body separator");
}
return BeaconFormat.readBlocks(socket);
return BeaconFormat.readBlocks(in);
} catch (IOException ex) {
throw new ConnectorException(ex);
}
@ -83,13 +89,13 @@ public class BeaconClient implements AutoCloseable {
public OutputStream sendBody() throws ConnectorException {
try {
out.write(BODY_SEPARATOR);
return BeaconFormat.writeBlocks(socket);
return BeaconFormat.writeBlocks(out);
} catch (IOException ex) {
throw new ConnectorException(ex);
}
}
public <T extends RequestMessage> void sendRequest(T req) throws ClientException, ConnectorException {
public <T extends RequestMessage> void sendRequest(T req) throws ClientException, ConnectorException {
ObjectNode json = JacksonHelper.newMapper().valueToTree(req);
var prov = MessageExchanges.byRequest(req);
if (prov.isEmpty()) {
@ -106,25 +112,31 @@ public class BeaconClient implements AutoCloseable {
System.out.println("Sending request to server of type " + req.getClass().getName());
}
if (BeaconConfig.printMessages()) {
System.out.println("Sending raw request:");
System.out.println(msg.toPrettyString());
var writer = new StringWriter();
var mapper = JacksonHelper.newMapper();
try (JsonGenerator g = mapper.createGenerator(writer).setPrettyPrinter(new DefaultPrettyPrinter())) {
g.writeTree(msg);
} catch (IOException ex) {
throw new ConnectorException("Couldn't serialize request", ex);
}
try {
var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
var gen = mapper.createGenerator(socket.getOutputStream());
gen.writeTree(msg);
var content = writer.toString();
if (BeaconConfig.printMessages()) {
System.out.println("Sending raw request:");
System.out.println(content);
}
try (OutputStream blockOut = BeaconFormat.writeBlocks(out)) {
blockOut.write(content.getBytes(StandardCharsets.UTF_8));
} catch (IOException ex) {
throw new ConnectorException("Couldn't write to socket", ex);
}
}
public <T extends ResponseMessage> T receiveResponse() throws ConnectorException, ClientException, ServerException {
JsonNode read;
try {
var in = socket.getInputStream();
read = JacksonHelper.newMapper().disable(JsonParser.Feature.AUTO_CLOSE_SOURCE).readTree(in);
JsonNode node;
try (InputStream blockIn = BeaconFormat.readBlocks(in)) {
node = JacksonHelper.newMapper().readTree(blockIn);
} catch (SocketException ex) {
throw new ConnectorException("Connection to xpipe daemon closed unexpectedly", ex);
} catch (IOException ex) {
@ -133,24 +145,24 @@ public class BeaconClient implements AutoCloseable {
if (BeaconConfig.printMessages()) {
System.out.println("Received response:");
System.out.println(read.toPrettyString());
System.out.println(node.toPrettyString());
}
if (read.isMissingNode()) {
if (node.isMissingNode()) {
throw new ConnectorException("Received unexpected EOF");
}
var se = parseServerError(read);
var se = parseServerError(node);
if (se.isPresent()) {
se.get().throwError();
}
var ce = parseClientError(read);
var ce = parseClientError(node);
if (ce.isPresent()) {
throw ce.get().throwException();
}
return parseResponse(read);
return parseResponse(node);
}
private Optional<ClientErrorMessage> parseClientError(JsonNode node) throws ConnectorException {
@ -206,4 +218,12 @@ public class BeaconClient implements AutoCloseable {
throw new ConnectorException("Couldn't parse response", ex);
}
}
public InputStream getRawInputStream() {
return in;
}
public OutputStream getRawOutputStream() {
return out;
}
}

View file

@ -6,22 +6,27 @@ import java.io.OutputStream;
public abstract class BeaconConnection implements AutoCloseable {
protected BeaconClient socket;
protected BeaconClient beaconClient;
private InputStream bodyInput;
private OutputStream bodyOutput;
protected abstract void constructSocket();
public BeaconClient getBeaconClient() {
return beaconClient;
}
@Override
public void close() {
try {
if (socket != null) {
socket.close();
if (beaconClient != null) {
beaconClient.close();
}
socket = null;
beaconClient = null;
} catch (Exception e) {
socket = null;
beaconClient = null;
throw new BeaconException("Could not close beacon connection", e);
}
}
@ -43,7 +48,7 @@ public abstract class BeaconConnection implements AutoCloseable {
}
public void checkClosed() {
if (socket == null) {
if (beaconClient == null) {
throw new BeaconException("Socket is closed");
}
}
@ -70,7 +75,8 @@ public abstract class BeaconConnection implements AutoCloseable {
public <REQ extends RequestMessage, RES extends ResponseMessage> void performInputExchange(
REQ req,
BeaconClient.FailableBiConsumer<RES, InputStream, Exception> responseConsumer) {
BeaconClient.FailableBiConsumer<RES, InputStream, Exception> responseConsumer
) {
checkClosed();
performInputOutputExchange(req, null, responseConsumer);
@ -79,33 +85,35 @@ public abstract class BeaconConnection implements AutoCloseable {
public <REQ extends RequestMessage, RES extends ResponseMessage> void performInputOutputExchange(
REQ req,
BeaconClient.FailableConsumer<OutputStream, IOException> reqWriter,
BeaconClient.FailableBiConsumer<RES, InputStream, Exception> responseConsumer) {
BeaconClient.FailableBiConsumer<RES, InputStream, Exception> responseConsumer
) {
checkClosed();
try {
socket.sendRequest(req);
beaconClient.sendRequest(req);
if (reqWriter != null) {
try (var out = socket.sendBody()) {
try (var out = sendBody()) {
reqWriter.accept(out);
}
}
RES res = socket.receiveResponse();
try (var in = socket.receiveBody()) {
RES res = beaconClient.receiveResponse();
try (var in = receiveBody()) {
responseConsumer.accept(res, in);
}
} catch (Exception e) {
throw new BeaconException("Could not communicate with beacon", e);
throw unwrapException(e);
}
}
public <REQ extends RequestMessage> void sendRequest(
REQ req) {
REQ req
) {
checkClosed();
try {
socket.sendRequest(req);
beaconClient.sendRequest(req);
} catch (Exception e) {
throw new BeaconException("Could not communicate with beacon", e);
throw unwrapException(e);
}
}
@ -113,9 +121,9 @@ public abstract class BeaconConnection implements AutoCloseable {
checkClosed();
try {
return socket.receiveResponse();
return beaconClient.receiveResponse();
} catch (Exception e) {
throw new BeaconException("Could not communicate with beacon", e);
throw unwrapException(e);
}
}
@ -123,10 +131,10 @@ public abstract class BeaconConnection implements AutoCloseable {
checkClosed();
try {
bodyOutput = socket.sendBody();
bodyOutput = beaconClient.sendBody();
return bodyOutput;
} catch (Exception e) {
throw new BeaconException("Could not communicate with beacon", e);
throw unwrapException(e);
}
}
@ -134,38 +142,57 @@ public abstract class BeaconConnection implements AutoCloseable {
checkClosed();
try {
bodyInput = socket.receiveBody();
bodyInput = beaconClient.receiveBody();
return bodyInput;
} catch (Exception e) {
throw new BeaconException("Could not communicate with beacon", e);
throw unwrapException(e);
}
}
public <REQ extends RequestMessage, RES extends ResponseMessage> RES performOutputExchange(
REQ req,
BeaconClient.FailableConsumer<OutputStream, Exception> reqWriter) {
BeaconClient.FailableConsumer<OutputStream, Exception> reqWriter
) {
checkClosed();
try {
socket.sendRequest(req);
try (var out = socket.sendBody()) {
beaconClient.sendRequest(req);
try (var out = sendBody()) {
reqWriter.accept(out);
}
return socket.receiveResponse();
return beaconClient.receiveResponse();
} catch (Exception e) {
throw new BeaconException("Could not communicate with beacon", e);
throw unwrapException(e);
}
}
public <REQ extends RequestMessage, RES extends ResponseMessage> RES performSimpleExchange(
REQ req) {
REQ req
) {
checkClosed();
try {
socket.sendRequest(req);
return socket.receiveResponse();
beaconClient.sendRequest(req);
return beaconClient.receiveResponse();
} catch (Exception e) {
throw new BeaconException("Could not communicate with beacon", e);
throw unwrapException(e);
}
}
private BeaconException unwrapException(Exception exception) {
if (exception instanceof ServerException s) {
return new BeaconException("And internal server error occurred", s.getCause());
}
if (exception instanceof ClientException s) {
return new BeaconException("A client error occurred", s.getCause());
}
if (exception instanceof ConnectorException s) {
return new BeaconException("A beacon connection error occurred", s.getCause());
}
return new BeaconException("An unexpected error occurred", exception);
}
}

View file

@ -5,7 +5,6 @@ import lombok.experimental.UtilityClass;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
@UtilityClass
@ -13,8 +12,7 @@ public class BeaconFormat {
private static final int SEGMENT_SIZE = 65536;
public static OutputStream writeBlocks(Socket socket) throws IOException {
var out = socket.getOutputStream();
public static OutputStream writeBlocks(OutputStream out) throws IOException {
return new OutputStream() {
private final byte[] currentBytes = new byte[SEGMENT_SIZE];
private int index;
@ -23,6 +21,7 @@ public class BeaconFormat {
public void close() throws IOException {
finishBlock();
out.flush();
index = -1;
}
@Override
@ -49,8 +48,7 @@ public class BeaconFormat {
};
}
public static InputStream readBlocks(Socket socket) throws IOException {
var in = socket.getInputStream();
public static InputStream readBlocks(InputStream in) throws IOException {
return new InputStream() {
private byte[] currentBytes;
@ -60,7 +58,9 @@ public class BeaconFormat {
@Override
public int read() throws IOException {
if ((currentBytes == null || index == currentBytes.length) && !lastBlock) {
readBlock();
if (!readBlock()) {
return -1;
}
}
if (currentBytes != null && index == currentBytes.length && lastBlock) {
@ -72,8 +72,12 @@ public class BeaconFormat {
return out;
}
private void readBlock() throws IOException {
private boolean readBlock() throws IOException {
var length = in.readNBytes(4);
if (length.length < 4) {
return false;
}
var lengthInt = ByteBuffer.wrap(length).getInt();
if (BeaconConfig.printMessages()) {
@ -85,6 +89,7 @@ public class BeaconFormat {
if (lengthInt < SEGMENT_SIZE) {
lastBlock = true;
}
return true;
}
};
}

View file

@ -0,0 +1,17 @@
package io.xpipe.beacon;
import io.xpipe.core.store.ShellStore;
import lombok.Value;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Value
public class XPipeInstance {
UUID uuid;
String name;
Map<ShellStore, XPipeInstance> adjacent;
List<XPipeInstance> reachable;
}

View file

@ -0,0 +1,33 @@
package io.xpipe.beacon.exchange.cli;
import io.xpipe.beacon.RequestMessage;
import io.xpipe.beacon.ResponseMessage;
import io.xpipe.beacon.XPipeInstance;
import io.xpipe.beacon.exchange.MessageExchange;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
public class InstanceExchange implements MessageExchange {
@Override
public String getId() {
return "instance";
}
@Jacksonized
@Builder
@Value
public static class Request implements RequestMessage {
}
@Jacksonized
@Builder
@Value
public static class Response implements ResponseMessage {
@NonNull
XPipeInstance instance;
}
}

View file

@ -23,6 +23,7 @@ module io.xpipe.beacon {
uses MessageExchange;
provides io.xpipe.beacon.exchange.MessageExchange with
ForwardExchange,
InstanceExchange,
EditStoreExchange,
AddSourceExchange,
StoreProviderListExchange,

View file

@ -9,6 +9,9 @@ apply from: "$rootDir/deps/java.gradle"
apply from: "$rootDir/deps/lombok.gradle"
apply from: "$rootDir/deps/junit.gradle"
compileJava {
options.compilerArgs << '-parameters'
}
dependencies{
api group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "2.13.0"

View file

@ -74,9 +74,9 @@ public class GenericArrayReader implements GenericAbstractReader {
}
@Override
public void onArrayEnd() {
public void onArrayEnd(Map<Integer, String> metaAttributes) {
if (hasReader()) {
currentReader.onArrayEnd();
currentReader.onArrayEnd(metaAttributes);
if (currentReader.isDone()) {
put(currentReader.create());
currentReader = null;
@ -114,9 +114,9 @@ public class GenericArrayReader implements GenericAbstractReader {
}
@Override
public void onTupleEnd() {
public void onTupleEnd(Map<Integer, String> metaAttributes) {
if (hasReader()) {
currentReader.onTupleEnd();
currentReader.onTupleEnd(metaAttributes);
if (currentReader.isDone()) {
put(currentReader.create());
currentReader = null;

View file

@ -10,13 +10,13 @@ public interface GenericDataStreamCallback {
default void onArrayStart(int length) {
}
default void onArrayEnd() {
default void onArrayEnd(Map<Integer, String> metaAttributes) {
}
default void onTupleStart(int length) {
}
default void onTupleEnd() {
default void onTupleEnd(Map<Integer, String> metaAttributes) {
}

View file

@ -67,7 +67,7 @@ public class GenericDataStreamParser {
parse(in, cb);
}
var attributes = DataStructureNodeIO.parseAttributes(in);
cb.onTupleEnd();
cb.onTupleEnd(attributes);
}
private static void parseArray(InputStream in, GenericDataStreamCallback cb) throws IOException {
@ -77,7 +77,7 @@ public class GenericDataStreamParser {
parse(in, cb);
}
var attributes = DataStructureNodeIO.parseAttributes(in);
cb.onArrayEnd();
cb.onArrayEnd(attributes);
}
private static void parseValue(InputStream in, GenericDataStreamCallback cb) throws IOException {

View file

@ -52,8 +52,8 @@ public class GenericDataStreamWriter {
private static void writeValue(OutputStream out, ValueNode n) throws IOException {
out.write(DataStructureNodeIO.GENERIC_VALUE_ID);
DataStructureNodeIO.writeShort(out, n.getRawData().length);
out.write(n.getRawData());
var length = DataStructureNodeIO.writeShort(out, n.getRawData().length);
out.write(n.getRawData(), 0, length);
DataStructureNodeIO.writeAttributes(out, n);
}
}

View file

@ -44,12 +44,12 @@ public class GenericDataStructureNodeReader implements GenericDataStreamCallback
}
@Override
public void onArrayEnd() {
public void onArrayEnd(Map<Integer, String> metaAttributes) {
if (!hasReader()) {
throw new IllegalStateException("No array to close");
}
reader.onArrayEnd();
reader.onArrayEnd(metaAttributes);
if (reader.isDone()) {
node = reader.create();
reader = null;
@ -67,12 +67,12 @@ public class GenericDataStructureNodeReader implements GenericDataStreamCallback
}
@Override
public void onTupleEnd() {
public void onTupleEnd(Map<Integer, String> metaAttributes) {
if (!hasReader()) {
throw new IllegalStateException("No tuple to close");
}
reader.onTupleEnd();
reader.onTupleEnd(metaAttributes);
if (reader.isDone()) {
node = reader.create();
reader = null;

View file

@ -89,9 +89,9 @@ public class GenericTupleReader implements GenericAbstractReader {
}
@Override
public void onArrayEnd() {
public void onArrayEnd(Map<Integer, String> metaAttributes) {
if (hasReader()) {
currentReader.onArrayEnd();
currentReader.onArrayEnd(metaAttributes);
if (currentReader.isDone()) {
putNode(currentReader.create());
currentReader = null;
@ -118,9 +118,9 @@ public class GenericTupleReader implements GenericAbstractReader {
}
@Override
public void onTupleEnd() {
public void onTupleEnd(Map<Integer, String> metaAttributes) {
if (hasReader()) {
currentReader.onTupleEnd();
currentReader.onTupleEnd(metaAttributes);
if (currentReader.isDone()) {
putNode(currentReader.create());
currentReader = null;

View file

@ -12,18 +12,26 @@ public abstract class DataStructureNode implements Iterable<DataStructureNode> {
public static final Integer KEY_TABLE_NAME = 1;
public static final Integer KEY_ROW_NAME = 2;
public static final Integer BOOLEAN_TRUE = 3;
public static final Integer BOOLEAN_VALUE = 4;
public static final Integer IS_BOOLEAN = 4;
public static final Integer BOOLEAN_FALSE = 5;
public static final Integer INTEGER_VALUE = 6;
public static final Integer NULL_VALUE = 7;
public static final Integer IS_NUMBER = 8;
public static final Integer IS_NULL = 7;
public static final Integer IS_INTEGER = 9;
public static final Integer IS_FLOATING_POINT = 10;
public static final Integer FLOATING_POINT_VALUE = 11;
public static final Integer TEXT = 12;
public static final Integer IS_TEXT = 12;
public static final Integer IS_INSTANT = 13;
public static final Integer IS_BINARY = 14;
private Map<Integer, String> metaAttributes;
public void clearMetaAttributes() {
metaAttributes = null;
if (isTuple() || isArray()) {
getNodes().forEach(dataStructureNode -> dataStructureNode.clearMetaAttributes());
}
}
public Map<Integer, String> getMetaAttributes() {
return metaAttributes != null ? Collections.unmodifiableMap(metaAttributes) : null;
}
@ -138,12 +146,6 @@ public abstract class DataStructureNode implements Iterable<DataStructureNode> {
public boolean isValue() {
return false;
}
public boolean isNull() {
return false;
}
public DataStructureNode set(int index, DataStructureNode node) {
throw unsupported("set at index");
}

View file

@ -22,18 +22,25 @@ public class DataStructureNodeIO {
public static final int TYPED_ARRAY_ID = 7;
public static final int TYPED_VALUE_ID = 8;
public static void writeShort(OutputStream out, int value) throws IOException {
public static int writeShort(OutputStream out, int value) throws IOException {
if (value > Short.MAX_VALUE) {
value = Short.MAX_VALUE;
}
var buffer = ByteBuffer.allocate(2);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.putShort((short) value);
out.write(buffer.array());
return value;
}
public static void writeString(OutputStream out, String s) throws IOException {
if (s != null) {
var b = s.getBytes(StandardCharsets.UTF_8);
DataStructureNodeIO.writeShort(out, b.length);
out.write(b);
var length = DataStructureNodeIO.writeShort(out, b.length);
out.write(b, 0, length);
} else {
writeShort(out, -1);
}
}
@ -50,6 +57,10 @@ public class DataStructureNodeIO {
public static String parseString(InputStream in) throws IOException {
var nameLength = parseShort(in);
if (nameLength == -1) {
return null;
}
var name = new String(in.readNBytes(nameLength), StandardCharsets.UTF_8);
return name;
}
@ -74,13 +85,11 @@ public class DataStructureNodeIO {
writeShort(out, s.getMetaAttributes().size());
for (Map.Entry<Integer, String> entry : s.getMetaAttributes().entrySet()) {
Integer integer = entry.getKey();
var value = entry.getValue().getBytes(StandardCharsets.UTF_8);
writeShort(out, integer);
writeShort(out, value.length);
out.write(value);
writeString(out, entry.getValue());
}
} else {
out.write(0);
writeShort(out, 0);
}
}
}

View file

@ -18,7 +18,7 @@ public class SimpleImmutableValueNode extends ValueNode {
@Override
public final String asString() {
if (getRawData().length == 0 && !hasMetaAttribute(TEXT)) {
if (getRawData().length == 0 && !hasMetaAttribute(IS_TEXT)) {
return "null";
}
@ -27,8 +27,8 @@ public class SimpleImmutableValueNode extends ValueNode {
@Override
public String toString(int indent) {
var string = getRawData().length == 0 && !hasMetaAttribute(TEXT) ? "<null>" : new String(getRawData(), StandardCharsets.UTF_8);
return (hasMetaAttribute(TEXT) ? "\"" : "") + string + (hasMetaAttribute(TEXT) ? "\"" : "") + " " + metaToString();
var string = getRawData().length == 0 && !hasMetaAttribute(IS_TEXT) ? "<null>" : new String(getRawData(), StandardCharsets.UTF_8);
return (hasMetaAttribute(IS_TEXT) ? "\"" : "") + string + (hasMetaAttribute(IS_TEXT) ? "\"" : "") + " " + metaToString();
}
@Override

View file

@ -20,6 +20,7 @@ public abstract class TupleNode extends DataStructureNode {
return new SimpleTupleNode(null, nodes);
}
@SuppressWarnings("unchecked")
public static TupleNode of(List<String> names, List<? extends DataStructureNode> nodes) {
if (names == null) {
throw new IllegalArgumentException("Names must be not null");

View file

@ -3,6 +3,8 @@ package io.xpipe.core.data.node;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.type.ValueType;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
@ -34,13 +36,40 @@ public abstract class ValueNode extends DataStructureNode {
}
public static ValueNode nullValue() {
return new SimpleImmutableValueNode(new byte[0]).tag(NULL_VALUE).asValue();
return new SimpleImmutableValueNode(new byte[0]).tag(IS_NULL).asValue();
}
public static ValueNode of(byte[] data) {
if (data == null) {
return nullValue();
}
return new SimpleImmutableValueNode(data);
}
public static ValueNode ofBytes(byte[] data) {
var created = of(data);
created.tag(IS_BINARY);
return created;
}
public static ValueNode ofInteger(BigInteger integer) {
var created = of(integer);
created.tag(IS_INTEGER);
return created;
}
public static ValueNode ofDecimal(BigDecimal decimal) {
var created = of(decimal);
created.tag(IS_FLOATING_POINT);
return created;
}
public static ValueNode ofBoolean(Boolean bool) {
var created = of(bool);
created.tag(IS_BOOLEAN);
return created;
}
public static ValueNode of(Object o) {
if (o == null) {
return nullValue();

View file

@ -3,9 +3,11 @@ package io.xpipe.core.data.typed;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.type.TupleType;
import java.util.Map;
public interface TypedDataStreamCallback {
default void onValue(byte[] data) {
default void onValue(byte[] data, Map<Integer, String> metaAttributes) {
}
default void onGenericNode(DataStructureNode node) {
@ -14,13 +16,13 @@ public interface TypedDataStreamCallback {
default void onTupleBegin(TupleType type) {
}
default void onTupleEnd() {
default void onTupleEnd(Map<Integer, String> metaAttributes) {
}
default void onArrayBegin(int size) {
}
default void onArrayEnd() {
default void onArrayEnd(Map<Integer, String> metaAttributes) {
}
default void onNodeBegin() {

View file

@ -21,7 +21,7 @@ public class TypedDataStreamParser {
this.dataType = dataType;
}
public boolean hasNext(InputStream in) throws IOException {
private boolean hasNext(InputStream in) throws IOException {
var b = in.read();
if (b == -1) {
return false;
@ -109,7 +109,8 @@ public class TypedDataStreamParser {
for (int i = 0; i < type.getSize(); i++) {
parse(in, cb, type.getTypes().get(i));
}
cb.onTupleEnd();
var attributes = DataStructureNodeIO.parseAttributes(in);
cb.onTupleEnd(attributes);
}
private GenericDataStructureNodeReader getGenericReader() {
@ -125,12 +126,14 @@ public class TypedDataStreamParser {
for (int i = 0; i < size; i++) {
parse(in, cb, type.getSharedType());
}
cb.onArrayEnd();
var attributes = DataStructureNodeIO.parseAttributes(in);
cb.onArrayEnd(attributes);
}
private void parseValue(InputStream in, TypedDataStreamCallback cb) throws IOException {
var size = DataStructureNodeIO.parseShort(in);
var data = in.readNBytes(size);
cb.onValue(data);
var attributes = DataStructureNodeIO.parseAttributes(in);
cb.onValue(data, attributes);
}
}

View file

@ -32,8 +32,9 @@ public class TypedDataStreamWriter {
private static void writeValue(OutputStream out, ValueNode n) throws IOException {
out.write(DataStructureNodeIO.TYPED_VALUE_ID);
DataStructureNodeIO.writeShort(out, n.getRawData().length);
out.write(n.getRawData());
var length = DataStructureNodeIO.writeShort(out, n.getRawData().length);
out.write(n.getRawData(), 0, length);
DataStructureNodeIO.writeAttributes(out, n);
}
private static void writeTuple(OutputStream out, TupleNode tuple, TupleType type) throws IOException {
@ -45,6 +46,7 @@ public class TypedDataStreamWriter {
for (int i = 0; i < tuple.size(); i++) {
write(out, tuple.at(i), type.getTypes().get(i));
}
DataStructureNodeIO.writeAttributes(out, tuple);
}
private static void writeArray(OutputStream out, ArrayNode array, ArrayType type) throws IOException {
@ -53,5 +55,6 @@ public class TypedDataStreamWriter {
for (int i = 0; i < array.size(); i++) {
write(out, array.at(i), type.getSharedType());
}
DataStructureNodeIO.writeAttributes(out, array);
}
}

View file

@ -7,6 +7,7 @@ import io.xpipe.core.data.type.TupleType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Stack;
public class TypedDataStructureNodeReader implements TypedAbstractReader {
@ -74,12 +75,12 @@ public class TypedDataStructureNodeReader implements TypedAbstractReader {
}
@Override
public void onValue(byte[] data) {
public void onValue(byte[] data, Map<Integer, String> metaAttributes) {
if (!expectedType.isValue()) {
throw new IllegalStateException("Expected " + expectedType.getName() + " but got value");
}
var val = ValueNode.of(data);
var val = ValueNode.of(data).tag(metaAttributes);
finishNode(val);
moveExpectedType(false);
}
@ -115,14 +116,14 @@ public class TypedDataStructureNodeReader implements TypedAbstractReader {
}
@Override
public void onTupleEnd() {
public void onTupleEnd(Map<Integer, String> metaAttributes) {
children.pop();
var popped = nodes.pop();
if (!popped.isTuple()) {
throw new IllegalStateException("No tuple to end");
}
TupleNode tuple = popped.asTuple();
TupleNode tuple = popped.tag(metaAttributes).asTuple();
if (tuple.getKeyNames().size() != tuple.getNodes().size()) {
throw new IllegalStateException("Tuple node size mismatch");
}
@ -154,7 +155,7 @@ public class TypedDataStructureNodeReader implements TypedAbstractReader {
}
@Override
public void onArrayEnd() {
public void onArrayEnd(Map<Integer, String> metaAttributes) {
if (!isInArray()) {
throw new IllegalStateException("No array to end");
}
@ -163,7 +164,7 @@ public class TypedDataStructureNodeReader implements TypedAbstractReader {
moveExpectedType(true);
children.pop();
var popped = nodes.pop();
var popped = nodes.pop().tag(metaAttributes);
finishNode(popped);
}
}

View file

@ -1,6 +1,6 @@
package io.xpipe.core.dialog;
import io.xpipe.core.util.Secret;
import io.xpipe.core.util.SecretValue;
import java.util.*;
import java.util.function.Consumer;
@ -164,7 +164,7 @@ public abstract class Dialog {
/**
* A special wrapper for secret values of {@link #query(String, boolean, boolean, boolean, Object, QueryConverter)}.
*/
public static Dialog.Query querySecret(String description, boolean newLine, boolean required, Secret value) {
public static Dialog.Query querySecret(String description, boolean newLine, boolean required, SecretValue value) {
var q = new Dialog.Query(description, newLine, required, false, value, QueryConverter.SECRET, true);
q.evaluateTo(q::getConvertedValue);
return q;

View file

@ -2,7 +2,7 @@ package io.xpipe.core.dialog;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.charsetter.StreamCharset;
import io.xpipe.core.util.Secret;
import io.xpipe.core.util.SecretValue;
import java.net.URI;
import java.net.URISyntaxException;
@ -48,14 +48,14 @@ public abstract class QueryConverter<T> {
}
};
public static final QueryConverter<Secret> SECRET = new QueryConverter<Secret>() {
public static final QueryConverter<SecretValue> SECRET = new QueryConverter<SecretValue>() {
@Override
protected Secret fromString(String s) {
return new Secret(s);
protected SecretValue fromString(String s) {
return new SecretValue(s);
}
@Override
protected String toString(Secret value) {
protected String toString(SecretValue value) {
return value.getEncryptedValue();
}
};

View file

@ -0,0 +1,48 @@
package io.xpipe.core.impl;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.source.TableWriteConnection;
import java.util.ArrayList;
import java.util.List;
public abstract class BatchTableWriteConnection implements TableWriteConnection {
public static final int BATCH_SIZE = 2000;
private final List<DataStructureNode> batch = new ArrayList<>();
@Override
public final DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor() {
return node -> {
if (batch.size() < BATCH_SIZE) {
batch.add(node);
return true;
}
var array = ArrayNode.of(batch);
var returned = writeBatchLinesAcceptor().accept(array);
batch.clear();
return returned;
};
}
@Override
public final void close() throws Exception {
if (batch.size() > 0) {
var array = ArrayNode.of(batch);
var returned = writeBatchLinesAcceptor().accept(array);
batch.clear();
}
onClose();
}
protected abstract void onClose() throws Exception;
protected abstract DataStructureNodeAcceptor<ArrayNode> writeBatchLinesAcceptor();
}

View file

@ -0,0 +1,67 @@
package io.xpipe.core.impl;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.source.RawDataSource;
import io.xpipe.core.source.RawReadConnection;
import io.xpipe.core.source.RawWriteConnection;
import io.xpipe.core.store.StreamDataStore;
import lombok.experimental.SuperBuilder;
import java.io.InputStream;
import java.io.OutputStream;
@JsonTypeName("binary")
@SuperBuilder
public class BinarySource extends RawDataSource<StreamDataStore> {
@Override
protected RawWriteConnection newWriteConnection() {
return new RawWriteConnection() {
private OutputStream out;
@Override
public void init() throws Exception {
out = getStore().openOutput();
}
@Override
public void close() throws Exception {
out.close();
}
@Override
public void write(byte[] bytes) throws Exception {
out.write(bytes);
}
};
}
@Override
protected RawReadConnection newReadConnection() {
return new RawReadConnection() {
private InputStream inputStream;
@Override
public void init() throws Exception {
if (inputStream != null) {
throw new IllegalStateException("Already initialized");
}
inputStream = getStore().openInput();
}
@Override
public void close() throws Exception {
inputStream.close();
}
@Override
public byte[] readBytes(int max) throws Exception {
return inputStream.readNBytes(max);
}
};
}
}

View file

@ -0,0 +1,59 @@
package io.xpipe.core.impl;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.source.TableReadConnection;
import java.util.OptionalInt;
import java.util.concurrent.atomic.AtomicInteger;
public class LimitTableReadConnection implements TableReadConnection {
private final TableReadConnection connection;
private final int maxCount;
private int count = 0;
public LimitTableReadConnection(TableReadConnection connection, int maxCount) {
this.connection = connection;
this.maxCount = maxCount;
}
@Override
public void init() throws Exception {
connection.init();
}
@Override
public void close() throws Exception {
connection.close();
}
@Override
public TupleType getDataType() {
return connection.getDataType();
}
@Override
public OptionalInt getRowCount() throws Exception {
return connection.getRowCount();
}
@Override
public int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
AtomicInteger localCounter = new AtomicInteger();
connection.withRows(node -> {
if (count == maxCount) {
return false;
}
count++;
var returned = lineAcceptor
.accept(node);
localCounter.getAndIncrement();
return returned;
});
return localCounter.get();
}
}

View file

@ -1,37 +1,34 @@
package io.xpipe.core.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.charsetter.Charsettable;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.charsetter.StreamCharset;
import io.xpipe.core.source.TextDataSource;
import io.xpipe.core.store.StreamDataStore;
import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@Value
@EqualsAndHashCode(callSuper = true)
public class TextSource extends TextDataSource<StreamDataStore> implements Charsettable {
@Getter
@JsonTypeName("text")
@SuperBuilder
@Jacksonized
public final class TextSource extends TextDataSource<StreamDataStore> implements Charsettable {
StreamCharset charset;
NewLine newLine;
public TextSource(StreamDataStore store){
this(store, StreamCharset.UTF8, NewLine.LF);
}
@JsonCreator
public TextSource(StreamDataStore store, StreamCharset charset, NewLine newLine) {
super(store);
this.charset = charset;
this.newLine = newLine;
}
private final StreamCharset charset;
private final NewLine newLine;
@Override
protected io.xpipe.core.source.TextWriteConnection newWriteConnection() {
return new TextWriteConnection(this);
}
@Override
protected io.xpipe.core.source.TextWriteConnection newPrependingWriteConnection() {
return new PreservingTextWriteConnection(this, newWriteConnection(), false);
}
@Override
protected io.xpipe.core.source.TextWriteConnection newAppendingWriteConnection() {
return new PreservingTextWriteConnection(this, newWriteConnection(), true);

View file

@ -0,0 +1,23 @@
package io.xpipe.core.impl;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.source.StructureDataSource;
import io.xpipe.core.source.StructureReadConnection;
import io.xpipe.core.source.StructureWriteConnection;
import io.xpipe.core.store.StreamDataStore;
import lombok.experimental.SuperBuilder;
@JsonTypeName("xpbs")
@SuperBuilder
public class XpbsSource extends StructureDataSource<StreamDataStore> {
@Override
protected StructureWriteConnection newWriteConnection() {
return null;
}
@Override
protected StructureReadConnection newReadConnection() {
return null;
}
}

View file

@ -1,5 +1,6 @@
package io.xpipe.core.impl;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
@ -33,10 +34,11 @@ public class XpbtReadConnection implements TableReadConnection {
}
var headerLength = header.getBytes(StandardCharsets.UTF_8).length;
this.inputStream.skip(headerLength);
this.inputStream.skip(headerLength + 1);
List<String> names = JacksonHelper.newMapper()
.disable(JsonParser.Feature.AUTO_CLOSE_SOURCE)
.readerFor(new TypeReference<List<String>>(){}).readValue(header);
.readerFor(new TypeReference<List<String>>() {
}).readValue(header);
TupleType dataType = TupleType.tableType(names);
this.dataType = dataType;
this.parser = new TypedDataStreamParser(dataType);
@ -53,7 +55,7 @@ public class XpbtReadConnection implements TableReadConnection {
private TypedDataStreamParser parser;
private boolean empty;
protected XpbtReadConnection(StreamDataStore store) {
protected XpbtReadConnection(StreamDataStore store) {
this.store = store;
}
@ -63,14 +65,15 @@ public class XpbtReadConnection implements TableReadConnection {
}
@Override
public void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
public int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
if (empty) {
return;
return 0;
}
var reader = TypedDataStructureNodeReader.of(dataType);
AtomicBoolean quit = new AtomicBoolean(false);
AtomicReference<Exception> exception = new AtomicReference<>();
var counter = 0;
while (!quit.get()) {
var node = parser.parseStructure(inputStream, reader);
if (node == null) {
@ -82,6 +85,7 @@ public class XpbtReadConnection implements TableReadConnection {
if (!lineAcceptor.accept(node.asTuple())) {
quit.set(true);
}
counter++;
} catch (Exception ex) {
quit.set(true);
exception.set(ex);
@ -91,5 +95,6 @@ public class XpbtReadConnection implements TableReadConnection {
if (exception.get() != null) {
throw exception.get();
}
return counter;
}
}

View file

@ -1,18 +1,16 @@
package io.xpipe.core.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.source.TableDataSource;
import io.xpipe.core.source.TableReadConnection;
import io.xpipe.core.source.TableWriteConnection;
import io.xpipe.core.store.StreamDataStore;
import lombok.experimental.SuperBuilder;
@JsonTypeName("xpbt")
@SuperBuilder
public class XpbtSource extends TableDataSource<StreamDataStore> {
@JsonCreator
public XpbtSource(StreamDataStore store) {
super(store);
}
@Override
protected TableWriteConnection newWriteConnection() {
return new XpbtWriteConnection(store);

View file

@ -60,6 +60,8 @@ public class XpbtWriteConnection implements TableWriteConnection {
.setPrettyPrinter(new DefaultPrettyPrinter())) {
JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
.writeValue(g, tupleNode.getKeyNames());
writer.append("\n");
}
writer.flush();
}
}

View file

@ -1,19 +1,17 @@
package io.xpipe.core.source;
import io.xpipe.core.store.DataStore;
import lombok.Singular;
import lombok.experimental.SuperBuilder;
import java.util.HashMap;
import java.util.Map;
@SuperBuilder
public abstract class CollectionDataSource<DS extends DataStore> extends DataSource<DS> {
@Singular
private final Map<String, String> preferredProviders;
public CollectionDataSource(DS store) {
super(store);
this.preferredProviders = new HashMap<>();
}
public CollectionDataSource<DS> annotate(String file, String provider) {
preferredProviders.put(file, provider);
return this;

View file

@ -1,15 +1,16 @@
package io.xpipe.core.source;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.charsetter.StreamCharset;
import io.xpipe.core.impl.TextSource;
import io.xpipe.core.impl.XpbtSource;
import io.xpipe.core.store.DataFlow;
import io.xpipe.core.store.DataStore;
import io.xpipe.core.util.JacksonHelper;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.experimental.SuperBuilder;
import java.util.Optional;
@ -19,18 +20,21 @@ import java.util.Optional;
* <p>
* This instance is only valid in combination with its associated data store instance.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
property = "type"
)
public abstract class DataSource<DS extends DataStore> {
public static DataSource<?> createInternalDataSource(DataSourceType t, DataStore store) {
try {
return switch (t) {
case TABLE -> new XpbtSource(store.asNeeded());
case TABLE -> XpbtSource.builder().store(store.asNeeded()).build();
case STRUCTURE -> null;
case TEXT -> new TextSource(store.asNeeded());
case TEXT -> TextSource.builder().store(store.asNeeded()).newLine(NewLine.LF).charset(
StreamCharset.UTF8).build();
case RAW -> null;
//TODO
case COLLECTION -> null;
@ -55,6 +59,15 @@ public abstract class DataSource<DS extends DataStore> {
store.validate();
}
public WriteMode[] getAvailableWriteModes() {
if (getFlow() != null && !getFlow().hasOutput()) {
return new WriteMode[0];
}
return WriteMode.values();
}
public DataFlow getFlow() {
if (store == null) {
return null;
@ -72,6 +85,32 @@ public abstract class DataSource<DS extends DataStore> {
return (T) mapper.readValue(tb.asParser(), getClass());
}
@SneakyThrows
public final String toString() {
var tree = JacksonHelper.newMapper().valueToTree(this);
return tree.toPrettyString();
}
@Override
public final boolean equals(Object o) {
if (this == o) {
return true;
}
if (getClass() != o.getClass()) {
return false;
}
var tree = JacksonHelper.newMapper().valueToTree(this);
var otherTree = JacksonHelper.newMapper().valueToTree(o);
return tree.equals(otherTree);
}
@Override
public final int hashCode() {
var tree = JacksonHelper.newMapper().valueToTree(this);
return tree.hashCode();
}
public DataSource<DS> withStore(DS store) {
var c = copy();
c.store = store;

View file

@ -1,15 +1,13 @@
package io.xpipe.core.source;
import io.xpipe.core.store.DataStore;
import lombok.experimental.SuperBuilder;
@SuperBuilder
public abstract class RawDataSource<DS extends DataStore> extends DataSource<DS> {
private static final int MAX_BYTES_READ = 100000;
public RawDataSource(DS store) {
super(store);
}
@Override
public final DataSourceInfo determineInfo() throws Exception {
try (var con = openReadConnection()) {

View file

@ -2,13 +2,11 @@ package io.xpipe.core.source;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.store.DataStore;
import lombok.experimental.SuperBuilder;
@SuperBuilder
public abstract class StructureDataSource<DS extends DataStore> extends DataSource<DS> {
public StructureDataSource(DS store) {
super(store);
}
private int countEntries(DataStructureNode n) {
if (n.isValue()) {
return 1;

View file

@ -2,19 +2,11 @@ package io.xpipe.core.source;
import io.xpipe.core.impl.PreservingTableWriteConnection;
import io.xpipe.core.store.DataStore;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
@Data
@EqualsAndHashCode(callSuper = true)
@NoArgsConstructor
@SuperBuilder
public abstract class TableDataSource<DS extends DataStore> extends DataSource<DS> {
public TableDataSource(DS store) {
super(store);
}
@Override
public final DataSourceInfo determineInfo() throws Exception {
if (!getFlow().hasInput()) {

View file

@ -7,6 +7,7 @@ import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.typed.TypedDataStreamWriter;
import io.xpipe.core.impl.LimitTableReadConnection;
import java.io.OutputStream;
import java.util.ArrayList;
@ -31,8 +32,8 @@ public interface TableReadConnection extends DataSourceReadConnection {
}
@Override
public void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
public int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception {
return 0;
}
@Override
@ -54,10 +55,16 @@ public interface TableReadConnection extends DataSourceReadConnection {
return OptionalInt.empty();
}
default TableReadConnection limit(int limit) {
return new LimitTableReadConnection(this, limit);
}
/**
* Consumes the table rows until the acceptor returns false.
*
* @return
*/
void withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception;
int withRows(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception;
/**
* Reads multiple rows in bulk.
@ -92,7 +99,11 @@ public interface TableReadConnection extends DataSourceReadConnection {
}
default void forward(DataSourceConnection con) throws Exception {
forwardAndCount(con);
}
default int forwardAndCount(DataSourceConnection con) throws Exception {
var tCon = (TableWriteConnection) con;
withRows(tCon.writeLinesAcceptor());
return withRows(tCon.writeLinesAcceptor());
}
}

View file

@ -10,9 +10,20 @@ import io.xpipe.core.data.node.TupleNode;
*/
public interface TableWriteConnection extends DataSourceConnection {
public static TableWriteConnection empty() {
return new TableWriteConnection() {
@Override
public DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor() {
return node -> {
return true;
};
}
};
}
DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor();
default void writeLines(ArrayNode lines) throws Exception{
default void writeLines(ArrayNode lines) throws Exception {
var consumer = writeLinesAcceptor();
for (DataStructureNode dataStructureNode : lines.getNodes()) {
consumer.accept(dataStructureNode.asTuple());

View file

@ -2,19 +2,15 @@ package io.xpipe.core.source;
import io.xpipe.core.impl.PreservingTextWriteConnection;
import io.xpipe.core.store.DataStore;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.concurrent.atomic.AtomicInteger;
@NoArgsConstructor
@SuperBuilder
public abstract class TextDataSource<DS extends DataStore> extends DataSource<DS> {
private static final int MAX_LINE_READ = 1000;
public TextDataSource(DS store) {
super(store);
}
@Override
public final DataSourceInfo determineInfo() throws Exception {
if (!getStore().canOpen()) {

View file

@ -1,8 +1,9 @@
package io.xpipe.core.store;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import lombok.Builder;
import lombok.Value;
import lombok.extern.jackson.Jacksonized;
import java.io.InputStream;
import java.io.OutputStream;
@ -13,6 +14,8 @@ import java.nio.file.Path;
*/
@Value
@JsonTypeName("file")
@Builder
@Jacksonized
public class FileStore implements StreamDataStore, FilenameStore {
public static FileStore local(Path p) {
@ -29,12 +32,6 @@ public class FileStore implements StreamDataStore, FilenameStore {
MachineFileStore machine;
String file;
@JsonCreator
public FileStore(MachineFileStore machine, String file) {
this.machine = machine;
this.file = file;
}
@Override
public void validate() throws Exception {
if (machine == null) {

View file

@ -6,6 +6,7 @@ import lombok.Value;
import lombok.experimental.NonFinal;
import java.io.*;
import java.nio.charset.StandardCharsets;
/**
* A store whose contents are stored in memory.
@ -47,4 +48,7 @@ public class InMemoryStore implements StreamDataStore {
};
}
public String toString() {
return new String(value, StandardCharsets.UTF_8);
}
}

View file

@ -2,7 +2,7 @@ package io.xpipe.core.store;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.util.Secret;
import io.xpipe.core.util.SecretValue;
import lombok.EqualsAndHashCode;
import lombok.Value;
@ -27,14 +27,14 @@ public class LocalStore extends StandardShellStore implements MachineFileStore {
class LocalProcessControl extends ProcessControl {
private final List<Secret> input;
private final List<SecretValue> input;
private final Integer timeout;
private final List<String> command;
private Charset charset;
private Process process;
LocalProcessControl(List<Secret> input, List<String> cmd, Integer timeout) {
LocalProcessControl(List<SecretValue> input, List<String> cmd, Integer timeout) {
this.input = input;
this.timeout = timeout;
this.command = cmd;
@ -80,7 +80,12 @@ public class LocalStore extends StandardShellStore implements MachineFileStore {
return process.getInputStream();
}
@Override
@Override
public OutputStream getStdin() {
return process.getOutputStream();
}
@Override
public InputStream getStderr() {
return process.getErrorStream();
}
@ -125,12 +130,12 @@ public class LocalStore extends StandardShellStore implements MachineFileStore {
}
@Override
public ProcessControl prepareCommand(List<Secret> input, List<String> cmd, Integer timeout) {
public ProcessControl prepareCommand(List<SecretValue> input, List<String> cmd, Integer timeout) {
return new LocalProcessControl(input, cmd, getEffectiveTimeOut(timeout));
}
@Override
public ProcessControl preparePrivilegedCommand(List<Secret> input, List<String> cmd, Integer timeOut) throws Exception {
public ProcessControl preparePrivilegedCommand(List<SecretValue> input, List<String> cmd, Integer timeOut) throws Exception {
return new LocalProcessControl(input, cmd, getEffectiveTimeOut(timeOut));
}

View file

@ -68,6 +68,7 @@ public abstract class ProcessControl {
public abstract int waitFor() throws Exception;
public abstract InputStream getStdout();
public abstract OutputStream getStdin();
public abstract InputStream getStderr();

View file

@ -1,6 +1,6 @@
package io.xpipe.core.store;
import io.xpipe.core.util.Secret;
import io.xpipe.core.util.SecretValue;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -15,7 +15,7 @@ public abstract class ShellStore implements DataStore {
return null;
}
public List<Secret> getInput() {
public List<SecretValue> getInput() {
return List.of();
}
@ -31,7 +31,7 @@ public abstract class ShellStore implements DataStore {
return string;
}
public String executeAndCheckOut(List<Secret> in, List<String> cmd, Integer timeout) throws ProcessOutputException, Exception {
public String executeAndCheckOut(List<SecretValue> in, List<String> cmd, Integer timeout) throws ProcessOutputException, Exception {
var pc = prepareCommand(in, cmd, getEffectiveTimeOut(timeout));
pc.start();
@ -76,7 +76,7 @@ public abstract class ShellStore implements DataStore {
}
}
public Optional<String> executeAndCheckErr(List<Secret> in, List<String> cmd) throws Exception {
public Optional<String> executeAndCheckErr(List<SecretValue> in, List<String> cmd) throws Exception {
var pc = prepareCommand(in, cmd, getTimeout());
pc.start();
var outT = pc.discardOut();
@ -113,13 +113,13 @@ public abstract class ShellStore implements DataStore {
return prepareCommand(List.of(), cmd, timeout);
}
public abstract ProcessControl prepareCommand(List<Secret> input, List<String> cmd, Integer timeout) throws Exception;
public abstract ProcessControl prepareCommand(List<SecretValue> input, List<String> cmd, Integer timeout) throws Exception;
public ProcessControl preparePrivilegedCommand(List<String> cmd, Integer timeout) throws Exception {
return preparePrivilegedCommand(List.of(), cmd, timeout);
}
public ProcessControl preparePrivilegedCommand(List<Secret> input, List<String> cmd, Integer timeout) throws Exception {
public ProcessControl preparePrivilegedCommand(List<SecretValue> input, List<String> cmd, Integer timeout) throws Exception {
throw new UnsupportedOperationException();
}
}

View file

@ -2,7 +2,7 @@ package io.xpipe.core.store;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.util.Secret;
import io.xpipe.core.util.SecretValue;
import lombok.Value;
import java.io.ByteArrayInputStream;
@ -83,7 +83,7 @@ public class ShellTypes {
}
@Override
public ProcessControl prepareElevatedCommand(ShellStore st, List<Secret> in, List<String> cmd, Integer timeout, String pw) throws Exception {
public ProcessControl prepareElevatedCommand(ShellStore st, List<SecretValue> in, List<String> cmd, Integer timeout, String pw) throws Exception {
var l = List.of("net", "session", ";", "if", "%errorLevel%", "!=", "0");
return st.prepareCommand(List.of(), l, timeout);
}
@ -186,12 +186,12 @@ public class ShellTypes {
}
@Override
public ProcessControl prepareElevatedCommand(ShellStore st, List<Secret> in, List<String> cmd, Integer timeout, String pw) throws Exception {
public ProcessControl prepareElevatedCommand(ShellStore st, List<SecretValue> in, List<String> cmd, Integer timeout, String pw) throws Exception {
var l = new ArrayList<>(cmd);
l.add(0, "sudo");
l.add(1, "-S");
var pws = new ByteArrayInputStream(pw.getBytes(determineCharset(st)));
return st.prepareCommand(List.of(Secret.createForSecretValue(pw)), l, timeout);
return st.prepareCommand(List.of(SecretValue.createForSecretValue(pw)), l, timeout);
}
@Override

View file

@ -2,7 +2,7 @@ package io.xpipe.core.store;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.util.Secret;
import io.xpipe.core.util.SecretValue;
import java.io.InputStream;
import java.io.OutputStream;
@ -16,7 +16,7 @@ public abstract class StandardShellStore extends ShellStore implements MachineFi
List<String> switchTo(List<String> cmd);
default ProcessControl prepareElevatedCommand(ShellStore st, List<Secret> in, List<String> cmd, Integer timeout, String pw) throws Exception {
default ProcessControl prepareElevatedCommand(ShellStore st, List<SecretValue> in, List<String> cmd, Integer timeout, String pw) throws Exception {
return st.prepareCommand(in, cmd, timeout);
}

View file

@ -20,6 +20,8 @@ import io.xpipe.core.dialog.BaseQueryElement;
import io.xpipe.core.dialog.BusyElement;
import io.xpipe.core.dialog.ChoiceElement;
import io.xpipe.core.dialog.HeaderElement;
import io.xpipe.core.impl.TextSource;
import io.xpipe.core.impl.XpbtSource;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.source.DataSourceInfo;
import io.xpipe.core.source.DataSourceReference;
@ -34,6 +36,9 @@ public class CoreJacksonModule extends SimpleModule {
@Override
public void setupModule(SetupContext context) {
context.registerSubtypes(
new NamedType(TextSource.class),
new NamedType(XpbtSource.class),
new NamedType(FileStore.class),
new NamedType(StdinDataStore.class),
new NamedType(StdoutDataStore.class),
@ -70,8 +75,8 @@ public class CoreJacksonModule extends SimpleModule {
addSerializer(Path.class, new LocalPathSerializer());
addDeserializer(Path.class, new LocalPathDeserializer());
addSerializer(Secret.class, new SecretSerializer());
addDeserializer(Secret.class, new SecretDeserializer());
addSerializer(SecretValue.class, new SecretSerializer());
addDeserializer(SecretValue.class, new SecretDeserializer());
addSerializer(DataSourceReference.class, new DataSourceReferenceSerializer());
addDeserializer(DataSourceReference.class, new DataSourceReferenceDeserializer());
@ -159,20 +164,20 @@ public class CoreJacksonModule extends SimpleModule {
}
}
public static class SecretSerializer extends JsonSerializer<Secret> {
public static class SecretSerializer extends JsonSerializer<SecretValue> {
@Override
public void serialize(Secret value, JsonGenerator jgen, SerializerProvider provider)
public void serialize(SecretValue value, JsonGenerator jgen, SerializerProvider provider)
throws IOException {
jgen.writeString(value.getEncryptedValue());
}
}
public static class SecretDeserializer extends JsonDeserializer<Secret> {
public static class SecretDeserializer extends JsonDeserializer<SecretValue> {
@Override
public Secret deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
return new Secret(p.getValueAsString());
public SecretValue deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
return new SecretValue(p.getValueAsString());
}
}

View file

@ -56,23 +56,12 @@ public class JacksonHelper {
* Constructs a new ObjectMapper that is able to map all required X-Pipe classes and also possible extensions.
*/
public static ObjectMapper newMapper() {
if (!init) {
throw new IllegalStateException("Not initialized");
if (!JacksonHelper.isInit()) {
JacksonHelper.initModularized(ModuleLayer.boot());
}
return INSTANCE.copy();
}
public static ObjectMapper newMapper(Class<?> excludedModule) {
if (!init) {
throw new IllegalStateException("Not initialized");
}
var mapper = BASE.copy();
mapper.registerModules(MODULES.stream().filter(module -> !module.getClass().equals(excludedModule)).toList());
return mapper;
}
public static boolean isInit() {
return init;
}

View file

@ -8,18 +8,18 @@ import java.util.Base64;
@AllArgsConstructor
@EqualsAndHashCode
public class Secret {
public class SecretValue {
public static Secret createForSecretValue(String s) {
public static SecretValue createForSecretValue(String s) {
if (s == null) {
return null;
}
if (s.length() < 2) {
return new Secret(s);
return new SecretValue(s);
}
return new Secret(Base64.getEncoder().encodeToString(s.getBytes(StandardCharsets.UTF_8)));
return new SecretValue(Base64.getEncoder().encodeToString(s.getBytes(StandardCharsets.UTF_8)));
}
String value;

View file

@ -1,23 +0,0 @@
package io.xpipe.core.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
public class StreamHelper {
private static final int DEFAULT_BUFFER_SIZE = 8192;
public static long transferTo(InputStream in, OutputStream out) throws IOException {
Objects.requireNonNull(out, "out");
long transferred = 0;
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
int read;
while (in.available() > 0 && (read = in.read(buffer, 0, DEFAULT_BUFFER_SIZE)) >= 0) {
out.write(buffer, 0, read);
transferred += read;
}
return transferred;
}
}

View file

@ -7,7 +7,6 @@ plugins {
apply from: "$rootDir/deps/java.gradle"
apply from: "$rootDir/deps/javafx.gradle"
apply from: "$rootDir/deps/richtextfx.gradle"
apply from: "$rootDir/deps/lombok.gradle"
configurations {
@ -26,6 +25,7 @@ dependencies {
api group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: "2.13.0"
compileOnly group: 'org.kordamp.ikonli', name: 'ikonli-javafx', version: "12.2.0"
compileOnly group: 'org.fxmisc.richtext', name: 'richtextfx', version: '0.10.9'
compileOnly 'net.synedra:validatorfx:0.3.1'
compileOnly 'org.junit.jupiter:junit-jupiter-api:5.9.0'
compileOnly 'com.jfoenix:jfoenix:9.0.10'

View file

@ -6,7 +6,6 @@ import io.xpipe.core.source.DataSourceType;
import io.xpipe.core.store.DataStore;
import javafx.beans.property.Property;
import javafx.scene.layout.Region;
import lombok.SneakyThrows;
import java.util.List;
import java.util.Map;
@ -23,11 +22,6 @@ public interface DataSourceProvider<T extends DataSource<?>> {
getSourceClass();
}
@SneakyThrows
default T create(Object... arguments) {
return (T) getSourceClass().getDeclaredConstructors()[0].newInstance(arguments);
}
default Category getCategory() {
if (getFileProvider() != null) {
return Category.FILE;

View file

@ -44,17 +44,17 @@ public class DataStoreProviders {
}
return ALL.stream().map(d -> {
var store = d.storeForString(s);
if (store != null) {
return d.dialogForStore(store);
} else {
return null;
}
}
var store = d.storeForString(s);
if (store != null) {
return d.dialogForStore(store);
} else {
return null;
}
}
).filter(Objects::nonNull).findAny();
}
@SuppressWarnings("unchecked")
public static <T extends DataStoreProvider> T byStore(DataStore store) {
return (T) byStoreClass(store.getClass());
}

View file

@ -12,11 +12,15 @@ import javafx.beans.value.ObservableValue;
import javafx.collections.FXCollections;
import javafx.scene.control.ComboBox;
import javafx.util.StringConverter;
import lombok.Value;
import lombok.AccessLevel;
import lombok.experimental.FieldDefaults;
import java.util.Map;
@Value
@FieldDefaults(
makeFinal = true,
level = AccessLevel.PRIVATE
)
public class ChoiceComp<T> extends Comp<CompStructure<ComboBox<T>>> {
Property<T> value;

View file

@ -12,12 +12,18 @@ import javafx.scene.control.ComboBox;
import javafx.scene.layout.Region;
import javafx.scene.layout.VBox;
import javafx.util.StringConverter;
import lombok.Value;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.experimental.FieldDefaults;
import java.util.List;
import java.util.function.Function;
@Value
@FieldDefaults(
makeFinal = true,
level = AccessLevel.PRIVATE
)
@AllArgsConstructor
public class ChoicePaneComp extends Comp<CompStructure<VBox>> {
List<Entry> entries;

View file

@ -8,9 +8,13 @@ import javafx.beans.property.Property;
import javafx.beans.value.ChangeListener;
import javafx.scene.control.TextField;
import javafx.scene.input.KeyEvent;
import lombok.Value;
import lombok.AccessLevel;
import lombok.experimental.FieldDefaults;
@Value
@FieldDefaults(
makeFinal = true,
level = AccessLevel.PRIVATE
)
public class IntFieldComp extends Comp<CompStructure<TextField>> {
Property<Integer> value;

View file

@ -1,6 +1,6 @@
package io.xpipe.extension.comp;
import io.xpipe.core.util.Secret;
import io.xpipe.core.util.SecretValue;
import io.xpipe.fxcomps.Comp;
import io.xpipe.fxcomps.CompStructure;
import io.xpipe.fxcomps.SimpleCompStructure;
@ -11,9 +11,9 @@ import javafx.scene.control.TextField;
public class SecretFieldComp extends Comp<CompStructure<TextField>> {
private final Property<Secret> value;
private final Property<SecretValue> value;
public SecretFieldComp(Property<Secret> value) {
public SecretFieldComp(Property<SecretValue> value) {
this.value = value;
}
@ -22,7 +22,7 @@ public class SecretFieldComp extends Comp<CompStructure<TextField>> {
var text = new PasswordField();
text.setText(value.getValue() != null ? value.getValue().getSecretValue() : null);
text.textProperty().addListener((c, o, n) -> {
value.setValue(n != null && n.length() > 0 ? Secret.createForSecretValue(n) : null);
value.setValue(n != null && n.length() > 0 ? SecretValue.createForSecretValue(n) : null);
});
value.addListener((c, o, n) -> {
PlatformThread.runLaterIfNeeded(() -> {

View file

@ -29,6 +29,7 @@ public interface PrefsChoiceValue extends Translatable {
}
}
@SuppressWarnings("unchecked")
static <T extends PrefsChoiceValue> List<T> getSupported(Class<T> type) {
try {
return (List<T>) type.getDeclaredField("SUPPORTED").get(null);

View file

@ -6,7 +6,7 @@ import io.xpipe.core.dialog.Dialog;
import io.xpipe.core.dialog.QueryConverter;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.store.*;
import io.xpipe.core.util.Secret;
import io.xpipe.core.util.SecretValue;
import lombok.Value;
import java.util.function.Predicate;
@ -131,7 +131,7 @@ public class DialogHelper {
});
}
public static Dialog passwordQuery(Secret password) {
public static Dialog passwordQuery(SecretValue password) {
return Dialog.querySecret("Password", false, true, password);
}

View file

@ -2,7 +2,7 @@ package io.xpipe.extension.util;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.charsetter.StreamCharset;
import io.xpipe.core.util.Secret;
import io.xpipe.core.util.SecretValue;
import io.xpipe.extension.I18n;
import io.xpipe.extension.comp.*;
import io.xpipe.fxcomps.Comp;
@ -46,7 +46,7 @@ public class DynamicOptionsBuilder<T> {
}
public DynamicOptionsBuilder<T> addTitle(ObservableValue<String> title) {
entries.add(new DynamicOptionsComp.Entry(null, Comp.of(() -> new Label(title.getValue())).styleClass("title")));
entries.add(new DynamicOptionsComp.Entry(null, Comp.of(() -> new Label(title.getValue())).styleClass("title-header")));
return this;
}
@ -169,11 +169,11 @@ public class DynamicOptionsBuilder<T> {
return this;
}
public DynamicOptionsBuilder<T> addSecret(String nameKey, Property<Secret> prop) {
public DynamicOptionsBuilder<T> addSecret(String nameKey, Property<SecretValue> prop) {
return addSecret(I18n.observable(nameKey), prop);
}
public DynamicOptionsBuilder<T> addSecret(ObservableValue<String> name, Property<Secret> prop) {
public DynamicOptionsBuilder<T> addSecret(ObservableValue<String> name, Property<SecretValue> prop) {
var comp = new SecretFieldComp(prop);
entries.add(new DynamicOptionsComp.Entry(name, comp));
props.add(prop);
@ -233,7 +233,7 @@ public class DynamicOptionsBuilder<T> {
public Comp<?> buildComp() {
if (title != null) {
entries.add(0, new DynamicOptionsComp.Entry(null, Comp.of(() -> new Label(title.getValue())).styleClass("title")));
entries.add(0, new DynamicOptionsComp.Entry(null, Comp.of(() -> new Label(title.getValue())).styleClass("title-header")));
}
return new DynamicOptionsComp(entries, wrap);
}

View file

@ -1,54 +0,0 @@
package io.xpipe.extension.util;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.util.JacksonHelper;
import io.xpipe.extension.DataSourceProviders;
import java.io.IOException;
public class ExtensionJacksonModule extends SimpleModule {
@Override
public void setupModule(SetupContext context) {
addSerializer(DataSource.class, new DataSourceSerializer());
addDeserializer(DataSource.class, new DataSourceDeserializer());
context.addSerializers(_serializers);
context.addDeserializers(_deserializers);
}
public static class DataSourceSerializer extends JsonSerializer<DataSource> {
@Override
public void serialize(DataSource value, JsonGenerator jgen, SerializerProvider provider)
throws IOException {
ObjectMapper mapper = JacksonHelper.newMapper(ExtensionJacksonModule.class);
var prov = DataSourceProviders.byDataSourceClass(value.getClass());
ObjectNode objectNode = mapper.valueToTree(value);
objectNode.put("type", prov.getId());
jgen.writeTree(objectNode);
}
}
public static class DataSourceDeserializer extends JsonDeserializer<DataSource> {
@Override
public DataSource deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
var mapper = JacksonHelper.newMapper(ExtensionJacksonModule.class);
var tree = (ObjectNode) mapper.readTree(p);
var type = tree.get("type").textValue();
var prov = DataSourceProviders.byName(type);
if (prov.isEmpty()) {
return null;
}
return mapper.treeToValue(tree, prov.get().getSourceClass());
}
}
}

View file

@ -22,10 +22,17 @@ public class ExtensionTest {
return FileStore.local(Path.of(file));
}
public static DataSource getSource(String type, DataStore store) {
return DataSource.create(null, type, store);
}
public static DataSource getSource(String type, String file) {
return DataSource.create(null, type, getResource(file));
}
public static DataSource getSource(io.xpipe.core.source.DataSource<?> source) {
return DataSource.create(null, source);
}
@BeforeAll
public static void setup() throws Exception {

View file

@ -4,12 +4,15 @@ import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.ValueNode;
import org.apache.commons.lang3.math.NumberUtils;
import java.math.BigDecimal;
import java.math.BigInteger;
public class TypeConverter {
public static void tagNullType(ValueNode node, String nullValue) {
var string = node.asString();
if (string.equals(nullValue)) {
node.tag(DataStructureNode.NULL_VALUE);
node.tag(DataStructureNode.IS_NULL);
}
}
@ -17,18 +20,17 @@ public class TypeConverter {
var string = node.asString();
if (string.equals(trueValue)) {
node.tag(DataStructureNode.BOOLEAN_TRUE);
node.tag(DataStructureNode.BOOLEAN_VALUE);
node.tag(DataStructureNode.IS_BOOLEAN);
}
if (string.equals(falseValue)) {
node.tag(DataStructureNode.BOOLEAN_FALSE);
node.tag(DataStructureNode.BOOLEAN_VALUE);
node.tag(DataStructureNode.IS_BOOLEAN);
}
}
public static void tagNumberType(ValueNode node) {
var string = node.asString();
if (NumberUtils.isCreatable(string)) {
node.tag(DataStructureNode.IS_NUMBER);
var number = NumberUtils.createNumber(string);
if (number instanceof Float || number instanceof Double) {
node.tag(DataStructureNode.IS_FLOATING_POINT);
@ -37,4 +39,93 @@ public class TypeConverter {
}
}
}
public static BigDecimal parseDecimal(DataStructureNode node) {
if (node == null || node.hasMetaAttribute(DataStructureNode.IS_NULL)) {
return BigDecimal.ZERO;
}
if (node.hasMetaAttribute(DataStructureNode.FLOATING_POINT_VALUE)) {
return new BigDecimal(node.getMetaAttribute(DataStructureNode.FLOATING_POINT_VALUE));
}
var parsedDecimal = parseDecimal(node.asString());
if (parsedDecimal != null) {
return parsedDecimal;
}
if (node.hasMetaAttribute(DataStructureNode.INTEGER_VALUE)) {
return new BigDecimal(node.getMetaAttribute(DataStructureNode.INTEGER_VALUE));
}
var parsedInteger = parseInteger(node.asString());
if (parsedInteger != null) {
return new BigDecimal(parsedInteger);
}
return null;
}
public static Boolean parseBoolean(DataStructureNode node) {
if (node == null || node.hasMetaAttribute(DataStructureNode.IS_NULL)) {
return false;
}
if (node.hasMetaAttribute(DataStructureNode.BOOLEAN_FALSE)) {
return Boolean.parseBoolean(node.getMetaAttribute(DataStructureNode.BOOLEAN_FALSE));
}
if (node.hasMetaAttribute(DataStructureNode.BOOLEAN_TRUE)) {
return Boolean.parseBoolean(node.getMetaAttribute(DataStructureNode.BOOLEAN_TRUE));
}
var string = node.asString();
if (string.length() == 0 || string.equalsIgnoreCase("false")) {
return false;
}
return true;
}
public static BigInteger parseInteger(DataStructureNode node) {
if (node == null || node.hasMetaAttribute(DataStructureNode.IS_NULL)) {
return BigInteger.ZERO;
}
if (node.hasMetaAttribute(DataStructureNode.INTEGER_VALUE)) {
return new BigInteger(node.getMetaAttribute(DataStructureNode.INTEGER_VALUE));
}
var parsedInteger = parseInteger(node.asString());
if (parsedInteger != null) {
return parsedInteger;
}
if (node.hasMetaAttribute(DataStructureNode.FLOATING_POINT_VALUE)) {
return new BigDecimal(node.getMetaAttribute(DataStructureNode.FLOATING_POINT_VALUE)).toBigInteger();
}
var parsedDecimal = parseDecimal(node.asString());
if (parsedDecimal != null) {
return parsedDecimal.toBigInteger();
}
return null;
}
private static BigInteger parseInteger(String string) {
if (string == null) {
return BigInteger.ZERO;
}
return NumberUtils.createBigInteger(string);
}
private static BigDecimal parseDecimal(String string) {
if (string == null) {
return BigDecimal.ZERO;
}
return NumberUtils.createBigDecimal(string);
}
}

View file

@ -1,7 +1,5 @@
import com.fasterxml.jackson.databind.Module;
import io.xpipe.extension.DataSourceProvider;
import io.xpipe.extension.SupportedApplicationProvider;
import io.xpipe.extension.util.ExtensionJacksonModule;
import io.xpipe.extension.util.XPipeDaemon;
open module io.xpipe.extension {
@ -27,9 +25,6 @@ open module io.xpipe.extension {
requires static org.fxmisc.richtext;
requires static net.synedra.validatorfx;
requires static org.fxmisc.flowless;
requires static org.fxmisc.undofx;
requires static org.fxmisc.wellbehavedfx;
requires static org.reactfx;
requires static org.kordamp.ikonli.javafx;
requires static com.jfoenix;
@ -41,6 +36,4 @@ open module io.xpipe.extension {
uses io.xpipe.extension.DataStoreProvider;
uses XPipeDaemon;
uses io.xpipe.extension.Cache;
provides Module with ExtensionJacksonModule;
}