KsqlService.java 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. package com.provectus.kafka.ui.service;
  2. import com.provectus.kafka.ui.client.KsqlClient;
  3. import com.provectus.kafka.ui.exception.ClusterNotFoundException;
  4. import com.provectus.kafka.ui.exception.KsqlDbNotFoundException;
  5. import com.provectus.kafka.ui.exception.UnprocessableEntityException;
  6. import com.provectus.kafka.ui.model.KafkaCluster;
  7. import com.provectus.kafka.ui.model.KsqlCommandDTO;
  8. import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
  9. import com.provectus.kafka.ui.strategy.ksql.statement.BaseStrategy;
  10. import java.util.List;
  11. import lombok.RequiredArgsConstructor;
  12. import org.springframework.stereotype.Service;
  13. import reactor.core.publisher.Mono;
  14. @Service
  15. @RequiredArgsConstructor
  16. public class KsqlService {
  17. private final KsqlClient ksqlClient;
  18. private final List<BaseStrategy> ksqlStatementStrategies;
  19. public Mono<KsqlCommandResponseDTO> executeKsqlCommand(KafkaCluster cluster,
  20. Mono<KsqlCommandDTO> ksqlCommand) {
  21. return Mono.justOrEmpty(cluster)
  22. .map(KafkaCluster::getKsqldbServer)
  23. .onErrorResume(e -> {
  24. Throwable throwable =
  25. e instanceof ClusterNotFoundException ? e : new KsqlDbNotFoundException();
  26. return Mono.error(throwable);
  27. })
  28. .flatMap(ksqlServer -> getStatementStrategyForKsqlCommand(ksqlCommand)
  29. .map(statement -> statement.host(ksqlServer.getUrl()))
  30. )
  31. .flatMap(baseStrategy -> ksqlClient.execute(baseStrategy, cluster));
  32. }
  33. private Mono<BaseStrategy> getStatementStrategyForKsqlCommand(
  34. Mono<KsqlCommandDTO> ksqlCommand) {
  35. return ksqlCommand
  36. .map(command -> ksqlStatementStrategies.stream()
  37. .filter(s -> s.test(command.getKsql()))
  38. .map(s -> s.ksqlCommand(command))
  39. .findFirst())
  40. .flatMap(Mono::justOrEmpty)
  41. .switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql")));
  42. }
  43. }