Drop deprecated ksql api (#2796)

Co-authored-by: iliax <ikuramshin@provectus.com>
This commit is contained in:
Ilya Kuramshin 2022-10-23 19:49:11 +04:00 committed by GitHub
parent 4558466ff6
commit d60808a2f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 1 additions and 1199 deletions

View file

@ -1,53 +0,0 @@
package com.provectus.kafka.ui.client;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
@Slf4j
public class KsqlClient {
private final WebClient webClient;
private final ObjectMapper mapper;
public Mono<KsqlCommandResponseDTO> execute(BaseStrategy ksqlStatement, KafkaCluster cluster) {
return webClient.post()
.uri(ksqlStatement.getUri())
.headers(httpHeaders -> KsqlApiClient.setBasicAuthIfEnabled(httpHeaders, cluster))
.accept(new MediaType("application", "vnd.ksql.v1+json"))
.body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
.retrieve()
.onStatus(HttpStatus::isError, this::getErrorMessage)
.bodyToMono(byte[].class)
.map(this::toJson)
.map(ksqlStatement::serializeResponse);
}
private Mono<Throwable> getErrorMessage(ClientResponse response) {
return response
.bodyToMono(byte[].class)
.map(this::toJson)
.map(jsonNode -> jsonNode.get("message").asText())
.flatMap(error -> Mono.error(new UnprocessableEntityException(error)));
}
@SneakyThrows
private JsonNode toJson(byte[] content) {
return this.mapper.readTree(content);
}
}

View file

@ -1,15 +1,12 @@
package com.provectus.kafka.ui.controller; package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.KsqlApi; import com.provectus.kafka.ui.api.KsqlApi;
import com.provectus.kafka.ui.model.KsqlCommandDTO;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.model.KsqlCommandV2DTO; import com.provectus.kafka.ui.model.KsqlCommandV2DTO;
import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO; import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO;
import com.provectus.kafka.ui.model.KsqlResponseDTO; import com.provectus.kafka.ui.model.KsqlResponseDTO;
import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO; import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO; import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
import com.provectus.kafka.ui.model.KsqlTableResponseDTO; import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
import com.provectus.kafka.ui.service.KsqlService;
import com.provectus.kafka.ui.service.ksql.KsqlServiceV2; import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -27,17 +24,8 @@ import reactor.core.publisher.Mono;
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
public class KsqlController extends AbstractController implements KsqlApi { public class KsqlController extends AbstractController implements KsqlApi {
private final KsqlService ksqlService;
private final KsqlServiceV2 ksqlServiceV2;
@Override private final KsqlServiceV2 ksqlServiceV2;
public Mono<ResponseEntity<KsqlCommandResponseDTO>> executeKsqlCommand(String clusterName,
Mono<KsqlCommandDTO>
ksqlCommand,
ServerWebExchange exchange) {
return ksqlService.executeKsqlCommand(getCluster(clusterName), ksqlCommand)
.map(ResponseEntity::ok);
}
@Override @Override
public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName, public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,

View file

@ -1,47 +0,0 @@
package com.provectus.kafka.ui.service;
import com.provectus.kafka.ui.client.KsqlClient;
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KsqlCommandDTO;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
import java.util.List;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
@Service
@RequiredArgsConstructor
public class KsqlService {
private final KsqlClient ksqlClient;
private final List<BaseStrategy> ksqlStatementStrategies;
public Mono<KsqlCommandResponseDTO> executeKsqlCommand(KafkaCluster cluster,
Mono<KsqlCommandDTO> ksqlCommand) {
return Mono.justOrEmpty(cluster)
.map(KafkaCluster::getKsqldbServer)
.onErrorResume(e -> {
Throwable throwable =
e instanceof ClusterNotFoundException ? e : new KsqlDbNotFoundException();
return Mono.error(throwable);
})
.flatMap(ksqlServer -> getStatementStrategyForKsqlCommand(ksqlCommand)
.map(statement -> statement.host(ksqlServer.getUrl()))
)
.flatMap(baseStrategy -> ksqlClient.execute(baseStrategy, cluster));
}
private Mono<BaseStrategy> getStatementStrategyForKsqlCommand(
Mono<KsqlCommandDTO> ksqlCommand) {
return ksqlCommand
.map(command -> ksqlStatementStrategies.stream()
.filter(s -> s.test(command.getKsql()))
.map(s -> s.ksqlCommand(command))
.findFirst())
.flatMap(Mono::justOrEmpty)
.switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql")));
}
}

View file

@ -1,166 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandDTO;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.model.TableDTO;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public abstract class BaseStrategy {
protected static final String KSQL_REQUEST_PATH = "/ksql";
protected static final String QUERY_REQUEST_PATH = "/query";
private static final String MAPPING_EXCEPTION_ERROR = "KSQL DB response mapping error";
protected String host = null;
protected KsqlCommandDTO ksqlCommand = null;
public String getUri() {
if (this.host != null) {
return this.host + this.getRequestPath();
}
throw new UnprocessableEntityException("Strategy doesn't have host");
}
public boolean test(String sql) {
return sql.trim().toLowerCase().matches(getTestRegExp());
}
public BaseStrategy host(String host) {
this.host = host;
return this;
}
public KsqlCommandDTO getKsqlCommand() {
return ksqlCommand;
}
public BaseStrategy ksqlCommand(KsqlCommandDTO ksqlCommand) {
this.ksqlCommand = ksqlCommand;
return this;
}
protected String getRequestPath() {
return BaseStrategy.KSQL_REQUEST_PATH;
}
protected KsqlCommandResponseDTO serializeTableResponse(JsonNode response, String key) {
JsonNode item = getResponseFirstItemValue(response, key);
TableDTO table = item.isArray() ? getTableFromArray(item) : getTableFromObject(item);
return (new KsqlCommandResponseDTO()).data(table);
}
protected KsqlCommandResponseDTO serializeMessageResponse(JsonNode response, String key) {
JsonNode item = getResponseFirstItemValue(response, key);
return (new KsqlCommandResponseDTO()).message(getMessageFromObject(item));
}
protected KsqlCommandResponseDTO serializeQueryResponse(JsonNode response) {
if (response.isArray() && response.size() > 0) {
TableDTO table = (new TableDTO())
.headers(getQueryResponseHeader(response))
.rows(getQueryResponseRows(response));
return (new KsqlCommandResponseDTO()).data(table);
}
throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
}
private JsonNode getResponseFirstItemValue(JsonNode response, String key) {
if (response.isArray() && response.size() > 0) {
JsonNode first = response.get(0);
if (first.has(key)) {
return first.path(key);
}
}
throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
}
private List<String> getQueryResponseHeader(JsonNode response) {
JsonNode headerRow = response.get(0);
if (headerRow.isObject() && headerRow.has("header")) {
String schema = headerRow.get("header").get("schema").asText();
return Arrays.stream(schema.split(",")).map(String::trim).collect(Collectors.toList());
}
return new ArrayList<>();
}
private List<List<String>> getQueryResponseRows(JsonNode node) {
return getStreamForJsonArray(node)
.filter(row -> row.has("row") && row.get("row").has("columns"))
.map(row -> row.get("row").get("columns"))
.map(cellNode -> getStreamForJsonArray(cellNode)
.map(JsonNode::asText)
.collect(Collectors.toList())
)
.collect(Collectors.toList());
}
private TableDTO getTableFromArray(JsonNode node) {
TableDTO table = new TableDTO();
table.headers(new ArrayList<>()).rows(new ArrayList<>());
if (node.size() > 0) {
List<String> keys = getJsonObjectKeys(node.get(0));
List<List<String>> rows = getTableRows(node, keys);
table.headers(keys).rows(rows);
}
return table;
}
private TableDTO getTableFromObject(JsonNode node) {
List<String> keys = getJsonObjectKeys(node);
List<String> values = getJsonObjectValues(node);
List<List<String>> rows = IntStream
.range(0, keys.size())
.mapToObj(i -> List.of(keys.get(i), values.get(i)))
.collect(Collectors.toList());
return (new TableDTO()).headers(List.of("key", "value")).rows(rows);
}
private String getMessageFromObject(JsonNode node) {
if (node.isObject() && node.has("message")) {
return node.get("message").asText();
}
throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
}
private List<List<String>> getTableRows(JsonNode node, List<String> keys) {
return getStreamForJsonArray(node)
.map(row -> keys.stream()
.map(header -> row.get(header).asText())
.collect(Collectors.toList())
)
.collect(Collectors.toList());
}
private Stream<JsonNode> getStreamForJsonArray(JsonNode node) {
if (node.isArray() && node.size() > 0) {
return StreamSupport.stream(node.spliterator(), false);
}
throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
}
private List<String> getJsonObjectKeys(JsonNode node) {
if (node.isObject()) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false
).collect(Collectors.toList());
}
throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
}
private List<String> getJsonObjectValues(JsonNode node) {
return getJsonObjectKeys(node).stream().map(key -> node.get(key).asText())
.collect(Collectors.toList());
}
public abstract KsqlCommandResponseDTO serializeResponse(JsonNode response);
protected abstract String getTestRegExp();
}

View file

@ -1,20 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import org.springframework.stereotype.Component;
@Component
public class CreateStrategy extends BaseStrategy {
private static final String RESPONSE_VALUE_KEY = "commandStatus";
@Override
public KsqlCommandResponseDTO serializeResponse(JsonNode response) {
return serializeMessageResponse(response, RESPONSE_VALUE_KEY);
}
@Override
protected String getTestRegExp() {
return "create (table|stream)(.*)(with|as select(.*)from)(.*);";
}
}

View file

@ -1,20 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import org.springframework.stereotype.Component;
@Component
public class DescribeStrategy extends BaseStrategy {
private static final String RESPONSE_VALUE_KEY = "sourceDescription";
@Override
public KsqlCommandResponseDTO serializeResponse(JsonNode response) {
return serializeTableResponse(response, RESPONSE_VALUE_KEY);
}
@Override
protected String getTestRegExp() {
return "describe (.*);";
}
}

View file

@ -1,20 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import org.springframework.stereotype.Component;
@Component
public class DropStrategy extends BaseStrategy {
private static final String RESPONSE_VALUE_KEY = "commandStatus";
@Override
public KsqlCommandResponseDTO serializeResponse(JsonNode response) {
return serializeMessageResponse(response, RESPONSE_VALUE_KEY);
}
@Override
protected String getTestRegExp() {
return "drop (table|stream) (.*);";
}
}

View file

@ -1,20 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import org.springframework.stereotype.Component;
@Component
public class ExplainStrategy extends BaseStrategy {
private static final String RESPONSE_VALUE_KEY = "queryDescription";
@Override
public KsqlCommandResponseDTO serializeResponse(JsonNode response) {
return serializeTableResponse(response, RESPONSE_VALUE_KEY);
}
@Override
protected String getTestRegExp() {
return "explain (.*);";
}
}

View file

@ -1,24 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import org.springframework.stereotype.Component;
@Component
public class SelectStrategy extends BaseStrategy {
@Override
public KsqlCommandResponseDTO serializeResponse(JsonNode response) {
return serializeQueryResponse(response);
}
@Override
protected String getRequestPath() {
return BaseStrategy.QUERY_REQUEST_PATH;
}
@Override
protected String getTestRegExp() {
return "select (.*) from (.*);";
}
}

View file

@ -1,67 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandDTO;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import java.util.List;
import java.util.Optional;
import org.springframework.stereotype.Component;
@Component
public class ShowStrategy extends BaseStrategy {
private static final List<String> SHOW_STATEMENTS =
List.of("functions", "topics", "streams", "tables", "queries", "properties");
private static final List<String> LIST_STATEMENTS =
List.of("functions", "topics", "streams", "tables");
private String responseValueKey = "";
@Override
public KsqlCommandResponseDTO serializeResponse(JsonNode response) {
return serializeTableResponse(response, responseValueKey);
}
@Override
public boolean test(String sql) {
Optional<String> statement = SHOW_STATEMENTS.stream()
.filter(s -> testSql(sql, getShowRegExp(s)) || testSql(sql, getListRegExp(s)))
.findFirst();
if (statement.isPresent()) {
setResponseValueKey(statement.get());
return true;
}
return false;
}
@Override
protected String getTestRegExp() {
return "";
}
@Override
public BaseStrategy ksqlCommand(KsqlCommandDTO ksqlCommand) {
// return new instance to avoid conflicts for parallel requests
ShowStrategy clone = new ShowStrategy();
clone.setResponseValueKey(responseValueKey);
clone.ksqlCommand = ksqlCommand;
return clone;
}
protected String getShowRegExp(String key) {
return "show " + key + ";";
}
protected String getListRegExp(String key) {
if (LIST_STATEMENTS.contains(key)) {
return "list " + key + ";";
}
return "";
}
private void setResponseValueKey(String path) {
responseValueKey = path;
}
private boolean testSql(String sql, String pattern) {
return sql.trim().toLowerCase().matches(pattern);
}
}

View file

@ -1,20 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import org.springframework.stereotype.Component;
@Component
public class TerminateStrategy extends BaseStrategy {
private static final String RESPONSE_VALUE_KEY = "commandStatus";
@Override
public KsqlCommandResponseDTO serializeResponse(JsonNode response) {
return serializeMessageResponse(response, RESPONSE_VALUE_KEY);
}
@Override
protected String getTestRegExp() {
return "terminate (.*);";
}
}

View file

@ -1,104 +0,0 @@
package com.provectus.kafka.ui.service;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.provectus.kafka.ui.client.KsqlClient;
import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.InternalKsqlServer;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KsqlCommandDTO;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
import com.provectus.kafka.ui.strategy.ksql.statement.DescribeStrategy;
import com.provectus.kafka.ui.strategy.ksql.statement.ShowStrategy;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ExtendWith(MockitoExtension.class)
class KsqlServiceTest {
private KsqlService ksqlService;
private BaseStrategy baseStrategy;
private BaseStrategy alternativeStrategy;
@Mock
private ClustersStorage clustersStorage;
@Mock
private KsqlClient ksqlClient;
@BeforeEach
public void setUp() {
this.baseStrategy = new ShowStrategy();
this.alternativeStrategy = new DescribeStrategy();
this.ksqlService = new KsqlService(
this.ksqlClient,
List.of(baseStrategy, alternativeStrategy)
);
}
@Test
void shouldThrowKsqlDbNotFoundExceptionOnExecuteKsqlCommand() {
KsqlCommandDTO command = (new KsqlCommandDTO()).ksql("show streams;");
KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
when(kafkaCluster.getKsqldbServer()).thenReturn(null);
StepVerifier.create(ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)))
.verifyError(KsqlDbNotFoundException.class);
}
@Test
void shouldThrowUnprocessableEntityExceptionOnExecuteKsqlCommand() {
KsqlCommandDTO command =
(new KsqlCommandDTO()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');");
KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url("localhost:8088").build());
StepVerifier.create(ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)))
.verifyError(UnprocessableEntityException.class);
StepVerifier.create(ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)))
.verifyErrorMessage("Invalid sql");
}
@Test
void shouldSetHostToStrategy() {
String host = "localhost:8088";
KsqlCommandDTO command = (new KsqlCommandDTO()).ksql("describe streams;");
KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url(host).build());
when(ksqlClient.execute(any(), any())).thenReturn(Mono.just(new KsqlCommandResponseDTO()));
ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)).block();
assertThat(alternativeStrategy.getUri()).isEqualTo(host + "/ksql");
}
@Test
void shouldCallClientAndReturnResponse() {
KsqlCommandDTO command = (new KsqlCommandDTO()).ksql("describe streams;");
KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
KsqlCommandResponseDTO response = new KsqlCommandResponseDTO().message("success");
when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url("host").build());
when(ksqlClient.execute(any(), any())).thenReturn(Mono.just(response));
KsqlCommandResponseDTO receivedResponse =
ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)).block();
verify(ksqlClient, times(1)).execute(eq(alternativeStrategy), any());
assertThat(receivedResponse).isEqualTo(response);
}
}

