KsqlController.java 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.KsqlApi;
  3. import com.provectus.kafka.ui.model.KsqlCommandV2DTO;
  4. import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO;
  5. import com.provectus.kafka.ui.model.KsqlResponseDTO;
  6. import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
  7. import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
  8. import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
  9. import com.provectus.kafka.ui.model.rbac.AccessContext;
  10. import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
  11. import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
  12. import java.util.List;
  13. import java.util.Map;
  14. import java.util.Optional;
  15. import lombok.RequiredArgsConstructor;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.springframework.http.ResponseEntity;
  18. import org.springframework.web.bind.annotation.RestController;
  19. import org.springframework.web.server.ServerWebExchange;
  20. import reactor.core.publisher.Flux;
  21. import reactor.core.publisher.Mono;
  22. @RestController
  23. @RequiredArgsConstructor
  24. @Slf4j
  25. public class KsqlController extends AbstractController implements KsqlApi {
  26. private final KsqlServiceV2 ksqlServiceV2;
  27. @Override
  28. public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
  29. Mono<KsqlCommandV2DTO> ksqlCmdDo,
  30. ServerWebExchange exchange) {
  31. return ksqlCmdDo.flatMap(
  32. command -> {
  33. var context = AccessContext.builder()
  34. .cluster(clusterName)
  35. .ksqlActions(KsqlAction.EXECUTE)
  36. .operationName("executeKsql")
  37. .operationParams(command)
  38. .build();
  39. return validateAccess(context).thenReturn(
  40. new KsqlCommandV2ResponseDTO().pipeId(
  41. ksqlServiceV2.registerCommand(
  42. getCluster(clusterName),
  43. command.getKsql(),
  44. Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
  45. .doOnEach(sig -> audit(context, sig));
  46. }
  47. )
  48. .map(ResponseEntity::ok);
  49. }
  50. @Override
  51. public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
  52. String pipeId,
  53. ServerWebExchange exchange) {
  54. var context = AccessContext.builder()
  55. .cluster(clusterName)
  56. .ksqlActions(KsqlAction.EXECUTE)
  57. .operationName("openKsqlResponsePipe")
  58. .build();
  59. return validateAccess(context).thenReturn(
  60. ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
  61. .map(table -> new KsqlResponseDTO()
  62. .table(
  63. new KsqlTableResponseDTO()
  64. .header(table.getHeader())
  65. .columnNames(table.getColumnNames())
  66. .values((List<List<Object>>) ((List<?>) (table.getValues()))))))
  67. );
  68. }
  69. @Override
  70. public Mono<ResponseEntity<Flux<KsqlStreamDescriptionDTO>>> listStreams(String clusterName,
  71. ServerWebExchange exchange) {
  72. var context = AccessContext.builder()
  73. .cluster(clusterName)
  74. .ksqlActions(KsqlAction.EXECUTE)
  75. .operationName("listStreams")
  76. .build();
  77. return validateAccess(context)
  78. .thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
  79. .doOnEach(sig -> audit(context, sig));
  80. }
  81. @Override
  82. public Mono<ResponseEntity<Flux<KsqlTableDescriptionDTO>>> listTables(String clusterName,
  83. ServerWebExchange exchange) {
  84. var context = AccessContext.builder()
  85. .cluster(clusterName)
  86. .ksqlActions(KsqlAction.EXECUTE)
  87. .operationName("listTables")
  88. .build();
  89. return validateAccess(context)
  90. .thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
  91. .doOnEach(sig -> audit(context, sig));
  92. }
  93. }