|
@@ -1,5 +1,9 @@
|
|
|
package com.provectus.kafka.ui.controller;
|
|
|
|
|
|
+import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART;
|
|
|
+import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART_ALL_TASKS;
|
|
|
+import static com.provectus.kafka.ui.model.ConnectorActionDTO.RESTART_FAILED_TASKS;
|
|
|
+
|
|
|
import com.provectus.kafka.ui.api.KafkaConnectApi;
|
|
|
import com.provectus.kafka.ui.model.ConnectDTO;
|
|
|
import com.provectus.kafka.ui.model.ConnectorActionDTO;
|
|
@@ -14,9 +18,11 @@ import com.provectus.kafka.ui.model.TaskDTO;
|
|
|
import com.provectus.kafka.ui.model.rbac.AccessContext;
|
|
|
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
|
|
|
import com.provectus.kafka.ui.service.KafkaConnectService;
|
|
|
+import com.provectus.kafka.ui.service.audit.AuditService;
|
|
|
import com.provectus.kafka.ui.service.rbac.AccessControlService;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import javax.validation.Valid;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
@@ -30,8 +36,12 @@ import reactor.core.publisher.Mono;
|
|
|
@RequiredArgsConstructor
|
|
|
@Slf4j
|
|
|
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
|
|
|
+ private static final Set<ConnectorActionDTO> RESTART_ACTIONS
|
|
|
+ = Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
|
|
|
+
|
|
|
private final KafkaConnectService kafkaConnectService;
|
|
|
private final AccessControlService accessControlService;
|
|
|
+ private final AuditService auditService;
|
|
|
|
|
|
@Override
|
|
|
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
|
|
@@ -47,15 +57,16 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
|
|
|
ServerWebExchange exchange) {
|
|
|
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
.connectActions(ConnectAction.VIEW)
|
|
|
- .build());
|
|
|
+ .operationName("getConnectors")
|
|
|
+ .build();
|
|
|
|
|
|
- return validateAccess.thenReturn(
|
|
|
- ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName))
|
|
|
- );
|
|
|
+ return accessControlService.validateAccess(context)
|
|
|
+ .thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
|
|
|
+ .doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -63,16 +74,17 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
@Valid Mono<NewConnectorDTO> connector,
|
|
|
ServerWebExchange exchange) {
|
|
|
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
.connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
|
|
|
- .build());
|
|
|
+ .operationName("createConnector")
|
|
|
+ .build();
|
|
|
|
|
|
- return validateAccess.then(
|
|
|
+ return accessControlService.validateAccess(context).then(
|
|
|
kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
|
|
|
.map(ResponseEntity::ok)
|
|
|
- );
|
|
|
+ ).doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -80,17 +92,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
String connectorName,
|
|
|
ServerWebExchange exchange) {
|
|
|
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
.connectActions(ConnectAction.VIEW)
|
|
|
.connector(connectorName)
|
|
|
- .build());
|
|
|
+ .operationName("getConnector")
|
|
|
+ .build();
|
|
|
|
|
|
- return validateAccess.then(
|
|
|
+ return accessControlService.validateAccess(context).then(
|
|
|
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
|
|
|
.map(ResponseEntity::ok)
|
|
|
- );
|
|
|
+ ).doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -98,16 +111,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
String connectorName,
|
|
|
ServerWebExchange exchange) {
|
|
|
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
|
|
- .build());
|
|
|
+ .operationName("deleteConnector")
|
|
|
+ .operationParams(Map.of("connectorName", connectName))
|
|
|
+ .build();
|
|
|
|
|
|
- return validateAccess.then(
|
|
|
+ return accessControlService.validateAccess(context).then(
|
|
|
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
|
|
|
.map(ResponseEntity::ok)
|
|
|
- );
|
|
|
+ ).doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
|
|
@@ -119,14 +134,23 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
SortOrderDTO sortOrder,
|
|
|
ServerWebExchange exchange
|
|
|
) {
|
|
|
+ var context = AccessContext.builder()
|
|
|
+ .cluster(clusterName)
|
|
|
+ .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
|
|
+ .operationName("getAllConnectors")
|
|
|
+ .build();
|
|
|
+
|
|
|
var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
|
|
|
? getConnectorsComparator(orderBy)
|
|
|
: getConnectorsComparator(orderBy).reversed();
|
|
|
+
|
|
|
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
|
|
|
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
|
|
|
- .filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName));
|
|
|
+ .filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName))
|
|
|
+ .sort(comparator);
|
|
|
|
|
|
- return Mono.just(ResponseEntity.ok(job.sort(comparator)));
|
|
|
+ return Mono.just(ResponseEntity.ok(job))
|
|
|
+ .doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -135,17 +159,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
String connectorName,
|
|
|
ServerWebExchange exchange) {
|
|
|
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
.connectActions(ConnectAction.VIEW)
|
|
|
- .build());
|
|
|
+ .operationName("getConnectorConfig")
|
|
|
+ .build();
|
|
|
|
|
|
- return validateAccess.then(
|
|
|
+ return accessControlService.validateAccess(context).then(
|
|
|
kafkaConnectService
|
|
|
.getConnectorConfig(getCluster(clusterName), connectName, connectorName)
|
|
|
.map(ResponseEntity::ok)
|
|
|
- );
|
|
|
+ ).doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -154,16 +179,19 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
Mono<Map<String, Object>> requestBody,
|
|
|
ServerWebExchange exchange) {
|
|
|
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
|
|
- .build());
|
|
|
-
|
|
|
- return validateAccess.then(
|
|
|
- kafkaConnectService
|
|
|
- .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
|
|
|
- .map(ResponseEntity::ok));
|
|
|
+ .operationName("setConnectorConfig")
|
|
|
+ .operationParams(Map.of("connectorName", connectorName))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ return accessControlService.validateAccess(context).then(
|
|
|
+ kafkaConnectService
|
|
|
+ .setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
|
|
|
+ .map(ResponseEntity::ok))
|
|
|
+ .doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -171,18 +199,26 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
String connectorName,
|
|
|
ConnectorActionDTO action,
|
|
|
ServerWebExchange exchange) {
|
|
|
+ ConnectAction[] connectActions;
|
|
|
+ if (RESTART_ACTIONS.contains(action)) {
|
|
|
+ connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.RESTART};
|
|
|
+ } else {
|
|
|
+ connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.EDIT};
|
|
|
+ }
|
|
|
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
- .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
|
|
- .build());
|
|
|
+ .connectActions(connectActions)
|
|
|
+ .operationName("updateConnectorState")
|
|
|
+ .operationParams(Map.of("connectorName", connectorName))
|
|
|
+ .build();
|
|
|
|
|
|
- return validateAccess.then(
|
|
|
+ return accessControlService.validateAccess(context).then(
|
|
|
kafkaConnectService
|
|
|
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
|
|
|
.map(ResponseEntity::ok)
|
|
|
- );
|
|
|
+ ).doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -190,17 +226,19 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
String connectName,
|
|
|
String connectorName,
|
|
|
ServerWebExchange exchange) {
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
.connectActions(ConnectAction.VIEW)
|
|
|
- .build());
|
|
|
+ .operationName("getConnectorTasks")
|
|
|
+ .operationParams(Map.of("connectorName", connectorName))
|
|
|
+ .build();
|
|
|
|
|
|
- return validateAccess.thenReturn(
|
|
|
+ return accessControlService.validateAccess(context).thenReturn(
|
|
|
ResponseEntity
|
|
|
.ok(kafkaConnectService
|
|
|
.getConnectorTasks(getCluster(clusterName), connectName, connectorName))
|
|
|
- );
|
|
|
+ ).doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -208,34 +246,37 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
String connectorName, Integer taskId,
|
|
|
ServerWebExchange exchange) {
|
|
|
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
- .connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
|
|
|
- .build());
|
|
|
+ .connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
|
|
|
+ .operationName("restartConnectorTask")
|
|
|
+ .operationParams(Map.of("connectorName", connectorName))
|
|
|
+ .build();
|
|
|
|
|
|
- return validateAccess.then(
|
|
|
+ return accessControlService.validateAccess(context).then(
|
|
|
kafkaConnectService
|
|
|
.restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
|
|
|
.map(ResponseEntity::ok)
|
|
|
- );
|
|
|
+ ).doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
|
|
|
String clusterName, String connectName, ServerWebExchange exchange) {
|
|
|
|
|
|
- Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
|
|
|
+ var context = AccessContext.builder()
|
|
|
.cluster(clusterName)
|
|
|
.connect(connectName)
|
|
|
.connectActions(ConnectAction.VIEW)
|
|
|
- .build());
|
|
|
+ .operationName("getConnectorPlugins")
|
|
|
+ .build();
|
|
|
|
|
|
- return validateAccess.then(
|
|
|
+ return accessControlService.validateAccess(context).then(
|
|
|
Mono.just(
|
|
|
ResponseEntity.ok(
|
|
|
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
|
|
|
- );
|
|
|
+ ).doOnEach(sig -> auditService.audit(context, sig));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -253,16 +294,11 @@ public class KafkaConnectController extends AbstractController implements KafkaC
|
|
|
if (orderBy == null) {
|
|
|
return defaultComparator;
|
|
|
}
|
|
|
- switch (orderBy) {
|
|
|
- case CONNECT:
|
|
|
- return Comparator.comparing(FullConnectorInfoDTO::getConnect);
|
|
|
- case TYPE:
|
|
|
- return Comparator.comparing(FullConnectorInfoDTO::getType);
|
|
|
- case STATUS:
|
|
|
- return Comparator.comparing(fullConnectorInfoDTO -> fullConnectorInfoDTO.getStatus().getState());
|
|
|
- case NAME:
|
|
|
- default:
|
|
|
- return defaultComparator;
|
|
|
- }
|
|
|
+ return switch (orderBy) {
|
|
|
+ case CONNECT -> Comparator.comparing(FullConnectorInfoDTO::getConnect);
|
|
|
+ case TYPE -> Comparator.comparing(FullConnectorInfoDTO::getType);
|
|
|
+ case STATUS -> Comparator.comparing(fullConnectorInfoDTO -> fullConnectorInfoDTO.getStatus().getState());
|
|
|
+ default -> defaultComparator;
|
|
|
+ };
|
|
|
}
|
|
|
}
|