KsqlController.java 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.KsqlApi;
  3. import com.provectus.kafka.ui.model.KsqlCommandV2DTO;
  4. import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO;
  5. import com.provectus.kafka.ui.model.KsqlResponseDTO;
  6. import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
  7. import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
  8. import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
  9. import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
  10. import java.util.List;
  11. import java.util.Map;
  12. import java.util.Optional;
  13. import lombok.RequiredArgsConstructor;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.springframework.http.ResponseEntity;
  16. import org.springframework.web.bind.annotation.RestController;
  17. import org.springframework.web.server.ServerWebExchange;
  18. import reactor.core.publisher.Flux;
  19. import reactor.core.publisher.Mono;
  20. @RestController
  21. @RequiredArgsConstructor
  22. @Slf4j
  23. public class KsqlController extends AbstractController implements KsqlApi {
  24. private final KsqlServiceV2 ksqlServiceV2;
  25. @Override
  26. public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
  27. Mono<KsqlCommandV2DTO>
  28. ksqlCommand2Dto,
  29. ServerWebExchange exchange) {
  30. return ksqlCommand2Dto.map(dto -> {
  31. var id = ksqlServiceV2.registerCommand(
  32. getCluster(clusterName),
  33. dto.getKsql(),
  34. Optional.ofNullable(dto.getStreamsProperties()).orElse(Map.of()));
  35. return new KsqlCommandV2ResponseDTO().pipeId(id);
  36. }).map(ResponseEntity::ok);
  37. }
  38. @Override
  39. public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
  40. String pipeId,
  41. ServerWebExchange exchange) {
  42. return Mono.just(
  43. ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
  44. .map(table -> new KsqlResponseDTO()
  45. .table(
  46. new KsqlTableResponseDTO()
  47. .header(table.getHeader())
  48. .columnNames(table.getColumnNames())
  49. .values((List<List<Object>>) ((List<?>) (table.getValues())))))));
  50. }
  51. @Override
  52. public Mono<ResponseEntity<Flux<KsqlStreamDescriptionDTO>>> listStreams(String clusterName,
  53. ServerWebExchange exchange) {
  54. return Mono.just(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))));
  55. }
  56. @Override
  57. public Mono<ResponseEntity<Flux<KsqlTableDescriptionDTO>>> listTables(String clusterName,
  58. ServerWebExchange exchange) {
  59. return Mono.just(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))));
  60. }
  61. }