Selaa lähdekoodia

ODD: Skipping topic exporting if we failed to load/parse topic's schema from SR (#3980)

Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
Ilya Kuramshin 2 vuotta sitten
vanhempi
commit
50b9c56112

+ 3 - 6
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/OddExporter.java

@@ -5,13 +5,10 @@ import com.google.common.base.Preconditions;
 import com.provectus.kafka.ui.model.KafkaCluster;
 import com.provectus.kafka.ui.service.KafkaConnectService;
 import com.provectus.kafka.ui.service.StatisticsCache;
-import java.util.List;
 import java.util.function.Predicate;
 import java.util.regex.Pattern;
-import lombok.SneakyThrows;
 import org.opendatadiscovery.client.ApiClient;
 import org.opendatadiscovery.client.api.OpenDataDiscoveryIngestionApi;
-import org.opendatadiscovery.client.model.DataEntity;
 import org.opendatadiscovery.client.model.DataEntityList;
 import org.opendatadiscovery.client.model.DataSource;
 import org.opendatadiscovery.client.model.DataSourceList;
@@ -68,14 +65,14 @@ class OddExporter {
   private Mono<Void> exportTopics(KafkaCluster c) {
     return createKafkaDataSource(c)
         .thenMany(topicsExporter.export(c))
-        .concatMap(this::sentDataEntities)
+        .concatMap(this::sendDataEntities)
         .then();
   }
 
   private Mono<Void> exportKafkaConnects(KafkaCluster cluster) {
     return createConnectDataSources(cluster)
         .thenMany(connectorsExporter.export(cluster))
-        .concatMap(this::sentDataEntities)
+        .concatMap(this::sendDataEntities)
         .then();
   }
 
@@ -99,7 +96,7 @@ class OddExporter {
     );
   }
 
-  private Mono<Void> sentDataEntities(DataEntityList dataEntityList) {
+  private Mono<Void> sendDataEntities(DataEntityList dataEntityList) {
     return oddApi.postDataEntityList(dataEntityList);
   }
 

+ 5 - 5
kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporter.java

@@ -38,6 +38,8 @@ class TopicsExporter {
     return Flux.fromIterable(stats.getTopicDescriptions().keySet())
         .filter(topicFilter)
         .flatMap(topic -> createTopicDataEntity(cluster, topic, stats))
+        .onErrorContinue(
+            (th, topic) -> log.warn("Error exporting data for topic {}, cluster {}", topic, cluster.getName(), th))
         .buffer(100)
         .map(topicsEntities ->
             new DataEntityList()
@@ -89,10 +91,10 @@ class TopicsExporter {
         .build();
   }
 
+  //returns empty list if schemaRegistry is not configured or assumed subject not found
   private Mono<List<DataSetField>> getTopicSchema(KafkaCluster cluster,
                                                   String topic,
                                                   KafkaPath topicOddrn,
-                                                  //currently we only retrieve value schema
                                                   boolean isKey) {
     if (cluster.getSchemaRegistryClient() == null) {
       return Mono.just(List.of());
@@ -102,10 +104,8 @@ class TopicsExporter {
         .mono(client -> client.getSubjectVersion(subject, "latest"))
         .map(subj -> DataSetFieldsExtractors.extract(subj, topicOddrn, isKey))
         .onErrorResume(WebClientResponseException.NotFound.class, th -> Mono.just(List.of()))
-        .onErrorResume(th -> true, th -> {
-          log.warn("Error retrieving subject {} for cluster {}", subject, cluster.getName(), th);
-          return Mono.just(List.of());
-        });
+        .onErrorMap(WebClientResponseException.class, err ->
+            new IllegalStateException("Error retrieving subject %s".formatted(subject), err));
   }
 
 }

+ 3 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/integration/odd/TopicsExporterTest.java

@@ -22,6 +22,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.opendatadiscovery.client.model.DataEntity;
 import org.opendatadiscovery.client.model.DataEntityType;
+import org.springframework.http.HttpHeaders;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
@@ -53,7 +55,7 @@ class TopicsExporterTest {
   @Test
   void doesNotExportTopicsWhichDontFitFiltrationRule() {
     when(schemaRegistryClientMock.getSubjectVersion(anyString(), anyString()))
-        .thenReturn(Mono.error(new RuntimeException("Not found")));
+        .thenReturn(Mono.error(WebClientResponseException.create(404, "NF", new HttpHeaders(), null, null, null)));
 
     stats = Statistics.empty()
         .toBuilder()