|
@@ -23,6 +23,7 @@ import com.provectus.kafka.ui.model.KafkaConnectCluster;
|
|
import com.provectus.kafka.ui.model.NewConnectorDTO;
|
|
import com.provectus.kafka.ui.model.NewConnectorDTO;
|
|
import com.provectus.kafka.ui.model.TaskDTO;
|
|
import com.provectus.kafka.ui.model.TaskDTO;
|
|
import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
|
|
import com.provectus.kafka.ui.model.connect.InternalConnectInfo;
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.function.Function;
|
|
import java.util.function.Function;
|
|
@@ -47,6 +48,7 @@ public class KafkaConnectService {
|
|
private final ClusterMapper clusterMapper;
|
|
private final ClusterMapper clusterMapper;
|
|
private final KafkaConnectMapper kafkaConnectMapper;
|
|
private final KafkaConnectMapper kafkaConnectMapper;
|
|
private final ObjectMapper objectMapper;
|
|
private final ObjectMapper objectMapper;
|
|
|
|
+ private final KafkaConfigSanitizer kafkaConfigSanitizer;
|
|
|
|
|
|
public Mono<Flux<ConnectDTO>> getConnects(KafkaCluster cluster) {
|
|
public Mono<Flux<ConnectDTO>> getConnects(KafkaCluster cluster) {
|
|
return Mono.just(
|
|
return Mono.just(
|
|
@@ -187,13 +189,19 @@ public class KafkaConnectService {
|
|
e -> emptyStatus(connectorName))
|
|
e -> emptyStatus(connectorName))
|
|
.map(connectorStatus -> {
|
|
.map(connectorStatus -> {
|
|
var status = connectorStatus.getConnector();
|
|
var status = connectorStatus.getConnector();
|
|
|
|
+ final Map<String, Object> obfuscatedConfig = connector.getConfig().entrySet()
|
|
|
|
+ .stream()
|
|
|
|
+ .collect(Collectors.toMap(
|
|
|
|
+ Map.Entry::getKey,
|
|
|
|
+ e -> kafkaConfigSanitizer.sanitize(e.getKey(), e.getValue())
|
|
|
|
+ ));
|
|
ConnectorDTO result = (ConnectorDTO) new ConnectorDTO()
|
|
ConnectorDTO result = (ConnectorDTO) new ConnectorDTO()
|
|
.connect(connectName)
|
|
.connect(connectName)
|
|
.status(kafkaConnectMapper.fromClient(status))
|
|
.status(kafkaConnectMapper.fromClient(status))
|
|
.type(connector.getType())
|
|
.type(connector.getType())
|
|
.tasks(connector.getTasks())
|
|
.tasks(connector.getTasks())
|
|
.name(connector.getName())
|
|
.name(connector.getName())
|
|
- .config(connector.getConfig());
|
|
|
|
|
|
+ .config(obfuscatedConfig);
|
|
|
|
|
|
if (connectorStatus.getTasks() != null) {
|
|
if (connectorStatus.getTasks() != null) {
|
|
boolean isAnyTaskFailed = connectorStatus.getTasks().stream()
|
|
boolean isAnyTaskFailed = connectorStatus.getTasks().stream()
|
|
@@ -223,7 +231,13 @@ public class KafkaConnectService {
|
|
return getConnectAddress(cluster, connectName)
|
|
return getConnectAddress(cluster, connectName)
|
|
.flatMap(connect ->
|
|
.flatMap(connect ->
|
|
KafkaConnectClients.withBaseUrl(connect).getConnectorConfig(connectorName)
|
|
KafkaConnectClients.withBaseUrl(connect).getConnectorConfig(connectorName)
|
|
- );
|
|
|
|
|
|
+ )
|
|
|
|
+ .map(connectorConfig -> {
|
|
|
|
+ final Map<String, Object> obfuscatedMap = new HashMap<>();
|
|
|
|
+ connectorConfig.forEach((key, value) ->
|
|
|
|
+ obfuscatedMap.put(key, kafkaConfigSanitizer.sanitize(key, value)));
|
|
|
|
+ return obfuscatedMap;
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connectName,
|
|
public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connectName,
|