Преглед на файлове

Merge pull request #787 from provectus/#207/ksql-parallel_requests_fix

KSQL: fixed ShowStatement behaviour for parallel requests
Ilnur Farukhshin преди 3 години
родител
ревизия
4f14942c6d

+ 10 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/strategy/ksql/statement/ShowStrategy.java

@@ -1,6 +1,7 @@
 package com.provectus.kafka.ui.strategy.ksql.statement;
 package com.provectus.kafka.ui.strategy.ksql.statement;
 
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.provectus.kafka.ui.model.KsqlCommand;
 import com.provectus.kafka.ui.model.KsqlCommandResponse;
 import com.provectus.kafka.ui.model.KsqlCommandResponse;
 import java.util.List;
 import java.util.List;
 import java.util.Optional;
 import java.util.Optional;
@@ -36,6 +37,15 @@ public class ShowStrategy extends BaseStrategy {
     return "";
     return "";
   }
   }
 
 
+  @Override
+  public BaseStrategy ksqlCommand(KsqlCommand ksqlCommand) {
+    // return new instance to avoid conflicts for parallel requests
+    ShowStrategy clone = new ShowStrategy();
+    clone.setResponseValueKey(responseValueKey);
+    clone.ksqlCommand = ksqlCommand;
+    return clone;
+  }
+
   protected String getShowRegExp(String key) {
   protected String getShowRegExp(String key) {
     return "show " + key + ";";
     return "show " + key + ";";
   }
   }

+ 8 - 5
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java

@@ -14,6 +14,7 @@ import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KsqlCommand;
 import com.provectus.kafka.ui.model.KsqlCommand;
 import com.provectus.kafka.ui.model.KsqlCommandResponse;
 import com.provectus.kafka.ui.model.KsqlCommandResponse;
 import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
 import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
+import com.provectus.kafka.ui.strategy.ksql.statement.DescribeStrategy;
 import com.provectus.kafka.ui.strategy.ksql.statement.ShowStrategy;
 import com.provectus.kafka.ui.strategy.ksql.statement.ShowStrategy;
 import java.util.List;
 import java.util.List;
 import java.util.Optional;
 import java.util.Optional;
@@ -30,6 +31,7 @@ import reactor.test.StepVerifier;
 class KsqlServiceTest {
 class KsqlServiceTest {
   private KsqlService ksqlService;
   private KsqlService ksqlService;
   private BaseStrategy baseStrategy;
   private BaseStrategy baseStrategy;
+  private BaseStrategy alternativeStrategy;
 
 
   @Mock
   @Mock
   private ClustersStorage clustersStorage;
   private ClustersStorage clustersStorage;
@@ -40,10 +42,11 @@ class KsqlServiceTest {
   @BeforeEach
   @BeforeEach
   public void setUp() {
   public void setUp() {
     this.baseStrategy = new ShowStrategy();
     this.baseStrategy = new ShowStrategy();
+    this.alternativeStrategy = new DescribeStrategy();
     this.ksqlService = new KsqlService(
     this.ksqlService = new KsqlService(
         this.ksqlClient,
         this.ksqlClient,
         this.clustersStorage,
         this.clustersStorage,
-        List.of(baseStrategy)
+        List.of(baseStrategy, alternativeStrategy)
     );
     );
   }
   }
 
 
@@ -91,7 +94,7 @@ class KsqlServiceTest {
   void shouldSetHostToStrategy() {
   void shouldSetHostToStrategy() {
     String clusterName = "test";
     String clusterName = "test";
     String host = "localhost:8088";
     String host = "localhost:8088";
-    KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+    KsqlCommand command = (new KsqlCommand()).ksql("describe streams;");
     KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
     KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
 
 
     when(clustersStorage.getClusterByName(clusterName))
     when(clustersStorage.getClusterByName(clusterName))
@@ -100,13 +103,13 @@ class KsqlServiceTest {
     when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponse()));
     when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponse()));
 
 
     ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
     ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
-    assertThat(baseStrategy.getUri()).isEqualTo(host + "/ksql");
+    assertThat(alternativeStrategy.getUri()).isEqualTo(host + "/ksql");
   }
   }
 
 
   @Test
   @Test
   void shouldCallClientAndReturnResponse() {
   void shouldCallClientAndReturnResponse() {
     String clusterName = "test";
     String clusterName = "test";
-    KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
+    KsqlCommand command = (new KsqlCommand()).ksql("describe streams;");
     KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
     KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
     KsqlCommandResponse response = new KsqlCommandResponse().message("success");
     KsqlCommandResponse response = new KsqlCommandResponse().message("success");
 
 
@@ -117,7 +120,7 @@ class KsqlServiceTest {
 
 
     KsqlCommandResponse receivedResponse =
     KsqlCommandResponse receivedResponse =
         ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
         ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
-    verify(ksqlClient, times(1)).execute(baseStrategy);
+    verify(ksqlClient, times(1)).execute(alternativeStrategy);
     assertThat(receivedResponse).isEqualTo(response);
     assertThat(receivedResponse).isEqualTo(response);
 
 
   }
   }