View file

@ -1,85 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class CreateStrategyTest {
private final ObjectMapper mapper = new ObjectMapper();
private CreateStrategy strategy;
@BeforeEach
void setUp() {
strategy = new CreateStrategy();
}
@Test
void shouldReturnUri() {
strategy.host("ksqldb-server:8088");
assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
}
@Test
void shouldReturnTrueInTest() {
assertTrue(strategy.test("CREATE STREAM stream WITH (KAFKA_TOPIC='topic');"));
assertTrue(strategy.test("CREATE STREAM stream"
+ " AS SELECT users.id AS userid FROM users EMIT CHANGES;"
));
assertTrue(strategy.test(
"CREATE TABLE table (id VARCHAR) WITH (KAFKA_TOPIC='table');"
));
assertTrue(strategy.test(
"CREATE TABLE pageviews_regions WITH (KEY_FORMAT='JSON')"
+ " AS SELECT gender, COUNT(*) AS numbers"
+ " FROM pageviews EMIT CHANGES;"
));
}
@Test
void shouldReturnFalseInTest() {
assertFalse(strategy.test("show streams;"));
assertFalse(strategy.test("show tables;"));
assertFalse(strategy.test("CREATE TABLE test;"));
assertFalse(strategy.test("CREATE STREAM test;"));
}
@Test
void shouldSerializeResponse() {
String message = "updated successful";
JsonNode node = getResponseWithMessage(message);
KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node);
assertThat(serializedResponse.getMessage()).isEqualTo(message);
}
@Test
void shouldSerializeWithException() {
JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
Exception exception = assertThrows(
UnprocessableEntityException.class,
() -> strategy.serializeResponse(node)
);
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
}
@SneakyThrows
private JsonNode getResponseWithMessage(String message) {
JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message);
JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage);
return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
}
}

