mirror of
https://github.com/xpipe-io/xpipe.git
synced 2024-11-21 15:10:23 +00:00
Rework beacon exchanges
This commit is contained in:
parent
b2af324993
commit
40f9de1dc3
13 changed files with 229 additions and 34 deletions
|
@ -23,7 +23,7 @@ public abstract class DataSourceImpl implements DataSource {
|
|||
new XPipeApiConnector() {
|
||||
@Override
|
||||
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
|
||||
var req = InfoExchange.Request.builder().id(ds).build();
|
||||
var req = InfoExchange.Request.builder().ref(ds).build();
|
||||
InfoExchange.Response res = performSimpleExchange(sc, req);
|
||||
|
||||
}
|
||||
|
|
|
@ -44,11 +44,13 @@ public class BeaconClient implements AutoCloseable {
|
|||
void accept(T var1) throws E;
|
||||
}
|
||||
|
||||
public static Optional<BeaconClient> tryConnect() {
|
||||
if (BeaconConfig.debugEnabled()) {
|
||||
System.out.println("Attempting connection to server at port " + BeaconConfig.getUsedPort());
|
||||
}
|
||||
@FunctionalInterface
|
||||
public interface FailableRunnable<E extends Throwable> {
|
||||
|
||||
void run() throws E;
|
||||
}
|
||||
|
||||
public static Optional<BeaconClient> tryConnect() {
|
||||
try {
|
||||
return Optional.of(new BeaconClient());
|
||||
} catch (IOException ex) {
|
||||
|
|
|
@ -1,22 +1,16 @@
|
|||
package io.xpipe.beacon;
|
||||
|
||||
import io.xpipe.beacon.message.ResponseMessage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
|
||||
public interface BeaconHandler {
|
||||
|
||||
void postResponse(BeaconClient.FailableRunnable<Exception> r);
|
||||
|
||||
void prepareBody() throws IOException;
|
||||
|
||||
InputStream startBodyRead() throws IOException;
|
||||
|
||||
public <T extends ResponseMessage> void sendResponse(T obj) throws Exception;
|
||||
|
||||
public void sendClientErrorResponse(String message) throws Exception;
|
||||
|
||||
public void sendServerErrorResponse(Throwable ex) throws Exception;
|
||||
|
||||
OutputStream getOutputStream() throws Exception;
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ public class BeaconServer {
|
|||
public static boolean tryStart() throws Exception {
|
||||
var custom = BeaconConfig.getCustomExecCommand();
|
||||
if (custom != null) {
|
||||
new ProcessBuilder("cmd", "/c", "CALL", custom).inheritIO().start();
|
||||
Runtime.getRuntime().exec(custom);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -3,10 +3,11 @@ package io.xpipe.beacon.exchange;
|
|||
import io.xpipe.beacon.message.RequestMessage;
|
||||
import io.xpipe.beacon.message.ResponseMessage;
|
||||
import io.xpipe.core.source.DataSourceConfigInstance;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceInfo;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.store.DataStore;
|
||||
import lombok.Builder;
|
||||
import lombok.NonNull;
|
||||
import lombok.Value;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
|
@ -31,15 +32,19 @@ public class InfoExchange implements MessageExchange<InfoExchange.Request, InfoE
|
|||
@Builder
|
||||
@Value
|
||||
public static class Request implements RequestMessage {
|
||||
DataSourceId id;
|
||||
@NonNull
|
||||
DataSourceReference ref;
|
||||
}
|
||||
|
||||
@Jacksonized
|
||||
@Builder
|
||||
@Value
|
||||
public static class Response implements ResponseMessage {
|
||||
@NonNull
|
||||
DataSourceInfo info;
|
||||
@NonNull
|
||||
DataStore store;
|
||||
@NonNull
|
||||
DataSourceConfigInstance config;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package io.xpipe.beacon.exchange;
|
|||
import io.xpipe.beacon.message.RequestMessage;
|
||||
import io.xpipe.beacon.message.ResponseMessage;
|
||||
import io.xpipe.core.source.DataSourceConfigInstance;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.store.DataStore;
|
||||
import lombok.Builder;
|
||||
import lombok.NonNull;
|
||||
|
@ -36,7 +36,7 @@ public class ReadExecuteExchange implements MessageExchange<ReadExecuteExchange.
|
|||
@NonNull
|
||||
DataSourceConfigInstance config;
|
||||
@NonNull
|
||||
DataSourceId targetId;
|
||||
DataSourceReference target;
|
||||
}
|
||||
|
||||
@Jacksonized
|
||||
|
|
|
@ -2,8 +2,9 @@ package io.xpipe.beacon.exchange;
|
|||
|
||||
import io.xpipe.beacon.message.RequestMessage;
|
||||
import io.xpipe.beacon.message.ResponseMessage;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import lombok.Builder;
|
||||
import lombok.NonNull;
|
||||
import lombok.Value;
|
||||
import lombok.extern.jackson.Jacksonized;
|
||||
|
||||
|
@ -28,7 +29,8 @@ public class SelectExchange implements MessageExchange<SelectExchange.Request, S
|
|||
@Builder
|
||||
@Value
|
||||
public static class Request implements RequestMessage {
|
||||
DataSourceId id;
|
||||
@NonNull
|
||||
DataSourceReference ref;
|
||||
}
|
||||
|
||||
@Jacksonized
|
||||
|
|
|
@ -3,7 +3,7 @@ package io.xpipe.beacon.exchange;
|
|||
import io.xpipe.beacon.message.RequestMessage;
|
||||
import io.xpipe.beacon.message.ResponseMessage;
|
||||
import io.xpipe.core.source.DataSourceConfigInstance;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.store.DataStore;
|
||||
import lombok.Builder;
|
||||
import lombok.NonNull;
|
||||
|
@ -32,7 +32,7 @@ public class WriteExecuteExchange implements MessageExchange<WriteExecuteExchang
|
|||
@Value
|
||||
public static class Request implements RequestMessage {
|
||||
@NonNull
|
||||
DataSourceId sourceId;
|
||||
DataSourceReference ref;
|
||||
|
||||
DataStore dataStore;
|
||||
@NonNull
|
||||
|
|
|
@ -3,7 +3,7 @@ package io.xpipe.beacon.exchange;
|
|||
import io.xpipe.beacon.message.RequestMessage;
|
||||
import io.xpipe.beacon.message.ResponseMessage;
|
||||
import io.xpipe.core.source.DataSourceConfigInstance;
|
||||
import io.xpipe.core.source.DataSourceId;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.store.DataStore;
|
||||
import lombok.Builder;
|
||||
import lombok.NonNull;
|
||||
|
@ -34,7 +34,7 @@ public class WritePreparationExchange implements MessageExchange<WritePreparatio
|
|||
String providerType;
|
||||
String output;
|
||||
@NonNull
|
||||
DataSourceId sourceId;
|
||||
DataSourceReference ref;
|
||||
}
|
||||
|
||||
@Jacksonized
|
||||
|
|
|
@ -1,5 +1,10 @@
|
|||
package io.xpipe.beacon.message;
|
||||
|
||||
import io.xpipe.beacon.BeaconHandler;
|
||||
|
||||
public interface ResponseMessage {
|
||||
|
||||
default void postSend(BeaconHandler handler) throws Exception {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import lombok.Getter;
|
|||
public class DataSourceId {
|
||||
|
||||
public static final char SEPARATOR = ':';
|
||||
public static final DataSourceId ANONYMOUS = new DataSourceId(null, null);
|
||||
|
||||
private final String collectionName;
|
||||
private final String entryName;
|
||||
|
@ -41,10 +40,6 @@ public class DataSourceId {
|
|||
* @throws IllegalArgumentException if any name is not valid
|
||||
*/
|
||||
public static DataSourceId create(String collectionName, String entryName) {
|
||||
if (collectionName == null && entryName == null) {
|
||||
return ANONYMOUS;
|
||||
}
|
||||
|
||||
if (collectionName != null && collectionName.trim().length() == 0) {
|
||||
throw new IllegalArgumentException("Trimmed collection name is empty");
|
||||
}
|
||||
|
@ -77,10 +72,6 @@ public class DataSourceId {
|
|||
throw new IllegalArgumentException("String is null");
|
||||
}
|
||||
|
||||
if (s.equals(String.valueOf(SEPARATOR))) {
|
||||
return ANONYMOUS;
|
||||
}
|
||||
|
||||
var split = s.split(String.valueOf(SEPARATOR));
|
||||
if (split.length != 2) {
|
||||
throw new IllegalArgumentException("Data source id must contain exactly one " + SEPARATOR);
|
||||
|
@ -98,6 +89,6 @@ public class DataSourceId {
|
|||
|
||||
@Override
|
||||
public String toString() {
|
||||
return (collectionName != null ? collectionName.toLowerCase() : "") + SEPARATOR + (entryName != null ? entryName.toLowerCase() : "");
|
||||
return collectionName.toLowerCase() + SEPARATOR + (entryName != null ? entryName.toLowerCase() : "");
|
||||
}
|
||||
}
|
||||
|
|
167
core/src/main/java/io/xpipe/core/source/DataSourceReference.java
Normal file
167
core/src/main/java/io/xpipe/core/source/DataSourceReference.java
Normal file
|
@ -0,0 +1,167 @@
|
|||
package io.xpipe.core.source;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.NonNull;
|
||||
import lombok.Value;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public interface DataSourceReference {
|
||||
|
||||
static DataSourceReference empty() {
|
||||
return new Empty();
|
||||
}
|
||||
|
||||
public static DataSourceReference parse(String s) {
|
||||
if (s == null || s.trim().length() == 0) {
|
||||
return new Empty();
|
||||
}
|
||||
|
||||
if (s.contains(":")) {
|
||||
return new Id(DataSourceId.fromString(s));
|
||||
}
|
||||
|
||||
return new Name(s);
|
||||
}
|
||||
|
||||
enum Type {
|
||||
ID,
|
||||
NAME,
|
||||
EMPTY
|
||||
}
|
||||
|
||||
Type getType();
|
||||
DataSourceId getId();
|
||||
String getName();
|
||||
String toRefString();
|
||||
String toString();
|
||||
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
static class Id implements DataSourceReference {
|
||||
@NonNull
|
||||
DataSourceId value;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toRefString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toRefString() {
|
||||
return value.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Id id = (Id) o;
|
||||
return value.equals(id.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getType() {
|
||||
return Type.ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceId getId() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@Value
|
||||
@AllArgsConstructor
|
||||
static class Name implements DataSourceReference {
|
||||
@NonNull
|
||||
String value;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return toRefString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toRefString() {
|
||||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
Name n = (Name) o;
|
||||
return value.equals(n.value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getType() {
|
||||
return Type.NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceId getId() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
||||
static class Empty implements DataSourceReference {
|
||||
|
||||
@Override
|
||||
public String toRefString() {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "none";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
return o != null && getClass() == o.getClass();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getType() {
|
||||
return Type.EMPTY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSourceId getId() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -13,6 +13,7 @@ import io.xpipe.core.data.type.ArrayType;
|
|||
import io.xpipe.core.data.type.TupleType;
|
||||
import io.xpipe.core.data.type.ValueType;
|
||||
import io.xpipe.core.source.DataSourceInfo;
|
||||
import io.xpipe.core.source.DataSourceReference;
|
||||
import io.xpipe.core.store.LocalFileDataStore;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -37,7 +38,31 @@ public class CoreJacksonModule extends SimpleModule {
|
|||
addSerializer(Path.class, new LocalPathSerializer());
|
||||
addDeserializer(Path.class, new LocalPathDeserializer());
|
||||
|
||||
addSerializer(DataSourceReference.class, new DataSourceReferenceSerializer());
|
||||
addDeserializer(DataSourceReference.class, new DataSourceReferenceDeserializer());
|
||||
|
||||
context.setMixInAnnotations(Throwable.class, ThrowableTypeMixIn.class);
|
||||
context.setMixInAnnotations(DataSourceReference.class, DataSourceReferenceTypeMixIn.class);
|
||||
|
||||
context.addSerializers(_serializers);
|
||||
context.addDeserializers(_deserializers);
|
||||
}
|
||||
|
||||
public static class DataSourceReferenceSerializer extends JsonSerializer<DataSourceReference> {
|
||||
|
||||
@Override
|
||||
public void serialize(DataSourceReference value, JsonGenerator jgen, SerializerProvider provider)
|
||||
throws IOException {
|
||||
jgen.writeString(value.toRefString());
|
||||
}
|
||||
}
|
||||
|
||||
public static class DataSourceReferenceDeserializer extends JsonDeserializer<DataSourceReference> {
|
||||
|
||||
@Override
|
||||
public DataSourceReference deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
|
||||
return DataSourceReference.parse(p.getValueAsString());
|
||||
}
|
||||
}
|
||||
|
||||
public static class CharsetSerializer extends JsonSerializer<Charset> {
|
||||
|
@ -77,4 +102,8 @@ public class CoreJacksonModule extends SimpleModule {
|
|||
@JsonSerialize(as = Throwable.class)
|
||||
public abstract static class ThrowableTypeMixIn {
|
||||
}
|
||||
|
||||
@JsonSerialize(as = DataSourceReference.class)
|
||||
public abstract static class DataSourceReferenceTypeMixIn {
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue