ISSUE-1613: Fix Topic not found error after new topic creation

* load topic after creation retry added
* TopicService.recreateTopic,cloneTopic formatting improved
* ALLOW_AUTO_CREATE_TOPICS_CONFIG set to false for all created consumers to avoid accidental topic creation

Co-authored-by: iliax <ikuramshin@provectus.com>
This commit is contained in:
Ilya Kuramshin 2022-03-28 09:10:06 +03:00 committed by GitHub
parent 3a84d10034
commit e1f31d27c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 26 deletions

View file

@ -210,6 +210,7 @@ public class ConsumerGroupService {
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
props.putAll(properties);
return new KafkaConsumer<>(props);

View file

@ -71,6 +71,10 @@ public class TopicsService {
private int recreateMaxRetries;
@Value("${topic.recreate.delay.seconds:1}")
private int recreateDelayInSeconds;
@Value("${topic.load.after.create.maxRetries:10}")
private int loadTopicAfterCreateRetries;
@Value("${topic.load.after.create.delay.ms:500}")
private int loadTopicAfterCreateDelayInMs;
public Mono<TopicsResponseDTO> getTopics(KafkaCluster cluster,
Optional<Integer> pageNum,
@ -115,7 +119,32 @@ public class TopicsService {
private Mono<InternalTopic> loadTopic(KafkaCluster c, String topicName) {
return loadTopics(c, List.of(topicName))
.map(lst -> lst.stream().findFirst().orElseThrow(TopicNotFoundException::new));
.flatMap(lst -> lst.stream().findFirst()
.map(Mono::just)
.orElse(Mono.error(TopicNotFoundException::new)));
}
/**
* After creation topic can be invisible via API for some time.
* To workaround this, we retyring topic loading until it becomes visible.
*/
private Mono<InternalTopic> loadTopicAfterCreation(KafkaCluster c, String topicName) {
return loadTopic(c, topicName)
.retryWhen(
Retry
.fixedDelay(
loadTopicAfterCreateRetries,
Duration.ofMillis(loadTopicAfterCreateDelayInMs)
)
.filter(TopicNotFoundException.class::isInstance)
.onRetryExhaustedThrow((spec, sig) ->
new TopicMetadataException(
String.format(
"Error while loading created topic '%s' - topic is not visible via API "
+ "after waiting for %d ms.",
topicName,
loadTopicAfterCreateDelayInMs * loadTopicAfterCreateRetries)))
);
}
private List<InternalTopic> createList(List<String> orderedNames,
@ -182,7 +211,7 @@ public class TopicsService {
).thenReturn(topicData)
)
.onErrorResume(t -> Mono.error(new TopicMetadataException(t.getMessage())))
.flatMap(topicData -> loadTopic(c, topicData.getName()));
.flatMap(topicData -> loadTopicAfterCreation(c, topicData.getName()));
}
public Mono<TopicDTO> createTopic(KafkaCluster cluster, Mono<TopicCreationDTO> topicCreation) {
@ -194,23 +223,30 @@ public class TopicsService {
public Mono<TopicDTO> recreateTopic(KafkaCluster cluster, String topicName) {
return loadTopic(cluster, topicName)
.flatMap(t -> deleteTopic(cluster, topicName)
.thenReturn(t).delayElement(Duration.ofSeconds(recreateDelayInSeconds))
.flatMap(topic -> adminClientService.get(cluster).flatMap(ac -> ac.createTopic(topic.getName(),
topic.getPartitionCount(),
(short) topic.getReplicationFactor(),
topic.getTopicConfigs()
.stream()
.collect(Collectors
.toMap(InternalTopicConfig::getName,
InternalTopicConfig::getValue)))
.thenReturn(topicName))
.retryWhen(Retry.fixedDelay(recreateMaxRetries,
Duration.ofSeconds(recreateDelayInSeconds))
.filter(TopicExistsException.class::isInstance)
.onRetryExhaustedThrow((a, b) ->
new TopicRecreationException(topicName,
recreateMaxRetries * recreateDelayInSeconds)))
.flatMap(a -> loadTopic(cluster, topicName)).map(clusterMapper::toTopic)
.thenReturn(t)
.delayElement(Duration.ofSeconds(recreateDelayInSeconds))
.flatMap(topic ->
adminClientService.get(cluster)
.flatMap(ac ->
ac.createTopic(
topic.getName(),
topic.getPartitionCount(),
(short) topic.getReplicationFactor(),
topic.getTopicConfigs()
.stream()
.collect(Collectors.toMap(InternalTopicConfig::getName,
InternalTopicConfig::getValue))
)
.thenReturn(topicName)
)
.retryWhen(
Retry.fixedDelay(recreateMaxRetries, Duration.ofSeconds(recreateDelayInSeconds))
.filter(TopicExistsException.class::isInstance)
.onRetryExhaustedThrow((a, b) ->
new TopicRecreationException(topicName,
recreateMaxRetries * recreateDelayInSeconds))
)
.flatMap(a -> loadTopicAfterCreation(cluster, topicName)).map(clusterMapper::toTopic)
)
);
}
@ -431,13 +467,21 @@ public class TopicsService {
public Mono<TopicDTO> cloneTopic(
KafkaCluster cluster, String topicName, String newTopicName) {
return loadTopic(cluster, topicName).flatMap(topic ->
adminClientService.get(cluster).flatMap(ac -> ac.createTopic(newTopicName,
topic.getPartitionCount(),
(short) topic.getReplicationFactor(),
topic.getTopicConfigs()
.stream()
.collect(Collectors.toMap(InternalTopicConfig::getName, InternalTopicConfig::getValue)))
).thenReturn(newTopicName).flatMap(a -> loadTopic(cluster, newTopicName)).map(clusterMapper::toTopic));
adminClientService.get(cluster)
.flatMap(ac ->
ac.createTopic(
newTopicName,
topic.getPartitionCount(),
(short) topic.getReplicationFactor(),
topic.getTopicConfigs()
.stream()
.collect(Collectors
.toMap(InternalTopicConfig::getName, InternalTopicConfig::getValue))
)
).thenReturn(newTopicName)
.flatMap(a -> loadTopicAfterCreation(cluster, newTopicName))
.map(clusterMapper::toTopic)
);
}
@VisibleForTesting