View file

@ -1,76 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.model.TableDTO;
import java.util.List;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class DescribeStrategyTest {
private final ObjectMapper mapper = new ObjectMapper();
private DescribeStrategy strategy;
@BeforeEach
void setUp() {
strategy = new DescribeStrategy();
}
@Test
void shouldReturnUri() {
strategy.host("ksqldb-server:8088");
assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
}
@Test
void shouldReturnTrueInTest() {
assertTrue(strategy.test("DESCRIBE users;"));
assertTrue(strategy.test("DESCRIBE EXTENDED users;"));
}
@Test
void shouldReturnFalseInTest() {
assertFalse(strategy.test("list streams;"));
assertFalse(strategy.test("show tables;"));
}
@Test
void shouldSerializeResponse() {
JsonNode node = getResponseWithObjectNode();
KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node);
TableDTO table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("key", "value"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka")));
}
@Test
void shouldSerializeWithException() {
JsonNode sourceDescriptionNode =
mapper.createObjectNode().put("sourceDescription", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode));
Exception exception = assertThrows(
UnprocessableEntityException.class,
() -> strategy.serializeResponse(node)
);
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
}
@SneakyThrows
private JsonNode getResponseWithObjectNode() {
JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka");
JsonNode nodeWithResponse = mapper.createObjectNode().set("sourceDescription", nodeWithMessage);
return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
}
}

