[#207]: fix(api): clean up

This commit is contained in:
Ilnur Farukhshin 2021-07-23 16:32:45 +03:00
parent 82ce538488
commit 232a17b80d
22 changed files with 210 additions and 363 deletions

View file

@ -156,6 +156,7 @@ For example, if you want to use an environment variable to set the `name` parame
|`KAFKA_CLUSTERS_0_NAME` | Cluster name |`KAFKA_CLUSTERS_0_NAME` | Cluster name
|`KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS` |Address where to connect |`KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS` |Address where to connect
|`KAFKA_CLUSTERS_0_ZOOKEEPER` | Zookeper service address |`KAFKA_CLUSTERS_0_ZOOKEEPER` | Zookeper service address
|`KAFKA_CLUSTERS_0_KSQLDBSERVER` | KSQL DB server address
|`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` |Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable |`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` |Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable
|`KAFKA_CLUSTERS_0_SCHEMAREGISTRY` |SchemaRegistry's address |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY` |SchemaRegistry's address
|`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry |`KAFKA_CLUSTERS_0_SCHEMANAMETEMPLATE` |How keys are saved to schemaRegistry

View file

@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException; import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandResponse; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import com.provectus.kafka.ui.strategy.ksql.statement.KsqlStatementStrategy; import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
@ -23,7 +23,7 @@ public class KsqlClient {
private final WebClient webClient; private final WebClient webClient;
private final ObjectMapper mapper; private final ObjectMapper mapper;
public Mono<KsqlCommandResponse> execute(KsqlStatementStrategy ksqlStatement) { public Mono<KsqlCommandResponse> execute(BaseStrategy ksqlStatement) {
return webClient.post() return webClient.post()
.uri(ksqlStatement.getUri()) .uri(ksqlStatement.getUri())
.accept(new MediaType("application", "vnd.ksql.v1+json")) .accept(new MediaType("application", "vnd.ksql.v1+json"))

View file

@ -7,7 +7,7 @@ import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KsqlCommand; import com.provectus.kafka.ui.model.KsqlCommand;
import com.provectus.kafka.ui.model.KsqlCommandResponse; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import com.provectus.kafka.ui.strategy.ksql.statement.KsqlStatementStrategy; import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
import java.util.List; import java.util.List;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -18,7 +18,7 @@ import reactor.core.publisher.Mono;
public class KsqlService { public class KsqlService {
private final KsqlClient ksqlClient; private final KsqlClient ksqlClient;
private final ClustersStorage clustersStorage; private final ClustersStorage clustersStorage;
private final List<KsqlStatementStrategy> ksqlStatementStrategies; private final List<BaseStrategy> ksqlStatementStrategies;
public Mono<KsqlCommandResponse> executeKsqlCommand(String clusterName, public Mono<KsqlCommandResponse> executeKsqlCommand(String clusterName,
Mono<KsqlCommand> ksqlCommand) { Mono<KsqlCommand> ksqlCommand) {
@ -36,7 +36,7 @@ public class KsqlService {
.flatMap(ksqlClient::execute); .flatMap(ksqlClient::execute);
} }
private Mono<KsqlStatementStrategy> getStatementStrategyForKsqlCommand( private Mono<BaseStrategy> getStatementStrategyForKsqlCommand(
Mono<KsqlCommand> ksqlCommand) { Mono<KsqlCommand> ksqlCommand) {
return ksqlCommand return ksqlCommand
.map(command -> ksqlStatementStrategies.stream() .map(command -> ksqlStatementStrategies.stream()

View file

@ -15,7 +15,10 @@ import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
public abstract class KsqlStatementStrategy { public abstract class BaseStrategy {
protected static final String ksqlRequestPath = "/ksql";
protected static final String queryRequestPath = "/query";
private static final String mappingExceptionMessage = "KSQL DB response mapping error";
protected String host = null; protected String host = null;
protected KsqlCommand ksqlCommand = null; protected KsqlCommand ksqlCommand = null;
@ -23,14 +26,14 @@ public abstract class KsqlStatementStrategy {
if (this.host != null) { if (this.host != null) {
return this.host + this.getRequestPath(); return this.host + this.getRequestPath();
} }
return null; throw new UnprocessableEntityException("Strategy doesn't have host");
} }
public boolean test(String sql) { public boolean test(String sql) {
return sql.trim().toLowerCase().matches(getTestRegExp()); return sql.trim().toLowerCase().matches(getTestRegExp());
} }
public KsqlStatementStrategy host(String host) { public BaseStrategy host(String host) {
this.host = host; this.host = host;
return this; return this;
} }
@ -39,7 +42,7 @@ public abstract class KsqlStatementStrategy {
return ksqlCommand; return ksqlCommand;
} }
public KsqlStatementStrategy ksqlCommand(KsqlCommand ksqlCommand) { public BaseStrategy ksqlCommand(KsqlCommand ksqlCommand) {
this.ksqlCommand = ksqlCommand; this.ksqlCommand = ksqlCommand;
return this; return this;
} }
@ -52,7 +55,7 @@ public abstract class KsqlStatementStrategy {
Table table = items.isArray() ? getTableFromArray(items) : getTableFromObject(items); Table table = items.isArray() ? getTableFromArray(items) : getTableFromObject(items);
return commandResponse.data(table); return commandResponse.data(table);
} }
throw new UnprocessableEntityException("KSQL DB response mapping error"); throw new UnprocessableEntityException(mappingExceptionMessage);
} }
protected KsqlCommandResponse serializeMessageResponse(JsonNode response, String path) { protected KsqlCommandResponse serializeMessageResponse(JsonNode response, String path) {
@ -62,7 +65,7 @@ public abstract class KsqlStatementStrategy {
JsonNode item = first.path(path); JsonNode item = first.path(path);
return commandResponse.message(getMessageFromObject(item)); return commandResponse.message(getMessageFromObject(item));
} }
throw new UnprocessableEntityException("KSQL DB response mapping error"); throw new UnprocessableEntityException(mappingExceptionMessage);
} }
protected KsqlCommandResponse serializeQueryResponse(JsonNode response) { protected KsqlCommandResponse serializeQueryResponse(JsonNode response) {
@ -73,7 +76,7 @@ public abstract class KsqlStatementStrategy {
.rows(getQueryResponseRows(response)); .rows(getQueryResponseRows(response));
return commandResponse.data(table); return commandResponse.data(table);
} }
throw new UnprocessableEntityException("KSQL DB response mapping error"); throw new UnprocessableEntityException(mappingExceptionMessage);
} }
private List<String> getQueryResponseHeader(JsonNode response) { private List<String> getQueryResponseHeader(JsonNode response) {
@ -121,7 +124,7 @@ public abstract class KsqlStatementStrategy {
if (node.isObject() && node.has("message")) { if (node.isObject() && node.has("message")) {
return node.get("message").asText(); return node.get("message").asText();
} }
throw new UnprocessableEntityException("KSQL DB response mapping error"); throw new UnprocessableEntityException(mappingExceptionMessage);
} }
private List<List<String>> getTableRows(JsonNode node, List<String> keys) { private List<List<String>> getTableRows(JsonNode node, List<String> keys) {
@ -137,7 +140,7 @@ public abstract class KsqlStatementStrategy {
if (node.isArray() && node.size() > 0) { if (node.isArray() && node.size() > 0) {
return StreamSupport.stream(node.spliterator(), false); return StreamSupport.stream(node.spliterator(), false);
} }
throw new UnprocessableEntityException("KSQL DB response mapping error"); throw new UnprocessableEntityException(mappingExceptionMessage);
} }
private List<String> getJsonObjectKeys(JsonNode node) { private List<String> getJsonObjectKeys(JsonNode node) {
@ -146,7 +149,7 @@ public abstract class KsqlStatementStrategy {
Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false
).collect(Collectors.toList()); ).collect(Collectors.toList());
} }
throw new UnprocessableEntityException("KSQL DB response mapping error"); throw new UnprocessableEntityException(mappingExceptionMessage);
} }
private List<String> getJsonObjectValues(JsonNode node) { private List<String> getJsonObjectValues(JsonNode node) {

View file

@ -4,11 +4,9 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponse; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class CreateStrategy extends KsqlStatementStrategy { public class CreateStrategy extends BaseStrategy {
private final String requestPath = "/ksql"; private static final String responseValueKey = "commandStatus";
private final String responseValueKey = "commandStatus";
@Override @Override
public KsqlCommandResponse serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
@ -17,7 +15,7 @@ public class CreateStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getRequestPath() { protected String getRequestPath() {
return requestPath; return BaseStrategy.ksqlRequestPath;
} }
@Override @Override

View file

@ -4,11 +4,9 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponse; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class DescribeStrategy extends KsqlStatementStrategy { public class DescribeStrategy extends BaseStrategy {
private final String requestPath = "/ksql"; private static final String responseValueKey = "sourceDescription";
private final String responseValueKey = "sourceDescription";
@Override @Override
public KsqlCommandResponse serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
@ -17,7 +15,7 @@ public class DescribeStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getRequestPath() { protected String getRequestPath() {
return requestPath; return BaseStrategy.ksqlRequestPath;
} }
@Override @Override

View file

@ -4,11 +4,9 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponse; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class DropStrategy extends KsqlStatementStrategy { public class DropStrategy extends BaseStrategy {
private final String requestPath = "/ksql"; private static final String responseValueKey = "commandStatus";
private final String responseValueKey = "commandStatus";
@Override @Override
public KsqlCommandResponse serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
@ -17,7 +15,7 @@ public class DropStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getRequestPath() { protected String getRequestPath() {
return requestPath; return BaseStrategy.ksqlRequestPath;
} }
@Override @Override

View file

@ -5,9 +5,8 @@ import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class ExplainStrategy extends KsqlStatementStrategy { public class ExplainStrategy extends BaseStrategy {
private final String requestPath = "/ksql"; private static final String responseValueKey = "queryDescription";
private final String responseValueKey = "queryDescription";
@Override @Override
public KsqlCommandResponse serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
@ -16,7 +15,7 @@ public class ExplainStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getRequestPath() { protected String getRequestPath() {
return requestPath; return BaseStrategy.ksqlRequestPath;
} }
@Override @Override

View file

@ -1,49 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponse;
import java.util.List;
import java.util.Optional;
import org.springframework.stereotype.Component;
@Component
public class ListStrategy extends KsqlStatementStrategy {
private final String requestPath = "/ksql";
private final List<String> statements = List.of("functions", "topics", "streams", "tables");
private String responseValueKey = "";
@Override
public KsqlCommandResponse serializeResponse(JsonNode response) {
return serializeTableResponse(response, responseValueKey);
}
@Override
protected String getRequestPath() {
return requestPath;
}
@Override
public boolean test(String sql) {
Optional<String> statement = statements.stream()
.filter(s -> sql.trim().toLowerCase().matches(getTestRegExp(s)))
.findFirst();
if (statement.isPresent()) {
setResponseValueKey(statement.get());
return true;
}
return false;
}
@Override
protected String getTestRegExp() {
return "";
}
private String getTestRegExp(String key) {
return "list " + key + ";";
}
private void setResponseValueKey(String path) {
responseValueKey = path;
}
}

View file

@ -5,8 +5,7 @@ import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class SelectStrategy extends KsqlStatementStrategy { public class SelectStrategy extends BaseStrategy {
private final String requestPath = "/query";
@Override @Override
public KsqlCommandResponse serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
@ -15,7 +14,7 @@ public class SelectStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getRequestPath() { protected String getRequestPath() {
return requestPath; return BaseStrategy.queryRequestPath;
} }
@Override @Override

View file

@ -7,10 +7,11 @@ import java.util.Optional;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class ShowStrategy extends KsqlStatementStrategy { public class ShowStrategy extends BaseStrategy {
private final String requestPath = "/ksql"; private final List<String> showStatements =
private final List<String> statements =
List.of("functions", "topics", "streams", "tables", "queries", "properties"); List.of("functions", "topics", "streams", "tables", "queries", "properties");
private final List<String> listStatements =
List.of("functions", "topics", "streams", "tables");
private String responseValueKey = ""; private String responseValueKey = "";
@Override @Override
@ -18,15 +19,10 @@ public class ShowStrategy extends KsqlStatementStrategy {
return serializeTableResponse(response, responseValueKey); return serializeTableResponse(response, responseValueKey);
} }
@Override
protected String getRequestPath() {
return requestPath;
}
@Override @Override
public boolean test(String sql) { public boolean test(String sql) {
Optional<String> statement = statements.stream() Optional<String> statement = showStatements.stream()
.filter(s -> sql.trim().toLowerCase().matches(getTestRegExp(s))) .filter(s -> testSql(sql, getShowRegExp(s)) || testSql(sql, getListRegExp(s)))
.findFirst(); .findFirst();
if (statement.isPresent()) { if (statement.isPresent()) {
setResponseValueKey(statement.get()); setResponseValueKey(statement.get());
@ -35,17 +31,32 @@ public class ShowStrategy extends KsqlStatementStrategy {
return false; return false;
} }
@Override
protected String getRequestPath() {
return BaseStrategy.ksqlRequestPath;
}
@Override @Override
protected String getTestRegExp() { protected String getTestRegExp() {
return ""; return "";
} }
private String getTestRegExp(String key) { protected String getShowRegExp(String key) {
return "show " + key + ";"; return "show " + key + ";";
} }
protected String getListRegExp(String key) {
if (listStatements.contains(key)) {
return "list " + key + ";";
}
return "";
}
private void setResponseValueKey(String path) { private void setResponseValueKey(String path) {
responseValueKey = path; responseValueKey = path;
} }
private boolean testSql(String sql, String pattern) {
return sql.trim().toLowerCase().matches(pattern);
}
} }

View file

@ -5,9 +5,8 @@ import com.provectus.kafka.ui.model.KsqlCommandResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
public class TerminateStrategy extends KsqlStatementStrategy { public class TerminateStrategy extends BaseStrategy {
private final String requestPath = "/ksql"; private static final String responseValueKey = "commandStatus";
private final String responseValueKey = "commandStatus";
@Override @Override
public KsqlCommandResponse serializeResponse(JsonNode response) { public KsqlCommandResponse serializeResponse(JsonNode response) {
@ -16,7 +15,7 @@ public class TerminateStrategy extends KsqlStatementStrategy {
@Override @Override
protected String getRequestPath() { protected String getRequestPath() {
return requestPath; return BaseStrategy.ksqlRequestPath;
} }
@Override @Override

View file

@ -5,6 +5,7 @@ kafka:
bootstrapServers: localhost:9092 bootstrapServers: localhost:9092
zookeeper: localhost:2181 zookeeper: localhost:2181
schemaRegistry: http://localhost:8081 schemaRegistry: http://localhost:8081
ksqldbServer: http://localhost:8088
kafkaConnect: kafkaConnect:
- name: first - name: first
address: http://localhost:8083 address: http://localhost:8083

View file

@ -13,7 +13,7 @@ import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KsqlCommand; import com.provectus.kafka.ui.model.KsqlCommand;
import com.provectus.kafka.ui.model.KsqlCommandResponse; import com.provectus.kafka.ui.model.KsqlCommandResponse;
import com.provectus.kafka.ui.strategy.ksql.statement.KsqlStatementStrategy; import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
import com.provectus.kafka.ui.strategy.ksql.statement.ShowStrategy; import com.provectus.kafka.ui.strategy.ksql.statement.ShowStrategy;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -29,7 +29,7 @@ import reactor.test.StepVerifier;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
class KsqlServiceTest { class KsqlServiceTest {
private KsqlService ksqlService; private KsqlService ksqlService;
private KsqlStatementStrategy ksqlStatementStrategy; private BaseStrategy baseStrategy;
@Mock @Mock
private ClustersStorage clustersStorage; private ClustersStorage clustersStorage;
@ -39,16 +39,16 @@ class KsqlServiceTest {
@BeforeEach @BeforeEach
public void setUp() { public void setUp() {
this.ksqlStatementStrategy = new ShowStrategy(); this.baseStrategy = new ShowStrategy();
this.ksqlService = new KsqlService( this.ksqlService = new KsqlService(
this.ksqlClient, this.ksqlClient,
this.clustersStorage, this.clustersStorage,
List.of(ksqlStatementStrategy) List.of(baseStrategy)
); );
} }
@Test @Test
public void shouldThrowClusterNotFoundExceptionOnExecuteKsqlCommand() { void shouldThrowClusterNotFoundExceptionOnExecuteKsqlCommand() {
String clusterName = "test"; String clusterName = "test";
KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(null)); when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(null));
@ -58,7 +58,7 @@ class KsqlServiceTest {
} }
@Test @Test
public void shouldThrowKsqlDbNotFoundExceptionOnExecuteKsqlCommand() { void shouldThrowKsqlDbNotFoundExceptionOnExecuteKsqlCommand() {
String clusterName = "test"; String clusterName = "test";
KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
@ -71,7 +71,7 @@ class KsqlServiceTest {
} }
@Test @Test
public void shouldThrowUnprocessableEntityExceptionOnExecuteKsqlCommand() { void shouldThrowUnprocessableEntityExceptionOnExecuteKsqlCommand() {
String clusterName = "test"; String clusterName = "test";
KsqlCommand command = KsqlCommand command =
(new KsqlCommand()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');"); (new KsqlCommand()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');");
@ -88,7 +88,7 @@ class KsqlServiceTest {
} }
@Test @Test
public void shouldSetHostToStrategy() { void shouldSetHostToStrategy() {
String clusterName = "test"; String clusterName = "test";
String host = "localhost:8088"; String host = "localhost:8088";
KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
@ -100,11 +100,11 @@ class KsqlServiceTest {
when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponse())); when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponse()));
ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block(); ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
assertThat(ksqlStatementStrategy.getUri()).isEqualTo(host + "/ksql"); assertThat(baseStrategy.getUri()).isEqualTo(host + "/ksql");
} }
@Test @Test
public void shouldCallClientAndReturnResponse() { void shouldCallClientAndReturnResponse() {
String clusterName = "test"; String clusterName = "test";
KsqlCommand command = (new KsqlCommand()).ksql("show streams;"); KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
@ -117,7 +117,7 @@ class KsqlServiceTest {
KsqlCommandResponse receivedResponse = KsqlCommandResponse receivedResponse =
ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block(); ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
verify(ksqlClient, times(1)).execute(ksqlStatementStrategy); verify(ksqlClient, times(1)).execute(baseStrategy);
assertThat(receivedResponse).isEqualTo(response); assertThat(receivedResponse).isEqualTo(response);
} }

View file

@ -18,29 +18,29 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
class CreateStrategyTest { class CreateStrategyTest {
private final ObjectMapper mapper = new ObjectMapper(); private final ObjectMapper mapper = new ObjectMapper();
private KsqlStatementStrategy ksqlStatementStrategy; private CreateStrategy strategy;
@BeforeEach @BeforeEach
public void setUp() { void setUp() {
ksqlStatementStrategy = new CreateStrategy(); strategy = new CreateStrategy();
} }
@Test @Test
public void shouldReturnUri() { void shouldReturnUri() {
ksqlStatementStrategy.host("ksqldb-server:8088"); strategy.host("ksqldb-server:8088");
assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
} }
@Test @Test
public void shouldReturnTrueInTest() { void shouldReturnTrueInTest() {
assertTrue(ksqlStatementStrategy.test("CREATE STREAM stream WITH (KAFKA_TOPIC='topic');")); assertTrue(strategy.test("CREATE STREAM stream WITH (KAFKA_TOPIC='topic');"));
assertTrue(ksqlStatementStrategy.test("CREATE STREAM stream" assertTrue(strategy.test("CREATE STREAM stream"
+ " AS SELECT users.id AS userid FROM users EMIT CHANGES;" + " AS SELECT users.id AS userid FROM users EMIT CHANGES;"
)); ));
assertTrue(ksqlStatementStrategy.test( assertTrue(strategy.test(
"CREATE TABLE table (id VARCHAR) WITH (KAFKA_TOPIC='table');" "CREATE TABLE table (id VARCHAR) WITH (KAFKA_TOPIC='table');"
)); ));
assertTrue(ksqlStatementStrategy.test( assertTrue(strategy.test(
"CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')" "CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')"
+ " AS SELECT gender, COUNT(*) AS numbers" + " AS SELECT gender, COUNT(*) AS numbers"
+ " FROM pageviews EMIT CHANGES;" + " FROM pageviews EMIT CHANGES;"
@ -48,29 +48,29 @@ class CreateStrategyTest {
} }
@Test @Test
public void shouldReturnFalseInTest() { void shouldReturnFalseInTest() {
assertFalse(ksqlStatementStrategy.test("show streams;")); assertFalse(strategy.test("show streams;"));
assertFalse(ksqlStatementStrategy.test("show tables;")); assertFalse(strategy.test("show tables;"));
assertFalse(ksqlStatementStrategy.test("CREATE TABLE test;")); assertFalse(strategy.test("CREATE TABLE test;"));
assertFalse(ksqlStatementStrategy.test("CREATE STREAM test;")); assertFalse(strategy.test("CREATE STREAM test;"));
} }
@Test @Test
public void shouldSerializeResponse() { void shouldSerializeResponse() {
String message = "updated successful"; String message = "updated successful";
JsonNode node = getResponseWithMessage(message); JsonNode node = getResponseWithMessage(message);
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
assertThat(serializedResponse.getMessage()).isEqualTo(message); assertThat(serializedResponse.getMessage()).isEqualTo(message);
} }
@Test @Test
public void shouldSerializeWithException() { void shouldSerializeWithException() {
JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
Exception exception = assertThrows( Exception exception = assertThrows(
UnprocessableEntityException.class, UnprocessableEntityException.class,
() -> ksqlStatementStrategy.serializeResponse(node) () -> strategy.serializeResponse(node)
); );
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");

View file

@ -20,48 +20,48 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
class DescribeStrategyTest { class DescribeStrategyTest {
private final ObjectMapper mapper = new ObjectMapper(); private final ObjectMapper mapper = new ObjectMapper();
private KsqlStatementStrategy ksqlStatementStrategy; private DescribeStrategy strategy;
@BeforeEach @BeforeEach
public void setUp() { void setUp() {
ksqlStatementStrategy = new DescribeStrategy(); strategy = new DescribeStrategy();
} }
@Test @Test
public void shouldReturnUri() { void shouldReturnUri() {
ksqlStatementStrategy.host("ksqldb-server:8088"); strategy.host("ksqldb-server:8088");
assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
} }
@Test @Test
public void shouldReturnTrueInTest() { void shouldReturnTrueInTest() {
assertTrue(ksqlStatementStrategy.test("DESCRIBE users;")); assertTrue(strategy.test("DESCRIBE users;"));
assertTrue(ksqlStatementStrategy.test("DESCRIBE EXTENDED users;")); assertTrue(strategy.test("DESCRIBE EXTENDED users;"));
} }
@Test @Test
public void shouldReturnFalseInTest() { void shouldReturnFalseInTest() {
assertFalse(ksqlStatementStrategy.test("list streams;")); assertFalse(strategy.test("list streams;"));
assertFalse(ksqlStatementStrategy.test("show tables;")); assertFalse(strategy.test("show tables;"));
} }
@Test @Test
public void shouldSerializeResponse() { void shouldSerializeResponse() {
JsonNode node = getResponseWithObjectNode(); JsonNode node = getResponseWithObjectNode();
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
Table table = serializedResponse.getData(); Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("key", "value")); assertThat(table.getHeaders()).isEqualTo(List.of("key", "value"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka"))); assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka")));
} }
@Test @Test
public void shouldSerializeWithException() { void shouldSerializeWithException() {
JsonNode sourceDescriptionNode = JsonNode sourceDescriptionNode =
mapper.createObjectNode().put("sourceDescription", "nodeWithMessage"); mapper.createObjectNode().put("sourceDescription", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode)); JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode));
Exception exception = assertThrows( Exception exception = assertThrows(
UnprocessableEntityException.class, UnprocessableEntityException.class,
() -> ksqlStatementStrategy.serializeResponse(node) () -> strategy.serializeResponse(node)
); );
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");

View file

@ -18,49 +18,49 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
class DropStrategyTest { class DropStrategyTest {
private final ObjectMapper mapper = new ObjectMapper(); private final ObjectMapper mapper = new ObjectMapper();
private KsqlStatementStrategy ksqlStatementStrategy; private DropStrategy strategy;
@BeforeEach @BeforeEach
public void setUp() { void setUp() {
ksqlStatementStrategy = new DropStrategy(); strategy = new DropStrategy();
} }
@Test @Test
public void shouldReturnUri() { void shouldReturnUri() {
ksqlStatementStrategy.host("ksqldb-server:8088"); strategy.host("ksqldb-server:8088");
assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
} }
@Test @Test
public void shouldReturnTrueInTest() { void shouldReturnTrueInTest() {
assertTrue(ksqlStatementStrategy.test("drop table table1;")); assertTrue(strategy.test("drop table table1;"));
assertTrue(ksqlStatementStrategy.test("drop stream stream2;")); assertTrue(strategy.test("drop stream stream2;"));
} }
@Test @Test
public void shouldReturnFalseInTest() { void shouldReturnFalseInTest() {
assertFalse(ksqlStatementStrategy.test("show streams;")); assertFalse(strategy.test("show streams;"));
assertFalse(ksqlStatementStrategy.test("show tables;")); assertFalse(strategy.test("show tables;"));
assertFalse(ksqlStatementStrategy.test("create table test;")); assertFalse(strategy.test("create table test;"));
assertFalse(ksqlStatementStrategy.test("create stream test;")); assertFalse(strategy.test("create stream test;"));
} }
@Test @Test
public void shouldSerializeResponse() { void shouldSerializeResponse() {
String message = "updated successful"; String message = "updated successful";
JsonNode node = getResponseWithMessage(message); JsonNode node = getResponseWithMessage(message);
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
assertThat(serializedResponse.getMessage()).isEqualTo(message); assertThat(serializedResponse.getMessage()).isEqualTo(message);
} }
@Test @Test
public void shouldSerializeWithException() { void shouldSerializeWithException() {
JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
Exception exception = assertThrows( Exception exception = assertThrows(
UnprocessableEntityException.class, UnprocessableEntityException.class,
() -> ksqlStatementStrategy.serializeResponse(node) () -> strategy.serializeResponse(node)
); );
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");

View file

@ -20,46 +20,46 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
class ExplainStrategyTest { class ExplainStrategyTest {
private final ObjectMapper mapper = new ObjectMapper(); private final ObjectMapper mapper = new ObjectMapper();
private KsqlStatementStrategy ksqlStatementStrategy; private ExplainStrategy strategy;
@BeforeEach @BeforeEach
public void setUp() { void setUp() {
ksqlStatementStrategy = new ExplainStrategy(); strategy = new ExplainStrategy();
} }
@Test @Test
public void shouldReturnUri() { void shouldReturnUri() {
ksqlStatementStrategy.host("ksqldb-server:8088"); strategy.host("ksqldb-server:8088");
assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
} }
@Test @Test
public void shouldReturnTrueInTest() { void shouldReturnTrueInTest() {
assertTrue(ksqlStatementStrategy.test("explain users_query_id;")); assertTrue(strategy.test("explain users_query_id;"));
} }
@Test @Test
public void shouldReturnFalseInTest() { void shouldReturnFalseInTest() {
assertFalse(ksqlStatementStrategy.test("show queries;")); assertFalse(strategy.test("show queries;"));
} }
@Test @Test
public void shouldSerializeResponse() { void shouldSerializeResponse() {
JsonNode node = getResponseWithObjectNode(); JsonNode node = getResponseWithObjectNode();
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
Table table = serializedResponse.getData(); Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("key", "value")); assertThat(table.getHeaders()).isEqualTo(List.of("key", "value"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka"))); assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka")));
} }
@Test @Test
public void shouldSerializeWithException() { void shouldSerializeWithException() {
JsonNode sourceDescriptionNode = JsonNode sourceDescriptionNode =
mapper.createObjectNode().put("sourceDescription", "nodeWithMessage"); mapper.createObjectNode().put("sourceDescription", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode)); JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode));
Exception exception = assertThrows( Exception exception = assertThrows(
UnprocessableEntityException.class, UnprocessableEntityException.class,
() -> ksqlStatementStrategy.serializeResponse(node) () -> strategy.serializeResponse(node)
); );
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");

View file

@ -1,111 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandResponse;
import com.provectus.kafka.ui.model.Table;
import java.util.List;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class ListStrategyTest {
private final ObjectMapper mapper = new ObjectMapper();
private KsqlStatementStrategy ksqlStatementStrategy;
@BeforeEach
public void setUp() {
ksqlStatementStrategy = new ListStrategy();
}
@Test
public void shouldReturnUri() {
ksqlStatementStrategy.host("ksqldb-server:8088");
assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
}
@Test
public void shouldReturnTrueInTest() {
assertTrue(ksqlStatementStrategy.test("LIST STREAMS;"));
assertTrue(ksqlStatementStrategy.test("LIST TABLES;"));
assertTrue(ksqlStatementStrategy.test("LIST TOPICS;"));
assertTrue(ksqlStatementStrategy.test("LIST FUNCTIONS;"));
}
@Test
public void shouldReturnFalseInTest() {
assertFalse(ksqlStatementStrategy.test("SHOW STREAMS;"));
assertFalse(ksqlStatementStrategy.test("SHOW TABLES;"));
assertFalse(ksqlStatementStrategy.test("SHOW TOPICS;"));
assertFalse(ksqlStatementStrategy.test("SHOW FUNCTIONS;"));
}
@Test
public void shouldSerializeStreamsResponse() {
JsonNode node = getResponseWithData("streams");
ksqlStatementStrategy.test("list streams;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
}
@Test
public void shouldSerializeTablesResponse() {
JsonNode node = getResponseWithData("tables");
ksqlStatementStrategy.test("list tables;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
}
@Test
public void shouldSerializeTopicsResponse() {
JsonNode node = getResponseWithData("topics");
ksqlStatementStrategy.test("list topics;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
}
@Test
public void shouldSerializeFunctionsResponse() {
JsonNode node = getResponseWithData("functions");
ksqlStatementStrategy.test("list functions;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node);
Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
}
@Test
public void shouldSerializeWithException() {
JsonNode node = getResponseWithData("streams");
ksqlStatementStrategy.test("list tables;");
Exception exception = assertThrows(
UnprocessableEntityException.class,
() -> ksqlStatementStrategy.serializeResponse(node)
);
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
}
@SneakyThrows
private JsonNode getResponseWithData(String key) {
JsonNode nodeWithDataItem = mapper.createObjectNode().put("header", "value");
JsonNode nodeWithData = mapper.createArrayNode().add(nodeWithDataItem);
JsonNode nodeWithResponse = mapper.createObjectNode().set(key, nodeWithData);
return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
}
}

View file

@ -19,46 +19,46 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
class SelectStrategyTest { class SelectStrategyTest {
private KsqlStatementStrategy ksqlStatementStrategy;
private final ObjectMapper mapper = new ObjectMapper(); private final ObjectMapper mapper = new ObjectMapper();
private SelectStrategy strategy;
@BeforeEach @BeforeEach
public void setUp() { void setUp() {
ksqlStatementStrategy = new SelectStrategy(); strategy = new SelectStrategy();
} }
@Test @Test
public void shouldReturnUri() { void shouldReturnUri() {
ksqlStatementStrategy.host("ksqldb-server:8088"); strategy.host("ksqldb-server:8088");
assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/query"); assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/query");
} }
@Test @Test
public void shouldReturnTrueInTest() { void shouldReturnTrueInTest() {
assertTrue(ksqlStatementStrategy.test("select * from users;")); assertTrue(strategy.test("select * from users;"));
} }
@Test @Test
public void shouldReturnFalseInTest() { void shouldReturnFalseInTest() {
assertFalse(ksqlStatementStrategy.test("show streams;")); assertFalse(strategy.test("show streams;"));
assertFalse(ksqlStatementStrategy.test("select *;")); assertFalse(strategy.test("select *;"));
} }
@Test @Test
public void shouldSerializeResponse() { void shouldSerializeResponse() {
JsonNode node = getResponseWithData(); JsonNode node = getResponseWithData();
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
Table table = serializedResponse.getData(); Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header1", "header2")); assertThat(table.getHeaders()).isEqualTo(List.of("header1", "header2"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value1", "value2"))); assertThat(table.getRows()).isEqualTo(List.of(List.of("value1", "value2")));
} }
@Test @Test
public void shouldSerializeWithException() { void shouldSerializeWithException() {
JsonNode node = mapper.createObjectNode(); JsonNode node = mapper.createObjectNode();
Exception exception = assertThrows( Exception exception = assertThrows(
UnprocessableEntityException.class, UnprocessableEntityException.class,
() -> ksqlStatementStrategy.serializeResponse(node) () -> strategy.serializeResponse(node)
); );
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");

View file

@ -19,107 +19,107 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
class ShowStrategyTest { class ShowStrategyTest {
private KsqlStatementStrategy ksqlStatementStrategy;
private final ObjectMapper mapper = new ObjectMapper(); private final ObjectMapper mapper = new ObjectMapper();
private ShowStrategy strategy;
@BeforeEach @BeforeEach
public void setUp() { void setUp() {
ksqlStatementStrategy = new ShowStrategy(); strategy = new ShowStrategy();
} }
@Test @Test
public void shouldReturnUri() { void shouldReturnUri() {
ksqlStatementStrategy.host("ksqldb-server:8088"); strategy.host("ksqldb-server:8088");
assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
} }
@Test @Test
public void shouldReturnTrueInTest() { void shouldReturnTrueInTest() {
assertTrue(ksqlStatementStrategy.test("SHOW STREAMS;")); assertTrue(strategy.test("SHOW STREAMS;"));
assertTrue(ksqlStatementStrategy.test("SHOW TABLES;")); assertTrue(strategy.test("SHOW TABLES;"));
assertTrue(ksqlStatementStrategy.test("SHOW TOPICS;")); assertTrue(strategy.test("SHOW TOPICS;"));
assertTrue(ksqlStatementStrategy.test("SHOW QUERIES;")); assertTrue(strategy.test("SHOW QUERIES;"));
assertTrue(ksqlStatementStrategy.test("SHOW PROPERTIES;")); assertTrue(strategy.test("SHOW PROPERTIES;"));
assertTrue(ksqlStatementStrategy.test("SHOW FUNCTIONS;")); assertTrue(strategy.test("SHOW FUNCTIONS;"));
assertTrue(strategy.test("LIST STREAMS;"));
assertTrue(strategy.test("LIST TABLES;"));
assertTrue(strategy.test("LIST TOPICS;"));
assertTrue(strategy.test("LIST FUNCTIONS;"));
} }
@Test @Test
public void shouldReturnFalseInTest() { void shouldReturnFalseInTest() {
assertFalse(ksqlStatementStrategy.test("LIST STREAMS;")); assertFalse(strategy.test("LIST QUERIES;"));
assertFalse(ksqlStatementStrategy.test("LIST TABLES;")); assertFalse(strategy.test("LIST PROPERTIES;"));
assertFalse(ksqlStatementStrategy.test("LIST TOPICS;"));
assertFalse(ksqlStatementStrategy.test("LIST QUERIES;"));
assertFalse(ksqlStatementStrategy.test("LIST PROPERTIES;"));
assertFalse(ksqlStatementStrategy.test("LIST FUNCTIONS;"));
} }
@Test @Test
public void shouldSerializeStreamsResponse() { void shouldSerializeStreamsResponse() {
JsonNode node = getResponseWithData("streams"); JsonNode node = getResponseWithData("streams");
ksqlStatementStrategy.test("show streams;"); strategy.test("show streams;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
Table table = serializedResponse.getData(); Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header")); assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
} }
@Test @Test
public void shouldSerializeTablesResponse() { void shouldSerializeTablesResponse() {
JsonNode node = getResponseWithData("tables"); JsonNode node = getResponseWithData("tables");
ksqlStatementStrategy.test("show tables;"); strategy.test("show tables;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
Table table = serializedResponse.getData(); Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header")); assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
} }
@Test @Test
public void shouldSerializeTopicsResponse() { void shouldSerializeTopicsResponse() {
JsonNode node = getResponseWithData("topics"); JsonNode node = getResponseWithData("topics");
ksqlStatementStrategy.test("show topics;"); strategy.test("show topics;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
Table table = serializedResponse.getData(); Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header")); assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
} }
@Test @Test
public void shouldSerializePropertiesResponse() { void shouldSerializePropertiesResponse() {
JsonNode node = getResponseWithData("properties"); JsonNode node = getResponseWithData("properties");
ksqlStatementStrategy.test("show properties;"); strategy.test("show properties;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
Table table = serializedResponse.getData(); Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header")); assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
} }
@Test @Test
public void shouldSerializeFunctionsResponse() { void shouldSerializeFunctionsResponse() {
JsonNode node = getResponseWithData("functions"); JsonNode node = getResponseWithData("functions");
ksqlStatementStrategy.test("show functions;"); strategy.test("show functions;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
Table table = serializedResponse.getData(); Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header")); assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
} }
@Test @Test
public void shouldSerializeQueriesResponse() { void shouldSerializeQueriesResponse() {
JsonNode node = getResponseWithData("queries"); JsonNode node = getResponseWithData("queries");
ksqlStatementStrategy.test("show queries;"); strategy.test("show queries;");
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
Table table = serializedResponse.getData(); Table table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header")); assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value"))); assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
} }
@Test @Test
public void shouldSerializeWithException() { void shouldSerializeWithException() {
JsonNode node = getResponseWithData("streams"); JsonNode node = getResponseWithData("streams");
ksqlStatementStrategy.test("show tables;"); strategy.test("show tables;");
Exception exception = assertThrows( Exception exception = assertThrows(
UnprocessableEntityException.class, UnprocessableEntityException.class,
() -> ksqlStatementStrategy.serializeResponse(node) () -> strategy.serializeResponse(node)
); );
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");

View file

@ -18,46 +18,46 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
class TerminateStrategyTest { class TerminateStrategyTest {
private final ObjectMapper mapper = new ObjectMapper(); private final ObjectMapper mapper = new ObjectMapper();
private KsqlStatementStrategy ksqlStatementStrategy; private TerminateStrategy strategy;
@BeforeEach @BeforeEach
public void setUp() { void setUp() {
ksqlStatementStrategy = new TerminateStrategy(); strategy = new TerminateStrategy();
} }
@Test @Test
public void shouldReturnUri() { void shouldReturnUri() {
ksqlStatementStrategy.host("ksqldb-server:8088"); strategy.host("ksqldb-server:8088");
assertThat(ksqlStatementStrategy.getUri()).isEqualTo("ksqldb-server:8088/ksql"); assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
} }
@Test @Test
public void shouldReturnTrueInTest() { void shouldReturnTrueInTest() {
assertTrue(ksqlStatementStrategy.test("terminate query_id;")); assertTrue(strategy.test("terminate query_id;"));
} }
@Test @Test
public void shouldReturnFalseInTest() { void shouldReturnFalseInTest() {
assertFalse(ksqlStatementStrategy.test("show streams;")); assertFalse(strategy.test("show streams;"));
assertFalse(ksqlStatementStrategy.test("create table test;")); assertFalse(strategy.test("create table test;"));
} }
@Test @Test
public void shouldSerializeResponse() { void shouldSerializeResponse() {
String message = "query terminated."; String message = "query terminated.";
JsonNode node = getResponseWithMessage(message); JsonNode node = getResponseWithMessage(message);
KsqlCommandResponse serializedResponse = ksqlStatementStrategy.serializeResponse(node); KsqlCommandResponse serializedResponse = strategy.serializeResponse(node);
assertThat(serializedResponse.getMessage()).isEqualTo(message); assertThat(serializedResponse.getMessage()).isEqualTo(message);
} }
@Test @Test
public void shouldSerializeWithException() { void shouldSerializeWithException() {
JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage"); JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode)); JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
Exception exception = assertThrows( Exception exception = assertThrows(
UnprocessableEntityException.class, UnprocessableEntityException.class,
() -> ksqlStatementStrategy.serializeResponse(node) () -> strategy.serializeResponse(node)
); );
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error"); assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");