Browse Source

KSQLDB: Support Basic Auth (#2247)

* Support basic authentication for KSQL server

* Resolve mr issues

Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Artem Kazlanzhy 3 years ago
parent
commit
cbd4e4a52a

+ 2 - 0
README.md

@@ -163,6 +163,8 @@ For example, if you want to use an environment variable to set the `name` parame
 |`KAFKA_CLUSTERS_0_NAME` | Cluster name
 |`KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS` 	|Address where to connect 
 |`KAFKA_CLUSTERS_0_KSQLDBSERVER` 	| KSQL DB server address 
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERAUTH_USERNAME` 	| KSQL DB server's basic authentication username 
+|`KAFKA_CLUSTERS_0_KSQLDBSERVERAUTH_PASSWORD` 	| KSQL DB server's basic authentication password 
 |`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` 	|Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY`   	|SchemaRegistry's address
 |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME`   	|SchemaRegistry's basic authentication username

+ 4 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java

@@ -3,7 +3,9 @@ package com.provectus.kafka.ui.client;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
+import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
 import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
 import lombok.RequiredArgsConstructor;
 import lombok.SneakyThrows;
@@ -23,9 +25,10 @@ public class KsqlClient {
   private final WebClient webClient;
   private final ObjectMapper mapper;
 
-  public Mono<KsqlCommandResponseDTO> execute(BaseStrategy ksqlStatement) {
+  public Mono<KsqlCommandResponseDTO> execute(BaseStrategy ksqlStatement, KafkaCluster cluster) {
     return webClient.post()
         .uri(ksqlStatement.getUri())
+        .headers(httpHeaders -> KsqlApiClient.setBasicAuthIfEnabled(httpHeaders, cluster))
         .accept(new MediaType("application", "vnd.ksql.v1+json"))
         .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand()))
         .retrieve()

+ 9 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java

@@ -8,6 +8,7 @@ import java.util.Properties;
 import java.util.Set;
 import javax.annotation.PostConstruct;
 import lombok.Data;
+import lombok.ToString;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.util.StringUtils;
@@ -26,6 +27,7 @@ public class ClustersProperties {
     String schemaRegistry;
     SchemaRegistryAuth schemaRegistryAuth;
     String ksqldbServer;
+    KsqldbServerAuth ksqldbServerAuth;
     String schemaNameTemplate = "%s-value";
     String keySchemaNameTemplate = "%s-key";
     String protobufFile;
@@ -57,6 +59,13 @@ public class ClustersProperties {
     String password;
   }
 
+  @Data
+  @ToString(exclude = "password")
+  public static class KsqldbServerAuth {
+    String username;
+    String password;
+  }
+
   @PostConstruct
   public void validateAndSetDefaults() {
     validateClusterNames();

+ 20 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java

@@ -17,6 +17,7 @@ import com.provectus.kafka.ui.model.Feature;
 import com.provectus.kafka.ui.model.InternalBrokerConfig;
 import com.provectus.kafka.ui.model.InternalBrokerDiskUsage;
 import com.provectus.kafka.ui.model.InternalClusterState;
+import com.provectus.kafka.ui.model.InternalKsqlServer;
 import com.provectus.kafka.ui.model.InternalPartition;
 import com.provectus.kafka.ui.model.InternalReplica;
 import com.provectus.kafka.ui.model.InternalSchemaRegistry;
@@ -53,6 +54,7 @@ public interface ClusterMapper {
   @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName = "resolvePath")
   @Mapping(target = "properties", source = "properties", qualifiedByName = "setProperties")
   @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry")
+  @Mapping(target = "ksqldbServer", source = ".", qualifiedByName = "setKsqldbServer")
   KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
 
   ClusterStatsDTO toClusterStats(InternalClusterState clusterState);
@@ -110,6 +112,24 @@ public interface ClusterMapper {
     return internalSchemaRegistry.build();
   }
 
+  @Named("setKsqldbServer")
+  default InternalKsqlServer setKsqldbServer(ClustersProperties.Cluster clusterProperties) {
+    if (clusterProperties == null
+            || clusterProperties.getKsqldbServer() == null) {
+      return null;
+    }
+
+    InternalKsqlServer.InternalKsqlServerBuilder internalKsqlServerBuilder =
+            InternalKsqlServer.builder().url(clusterProperties.getKsqldbServer());
+
+    if (clusterProperties.getKsqldbServerAuth() != null) {
+      internalKsqlServerBuilder.username(clusterProperties.getKsqldbServerAuth().getUsername());
+      internalKsqlServerBuilder.password(clusterProperties.getKsqldbServerAuth().getPassword());
+    }
+
+    return internalKsqlServerBuilder.build();
+  }
+
   TopicDetailsDTO toTopicDetails(InternalTopic topic);
 
   @Mapping(target = "isReadOnly", source = "readOnly")

+ 14 - 0
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java

@@ -0,0 +1,14 @@
+package com.provectus.kafka.ui.model;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString(exclude = "password")
+@Builder(toBuilder = true)
+public class InternalKsqlServer {
+  private final String url;
+  private final String username;
+  private final String password;
+}

+ 1 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java

@@ -21,7 +21,7 @@ public class KafkaCluster {
   private final String jmxPassword;
   private final String bootstrapServers;
   private final InternalSchemaRegistry schemaRegistry;
-  private final String ksqldbServer;
+  private final InternalKsqlServer ksqldbServer;
   private final List<KafkaConnectCluster> kafkaConnect;
   private final String schemaNameTemplate;
   private final String keySchemaNameTemplate;

+ 3 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java

@@ -28,10 +28,10 @@ public class KsqlService {
               e instanceof ClusterNotFoundException ? e : new KsqlDbNotFoundException();
           return Mono.error(throwable);
         })
-        .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand)
-            .map(statement -> statement.host(host))
+        .flatMap(ksqlServer -> getStatementStrategyForKsqlCommand(ksqlCommand)
+            .map(statement -> statement.host(ksqlServer.getUrl()))
         )
-        .flatMap(ksqlClient::execute);
+        .flatMap(baseStrategy -> ksqlClient.execute(baseStrategy, cluster));
   }
 
   private Mono<BaseStrategy> getStatementStrategyForKsqlCommand(

+ 16 - 1
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java

@@ -8,6 +8,7 @@ import static ksql.KsqlGrammarParser.UndefineVariableContext;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.TextNode;
+import com.provectus.kafka.ui.exception.ValidationException;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.ksql.response.ResponseParser;
 import java.util.List;
@@ -18,6 +19,7 @@ import lombok.Builder;
 import lombok.Value;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.core.codec.DecodingException;
+import org.springframework.http.HttpHeaders;
 import org.springframework.http.MediaType;
 import org.springframework.http.codec.json.Jackson2JsonDecoder;
 import org.springframework.util.MimeTypeUtils;
@@ -79,12 +81,25 @@ public class KsqlApiClient {
         .build();
     return WebClient.builder()
         .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes()))
+        .defaultHeaders(httpHeaders -> setBasicAuthIfEnabled(httpHeaders, cluster))
         .exchangeStrategies(exchangeStrategies)
         .build();
   }
 
+  public static void setBasicAuthIfEnabled(HttpHeaders headers, KafkaCluster cluster) {
+    String username = cluster.getKsqldbServer().getUsername();
+    String password = cluster.getKsqldbServer().getPassword();
+    if (username != null && password != null) {
+      headers.setBasicAuth(username, password);
+    } else if (username != null) {
+      throw new ValidationException("You specified username but did not specify password");
+    } else if (password != null) {
+      throw new ValidationException("You specified password but did not specify username");
+    }
+  }
+
   private String baseKsqlDbUri() {
-    return cluster.getKsqldbServer();
+    return cluster.getKsqldbServer().getUrl();
   }
 
   private KsqlRequest ksqlRequest(String ksql, Map<String, String> streamProperties) {

+ 8 - 6
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java

@@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -9,6 +10,7 @@ import static org.mockito.Mockito.when;
 import com.provectus.kafka.ui.client.KsqlClient;
 import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
 import com.provectus.kafka.ui.exception.UnprocessableEntityException;
+import com.provectus.kafka.ui.model.InternalKsqlServer;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KsqlCommandDTO;
 import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
@@ -62,7 +64,7 @@ class KsqlServiceTest {
     KsqlCommandDTO command =
         (new KsqlCommandDTO()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');");
     KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
-    when(kafkaCluster.getKsqldbServer()).thenReturn("localhost:8088");
+    when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url("localhost:8088").build());
 
     StepVerifier.create(ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)))
         .verifyError(UnprocessableEntityException.class);
@@ -77,8 +79,8 @@ class KsqlServiceTest {
     KsqlCommandDTO command = (new KsqlCommandDTO()).ksql("describe streams;");
     KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
 
-    when(kafkaCluster.getKsqldbServer()).thenReturn(host);
-    when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponseDTO()));
+    when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url(host).build());
+    when(ksqlClient.execute(any(), any())).thenReturn(Mono.just(new KsqlCommandResponseDTO()));
 
     ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)).block();
     assertThat(alternativeStrategy.getUri()).isEqualTo(host + "/ksql");
@@ -90,12 +92,12 @@ class KsqlServiceTest {
     KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
     KsqlCommandResponseDTO response = new KsqlCommandResponseDTO().message("success");
 
-    when(kafkaCluster.getKsqldbServer()).thenReturn("host");
-    when(ksqlClient.execute(any())).thenReturn(Mono.just(response));
+    when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url("host").build());
+    when(ksqlClient.execute(any(), any())).thenReturn(Mono.just(response));
 
     KsqlCommandResponseDTO receivedResponse =
         ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)).block();
-    verify(ksqlClient, times(1)).execute(alternativeStrategy);
+    verify(ksqlClient, times(1)).execute(eq(alternativeStrategy), any());
     assertThat(receivedResponse).isEqualTo(response);
 
   }

+ 3 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java

@@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.TextNode;
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.container.KsqlDbContainer;
+import com.provectus.kafka.ui.model.InternalKsqlServer;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import java.time.Duration;
 import java.util.List;
@@ -42,7 +43,8 @@ class KsqlApiClientTest extends AbstractIntegrationTest {
   // Tutorial is here: https://ksqldb.io/quickstart.html
   @Test
   void ksqTutorialQueriesWork() {
-    var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(), maxBuffSize);
+    var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(
+            InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build(), maxBuffSize);
     execCommandSync(client,
         "CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) "
             + "WITH (kafka_topic='locations', value_format='json', partitions=1);",

+ 5 - 3
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java

@@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import com.provectus.kafka.ui.AbstractIntegrationTest;
 import com.provectus.kafka.ui.container.KsqlDbContainer;
+import com.provectus.kafka.ui.model.InternalKsqlServer;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
 import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
@@ -34,7 +35,8 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
 
   @AfterAll
   static void cleanup() {
-    var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(), maxBuffSize);
+    var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(
+        InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build(), maxBuffSize);
 
     TABLES_TO_DELETE.forEach(t ->
         client.execute(String.format("DROP TABLE IF EXISTS %s DELETE TOPIC;", t), Map.of())
@@ -51,7 +53,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
 
   @Test
   void listStreamsReturnsAllKsqlStreams() {
-    var cluster = KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build();
+    var cluster = KafkaCluster.builder().ksqldbServer(InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build();
     var streamName = "stream_" + System.currentTimeMillis();
     STREAMS_TO_DELETE.add(streamName);
 
@@ -80,7 +82,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest {
 
   @Test
   void listTablesReturnsAllKsqlTables() {
-    var cluster = KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build();
+    var cluster = KafkaCluster.builder().ksqldbServer(InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build();
     var tableName = "table_" + System.currentTimeMillis();
     TABLES_TO_DELETE.add(tableName);