View file

@ -1,75 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class DropStrategyTest {
private final ObjectMapper mapper = new ObjectMapper();
private DropStrategy strategy;
@BeforeEach
void setUp() {
strategy = new DropStrategy();
}
@Test
void shouldReturnUri() {
strategy.host("ksqldb-server:8088");
assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
}
@Test
void shouldReturnTrueInTest() {
assertTrue(strategy.test("drop table table1;"));
assertTrue(strategy.test("drop stream stream2;"));
}
@Test
void shouldReturnFalseInTest() {
assertFalse(strategy.test("show streams;"));
assertFalse(strategy.test("show tables;"));
assertFalse(strategy.test("create table test;"));
assertFalse(strategy.test("create stream test;"));
}
@Test
void shouldSerializeResponse() {
String message = "updated successful";
JsonNode node = getResponseWithMessage(message);
KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node);
assertThat(serializedResponse.getMessage()).isEqualTo(message);
}
@Test
void shouldSerializeWithException() {
JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
Exception exception = assertThrows(
UnprocessableEntityException.class,
() -> strategy.serializeResponse(node)
);
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
}
@SneakyThrows
private JsonNode getResponseWithMessage(String message) {
JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message);
JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage);
return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
}
}

View file

@ -1,74 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.model.TableDTO;
import java.util.List;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class ExplainStrategyTest {
private final ObjectMapper mapper = new ObjectMapper();
private ExplainStrategy strategy;
@BeforeEach
void setUp() {
strategy = new ExplainStrategy();
}
@Test
void shouldReturnUri() {
strategy.host("ksqldb-server:8088");
assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
}
@Test
void shouldReturnTrueInTest() {
assertTrue(strategy.test("explain users_query_id;"));
}
@Test
void shouldReturnFalseInTest() {
assertFalse(strategy.test("show queries;"));
}
@Test
void shouldSerializeResponse() {
JsonNode node = getResponseWithObjectNode();
KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node);
TableDTO table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("key", "value"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("name", "kafka")));
}
@Test
void shouldSerializeWithException() {
JsonNode sourceDescriptionNode =
mapper.createObjectNode().put("sourceDescription", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(sourceDescriptionNode));
Exception exception = assertThrows(
UnprocessableEntityException.class,
() -> strategy.serializeResponse(node)
);
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
}
@SneakyThrows
private JsonNode getResponseWithObjectNode() {
JsonNode nodeWithMessage = mapper.createObjectNode().put("name", "kafka");
JsonNode nodeWithResponse = mapper.createObjectNode().set("queryDescription", nodeWithMessage);
return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
}
}

