|
@@ -11,6 +11,7 @@ import org.springframework.http.MediaType;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.http.ResponseEntity;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.web.reactive.function.BodyInserters;
|
|
import org.springframework.web.reactive.function.BodyInserters;
|
|
|
|
+import org.springframework.web.reactive.function.client.ClientResponse;
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
@@ -78,12 +79,16 @@ public class SchemaRegistryService {
|
|
|
|
|
|
public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName, String subjectSchema, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
public Mono<ResponseEntity<SchemaSubject>> createNewSubject(String clusterName, String subjectSchema, Mono<NewSchemaSubject> newSchemaSubject) {
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
return clustersStorage.getClusterByName(clusterName)
|
|
- .map(cluster -> webClient.post()
|
|
|
|
- .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema)
|
|
|
|
- .contentType(MediaType.APPLICATION_JSON)
|
|
|
|
- .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
|
- .retrieve()
|
|
|
|
- .toEntity(SchemaSubject.class))
|
|
|
|
|
|
+ .map(cluster -> {
|
|
|
|
+ log.info("Submitting a new subject: {} to Schema Registry: {}", subjectSchema, cluster.getSchemaRegistry());
|
|
|
|
+ return webClient.post()
|
|
|
|
+ .uri(cluster.getSchemaRegistry() + URL_SUBJECT_VERSIONS, subjectSchema)
|
|
|
|
+ .contentType(MediaType.APPLICATION_JSON)
|
|
|
|
+ .body(BodyInserters.fromPublisher(newSchemaSubject, NewSchemaSubject.class))
|
|
|
|
+ .retrieve()
|
|
|
|
+ .onStatus(HttpStatus.INTERNAL_SERVER_ERROR::equals, ClientResponse::createException)
|
|
|
|
+ .toEntity(SchemaSubject.class);
|
|
|
|
+ })
|
|
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
.orElse(Mono.error(new NotFoundException("No such cluster")));
|
|
}
|
|
}
|
|
}
|
|
}
|