KsqlController.java 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package com.provectus.kafka.ui.controller;
  2. import com.provectus.kafka.ui.api.KsqlApi;
  3. import com.provectus.kafka.ui.model.KsqlCommandV2DTO;
  4. import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO;
  5. import com.provectus.kafka.ui.model.KsqlResponseDTO;
  6. import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO;
  7. import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO;
  8. import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
  9. import com.provectus.kafka.ui.model.rbac.AccessContext;
  10. import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
  11. import com.provectus.kafka.ui.service.audit.AuditService;
  12. import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
  13. import com.provectus.kafka.ui.service.rbac.AccessControlService;
  14. import java.util.List;
  15. import java.util.Map;
  16. import java.util.Optional;
  17. import lombok.RequiredArgsConstructor;
  18. import lombok.extern.slf4j.Slf4j;
  19. import org.springframework.http.ResponseEntity;
  20. import org.springframework.web.bind.annotation.RestController;
  21. import org.springframework.web.server.ServerWebExchange;
  22. import reactor.core.publisher.Flux;
  23. import reactor.core.publisher.Mono;
  24. @RestController
  25. @RequiredArgsConstructor
  26. @Slf4j
  27. public class KsqlController extends AbstractController implements KsqlApi {
  28. private final KsqlServiceV2 ksqlServiceV2;
  29. private final AccessControlService accessControlService;
  30. private final AuditService auditService;
  31. @Override
  32. public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
  33. Mono<KsqlCommandV2DTO> ksqlCmdDo,
  34. ServerWebExchange exchange) {
  35. return ksqlCmdDo.flatMap(
  36. command -> {
  37. var context = AccessContext.builder()
  38. .cluster(clusterName)
  39. .ksqlActions(KsqlAction.EXECUTE)
  40. .operationName("executeKsql")
  41. .operationParams(command)
  42. .build();
  43. return accessControlService.validateAccess(context).thenReturn(
  44. new KsqlCommandV2ResponseDTO().pipeId(
  45. ksqlServiceV2.registerCommand(
  46. getCluster(clusterName),
  47. command.getKsql(),
  48. Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
  49. .doOnEach(sig -> auditService.audit(context, sig));
  50. }
  51. )
  52. .map(ResponseEntity::ok);
  53. }
  54. @Override
  55. public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
  56. String pipeId,
  57. ServerWebExchange exchange) {
  58. var context = AccessContext.builder()
  59. .cluster(clusterName)
  60. .ksqlActions(KsqlAction.EXECUTE)
  61. .operationName("openKsqlResponsePipe")
  62. .build();
  63. return accessControlService.validateAccess(context).thenReturn(
  64. ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
  65. .map(table -> new KsqlResponseDTO()
  66. .table(
  67. new KsqlTableResponseDTO()
  68. .header(table.getHeader())
  69. .columnNames(table.getColumnNames())
  70. .values((List<List<Object>>) ((List<?>) (table.getValues()))))))
  71. );
  72. }
  73. @Override
  74. public Mono<ResponseEntity<Flux<KsqlStreamDescriptionDTO>>> listStreams(String clusterName,
  75. ServerWebExchange exchange) {
  76. var context = AccessContext.builder()
  77. .cluster(clusterName)
  78. .ksqlActions(KsqlAction.EXECUTE)
  79. .operationName("listStreams")
  80. .build();
  81. return accessControlService.validateAccess(context)
  82. .thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
  83. .doOnEach(sig -> auditService.audit(context, sig));
  84. }
  85. @Override
  86. public Mono<ResponseEntity<Flux<KsqlTableDescriptionDTO>>> listTables(String clusterName,
  87. ServerWebExchange exchange) {
  88. var context = AccessContext.builder()
  89. .cluster(clusterName)
  90. .ksqlActions(KsqlAction.EXECUTE)
  91. .operationName("listTables")
  92. .build();
  93. return accessControlService.validateAccess(context)
  94. .thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
  95. .doOnEach(sig -> auditService.audit(context, sig));
  96. }
  97. }