View file

@ -1,79 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.model.TableDTO;
import java.util.List;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class SelectStrategyTest {
private final ObjectMapper mapper = new ObjectMapper();
private SelectStrategy strategy;
@BeforeEach
void setUp() {
strategy = new SelectStrategy();
}
@Test
void shouldReturnUri() {
strategy.host("ksqldb-server:8088");
assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/query");
}
@Test
void shouldReturnTrueInTest() {
assertTrue(strategy.test("select * from users;"));
}
@Test
void shouldReturnFalseInTest() {
assertFalse(strategy.test("show streams;"));
assertFalse(strategy.test("select *;"));
}
@Test
void shouldSerializeResponse() {
JsonNode node = getResponseWithData();
KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node);
TableDTO table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header1", "header2"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value1", "value2")));
}
@Test
void shouldSerializeWithException() {
JsonNode node = mapper.createObjectNode();
Exception exception = assertThrows(
UnprocessableEntityException.class,
() -> strategy.serializeResponse(node)
);
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
}
@SneakyThrows
private JsonNode getResponseWithData() {
JsonNode headerNode = mapper.createObjectNode().set(
"header", mapper.createObjectNode().put("schema", "header1, header2")
);
JsonNode row = mapper.createObjectNode().set(
"row", mapper.createObjectNode().set(
"columns", mapper.createArrayNode().add("value1").add("value2")
)
);
return mapper.createArrayNode().add(headerNode).add(row);
}
}

