RESTART_ALL_TASKS & RESTART_FAILED_TASKS actions implemented, small refactoring (#1601)

RESTART_ALL_TASKS & RESTART_FAILED_TASKS kafka connect actions implemented, small refactoring (#1601)
This commit is contained in:
Ilya Kuramshin 2022-02-17 13:25:46 +03:00 committed by GitHub
parent b3ef8da446
commit b8c43f069e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 76 deletions

View file

@ -3,6 +3,7 @@ package com.provectus.kafka.ui.service;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.provectus.kafka.ui.client.KafkaConnectClients;
import com.provectus.kafka.ui.connect.api.KafkaConnectClientApi;
import com.provectus.kafka.ui.connect.model.ConnectorStatus;
import com.provectus.kafka.ui.connect.model.ConnectorStatusConnector;
import com.provectus.kafka.ui.connect.model.ConnectorTopics;
@ -17,6 +18,7 @@ import com.provectus.kafka.ui.model.ConnectorDTO;
import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponseDTO;
import com.provectus.kafka.ui.model.ConnectorPluginDTO;
import com.provectus.kafka.ui.model.ConnectorStateDTO;
import com.provectus.kafka.ui.model.ConnectorTaskStatusDTO;
import com.provectus.kafka.ui.model.FullConnectorInfoDTO;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.KafkaConnectCluster;
@ -119,12 +121,11 @@ public class KafkaConnectService {
private Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String connectClusterName,
String connectorName) {
return getConnectAddress(cluster, connectClusterName)
.flatMap(connectUrl -> KafkaConnectClients
.withBaseUrl(connectUrl)
.getConnectorTopics(connectorName)
.map(result -> result.get(connectorName))
);
return withConnectClient(cluster, connectClusterName)
.flatMap(c -> c.getConnectorTopics(connectorName).map(result -> result.get(connectorName)))
// old connectors don't have this api, setting empty list for
// backward-compatibility
.onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of())));
}
private Flux<Tuple2<String, String>> getConnectorNames(KafkaCluster cluster, String connectName) {
@ -143,17 +144,17 @@ public class KafkaConnectService {
}
public Flux<String> getConnectors(KafkaCluster cluster, String connectName) {
return getConnectAddress(cluster, connectName)
.flatMapMany(connect ->
KafkaConnectClients.withBaseUrl(connect).getConnectors(null)
return withConnectClient(cluster, connectName)
.flatMapMany(client ->
client.getConnectors(null)
.doOnError(e -> log.error("Unexpected error upon getting connectors", e))
);
}
public Mono<ConnectorDTO> createConnector(KafkaCluster cluster, String connectName,
Mono<NewConnectorDTO> connector) {
return getConnectAddress(cluster, connectName)
.flatMap(connectUrl ->
return withConnectClient(cluster, connectName)
.flatMap(client ->
connector
.flatMap(c -> connectorExists(cluster, connectName, c.getName())
.map(exists -> {
@ -164,7 +165,7 @@ public class KafkaConnectService {
return c;
}))
.map(kafkaConnectMapper::toClient)
.flatMap(c -> KafkaConnectClients.withBaseUrl(connectUrl).createConnector(c))
.flatMap(client::createConnector)
.flatMap(c -> getConnector(cluster, connectName, c.getName()))
);
}
@ -179,11 +180,11 @@ public class KafkaConnectService {
public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
String connectorName) {
return getConnectAddress(cluster, connectName)
.flatMap(connect -> KafkaConnectClients.withBaseUrl(connect).getConnector(connectorName)
return withConnectClient(cluster, connectName)
.flatMap(client -> client.getConnector(connectorName)
.map(kafkaConnectMapper::fromClient)
.flatMap(connector ->
KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName())
client.getConnectorStatus(connector.getName())
// status request can return 404 if tasks not assigned yet
.onErrorResume(WebClientResponseException.NotFound.class,
e -> emptyStatus(connectorName))
@ -228,10 +229,8 @@ public class KafkaConnectService {
public Mono<Map<String, Object>> getConnectorConfig(KafkaCluster cluster, String connectName,
String connectorName) {
return getConnectAddress(cluster, connectName)
.flatMap(connect ->
KafkaConnectClients.withBaseUrl(connect).getConnectorConfig(connectorName)
)
return withConnectClient(cluster, connectName)
.flatMap(c -> c.getConnectorConfig(connectorName))
.map(connectorConfig -> {
final Map<String, Object> obfuscatedMap = new HashMap<>();
connectorConfig.forEach((key, value) ->
@ -242,99 +241,94 @@ public class KafkaConnectService {
public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connectName,
String connectorName, Mono<Object> requestBody) {
return getConnectAddress(cluster, connectName)
.flatMap(connect ->
requestBody.flatMap(body ->
KafkaConnectClients.withBaseUrl(connect)
.setConnectorConfig(connectorName, (Map<String, Object>) body)
)
.map(kafkaConnectMapper::fromClient)
);
return withConnectClient(cluster, connectName)
.flatMap(c ->
requestBody
.flatMap(body -> c.setConnectorConfig(connectorName, (Map<String, Object>) body))
.map(kafkaConnectMapper::fromClient));
}
public Mono<Void> deleteConnector(
KafkaCluster cluster, String connectName, String connectorName) {
return getConnectAddress(cluster, connectName)
.flatMap(connect ->
KafkaConnectClients.withBaseUrl(connect).deleteConnector(connectorName)
);
return withConnectClient(cluster, connectName)
.flatMap(c -> c.deleteConnector(connectorName));
}
public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
String connectorName, ConnectorActionDTO action) {
Function<String, Mono<Void>> kafkaClientCall;
switch (action) {
case RESTART:
kafkaClientCall =
connect -> KafkaConnectClients.withBaseUrl(connect)
.restartConnector(connectorName, true, false);
break;
case PAUSE:
kafkaClientCall =
connect -> KafkaConnectClients.withBaseUrl(connect).pauseConnector(connectorName);
break;
case RESUME:
kafkaClientCall =
connect -> KafkaConnectClients.withBaseUrl(connect).resumeConnector(connectorName);
break;
default:
throw new IllegalStateException("Unexpected value: " + action);
}
return getConnectAddress(cluster, connectName)
.flatMap(kafkaClientCall);
return withConnectClient(cluster, connectName)
.flatMap(client -> {
switch (action) {
case RESTART:
return client.restartConnector(connectorName, false, false);
case RESTART_ALL_TASKS:
return restartTasks(cluster, connectName, connectorName, task -> true);
case RESTART_FAILED_TASKS:
return restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE:
return client.pauseConnector(connectorName);
case RESUME:
return client.resumeConnector(connectorName);
default:
throw new IllegalStateException("Unexpected value: " + action);
}
});
}
public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName,
String connectorName) {
return getConnectAddress(cluster, connectName)
.flatMapMany(connect ->
KafkaConnectClients.withBaseUrl(connect).getConnectorTasks(connectorName)
private Mono<Void> restartTasks(KafkaCluster cluster, String connectName,
String connectorName, Predicate<TaskDTO> taskFilter) {
return getConnectorTasks(cluster, connectName, connectorName)
.filter(taskFilter)
.flatMap(t ->
restartConnectorTask(cluster, connectName, connectorName, t.getId().getTask()))
.then();
}
public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName, String connectorName) {
return withConnectClient(cluster, connectName)
.flatMapMany(client ->
client.getConnectorTasks(connectorName)
.onErrorResume(WebClientResponseException.NotFound.class, e -> Flux.empty())
.map(kafkaConnectMapper::fromClient)
.flatMap(task ->
KafkaConnectClients.withBaseUrl(connect)
client
.getConnectorTaskStatus(connectorName, task.getId().getTask())
.onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
.map(kafkaConnectMapper::fromClient)
.map(task::status)
)
);
));
}
public Mono<Void> restartConnectorTask(KafkaCluster cluster, String connectName,
String connectorName, Integer taskId) {
return getConnectAddress(cluster, connectName)
.flatMap(connect ->
KafkaConnectClients.withBaseUrl(connect).restartConnectorTask(connectorName, taskId)
);
return withConnectClient(cluster, connectName)
.flatMap(client -> client.restartConnectorTask(connectorName, taskId));
}
public Mono<Flux<ConnectorPluginDTO>> getConnectorPlugins(KafkaCluster cluster,
String connectName) {
return Mono.just(getConnectAddress(cluster, connectName)
.flatMapMany(connect ->
KafkaConnectClients.withBaseUrl(connect).getConnectorPlugins()
.map(kafkaConnectMapper::fromClient)
));
return withConnectClient(cluster, connectName)
.map(client -> client.getConnectorPlugins().map(kafkaConnectMapper::fromClient));
}
public Mono<ConnectorPluginConfigValidationResponseDTO> validateConnectorPluginConfig(
KafkaCluster cluster, String connectName, String pluginName, Mono<Object> requestBody) {
return getConnectAddress(cluster, connectName)
.flatMap(connect ->
requestBody.flatMap(body ->
KafkaConnectClients.withBaseUrl(connect)
.validateConnectorPluginConfig(pluginName, (Map<String, Object>) body)
)
return withConnectClient(cluster, connectName)
.flatMap(client ->
requestBody
.flatMap(body ->
client.validateConnectorPluginConfig(pluginName, (Map<String, Object>) body))
.map(kafkaConnectMapper::fromClient)
);
}
private Mono<String> getConnectAddress(KafkaCluster cluster, String connectName) {
private Mono<KafkaConnectClientApi> withConnectClient(KafkaCluster cluster, String connectName) {
return Mono.justOrEmpty(cluster.getKafkaConnect().stream()
.filter(connect -> connect.getName().equals(connectName))
.findFirst()
.map(KafkaConnectCluster::getAddress))
.switchIfEmpty(Mono.error(ConnectNotFoundException::new));
.switchIfEmpty(Mono.error(ConnectNotFoundException::new))
.map(KafkaConnectClients::withBaseUrl);
}
}

View file

@ -2492,6 +2492,8 @@ components:
type: string
enum:
- RESTART
- RESTART_ALL_TASKS
- RESTART_FAILED_TASKS
- PAUSE
- RESUME