From e1f31d27c6659be419c5547c94bfbc828409a446 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Mon, 28 Mar 2022 09:10:06 +0300 Subject: [PATCH] ISSUE-1613: Fix Topic not found error after new topic creation * load topic after creation retry added * TopicService.recreateTopic,cloneTopic formatting improved * ALLOW_AUTO_CREATE_TOPICS_CONFIG set to false for all created consumers to avoid accidental topic creation Co-authored-by: iliax --- .../ui/service/ConsumerGroupService.java | 1 + .../kafka/ui/service/TopicsService.java | 96 ++++++++++++++----- 2 files changed, 71 insertions(+), 26 deletions(-) diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java index cba51a921a..beb9f84979 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ConsumerGroupService.java @@ -210,6 +210,7 @@ public class ConsumerGroupService { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); props.putAll(properties); return new KafkaConsumer<>(props); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index b4629cf39c..265103a7cb 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -71,6 +71,10 @@ public class TopicsService { private int recreateMaxRetries; @Value("${topic.recreate.delay.seconds:1}") private int recreateDelayInSeconds; + @Value("${topic.load.after.create.maxRetries:10}") + private int loadTopicAfterCreateRetries; + @Value("${topic.load.after.create.delay.ms:500}") + private int loadTopicAfterCreateDelayInMs; public Mono getTopics(KafkaCluster cluster, Optional pageNum, @@ -115,7 +119,32 @@ public class TopicsService { private Mono loadTopic(KafkaCluster c, String topicName) { return loadTopics(c, List.of(topicName)) - .map(lst -> lst.stream().findFirst().orElseThrow(TopicNotFoundException::new)); + .flatMap(lst -> lst.stream().findFirst() + .map(Mono::just) + .orElse(Mono.error(TopicNotFoundException::new))); + } + + /** + * After creation topic can be invisible via API for some time. + * To workaround this, we retyring topic loading until it becomes visible. + */ + private Mono loadTopicAfterCreation(KafkaCluster c, String topicName) { + return loadTopic(c, topicName) + .retryWhen( + Retry + .fixedDelay( + loadTopicAfterCreateRetries, + Duration.ofMillis(loadTopicAfterCreateDelayInMs) + ) + .filter(TopicNotFoundException.class::isInstance) + .onRetryExhaustedThrow((spec, sig) -> + new TopicMetadataException( + String.format( + "Error while loading created topic '%s' - topic is not visible via API " + + "after waiting for %d ms.", + topicName, + loadTopicAfterCreateDelayInMs * loadTopicAfterCreateRetries))) + ); } private List createList(List orderedNames, @@ -182,7 +211,7 @@ public class TopicsService { ).thenReturn(topicData) ) .onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage()))) - .flatMap(topicData -> loadTopic(c, topicData.getName())); + .flatMap(topicData -> loadTopicAfterCreation(c, topicData.getName())); } public Mono createTopic(KafkaCluster cluster, Mono topicCreation) { @@ -194,23 +223,30 @@ public class TopicsService { public Mono recreateTopic(KafkaCluster cluster, String topicName) { return loadTopic(cluster, topicName) .flatMap(t -> deleteTopic(cluster, topicName) - .thenReturn(t).delayElement(Duration.ofSeconds(recreateDelayInSeconds)) - .flatMap(topic -> adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(), - topic.getPartitionCount(), - (short) topic.getReplicationFactor(), - topic.getTopicConfigs() - .stream() - .collect(Collectors - .toMap(InternalTopicConfig::getName, - InternalTopicConfig::getValue))) - .thenReturn(topicName)) - .retryWhen(Retry.fixedDelay(recreateMaxRetries, - Duration.ofSeconds(recreateDelayInSeconds)) - .filter(TopicExistsException.class::isInstance) - .onRetryExhaustedThrow((a, b) -> - new TopicRecreationException(topicName, - recreateMaxRetries * recreateDelayInSeconds))) - .flatMap(a -> loadTopic(cluster, topicName)).map(clusterMapper::toTopic) + .thenReturn(t) + .delayElement(Duration.ofSeconds(recreateDelayInSeconds)) + .flatMap(topic -> + adminClientService.get(cluster) + .flatMap(ac -> + ac.createTopic( + topic.getName(), + topic.getPartitionCount(), + (short) topic.getReplicationFactor(), + topic.getTopicConfigs() + .stream() + .collect(Collectors.toMap(InternalTopicConfig::getName, + InternalTopicConfig::getValue)) + ) + .thenReturn(topicName) + ) + .retryWhen( + Retry.fixedDelay(recreateMaxRetries, Duration.ofSeconds(recreateDelayInSeconds)) + .filter(TopicExistsException.class::isInstance) + .onRetryExhaustedThrow((a, b) -> + new TopicRecreationException(topicName, + recreateMaxRetries * recreateDelayInSeconds)) + ) + .flatMap(a -> loadTopicAfterCreation(cluster, topicName)).map(clusterMapper::toTopic) ) ); } @@ -431,13 +467,21 @@ public class TopicsService { public Mono cloneTopic( KafkaCluster cluster, String topicName, String newTopicName) { return loadTopic(cluster, topicName).flatMap(topic -> - adminClientService.get(cluster).flatMap(ac -> ac.createTopic(newTopicName, - topic.getPartitionCount(), - (short) topic.getReplicationFactor(), - topic.getTopicConfigs() - .stream() - .collect(Collectors.toMap(InternalTopicConfig::getName, InternalTopicConfig::getValue))) - ).thenReturn(newTopicName).flatMap(a -> loadTopic(cluster, newTopicName)).map(clusterMapper::toTopic)); + adminClientService.get(cluster) + .flatMap(ac -> + ac.createTopic( + newTopicName, + topic.getPartitionCount(), + (short) topic.getReplicationFactor(), + topic.getTopicConfigs() + .stream() + .collect(Collectors + .toMap(InternalTopicConfig::getName, InternalTopicConfig::getValue)) + ) + ).thenReturn(newTopicName) + .flatMap(a -> loadTopicAfterCreation(cluster, newTopicName)) + .map(clusterMapper::toTopic) + ); } @VisibleForTesting