View file

@ -1,102 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.model.TableDTO;
import java.util.List;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DynamicTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class ShowStrategyTest {
private final ObjectMapper mapper = new ObjectMapper();
private ShowStrategy strategy;
@BeforeEach
void setUp() {
strategy = new ShowStrategy();
}
@Test
void shouldReturnUri() {
strategy.host("ksqldb-server:8088");
assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
}
@Test
void shouldReturnTrueInTest() {
assertTrue(strategy.test("SHOW STREAMS;"));
assertTrue(strategy.test("SHOW TABLES;"));
assertTrue(strategy.test("SHOW TOPICS;"));
assertTrue(strategy.test("SHOW QUERIES;"));
assertTrue(strategy.test("SHOW PROPERTIES;"));
assertTrue(strategy.test("SHOW FUNCTIONS;"));
assertTrue(strategy.test("LIST STREAMS;"));
assertTrue(strategy.test("LIST TABLES;"));
assertTrue(strategy.test("LIST TOPICS;"));
assertTrue(strategy.test("LIST FUNCTIONS;"));
}
@Test
void shouldReturnFalseInTest() {
assertFalse(strategy.test("LIST QUERIES;"));
assertFalse(strategy.test("LIST PROPERTIES;"));
}
@TestFactory
public Iterable<DynamicTest> shouldSerialize() {
return List.of(
shouldSerializeGenerate("streams", "show streams;"),
shouldSerializeGenerate("tables", "show tables;"),
shouldSerializeGenerate("topics", "show topics;"),
shouldSerializeGenerate("properties", "show properties;"),
shouldSerializeGenerate("functions", "show functions;"),
shouldSerializeGenerate("queries", "show queries;")
);
}
public DynamicTest shouldSerializeGenerate(final String key, final String sql) {
return DynamicTest.dynamicTest("Should serialize " + key,
() -> {
JsonNode node = getResponseWithData(key);
strategy.test(sql);
KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node);
TableDTO table = serializedResponse.getData();
assertThat(table.getHeaders()).isEqualTo(List.of("header"));
assertThat(table.getRows()).isEqualTo(List.of(List.of("value")));
}
);
}
@Test
void shouldSerializeWithException() {
JsonNode node = getResponseWithData("streams");
strategy.test("show tables;");
Exception exception = assertThrows(
UnprocessableEntityException.class,
() -> strategy.serializeResponse(node)
);
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
}
@SneakyThrows
private JsonNode getResponseWithData(String key) {
JsonNode nodeWithDataItem = mapper.createObjectNode().put("header", "value");
JsonNode nodeWithData = mapper.createArrayNode().add(nodeWithDataItem);
JsonNode nodeWithResponse = mapper.createObjectNode().set(key, nodeWithData);
return mapper.createArrayNode().add(mapper.valueToTree(nodeWithResponse));
}
}

