|
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.provectus.kafka.ui.client.KafkaConnectClients;
|
|
|
import com.provectus.kafka.ui.connect.model.ConnectorTopics;
|
|
|
+import com.provectus.kafka.ui.connect.model.TaskStatus;
|
|
|
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
|
|
|
import com.provectus.kafka.ui.exception.ConnectNotFoundException;
|
|
|
import com.provectus.kafka.ui.mapper.ClusterMapper;
|
|
@@ -13,6 +14,7 @@ import com.provectus.kafka.ui.model.Connector;
|
|
|
import com.provectus.kafka.ui.model.ConnectorAction;
|
|
|
import com.provectus.kafka.ui.model.ConnectorPlugin;
|
|
|
import com.provectus.kafka.ui.model.ConnectorPluginConfigValidationResponse;
|
|
|
+import com.provectus.kafka.ui.model.ConnectorState;
|
|
|
import com.provectus.kafka.ui.model.FullConnectorInfo;
|
|
|
import com.provectus.kafka.ui.model.KafkaCluster;
|
|
|
import com.provectus.kafka.ui.model.KafkaConnectCluster;
|
|
@@ -161,23 +163,32 @@ public class KafkaConnectService {
|
|
|
public Mono<Connector> getConnector(String clusterName, String connectName,
|
|
|
String connectorName) {
|
|
|
return getConnectAddress(clusterName, connectName)
|
|
|
- .flatMap(connect ->
|
|
|
- KafkaConnectClients.withBaseUrl(connect).getConnector(connectorName)
|
|
|
- .map(kafkaConnectMapper::fromClient)
|
|
|
- .flatMap(connector ->
|
|
|
- KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName())
|
|
|
- .map(connectorStatus -> {
|
|
|
- var status = connectorStatus.getConnector();
|
|
|
- connector.status(kafkaConnectMapper.fromClient(status));
|
|
|
- return (Connector) new Connector()
|
|
|
- .connect(connectName)
|
|
|
- .status(kafkaConnectMapper.fromClient(status))
|
|
|
- .type(connector.getType())
|
|
|
- .tasks(connector.getTasks())
|
|
|
- .name(connector.getName())
|
|
|
- .config(connector.getConfig());
|
|
|
- })
|
|
|
- )
|
|
|
+ .flatMap(connect -> KafkaConnectClients.withBaseUrl(connect).getConnector(connectorName)
|
|
|
+ .map(kafkaConnectMapper::fromClient)
|
|
|
+ .flatMap(connector ->
|
|
|
+ KafkaConnectClients.withBaseUrl(connect).getConnectorStatus(connector.getName())
|
|
|
+ .map(connectorStatus -> {
|
|
|
+ var status = connectorStatus.getConnector();
|
|
|
+ Connector result = (Connector) new Connector()
|
|
|
+ .connect(connectName)
|
|
|
+ .status(kafkaConnectMapper.fromClient(status))
|
|
|
+ .type(connector.getType())
|
|
|
+ .tasks(connector.getTasks())
|
|
|
+ .name(connector.getName())
|
|
|
+ .config(connector.getConfig());
|
|
|
+
|
|
|
+ if (connectorStatus.getTasks() != null) {
|
|
|
+ boolean isAnyTaskFailed = connectorStatus.getTasks().stream()
|
|
|
+ .map(TaskStatus::getState)
|
|
|
+ .anyMatch(TaskStatus.StateEnum.FAILED::equals);
|
|
|
+
|
|
|
+ if (isAnyTaskFailed) {
|
|
|
+ result.getStatus().state(ConnectorState.TASK_FAILED);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ })
|
|
|
+ )
|
|
|
);
|
|
|
}
|
|
|
|