package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.KsqlApi; import com.provectus.kafka.ui.model.KsqlCommandV2DTO; import com.provectus.kafka.ui.model.KsqlCommandV2ResponseDTO; import com.provectus.kafka.ui.model.KsqlResponseDTO; import com.provectus.kafka.ui.model.KsqlStreamDescriptionDTO; import com.provectus.kafka.ui.model.KsqlTableDescriptionDTO; import com.provectus.kafka.ui.model.KsqlTableResponseDTO; import com.provectus.kafka.ui.model.rbac.AccessContext; import com.provectus.kafka.ui.model.rbac.permission.KsqlAction; import com.provectus.kafka.ui.service.audit.AuditService; import com.provectus.kafka.ui.service.ksql.KsqlServiceV2; import com.provectus.kafka.ui.service.rbac.AccessControlService; import java.util.List; import java.util.Map; import java.util.Optional; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @RestController @RequiredArgsConstructor @Slf4j public class KsqlController extends AbstractController implements KsqlApi { private final KsqlServiceV2 ksqlServiceV2; private final AccessControlService accessControlService; private final AuditService auditService; @Override public Mono> executeKsql(String clusterName, Mono ksqlCmdDo, ServerWebExchange exchange) { return ksqlCmdDo.flatMap( command -> { var context = AccessContext.builder() .cluster(clusterName) .ksqlActions(KsqlAction.EXECUTE) .operationName("executeKsql") .operationParams(command) .build(); return accessControlService.validateAccess(context).thenReturn( new KsqlCommandV2ResponseDTO().pipeId( ksqlServiceV2.registerCommand( getCluster(clusterName), command.getKsql(), Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of())))) .doOnEach(sig -> auditService.audit(context, sig)); } ) .map(ResponseEntity::ok); } @Override public Mono>> openKsqlResponsePipe(String clusterName, String pipeId, ServerWebExchange exchange) { var context = AccessContext.builder() .cluster(clusterName) .ksqlActions(KsqlAction.EXECUTE) .operationName("openKsqlResponsePipe") .build(); return accessControlService.validateAccess(context).thenReturn( ResponseEntity.ok(ksqlServiceV2.execute(pipeId) .map(table -> new KsqlResponseDTO() .table( new KsqlTableResponseDTO() .header(table.getHeader()) .columnNames(table.getColumnNames()) .values((List>) ((List) (table.getValues())))))) ); } @Override public Mono>> listStreams(String clusterName, ServerWebExchange exchange) { var context = AccessContext.builder() .cluster(clusterName) .ksqlActions(KsqlAction.EXECUTE) .operationName("listStreams") .build(); return accessControlService.validateAccess(context) .thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName)))) .doOnEach(sig -> auditService.audit(context, sig)); } @Override public Mono>> listTables(String clusterName, ServerWebExchange exchange) { var context = AccessContext.builder() .cluster(clusterName) .ksqlActions(KsqlAction.EXECUTE) .operationName("listTables") .build(); return accessControlService.validateAccess(context) .thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName)))) .doOnEach(sig -> auditService.audit(context, sig)); } }