Implement support for custom write modes

This commit is contained in:
Christopher Schnick 2022-10-30 04:51:01 +01:00
parent d5566cc662
commit a0c008ab3b
20 changed files with 176 additions and 143 deletions

View file

@ -60,6 +60,14 @@ public class TupleType extends DataType {
return names.stream().allMatch(Objects::nonNull);
}
public TupleType sub(List<String> subNames) {
if (!hasAllNames()) {
throw new UnsupportedOperationException();
}
return new TupleType(subNames, subNames.stream().map(s -> types.get(getNames().indexOf(s))).toList());
}
@Override
public String getName() {
return "tuple";

View file

@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.source.RawDataSource;
import io.xpipe.core.source.RawReadConnection;
import io.xpipe.core.source.RawWriteConnection;
import io.xpipe.core.source.WriteMode;
import io.xpipe.core.store.StreamDataStore;
import lombok.experimental.SuperBuilder;
@ -15,7 +16,7 @@ import java.io.OutputStream;
public class BinarySource extends RawDataSource<StreamDataStore> {
@Override
protected RawWriteConnection newWriteConnection() {
protected RawWriteConnection newWriteConnection(WriteMode mode) {
return new RawWriteConnection() {
private OutputStream out;

View file

@ -3,6 +3,7 @@ package io.xpipe.core.impl;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.source.DataSourceConnection;
import io.xpipe.core.source.DataSourceType;
import io.xpipe.core.source.WriteMode;
import io.xpipe.core.store.FileStore;
import java.nio.file.Files;
@ -28,7 +29,7 @@ public class PreservingWriteConnection implements DataSourceConnection {
var nativeSource = DataSource.createInternalDataSource(type, nativeStore);
if (source.getStore().canOpen()) {
try (var in = source.openReadConnection();
var out = nativeSource.openWriteConnection()) {
var out = nativeSource.openWriteConnection(WriteMode.REPLACE)) {
in.forward(out);
}
;

View file

@ -5,6 +5,7 @@ import io.xpipe.core.charsetter.Charsettable;
import io.xpipe.core.charsetter.NewLine;
import io.xpipe.core.charsetter.StreamCharset;
import io.xpipe.core.source.TextDataSource;
import io.xpipe.core.source.WriteMode;
import io.xpipe.core.store.StreamDataStore;
import lombok.Getter;
import lombok.experimental.SuperBuilder;
@ -20,20 +21,10 @@ public final class TextSource extends TextDataSource<StreamDataStore> implements
private final NewLine newLine;
@Override
protected io.xpipe.core.source.TextWriteConnection newWriteConnection() {
protected io.xpipe.core.source.TextWriteConnection newWriteConnection(WriteMode mode) {
return new TextWriteConnection(this);
}
@Override
protected io.xpipe.core.source.TextWriteConnection newPrependingWriteConnection() {
return new PreservingTextWriteConnection(this, newWriteConnection(), false);
}
@Override
protected io.xpipe.core.source.TextWriteConnection newAppendingWriteConnection() {
return new PreservingTextWriteConnection(this, newWriteConnection(), true);
}
@Override
protected io.xpipe.core.source.TextReadConnection newReadConnection() {
return new TextReadConnection(this);

View file

@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.source.StructureDataSource;
import io.xpipe.core.source.StructureReadConnection;
import io.xpipe.core.source.StructureWriteConnection;
import io.xpipe.core.source.WriteMode;
import io.xpipe.core.store.StreamDataStore;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@ -14,7 +15,7 @@ import lombok.extern.jackson.Jacksonized;
public class XpbsSource extends StructureDataSource<StreamDataStore> {
@Override
protected StructureWriteConnection newWriteConnection() {
protected StructureWriteConnection newWriteConnection(WriteMode mode) {
return new XpbsWriteConnection(this);
}

View file

@ -4,6 +4,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.source.TableDataSource;
import io.xpipe.core.source.TableReadConnection;
import io.xpipe.core.source.TableWriteConnection;
import io.xpipe.core.source.WriteMode;
import io.xpipe.core.store.StreamDataStore;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;
@ -14,7 +15,7 @@ import lombok.extern.jackson.Jacksonized;
public class XpbtSource extends TableDataSource<StreamDataStore> {
@Override
protected TableWriteConnection newWriteConnection() {
protected TableWriteConnection newWriteConnection(WriteMode mode) {
return new XpbtWriteConnection(this);
}

View file

@ -36,13 +36,17 @@ public abstract class CollectionDataSource<DS extends DataStore> extends DataSou
return con;
}
public final CollectionWriteConnection openWriteConnection() throws Exception {
var con = newWriteConnection();
public final CollectionWriteConnection openWriteConnection(WriteMode mode) throws Exception {
var con = newWriteConnection(mode);
if (con == null) {
throw new UnsupportedOperationException(mode.getId());
}
con.init();
return con;
}
protected abstract CollectionWriteConnection newWriteConnection();
protected abstract CollectionWriteConnection newWriteConnection(WriteMode mode);
protected abstract CollectionReadConnection newReadConnection();
}

View file

@ -13,6 +13,7 @@ import io.xpipe.core.util.JacksonizedValue;
import lombok.SneakyThrows;
import lombok.experimental.SuperBuilder;
import java.util.List;
import java.util.Optional;
/**
@ -57,12 +58,12 @@ public abstract class DataSource<DS extends DataStore> extends JacksonizedValue
store.checkComplete();
}
public WriteMode[] getAvailableWriteModes() {
public List<WriteMode> getAvailableWriteModes() {
if (getFlow() != null && !getFlow().hasOutput()) {
return new WriteMode[0];
return List.of();
}
return WriteMode.values();
return List.of(WriteMode.REPLACE, WriteMode.APPEND, WriteMode.PREPEND);
}
public DataFlow getFlow() {
@ -115,18 +116,10 @@ public abstract class DataSource<DS extends DataStore> extends JacksonizedValue
throw new UnsupportedOperationException();
}
public DataSourceConnection openWriteConnection() throws Exception {
public DataSourceConnection openWriteConnection(WriteMode mode) throws Exception {
throw new UnsupportedOperationException();
}
public DataSourceConnection openAppendingWriteConnection() throws Exception {
throw new UnsupportedOperationException("Appending write is not supported");
}
public DataSourceConnection openPrependingWriteConnection() throws Exception {
throw new UnsupportedOperationException("Prepending write is not supported");
}
public DS getStore() {
return store;
}

View file

@ -25,13 +25,17 @@ public abstract class RawDataSource<DS extends DataStore> extends DataSource<DS>
}
@Override
public final RawWriteConnection openWriteConnection() throws Exception {
var con = newWriteConnection();
public final RawWriteConnection openWriteConnection(WriteMode mode) throws Exception {
var con = newWriteConnection(mode);
if (con == null) {
throw new UnsupportedOperationException(mode.getId());
}
con.init();
return con;
}
protected abstract RawWriteConnection newWriteConnection();
protected abstract RawWriteConnection newWriteConnection(WriteMode mode);
protected abstract RawReadConnection newReadConnection();
}

View file

@ -34,13 +34,17 @@ public abstract class StructureDataSource<DS extends DataStore> extends DataSour
return con;
}
public final StructureWriteConnection openWriteConnection() throws Exception {
var con = newWriteConnection();
public final StructureWriteConnection openWriteConnection(WriteMode mode) throws Exception {
var con = newWriteConnection(mode);
if (con == null) {
throw new UnsupportedOperationException(mode.getId());
}
con.init();
return con;
}
protected abstract StructureWriteConnection newWriteConnection();
protected abstract StructureWriteConnection newWriteConnection(WriteMode mode);
protected abstract StructureReadConnection newReadConnection();
}

View file

@ -47,34 +47,26 @@ public abstract class TableDataSource<DS extends DataStore> extends DataSource<D
return con;
}
public final TableWriteConnection openWriteConnection() throws Exception {
var con = newWriteConnection();
public final TableWriteConnection openWriteConnection(WriteMode mode) throws Exception {
var con = newWriteConnection(mode);
if (con == null) {
throw new UnsupportedOperationException(mode.getId());
}
con.init();
return con;
}
public final TableWriteConnection openAppendingWriteConnection() throws Exception {
var con = newAppendingWriteConnection();
con.init();
return con;
protected TableWriteConnection newWriteConnection(WriteMode mode) {
if (mode.equals(WriteMode.PREPEND)) {
return new PreservingTableWriteConnection(this, newWriteConnection(WriteMode.REPLACE), false);
}
public final TableWriteConnection openPrependingWriteConnection() throws Exception {
var con = newPrependingWriteConnection();
con.init();
return con;
if (mode.equals(WriteMode.APPEND)) {
return new PreservingTableWriteConnection(this, newWriteConnection(WriteMode.REPLACE), true);
}
protected TableWriteConnection newWriteConnection() {
throw new UnsupportedOperationException();
}
protected TableWriteConnection newAppendingWriteConnection() {
return new PreservingTableWriteConnection(this, newWriteConnection(), true);
}
protected TableWriteConnection newPrependingWriteConnection() {
return new PreservingTableWriteConnection(this, newWriteConnection(), false);
return null;
}
protected TableReadConnection newReadConnection() {

View file

@ -70,7 +70,8 @@ public class TableMapping {
}
public boolean isIdentity() {
return inputType.equals(outputType) && Arrays.equals(columMap, range(getInputType().getSize()));
return inputType.equals(outputType)
&& Arrays.equals(columMap, range(getInputType().getSize()));
}
public boolean isComplete() {
@ -78,14 +79,19 @@ public class TableMapping {
.allMatch(value -> inverseMap(value).isPresent());
}
public boolean isComplete(List<String> outputNames) {
return IntStream.range(0, outputType.getSize())
.filter(i -> outputNames.contains(outputType.getNames().get(i)))
.allMatch(value -> inverseMap(value).isPresent());
}
public TableMapping sub(List<String> outputNames) {
var array = new Integer[inputType.getSize()];
for (int i = 0; i < outputNames.size(); i++) {
var index = inverseMap(outputType.getNames().indexOf(outputNames.get(i)));
if (index.isPresent()) {
array[index.getAsInt()] = i;
} else {
throw new IllegalStateException();
var array = Arrays.copyOf(columMap, columMap.length);
for (int i = 0; i < inputType.getSize(); i++) {
var mapped = map(i);
if (mapped.isPresent()
&& !outputNames.contains(outputType.getNames().get(mapped.getAsInt()))) {
array[i] = null;
}
}
return new TableMapping(inputType, outputType, array);

View file

@ -37,34 +37,22 @@ public abstract class TextDataSource<DS extends DataStore> extends DataSource<DS
}
@Override
public final TextWriteConnection openWriteConnection() throws Exception {
var con = newWriteConnection();
public final TextWriteConnection openWriteConnection(WriteMode mode) throws Exception {
var con = newWriteConnection(mode);
con.init();
return con;
}
@Override
public final TextWriteConnection openAppendingWriteConnection() throws Exception {
var con = newAppendingWriteConnection();
con.init();
return con;
protected TextWriteConnection newWriteConnection(WriteMode mode) {
if (mode.equals(WriteMode.PREPEND)) {
return new PreservingTextWriteConnection(this, newWriteConnection(WriteMode.REPLACE), false);
}
@Override
public final TextWriteConnection openPrependingWriteConnection() throws Exception {
var con = newPrependingWriteConnection();
con.init();
return con;
if (mode.equals(WriteMode.APPEND)) {
return new PreservingTextWriteConnection(this, newWriteConnection(WriteMode.REPLACE), true);
}
protected abstract TextWriteConnection newWriteConnection();
protected TextWriteConnection newAppendingWriteConnection() {
return new PreservingTextWriteConnection(this, newWriteConnection(), true);
}
protected TextWriteConnection newPrependingWriteConnection() {
return new PreservingTextWriteConnection(this, newWriteConnection(), false);
return null;
}
protected abstract TextReadConnection newReadConnection();

View file

@ -1,26 +1,41 @@
package io.xpipe.core.source;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.util.JacksonizedValue;
public enum WriteMode {
@JsonProperty("replace")
REPLACE(DataSource::openWriteConnection),
@JsonProperty("append")
APPEND(DataSource::openAppendingWriteConnection),
@JsonProperty("prepend")
PREPEND(DataSource::openPrependingWriteConnection);
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
private final FailableFunction<DataSource<?>, DataSourceConnection, Exception> connectionOpener;
public class WriteMode extends JacksonizedValue {
WriteMode(FailableFunction<DataSource<?>, DataSourceConnection, Exception> connectionOpener) {
this.connectionOpener = connectionOpener;
}
private static final List<WriteMode> ALL = new ArrayList<>();
public DataSourceConnection open(DataSource<?> source) throws Exception {
return connectionOpener.apply(source);
}
public static interface FailableFunction<T, R, E extends Throwable> {
R apply(T input) throws E;
public static void init(ModuleLayer layer) {
if (ALL.size() == 0) {
ALL.addAll(ServiceLoader.load(layer, WriteMode.class).stream()
.map(p -> p.get())
.toList());
}
}
@JsonTypeName("replace")
public static final class Replace extends WriteMode {
}
@JsonTypeName("append")
public static final class Append extends WriteMode {
}
@JsonTypeName("prepend")
public static final class Prepend extends WriteMode {
}
public static final Replace REPLACE = new Replace();
public static final Append APPEND = new Append();
public static final Prepend PREPEND = new Prepend();
public final String getId() {
return getClass().getAnnotation(JsonTypeName.class).value();
}
}

View file

@ -1,9 +1,11 @@
package io.xpipe.core.util;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.SneakyThrows;
import lombok.experimental.SuperBuilder;
@SuperBuilder
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public class JacksonizedValue {
public JacksonizedValue() {}

View file

@ -1,3 +1,4 @@
import io.xpipe.core.source.WriteMode;
import io.xpipe.core.util.CoreJacksonModule;
open module io.xpipe.core {
@ -20,7 +21,9 @@ open module io.xpipe.core {
requires static lombok;
uses com.fasterxml.jackson.databind.Module;
uses io.xpipe.core.source.WriteMode;
provides WriteMode with WriteMode.Replace, WriteMode.Append, WriteMode.Prepend;
provides com.fasterxml.jackson.databind.Module with
CoreJacksonModule;
}

View file

@ -0,0 +1,36 @@
package io.xpipe.extension;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import io.xpipe.core.util.JacksonMapper;
import io.xpipe.extension.event.TrackEvent;
import io.xpipe.extension.prefs.PrefsProviders;
public class XPipeServiceProviders {
public static void load(ModuleLayer layer) {
TrackEvent.info("Loading extension providers ...");
DataSourceProviders.init(layer);
for (DataSourceProvider<?> p : DataSourceProviders.getAll()) {
TrackEvent.trace("Loaded data source provider " + p.getId());
JacksonMapper.configure(objectMapper -> {
objectMapper.registerSubtypes(new NamedType(p.getSourceClass()));
});
}
DataStoreProviders.init(layer);
for (DataStoreProvider p : DataStoreProviders.getAll()) {
TrackEvent.trace("Loaded data store provider " + p.getId());
JacksonMapper.configure(objectMapper -> {
for (Class<?> storeClass : p.getStoreClasses()) {
objectMapper.registerSubtypes(new NamedType(storeClass));
}
});
}
DataStoreActionProvider.init(layer);
SupportedApplicationProviders.loadAll(layer);
PrefsProviders.init(layer);
TrackEvent.info("Finished loading extension providers");
}
}

View file

@ -42,11 +42,14 @@ public class ToggleGroupComp<T> extends Comp<CompStructure<HBox>> {
b.setSelected(true);
}
}
if (box.getChildren().size() > 0) {
box.getChildren().get(0).getStyleClass().add("first");
for (int i = 1; i < box.getChildren().size() - 1; i++) {
box.getChildren().get(i).getStyleClass().add("center");
}
box.getChildren().get(box.getChildren().size() - 1).getStyleClass().add("last");
}
group.selectedToggleProperty().addListener((obsVal, oldVal, newVal) -> {
if (newVal == null) oldVal.setSelected(true);

View file

@ -14,63 +14,41 @@ import lombok.EqualsAndHashCode;
import lombok.Value;
import net.synedra.validatorfx.Check;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
@Value
@EqualsAndHashCode(callSuper = true)
public class WriteModeChoiceComp extends SimpleComp implements Validatable {
Property<WriteMode> selected;
WriteMode[] available;
List<WriteMode> available;
Validator validator = new SimpleValidator();
Check check;
public WriteModeChoiceComp(Property<WriteMode> selected, WriteMode[] available) {
public WriteModeChoiceComp(Property<WriteMode> selected, List<WriteMode> available) {
this.selected = selected;
this.available = available;
if (available.length == 1) {
selected.setValue(available[0]);
if (available.size() == 1) {
selected.setValue(available.get(0));
}
check = Validators.nonNull(validator, I18n.observable("mode"), selected);
}
@Override
protected Region createSimple() {
var a = Arrays.asList(available);
var a = available;
var map = new LinkedHashMap<WriteMode, ObservableValue<String>>();
var replaceIndex = -1;
if (a.contains(WriteMode.REPLACE)) {
map.put(WriteMode.REPLACE, I18n.observable("replace"));
replaceIndex = 0;
for (WriteMode writeMode : a) {
map.put(writeMode,I18n.observable(writeMode.getId()));
}
var appendIndex = -1;
if (a.contains(WriteMode.APPEND)) {
map.put(WriteMode.APPEND, I18n.observable("append"));
appendIndex = replaceIndex + 1;
}
var prependIndex = -1;
if (a.contains(WriteMode.PREPEND)) {
map.put(WriteMode.PREPEND, I18n.observable("prepend"));
prependIndex = Math.max(replaceIndex, appendIndex) + 1;
}
int finalReplaceIndex = replaceIndex;
int finalAppendIndex = appendIndex;
int finalPrependIndex = prependIndex;
return new ToggleGroupComp<>(selected, map)
.apply(struc -> {
if (finalReplaceIndex != -1)
new FancyTooltipAugment<>("extension.replaceDescription")
.augment(struc.get().getChildren().get(0));
if (finalAppendIndex != -1)
new FancyTooltipAugment<>("extension.appendDescription")
.augment(struc.get().getChildren().get(finalAppendIndex));
if (finalPrependIndex != -1)
new FancyTooltipAugment<>("extension.prependDescription")
.augment(struc.get().getChildren().get(finalPrependIndex));
for (int i = 0; i < a.size(); i++) {
new FancyTooltipAugment<>(a.get(i).getId() + "Description")
.augment(struc.get().getChildren().get(i));
}
})
.apply(struc -> check.decorates(struc.get()))
.createRegion();

View file

@ -4,7 +4,8 @@ import io.xpipe.api.DataSource;
import io.xpipe.api.util.XPipeDaemonController;
import io.xpipe.core.store.DataStore;
import io.xpipe.core.store.FileStore;
import io.xpipe.extension.DataSourceProviders;
import io.xpipe.core.util.JacksonMapper;
import io.xpipe.extension.XPipeServiceProviders;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@ -35,7 +36,8 @@ public class ExtensionTest {
@BeforeAll
public static void setup() throws Exception {
DataSourceProviders.init(ModuleLayer.boot());
JacksonMapper.initModularized(ModuleLayer.boot());
XPipeServiceProviders.load(ModuleLayer.boot());
XPipeDaemonController.start();
}