Преглед на файлове

[ISSUE-1773] Ksqldb 0.24 unexpected json stream ending workaround (#1791)

* Ksqldb 0.24 unexpected json stream ending workaround
* ksql db test added
* SchemaRegistryAwareRecordSerDe potential buffer underflow fixed
* SchemaRegistry magic bytes extracted to constants

Co-authored-by: iliax <ikuramshin@provectus.com>
Ilya Kuramshin преди 3 години
родител
ревизия
b4df8a73c8

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/serde/schemaregistry/SchemaRegistryAwareRecordSerDe.java

@@ -43,6 +43,9 @@ import org.apache.kafka.common.utils.Bytes;
 @Slf4j
 public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
 
+  private static final byte SR_RECORD_MAGIC_BYTE = (byte) 0;
+  private static final int SR_RECORD_PREFIX_LENGTH = 5;
+
   private static final StringMessageFormatter FALLBACK_FORMATTER = new StringMessageFormatter();
 
   private static final ProtobufSchemaConverter protoSchemaConverter = new ProtobufSchemaConverter();
@@ -260,7 +263,7 @@ public class SchemaRegistryAwareRecordSerDe implements RecordSerDe {
   private Optional<Integer> extractSchemaIdFromMsg(ConsumerRecord<Bytes, Bytes> msg, boolean isKey) {
     Bytes bytes = isKey ? msg.key() : msg.value();
     ByteBuffer buffer = ByteBuffer.wrap(bytes.get());
-    if (buffer.get() == 0 && buffer.remaining() > 4) {
+    if (buffer.remaining() > SR_RECORD_PREFIX_LENGTH && buffer.get() == SR_RECORD_MAGIC_BYTE) {
       int id = buffer.getInt();
       return Optional.of(id);
     }

+ 36 - 4
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java

@@ -6,6 +6,7 @@ import static ksql.KsqlGrammarParser.SingleStatementContext;
 import static ksql.KsqlGrammarParser.UndefineVariableContext;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.TextNode;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.ksql.response.ResponseParser;
@@ -16,10 +17,15 @@ import java.util.Set;
 import lombok.Builder;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.codec.DecodingException;
 import org.springframework.http.MediaType;
+import org.springframework.http.codec.json.Jackson2JsonDecoder;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.web.reactive.function.client.ExchangeStrategies;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 @Slf4j
 public class KsqlApiClient {
@@ -53,7 +59,20 @@ public class KsqlApiClient {
   }
 
   private WebClient webClient() {
-    return WebClient.create();
+    var exchangeStrategies = ExchangeStrategies.builder()
+        .codecs(configurer -> {
+          configurer.customCodecs()
+              .register(
+                  new Jackson2JsonDecoder(
+                      new ObjectMapper(),
+                      // some ksqldb versions do not set content-type header in response,
+                      // but we still need to use JsonDecoder for it
+                      MimeTypeUtils.APPLICATION_OCTET_STREAM));
+        })
+        .build();
+    return WebClient.builder()
+        .exchangeStrategies(exchangeStrategies)
+        .build();
   }
 
   private String baseKsqlDbUri() {
@@ -69,10 +88,11 @@ public class KsqlApiClient {
         .post()
         .uri(baseKsqlDbUri() + "/query")
         .accept(MediaType.parseMediaType("application/vnd.ksql.v1+json"))
-        .contentType(MediaType.parseMediaType("application/json"))
+        .contentType(MediaType.parseMediaType("application/vnd.ksql.v1+json"))
         .bodyValue(ksqlRequest(ksql, streamProperties))
         .retrieve()
         .bodyToFlux(JsonNode.class)
+        .onErrorResume(this::isUnexpectedJsonArrayEndCharException, th -> Mono.empty())
         .map(ResponseParser::parseSelectResponse)
         .filter(Optional::isPresent)
         .map(Optional::get)
@@ -80,6 +100,18 @@ public class KsqlApiClient {
             e -> Flux.just(ResponseParser.parseErrorResponse(e)));
   }
 
+  /**
+   * Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like <p/>
+   * <code>[{"header":{"queryId":"...","schema":"..."}}, ]</code>
+   * which will cause json parsing error and will be propagated to UI.
+   * This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed.
+   * To workaround this we need to check DecodingException err msg.
+   */
+  private boolean isUnexpectedJsonArrayEndCharException(Throwable th) {
+    return th instanceof DecodingException
+        && th.getMessage().contains("Unexpected character (']'");
+  }
+
   private Flux<KsqlResponseTable> executeStatement(String ksql,
                                                    Map<String, String> streamProperties) {
     return webClient()
@@ -126,7 +158,7 @@ public class KsqlApiClient {
     }
     Flux<KsqlResponseTable> outputFlux;
     if (KsqlGrammar.isSelect(statements.get(0))) {
-      outputFlux =  executeSelect(ksql, streamProperties);
+      outputFlux = executeSelect(ksql, streamProperties);
     } else {
       outputFlux = executeStatement(ksql, streamProperties);
     }
@@ -137,7 +169,7 @@ public class KsqlApiClient {
         });
   }
 
-  private  Flux<KsqlResponseTable> errorTableFlux(String errorText) {
+  private Flux<KsqlResponseTable> errorTableFlux(String errorText) {
     return Flux.just(ResponseParser.errorTableWithTextMsg(errorText));
   }
 

+ 39 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/container/KsqlDbContainer.java

@@ -0,0 +1,39 @@
+package com.provectus.kafka.ui.container;
+
+import java.time.Duration;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerImageName;
+
+public class KsqlDbContainer extends GenericContainer<KsqlDbContainer> {
+
+  private static final int PORT = 8088;
+
+  public KsqlDbContainer(DockerImageName imageName) {
+    super(imageName);
+    addExposedPort(PORT);
+    waitStrategy = Wait
+        .forHttp("/info")
+        .forStatusCode(200)
+        .withStartupTimeout(Duration.ofMinutes(5));
+  }
+
+  public KsqlDbContainer withKafka(KafkaContainer kafka) {
+    dependsOn(kafka);
+    String bootstrapServers = kafka.getNetworkAliases().get(0) + ":9092";
+    return withKafka(kafka.getNetwork(), bootstrapServers);
+  }
+
+  private KsqlDbContainer withKafka(Network network, String bootstrapServers) {
+    withNetwork(network);
+    withEnv("KSQL_LISTENERS", "http://0.0.0.0:" + PORT);
+    withEnv("KSQL_BOOTSTRAP_SERVERS", bootstrapServers);
+    return self();
+  }
+
+  public String url() {
+    return "http://" + getContainerIpAddress() + ":" + getMappedPort(PORT);
+  }
+}

+ 129 - 0
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java

@@ -0,0 +1,129 @@
+package com.provectus.kafka.ui.service.ksql;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.DoubleNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.provectus.kafka.ui.AbstractIntegrationTest;
+import com.provectus.kafka.ui.container.KsqlDbContainer;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+import reactor.test.StepVerifier;
+
+class KsqlApiClientTest extends AbstractIntegrationTest {
+
+  private static final KsqlDbContainer KSQL_DB = new KsqlDbContainer(
+      DockerImageName.parse("confluentinc/ksqldb-server").withTag("0.24.0"))
+      .withKafka(kafka);
+
+  @BeforeAll
+  static void startContainer() {
+    KSQL_DB.start();
+  }
+
+  @AfterAll
+  static void stopContainer() {
+    KSQL_DB.stop();
+  }
+
+  // Tutorial is here: https://ksqldb.io/quickstart.html
+  @Test
+  void ksqTutorialQueriesWork() {
+    var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build());
+    execCommandSync(client,
+        "CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) "
+            + "WITH (kafka_topic='locations', value_format='json', partitions=1);",
+        "CREATE TABLE currentLocation AS "
+            + "  SELECT profileId, "
+            + "         LATEST_BY_OFFSET(latitude) AS la, "
+            + "         LATEST_BY_OFFSET(longitude) AS lo "
+            + "  FROM riderlocations "
+            + "  GROUP BY profileId "
+            + "  EMIT CHANGES;",
+        "CREATE TABLE ridersNearMountainView AS "
+            + "  SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles, "
+            + "         COLLECT_LIST(profileId) AS riders, "
+            + "         COUNT(*) AS count "
+            + "  FROM currentLocation "
+            + "  GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);",
+        "INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205); ",
+        "INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643); ",
+        "INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813); ",
+        "INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813); ",
+        "INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822); ",
+        "INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);"
+    );
+
+    Awaitility.await()
+        .pollDelay(Duration.ofSeconds(1))
+        .atMost(Duration.ofSeconds(20))
+        .untilAsserted(() -> assertLastKsqTutorialQueryResult(client));
+  }
+
+  private void assertLastKsqTutorialQueryResult(KsqlApiClient client) {
+    // expected results:
+    //{"header":"Schema","columnNames":[...],"values":null}
+    //{"header":"Row","columnNames":null,"values":[[0.0,["4ab5cbad","8b6eae59","4a7c7b41"],3]]}
+    //{"header":"Row","columnNames":null,"values":[[10.0,["18f4ea86"],1]]}
+    StepVerifier.create(
+            client.execute(
+                "SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;",
+                Map.of()
+            )
+        )
+        .assertNext(header -> {
+          assertThat(header.getHeader()).isEqualTo("Schema");
+          assertThat(header.getColumnNames()).hasSize(3);
+          assertThat(header.getValues()).isNull();
+        })
+        .assertNext(row -> {
+          assertThat(row).isEqualTo(
+              KsqlApiClient.KsqlResponseTable.builder()
+                  .header("Row")
+                  .columnNames(null)
+                  .values(List.of(List.of(
+                      new DoubleNode(0.0),
+                      new ArrayNode(JsonNodeFactory.instance)
+                          .add(new TextNode("4ab5cbad"))
+                          .add(new TextNode("8b6eae59"))
+                          .add(new TextNode("4a7c7b41")),
+                      new IntNode(3)
+                  )))
+                  .build()
+          );
+        })
+        .assertNext(row -> {
+          assertThat(row).isEqualTo(
+              KsqlApiClient.KsqlResponseTable.builder()
+                  .header("Row")
+                  .columnNames(null)
+                  .values(List.of(List.of(
+                      new DoubleNode(10.0),
+                      new ArrayNode(JsonNodeFactory.instance)
+                          .add(new TextNode("18f4ea86")),
+                      new IntNode(1)
+                  )))
+                  .build()
+          );
+        })
+        .verifyComplete();
+  }
+
+  private void execCommandSync(KsqlApiClient client, String... ksqls) {
+    for (String ksql : ksqls) {
+      client.execute(ksql, Map.of()).collectList().block();
+    }
+  }
+
+
+}