소스 검색

Create WebClient bean and refactor its usage

Ildar Almakaev 4 년 전
부모
커밋
08d855f975

+ 8 - 3
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/config/Config.java

@@ -7,6 +7,7 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.jmx.export.MBeanExporter;
+import org.springframework.web.reactive.function.client.WebClient;
 
 import javax.management.remote.JMXConnector;
 
@@ -15,7 +16,7 @@ public class Config {
 
     @Bean
     public KeyedObjectPool<String, JMXConnector> pool() {
-        GenericKeyedObjectPool<String, JMXConnector> pool =  new GenericKeyedObjectPool<>(new JmxPoolFactory());
+        GenericKeyedObjectPool<String, JMXConnector> pool = new GenericKeyedObjectPool<>(new JmxPoolFactory());
         pool.setConfig(poolConfig());
         return pool;
     }
@@ -28,11 +29,15 @@ public class Config {
     }
 
     @Bean
-    public MBeanExporter exporter()
-    {
+    public MBeanExporter exporter() {
         final MBeanExporter exporter = new MBeanExporter();
         exporter.setAutodetect(true);
         exporter.setExcludedBeans("pool");
         return exporter;
     }
+
+    @Bean
+    public WebClient webClient() {
+        return WebClient.create();
+    }
 }

+ 43 - 45
kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/SchemaRegistryService.java

@@ -2,7 +2,6 @@ package com.provectus.kafka.ui.cluster.service;
 
 import com.provectus.kafka.ui.cluster.exception.NotFoundException;
 import com.provectus.kafka.ui.cluster.model.ClustersStorage;
-import com.provectus.kafka.ui.cluster.model.KafkaCluster;
 import com.provectus.kafka.ui.model.NewSchemaSubject;
 import com.provectus.kafka.ui.model.SubjectSchema;
 import lombok.RequiredArgsConstructor;
@@ -20,72 +19,71 @@ import reactor.core.publisher.Mono;
 @Log4j2
 @RequiredArgsConstructor
 public class SchemaRegistryService {
-    private final ClustersStorage clustersStorage;
     public static final String URL_SUBJECTS = "/subjects";
     public static final String URL_SUBJECT = "/subjects/{subjectName}";
     public static final String URL_SUBJECT_VERSIONS = "/subjects/{subjectName}/versions";
     public static final String URL_SUBJECT_BY_VERSION = "/subjects/{subjectName}/versions/{version}";
 
+    private final ClustersStorage clustersStorage;
+    private final WebClient webClient;
+
     public Flux<String> getAllSchemaSubjects(String clusterName) {
-        KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
-        WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
-        return webClient.get()
-                .uri(URL_SUBJECTS)
-                .retrieve()
-                .bodyToFlux(String.class);
+        return clustersStorage.getClusterByName(clusterName)
+                .map(cluster -> webClient.get()
+                        .uri(cluster.getSchemaRegistry() + URL_SUBJECTS)
+                        .retrieve()
+                        .bodyToFlux(String.class))
+                .orElse(Flux.empty());
     }
 
     public Flux<Integer> getSchemaSubjectVersions(String clusterName, String subjectName) {
-        KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
-        WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
-        return webClient.get()
-                .uri(URL_SUBJECT_VERSIONS, subjectName)
-                .retrieve()
-                .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject")))
-                .bodyToFlux(Integer.class);
+        return clustersStorage.getClusterByName(clusterName)
+                .map(cluster -> webClient.get()
+                        .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectName)
+                        .retrieve()
+                        .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject")))
+                        .bodyToFlux(Integer.class))
+                .orElse(Flux.empty());
     }
 
     public Flux<SubjectSchema> getSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
-        KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
-        WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
-        return webClient.get()
-                .uri(URL_SUBJECT_BY_VERSION, subjectName, version)
-                .retrieve()
-                .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
-                .bodyToFlux(SubjectSchema.class);
+        return clustersStorage.getClusterByName(clusterName)
+                .map(cluster -> webClient.get()
+                        .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
+                        .retrieve()
+                        .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
+                        .bodyToFlux(SubjectSchema.class))
+                .orElse(Flux.empty());
     }
 
     public Mono<Object> deleteSchemaSubjectByVersion(String clusterName, String subjectName, Integer version) {
-        KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
-        WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
-        return webClient.delete()
-                .uri(URL_SUBJECT_BY_VERSION, subjectName, version)
-                .retrieve()
-                .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
-                .bodyToMono(Object.class);
+        return clustersStorage.getClusterByName(clusterName)
+                .map(cluster -> webClient.delete()
+                        .uri(cluster.getSchemaRegistry() + URL_SUBJECT_BY_VERSION, subjectName, version)
+                        .retrieve()
+                        .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
+                        .bodyToMono(Object.class))
+                .orElse(Mono.empty());
     }
 
     public Mono<Object> deleteSchemaSubject(String clusterName, String subjectName) {
-        KafkaCluster kafkaCluster = clustersStorage.getClusterByName(clusterName).orElseThrow();
-        WebClient webClient = WebClient.create(kafkaCluster.getSchemaRegistry());
-        return webClient.delete()
-                .uri(URL_SUBJECT, subjectName)
-                .retrieve()
-                .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
-                .bodyToMono(Object.class);
+        return clustersStorage.getClusterByName(clusterName)
+                .map(cluster -> webClient.delete()
+                        .uri(cluster.getSchemaRegistry() + URL_SUBJECT, subjectName)
+                        .retrieve()
+                        .onStatus(HttpStatus.NOT_FOUND::equals, resp -> Mono.error(new NotFoundException("No such subject or version")))
+                        .bodyToMono(Object.class))
+                .orElse(Mono.empty());
     }
 
     public Mono<ResponseEntity<SubjectSchema>> createNewSubject(String clusterName, String subjectSchema, Mono<NewSchemaSubject> newSchemaSubject) {
         return clustersStorage.getClusterByName(clusterName)
-                .map(kafkaCluster -> WebClient.create(kafkaCluster.getSchemaRegistry()))
-                .map(webClient -> webClient
-                                .post()
-                                .uri(URL_SUBJECT_VERSIONS, subjectSchema)
-                                .contentType(MediaType.APPLICATION_JSON)
-                                .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
-                                .retrieve()
-                                .toEntity(SubjectSchema.class)
-                )
+                .map(cluster -> webClient.post()
+                        .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema)
+                        .contentType(MediaType.APPLICATION_JSON)
+                        .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
+                        .retrieve()
+                        .toEntity(SubjectSchema.class))
                 .orElse(Mono.empty());
     }
 }