|
@@ -28,7 +28,6 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
import java.util.function.Predicate;
|
|
|
-import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
import javax.annotation.Nullable;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
@@ -39,7 +38,6 @@ import org.springframework.stereotype.Service;
|
|
|
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
-import reactor.util.function.Tuples;
|
|
|
|
|
|
@Service
|
|
|
@Slf4j
|
|
@@ -63,22 +61,20 @@ public class KafkaConnectService {
|
|
|
return getConnects(cluster)
|
|
|
.flatMap(connect ->
|
|
|
getConnectorNamesWithErrorsSuppress(cluster, connect.getName())
|
|
|
- .flatMap(connectorName -> getConnector(cluster, connect.getName(), connectorName)))
|
|
|
- .flatMap(connector ->
|
|
|
- getConnectorConfig(cluster, connector.getConnect(), connector.getName())
|
|
|
- .map(config -> InternalConnectInfo.builder().connector(connector).config(config).build()))
|
|
|
- .flatMap(connectInfo -> {
|
|
|
- ConnectorDTO connector = connectInfo.getConnector();
|
|
|
- return getConnectorTasks(cluster, connector.getConnect(), connector.getName())
|
|
|
- .collectList()
|
|
|
- .map(tasks -> connectInfo.toBuilder().tasks(tasks).build());
|
|
|
- })
|
|
|
- .flatMap(connectInfo -> {
|
|
|
- ConnectorDTO connector = connectInfo.getConnector();
|
|
|
- return getConnectorTopics(cluster, connector.getConnect(), connector.getName())
|
|
|
- .map(ct -> connectInfo.toBuilder().topics(ct.getTopics()).build());
|
|
|
- })
|
|
|
- .map(kafkaConnectMapper::fullConnectorInfoFromTuple)
|
|
|
+ .flatMap(connectorName ->
|
|
|
+ Mono.zip(
|
|
|
+ getConnector(cluster, connect.getName(), connectorName),
|
|
|
+ getConnectorConfig(cluster, connect.getName(), connectorName),
|
|
|
+ getConnectorTasks(cluster, connect.getName(), connectorName).collectList(),
|
|
|
+ getConnectorTopics(cluster, connect.getName(), connectorName)
|
|
|
+ ).map(tuple ->
|
|
|
+ InternalConnectInfo.builder()
|
|
|
+ .connector(tuple.getT1())
|
|
|
+ .config(tuple.getT2())
|
|
|
+ .tasks(tuple.getT3())
|
|
|
+ .topics(tuple.getT4().getTopics())
|
|
|
+ .build())))
|
|
|
+ .map(kafkaConnectMapper::fullConnectorInfo)
|
|
|
.filter(matchesSearchTerm(search));
|
|
|
}
|
|
|
|