From cbd4e4a52adf8ca7b15c84a5c18331d6359eb51e Mon Sep 17 00:00:00 2001 From: Artem Kazlanzhy <87637857+akazlanzhy@users.noreply.github.com> Date: Thu, 7 Jul 2022 18:50:26 +0300 Subject: [PATCH] KSQLDB: Support Basic Auth (#2247) * Support basic authentication for KSQL server * Resolve mr issues Co-authored-by: Roman Zabaluev --- README.md | 2 ++ .../provectus/kafka/ui/client/KsqlClient.java | 5 ++++- .../kafka/ui/config/ClustersProperties.java | 9 +++++++++ .../kafka/ui/mapper/ClusterMapper.java | 20 +++++++++++++++++++ .../kafka/ui/model/InternalKsqlServer.java | 14 +++++++++++++ .../kafka/ui/model/KafkaCluster.java | 2 +- .../kafka/ui/service/KsqlService.java | 6 +++--- .../kafka/ui/service/ksql/KsqlApiClient.java | 17 +++++++++++++++- .../kafka/ui/service/KsqlServiceTest.java | 14 +++++++------ .../ui/service/ksql/KsqlApiClientTest.java | 4 +++- .../ui/service/ksql/KsqlServiceV2Test.java | 8 +++++--- 11 files changed, 85 insertions(+), 16 deletions(-) create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java diff --git a/README.md b/README.md index 5a0f51cf07..494909e673 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,8 @@ For example, if you want to use an environment variable to set the `name` parame |`KAFKA_CLUSTERS_0_NAME` | Cluster name |`KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS` |Address where to connect |`KAFKA_CLUSTERS_0_KSQLDBSERVER` | KSQL DB server address +|`KAFKA_CLUSTERS_0_KSQLDBSERVERAUTH_USERNAME` | KSQL DB server's basic authentication username +|`KAFKA_CLUSTERS_0_KSQLDBSERVERAUTH_PASSWORD` | KSQL DB server's basic authentication password |`KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL` |Security protocol to connect to the brokers. For SSL connection use "SSL", for plaintext connection don't set this environment variable |`KAFKA_CLUSTERS_0_SCHEMAREGISTRY` |SchemaRegistry's address |`KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME` |SchemaRegistry's basic authentication username diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java index 2e8026648d..8d051234ab 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/client/KsqlClient.java @@ -3,7 +3,9 @@ package com.provectus.kafka.ui.client; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; +import com.provectus.kafka.ui.service.ksql.KsqlApiClient; import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -23,9 +25,10 @@ public class KsqlClient { private final WebClient webClient; private final ObjectMapper mapper; - public Mono execute(BaseStrategy ksqlStatement) { + public Mono execute(BaseStrategy ksqlStatement, KafkaCluster cluster) { return webClient.post() .uri(ksqlStatement.getUri()) + .headers(httpHeaders -> KsqlApiClient.setBasicAuthIfEnabled(httpHeaders, cluster)) .accept(new MediaType("application", "vnd.ksql.v1+json")) .body(BodyInserters.fromValue(ksqlStatement.getKsqlCommand())) .retrieve() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java index 0d83e143c5..de4c8bc347 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/config/ClustersProperties.java @@ -8,6 +8,7 @@ import java.util.Properties; import java.util.Set; import javax.annotation.PostConstruct; import lombok.Data; +import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; @@ -26,6 +27,7 @@ public class ClustersProperties { String schemaRegistry; SchemaRegistryAuth schemaRegistryAuth; String ksqldbServer; + KsqldbServerAuth ksqldbServerAuth; String schemaNameTemplate = "%s-value"; String keySchemaNameTemplate = "%s-key"; String protobufFile; @@ -57,6 +59,13 @@ public class ClustersProperties { String password; } + @Data + @ToString(exclude = "password") + public static class KsqldbServerAuth { + String username; + String password; + } + @PostConstruct public void validateAndSetDefaults() { validateClusterNames(); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index 1eb6199f96..5709709e9b 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -17,6 +17,7 @@ import com.provectus.kafka.ui.model.Feature; import com.provectus.kafka.ui.model.InternalBrokerConfig; import com.provectus.kafka.ui.model.InternalBrokerDiskUsage; import com.provectus.kafka.ui.model.InternalClusterState; +import com.provectus.kafka.ui.model.InternalKsqlServer; import com.provectus.kafka.ui.model.InternalPartition; import com.provectus.kafka.ui.model.InternalReplica; import com.provectus.kafka.ui.model.InternalSchemaRegistry; @@ -53,6 +54,7 @@ public interface ClusterMapper { @Mapping(target = "protobufFile", source = "protobufFile", qualifiedByName = "resolvePath") @Mapping(target = "properties", source = "properties", qualifiedByName = "setProperties") @Mapping(target = "schemaRegistry", source = ".", qualifiedByName = "setSchemaRegistry") + @Mapping(target = "ksqldbServer", source = ".", qualifiedByName = "setKsqldbServer") KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties); ClusterStatsDTO toClusterStats(InternalClusterState clusterState); @@ -110,6 +112,24 @@ public interface ClusterMapper { return internalSchemaRegistry.build(); } + @Named("setKsqldbServer") + default InternalKsqlServer setKsqldbServer(ClustersProperties.Cluster clusterProperties) { + if (clusterProperties == null + || clusterProperties.getKsqldbServer() == null) { + return null; + } + + InternalKsqlServer.InternalKsqlServerBuilder internalKsqlServerBuilder = + InternalKsqlServer.builder().url(clusterProperties.getKsqldbServer()); + + if (clusterProperties.getKsqldbServerAuth() != null) { + internalKsqlServerBuilder.username(clusterProperties.getKsqldbServerAuth().getUsername()); + internalKsqlServerBuilder.password(clusterProperties.getKsqldbServerAuth().getPassword()); + } + + return internalKsqlServerBuilder.build(); + } + TopicDetailsDTO toTopicDetails(InternalTopic topic); @Mapping(target = "isReadOnly", source = "readOnly") diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java new file mode 100644 index 0000000000..a1c715bb58 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/InternalKsqlServer.java @@ -0,0 +1,14 @@ +package com.provectus.kafka.ui.model; + +import lombok.Builder; +import lombok.Data; +import lombok.ToString; + +@Data +@ToString(exclude = "password") +@Builder(toBuilder = true) +public class InternalKsqlServer { + private final String url; + private final String username; + private final String password; +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java index b9f1ea9676..eab02e789f 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/model/KafkaCluster.java @@ -21,7 +21,7 @@ public class KafkaCluster { private final String jmxPassword; private final String bootstrapServers; private final InternalSchemaRegistry schemaRegistry; - private final String ksqldbServer; + private final InternalKsqlServer ksqldbServer; private final List kafkaConnect; private final String schemaNameTemplate; private final String keySchemaNameTemplate; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java index 4e970dc0c8..6f74ede75e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/KsqlService.java @@ -28,10 +28,10 @@ public class KsqlService { e instanceof ClusterNotFoundException ? e : new KsqlDbNotFoundException(); return Mono.error(throwable); }) - .flatMap(host -> getStatementStrategyForKsqlCommand(ksqlCommand) - .map(statement -> statement.host(host)) + .flatMap(ksqlServer -> getStatementStrategyForKsqlCommand(ksqlCommand) + .map(statement -> statement.host(ksqlServer.getUrl())) ) - .flatMap(ksqlClient::execute); + .flatMap(baseStrategy -> ksqlClient.execute(baseStrategy, cluster)); } private Mono getStatementStrategyForKsqlCommand( diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java index 161c284c97..b83da666aa 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java @@ -8,6 +8,7 @@ import static ksql.KsqlGrammarParser.UndefineVariableContext; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.TextNode; +import com.provectus.kafka.ui.exception.ValidationException; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.service.ksql.response.ResponseParser; import java.util.List; @@ -18,6 +19,7 @@ import lombok.Builder; import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.springframework.core.codec.DecodingException; +import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.util.MimeTypeUtils; @@ -79,12 +81,25 @@ public class KsqlApiClient { .build(); return WebClient.builder() .codecs(c -> c.defaultCodecs().maxInMemorySize((int) maxBuffSize.toBytes())) + .defaultHeaders(httpHeaders -> setBasicAuthIfEnabled(httpHeaders, cluster)) .exchangeStrategies(exchangeStrategies) .build(); } + public static void setBasicAuthIfEnabled(HttpHeaders headers, KafkaCluster cluster) { + String username = cluster.getKsqldbServer().getUsername(); + String password = cluster.getKsqldbServer().getPassword(); + if (username != null && password != null) { + headers.setBasicAuth(username, password); + } else if (username != null) { + throw new ValidationException("You specified username but did not specify password"); + } else if (password != null) { + throw new ValidationException("You specified password but did not specify username"); + } + } + private String baseKsqlDbUri() { - return cluster.getKsqldbServer(); + return cluster.getKsqldbServer().getUrl(); } private KsqlRequest ksqlRequest(String ksql, Map streamProperties) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java index f41b595e79..ed434efba4 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/KsqlServiceTest.java @@ -2,6 +2,7 @@ package com.provectus.kafka.ui.service; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -9,6 +10,7 @@ import static org.mockito.Mockito.when; import com.provectus.kafka.ui.client.KsqlClient; import com.provectus.kafka.ui.exception.KsqlDbNotFoundException; import com.provectus.kafka.ui.exception.UnprocessableEntityException; +import com.provectus.kafka.ui.model.InternalKsqlServer; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KsqlCommandDTO; import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; @@ -62,7 +64,7 @@ class KsqlServiceTest { KsqlCommandDTO command = (new KsqlCommandDTO()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');"); KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); - when(kafkaCluster.getKsqldbServer()).thenReturn("localhost:8088"); + when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url("localhost:8088").build()); StepVerifier.create(ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command))) .verifyError(UnprocessableEntityException.class); @@ -77,8 +79,8 @@ class KsqlServiceTest { KsqlCommandDTO command = (new KsqlCommandDTO()).ksql("describe streams;"); KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); - when(kafkaCluster.getKsqldbServer()).thenReturn(host); - when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponseDTO())); + when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url(host).build()); + when(ksqlClient.execute(any(), any())).thenReturn(Mono.just(new KsqlCommandResponseDTO())); ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)).block(); assertThat(alternativeStrategy.getUri()).isEqualTo(host + "/ksql"); @@ -90,12 +92,12 @@ class KsqlServiceTest { KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class); KsqlCommandResponseDTO response = new KsqlCommandResponseDTO().message("success"); - when(kafkaCluster.getKsqldbServer()).thenReturn("host"); - when(ksqlClient.execute(any())).thenReturn(Mono.just(response)); + when(kafkaCluster.getKsqldbServer()).thenReturn(InternalKsqlServer.builder().url("host").build()); + when(ksqlClient.execute(any(), any())).thenReturn(Mono.just(response)); KsqlCommandResponseDTO receivedResponse = ksqlService.executeKsqlCommand(kafkaCluster, Mono.just(command)).block(); - verify(ksqlClient, times(1)).execute(alternativeStrategy); + verify(ksqlClient, times(1)).execute(eq(alternativeStrategy), any()); assertThat(receivedResponse).isEqualTo(response); } diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java index 5956d73082..a797175091 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlApiClientTest.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.TextNode; import com.provectus.kafka.ui.AbstractIntegrationTest; import com.provectus.kafka.ui.container.KsqlDbContainer; +import com.provectus.kafka.ui.model.InternalKsqlServer; import com.provectus.kafka.ui.model.KafkaCluster; import java.time.Duration; import java.util.List; @@ -42,7 +43,8 @@ class KsqlApiClientTest extends AbstractIntegrationTest { // Tutorial is here: https://ksqldb.io/quickstart.html @Test void ksqTutorialQueriesWork() { - var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(), maxBuffSize); + var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer( + InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build(), maxBuffSize); execCommandSync(client, "CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) " + "WITH (kafka_topic='locations', value_format='json', partitions=1);", diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java index d670680ea2..89823d643e 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ksql/KsqlServiceV2Test.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThat; import com.provectus.kafka.ui.AbstractIntegrationTest; import com.provectus.kafka.ui.container.KsqlDbContainer; +import com.provectus.kafka.ui.model.InternalKsqlServer; import com.provectus.kafka.ui.model.KafkaCluster; import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO; import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO; @@ -34,7 +35,8 @@ class KsqlServiceV2Test extends AbstractIntegrationTest { @AfterAll static void cleanup() { - var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(), maxBuffSize); + var client = new KsqlApiClient(KafkaCluster.builder().ksqldbServer( + InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build(), maxBuffSize); TABLES_TO_DELETE.forEach(t -> client.execute(String.format("DROP TABLE IF EXISTS %s DELETE TOPIC;", t), Map.of()) @@ -51,7 +53,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest { @Test void listStreamsReturnsAllKsqlStreams() { - var cluster = KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(); + var cluster = KafkaCluster.builder().ksqldbServer(InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build(); var streamName = "stream_" + System.currentTimeMillis(); STREAMS_TO_DELETE.add(streamName); @@ -80,7 +82,7 @@ class KsqlServiceV2Test extends AbstractIntegrationTest { @Test void listTablesReturnsAllKsqlTables() { - var cluster = KafkaCluster.builder().ksqldbServer(KSQL_DB.url()).build(); + var cluster = KafkaCluster.builder().ksqldbServer(InternalKsqlServer.builder().url(KSQL_DB.url()).build()).build(); var tableName = "table_" + System.currentTimeMillis(); TABLES_TO_DELETE.add(tableName);