|
@@ -20,8 +20,8 @@ public class KsqlService {
|
|
|
private final ClustersStorage clustersStorage;
|
|
|
private final List<KsqlStatementStrategy> commandParamsStrategies;
|
|
|
|
|
|
- public Mono<Object> getListStreams(String name, Mono<KsqlCommand> ksqlCommand) {
|
|
|
- return Mono.justOrEmpty(clustersStorage.getClusterByName(name))
|
|
|
+ public Mono<Object> executeKsqlCommand(String clusterName, Mono<KsqlCommand> ksqlCommand) {
|
|
|
+ return Mono.justOrEmpty(clustersStorage.getClusterByName(clusterName))
|
|
|
.switchIfEmpty(Mono.error(ClusterNotFoundException::new))
|
|
|
.map(KafkaCluster::getKsqldbServer)
|
|
|
.switchIfEmpty(Mono.error(KsqlDbNotFoundException::new))
|
|
@@ -38,7 +38,7 @@ public class KsqlService {
|
|
|
.map(s -> s.ksqlCommand(command))
|
|
|
.findFirst())
|
|
|
.flatMap(Mono::justOrEmpty)
|
|
|
-// TODO: how to handle not parsed statements?
|
|
|
+ // TODO: handle not parsed statements?
|
|
|
.switchIfEmpty(Mono.error(new UnprocessableEntityException("Invalid sql")));
|
|
|
}
|
|
|
}
|