123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package com.provectus.kafka.ui.service;
- import static org.assertj.core.api.Assertions.assertThat;
- import static org.mockito.ArgumentMatchers.any;
- import static org.mockito.Mockito.times;
- import static org.mockito.Mockito.verify;
- import static org.mockito.Mockito.when;
- import com.provectus.kafka.ui.client.KsqlClient;
- import com.provectus.kafka.ui.exception.ClusterNotFoundException;
- import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
- import com.provectus.kafka.ui.exception.UnprocessableEntityException;
- import com.provectus.kafka.ui.model.KafkaCluster;
- import com.provectus.kafka.ui.model.KsqlCommand;
- import com.provectus.kafka.ui.model.KsqlCommandResponse;
- import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
- import com.provectus.kafka.ui.strategy.ksql.statement.ShowStrategy;
- import java.util.List;
- import java.util.Optional;
- import org.junit.jupiter.api.BeforeEach;
- import org.junit.jupiter.api.Test;
- import org.junit.jupiter.api.extension.ExtendWith;
- import org.mockito.Mock;
- import org.mockito.Mockito;
- import org.mockito.junit.jupiter.MockitoExtension;
- import reactor.core.publisher.Mono;
- import reactor.test.StepVerifier;
- @ExtendWith(MockitoExtension.class)
- class KsqlServiceTest {
- private KsqlService ksqlService;
- private BaseStrategy baseStrategy;
- @Mock
- private ClustersStorage clustersStorage;
- @Mock
- private KsqlClient ksqlClient;
- @BeforeEach
- public void setUp() {
- this.baseStrategy = new ShowStrategy();
- this.ksqlService = new KsqlService(
- this.ksqlClient,
- this.clustersStorage,
- List.of(baseStrategy)
- );
- }
- @Test
- void shouldThrowClusterNotFoundExceptionOnExecuteKsqlCommand() {
- String clusterName = "test";
- KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
- when(clustersStorage.getClusterByName(clusterName)).thenReturn(Optional.ofNullable(null));
- StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
- .verifyError(ClusterNotFoundException.class);
- }
- @Test
- void shouldThrowKsqlDbNotFoundExceptionOnExecuteKsqlCommand() {
- String clusterName = "test";
- KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
- KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
- when(clustersStorage.getClusterByName(clusterName))
- .thenReturn(Optional.ofNullable(kafkaCluster));
- when(kafkaCluster.getKsqldbServer()).thenReturn(null);
- StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
- .verifyError(KsqlDbNotFoundException.class);
- }
- @Test
- void shouldThrowUnprocessableEntityExceptionOnExecuteKsqlCommand() {
- String clusterName = "test";
- KsqlCommand command =
- (new KsqlCommand()).ksql("CREATE STREAM users WITH (KAFKA_TOPIC='users');");
- KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
- when(clustersStorage.getClusterByName(clusterName))
- .thenReturn(Optional.ofNullable(kafkaCluster));
- when(kafkaCluster.getKsqldbServer()).thenReturn("localhost:8088");
- StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
- .verifyError(UnprocessableEntityException.class);
- StepVerifier.create(ksqlService.executeKsqlCommand(clusterName, Mono.just(command)))
- .verifyErrorMessage("Invalid sql");
- }
- @Test
- void shouldSetHostToStrategy() {
- String clusterName = "test";
- String host = "localhost:8088";
- KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
- KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
- when(clustersStorage.getClusterByName(clusterName))
- .thenReturn(Optional.ofNullable(kafkaCluster));
- when(kafkaCluster.getKsqldbServer()).thenReturn(host);
- when(ksqlClient.execute(any())).thenReturn(Mono.just(new KsqlCommandResponse()));
- ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
- assertThat(baseStrategy.getUri()).isEqualTo(host + "/ksql");
- }
- @Test
- void shouldCallClientAndReturnResponse() {
- String clusterName = "test";
- KsqlCommand command = (new KsqlCommand()).ksql("show streams;");
- KafkaCluster kafkaCluster = Mockito.mock(KafkaCluster.class);
- KsqlCommandResponse response = new KsqlCommandResponse().message("success");
- when(clustersStorage.getClusterByName(clusterName))
- .thenReturn(Optional.ofNullable(kafkaCluster));
- when(kafkaCluster.getKsqldbServer()).thenReturn("host");
- when(ksqlClient.execute(any())).thenReturn(Mono.just(response));
- KsqlCommandResponse receivedResponse =
- ksqlService.executeKsqlCommand(clusterName, Mono.just(command)).block();
- verify(ksqlClient, times(1)).execute(baseStrategy);
- assertThat(receivedResponse).isEqualTo(response);
- }
- }
|