|
@@ -16,9 +16,9 @@ import java.util.stream.Stream;
|
|
|
import java.util.stream.StreamSupport;
|
|
|
|
|
|
public abstract class BaseStrategy {
|
|
|
- protected static final String ksqlRequestPath = "/ksql";
|
|
|
- protected static final String queryRequestPath = "/query";
|
|
|
- private static final String mappingExceptionMessage = "KSQL DB response mapping error";
|
|
|
+ protected static final String KSQL_REQUEST_PATH = "/ksql";
|
|
|
+ protected static final String QUERY_REQUEST_PATH = "/query";
|
|
|
+ private static final String MAPPING_EXCEPTION_ERROR = "KSQL DB response mapping error";
|
|
|
protected String host = null;
|
|
|
protected KsqlCommand ksqlCommand = null;
|
|
|
|
|
@@ -47,36 +47,39 @@ public abstract class BaseStrategy {
|
|
|
return this;
|
|
|
}
|
|
|
|
|
|
- protected KsqlCommandResponse serializeTableResponse(JsonNode response, String path) {
|
|
|
- if (response.isArray() && response.size() > 0) {
|
|
|
- KsqlCommandResponse commandResponse = new KsqlCommandResponse();
|
|
|
- JsonNode first = response.get(0);
|
|
|
- JsonNode items = first.path(path);
|
|
|
- Table table = items.isArray() ? getTableFromArray(items) : getTableFromObject(items);
|
|
|
- return commandResponse.data(table);
|
|
|
- }
|
|
|
- throw new UnprocessableEntityException(mappingExceptionMessage);
|
|
|
+ protected String getRequestPath() {
|
|
|
+ return BaseStrategy.KSQL_REQUEST_PATH;
|
|
|
}
|
|
|
|
|
|
- protected KsqlCommandResponse serializeMessageResponse(JsonNode response, String path) {
|
|
|
- if (response.isArray() && response.size() > 0) {
|
|
|
- KsqlCommandResponse commandResponse = new KsqlCommandResponse();
|
|
|
- JsonNode first = response.get(0);
|
|
|
- JsonNode item = first.path(path);
|
|
|
- return commandResponse.message(getMessageFromObject(item));
|
|
|
- }
|
|
|
- throw new UnprocessableEntityException(mappingExceptionMessage);
|
|
|
+ protected KsqlCommandResponse serializeTableResponse(JsonNode response, String key) {
|
|
|
+ JsonNode item = getResponseFirstItemValue(response, key);
|
|
|
+ Table table = item.isArray() ? getTableFromArray(item) : getTableFromObject(item);
|
|
|
+ return (new KsqlCommandResponse()).data(table);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected KsqlCommandResponse serializeMessageResponse(JsonNode response, String key) {
|
|
|
+ JsonNode item = getResponseFirstItemValue(response, key);
|
|
|
+ return (new KsqlCommandResponse()).message(getMessageFromObject(item));
|
|
|
}
|
|
|
|
|
|
protected KsqlCommandResponse serializeQueryResponse(JsonNode response) {
|
|
|
if (response.isArray() && response.size() > 0) {
|
|
|
- KsqlCommandResponse commandResponse = new KsqlCommandResponse();
|
|
|
Table table = (new Table())
|
|
|
.headers(getQueryResponseHeader(response))
|
|
|
.rows(getQueryResponseRows(response));
|
|
|
- return commandResponse.data(table);
|
|
|
+ return (new KsqlCommandResponse()).data(table);
|
|
|
}
|
|
|
- throw new UnprocessableEntityException(mappingExceptionMessage);
|
|
|
+ throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
|
|
|
+ }
|
|
|
+
|
|
|
+ private JsonNode getResponseFirstItemValue(JsonNode response, String key) {
|
|
|
+ if (response.isArray() && response.size() > 0) {
|
|
|
+ JsonNode first = response.get(0);
|
|
|
+ if (first.has(key)) {
|
|
|
+ return first.path(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
|
|
|
}
|
|
|
|
|
|
private List<String> getQueryResponseHeader(JsonNode response) {
|
|
@@ -124,7 +127,7 @@ public abstract class BaseStrategy {
|
|
|
if (node.isObject() && node.has("message")) {
|
|
|
return node.get("message").asText();
|
|
|
}
|
|
|
- throw new UnprocessableEntityException(mappingExceptionMessage);
|
|
|
+ throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
|
|
|
}
|
|
|
|
|
|
private List<List<String>> getTableRows(JsonNode node, List<String> keys) {
|
|
@@ -140,7 +143,7 @@ public abstract class BaseStrategy {
|
|
|
if (node.isArray() && node.size() > 0) {
|
|
|
return StreamSupport.stream(node.spliterator(), false);
|
|
|
}
|
|
|
- throw new UnprocessableEntityException(mappingExceptionMessage);
|
|
|
+ throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
|
|
|
}
|
|
|
|
|
|
private List<String> getJsonObjectKeys(JsonNode node) {
|
|
@@ -149,7 +152,7 @@ public abstract class BaseStrategy {
|
|
|
Spliterators.spliteratorUnknownSize(node.fieldNames(), Spliterator.ORDERED), false
|
|
|
).collect(Collectors.toList());
|
|
|
}
|
|
|
- throw new UnprocessableEntityException(mappingExceptionMessage);
|
|
|
+ throw new UnprocessableEntityException(MAPPING_EXCEPTION_ERROR);
|
|
|
}
|
|
|
|
|
|
private List<String> getJsonObjectValues(JsonNode node) {
|
|
@@ -159,7 +162,5 @@ public abstract class BaseStrategy {
|
|
|
|
|
|
public abstract KsqlCommandResponse serializeResponse(JsonNode response);
|
|
|
|
|
|
- protected abstract String getRequestPath();
|
|
|
-
|
|
|
protected abstract String getTestRegExp();
|
|
|
}
|