|
@@ -3,14 +3,13 @@ package com.provectus.kafka.ui.service.ksql.response;
|
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
|
|
import com.fasterxml.jackson.databind.json.JsonMapper;
|
|
|
import com.fasterxml.jackson.databind.node.TextNode;
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.collect.Lists;
|
|
|
import com.provectus.kafka.ui.exception.KsqlApiException;
|
|
|
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Optional;
|
|
|
-import java.util.stream.Collectors;
|
|
|
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
|
|
|
|
|
public class ResponseParser {
|
|
@@ -24,11 +23,7 @@ public class ResponseParser {
|
|
|
return Optional.of(
|
|
|
KsqlApiClient.KsqlResponseTable.builder()
|
|
|
.header("Schema")
|
|
|
- .columnNames(
|
|
|
- Arrays.stream(jsonNode.get("header").get("schema").asText().split(","))
|
|
|
- .map(String::trim)
|
|
|
- .collect(Collectors.toList())
|
|
|
- )
|
|
|
+ .columnNames(parseSelectHeadersString(jsonNode.get("header").get("schema").asText()))
|
|
|
.build());
|
|
|
}
|
|
|
if (arrayFieldNonEmpty(jsonNode, "row")) {
|
|
@@ -46,6 +41,34 @@ public class ResponseParser {
|
|
|
return Optional.empty();
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ static List<String> parseSelectHeadersString(String str) {
|
|
|
+ List<String> headers = new ArrayList<>();
|
|
|
+ int structNesting = 0;
|
|
|
+ boolean quotes = false;
|
|
|
+ var headerBuilder = new StringBuilder();
|
|
|
+ for (char ch : str.toCharArray()) {
|
|
|
+ if (ch == '<') {
|
|
|
+ structNesting++;
|
|
|
+ } else if (ch == '>') {
|
|
|
+ structNesting--;
|
|
|
+ } else if (ch == '`') {
|
|
|
+ quotes = !quotes;
|
|
|
+ } else if (ch == ' ' && headerBuilder.isEmpty()) {
|
|
|
+ continue; //skipping leading & training whitespaces
|
|
|
+ } else if (ch == ',' && structNesting == 0 && !quotes) {
|
|
|
+ headers.add(headerBuilder.toString());
|
|
|
+ headerBuilder = new StringBuilder();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ headerBuilder.append(ch);
|
|
|
+ }
|
|
|
+ if (!headerBuilder.isEmpty()) {
|
|
|
+ headers.add(headerBuilder.toString());
|
|
|
+ }
|
|
|
+ return headers;
|
|
|
+ }
|
|
|
+
|
|
|
public static KsqlApiClient.KsqlResponseTable errorTableWithTextMsg(String errorText) {
|
|
|
return KsqlApiClient.KsqlResponseTable.builder()
|
|
|
.header("Execution error")
|