View file

@ -1,72 +0,0 @@
package com.provectus.kafka.ui.strategy.ksql.statement;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.exception.UnprocessableEntityException;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class TerminateStrategyTest {
private final ObjectMapper mapper = new ObjectMapper();
private TerminateStrategy strategy;
@BeforeEach
void setUp() {
strategy = new TerminateStrategy();
}
@Test
void shouldReturnUri() {
strategy.host("ksqldb-server:8088");
assertThat(strategy.getUri()).isEqualTo("ksqldb-server:8088/ksql");
}
@Test
void shouldReturnTrueInTest() {
assertTrue(strategy.test("terminate query_id;"));
}
@Test
void shouldReturnFalseInTest() {
assertFalse(strategy.test("show streams;"));
assertFalse(strategy.test("create table test;"));
}
@Test
void shouldSerializeResponse() {
String message = "query terminated.";
JsonNode node = getResponseWithMessage(message);
KsqlCommandResponseDTO serializedResponse = strategy.serializeResponse(node);
assertThat(serializedResponse.getMessage()).isEqualTo(message);
}
@Test
void shouldSerializeWithException() {
JsonNode commandStatusNode = mapper.createObjectNode().put("commandStatus", "nodeWithMessage");
JsonNode node = mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
Exception exception = assertThrows(
UnprocessableEntityException.class,
() -> strategy.serializeResponse(node)
);
assertThat(exception.getMessage()).isEqualTo("KSQL DB response mapping error");
}
@SneakyThrows
private JsonNode getResponseWithMessage(String message) {
JsonNode nodeWithMessage = mapper.createObjectNode().put("message", message);
JsonNode commandStatusNode = mapper.createObjectNode().set("commandStatus", nodeWithMessage);
return mapper.createArrayNode().add(mapper.valueToTree(commandStatusNode));
}
}

View file

@ -1561,31 +1561,6 @@ paths:
200: 200:
description: OK description: OK
/api/clusters/{clusterName}/ksql:
description: Deprecated - use ksql/v2 instead!
post:
tags:
- Ksql
summary: executeKsqlCommand
operationId: executeKsqlCommand
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/KsqlCommand'
responses:
200:
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/KsqlCommandResponse'
/api/clusters/{clusterName}/ksql/v2: /api/clusters/{clusterName}/ksql/v2:
post: post:
@ -2986,18 +2961,6 @@ components:
items: items:
$ref: '#/components/schemas/ConnectorPluginConfig' $ref: '#/components/schemas/ConnectorPluginConfig'
KsqlCommand:
type: object
properties:
ksql:
type: string
streamsProperties:
type: object
additionalProperties:
type: string
required:
- ksql
KsqlCommandV2: KsqlCommandV2:
type: object type: object
properties: properties:
@ -3044,31 +3007,6 @@ components:
valueFormat: valueFormat:
type: string type: string
KsqlCommandResponse:
type: object
properties:
data:
$ref: '#/components/schemas/Table'
message:
type: string
Table:
type: object
properties:
headers:
type: array
items:
type: string
rows:
type: array
items:
type: array
items:
type: string
required:
- headers
- rows
KsqlResponse: KsqlResponse:
type: object type: object
properties: properties: