From ec42c37f24260c6985eda95ec354819d982b1c12 Mon Sep 17 00:00:00 2001 From: Ilya Kuramshin Date: Mon, 13 Dec 2021 13:16:47 +0300 Subject: [PATCH] Ksql functionality reimplemented (#1161) * Ksql functionality reimplemented Co-authored-by: iliax --- docker/kafka-ui-connectors.yaml | 4 + kafka-ui-api/pom.xml | 21 + .../src/main/antlr4/ksql/KsqlGrammar.g4 | 621 ++++++++++++++++++ .../kafka/ui/controller/KsqlController.java | 30 + .../kafka/ui/exception/ErrorCode.java | 1 + .../kafka/ui/exception/KsqlApiException.java | 17 + .../kafka/ui/service/ksql/KsqlApiClient.java | 114 ++++ .../kafka/ui/service/ksql/KsqlGrammar.java | 83 +++ .../service/ksql/response/DynamicParser.java | 69 ++ .../service/ksql/response/ResponseParser.java | 161 +++++ .../main/resources/swagger/kafka-ui-api.yaml | 50 ++ pom.xml | 1 + 12 files changed, 1172 insertions(+) create mode 100644 kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4 create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlApiException.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java create mode 100644 kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java diff --git a/docker/kafka-ui-connectors.yaml b/docker/kafka-ui-connectors.yaml index 5f4a3b597d..8d0d88d755 100644 --- a/docker/kafka-ui-connectors.yaml +++ b/docker/kafka-ui-connectors.yaml @@ -51,6 +51,8 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 JMX_PORT: 9997 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 @@ -74,6 +76,8 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 JMX_PORT: 9998 KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9998 diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml index f134cdbe3d..6723a24cd3 100644 --- a/kafka-ui-api/pom.xml +++ b/kafka-ui-api/pom.xml @@ -186,6 +186,11 @@ spring-boot-starter-actuator + + org.antlr + antlr4-runtime + ${antlr4-maven-plugin.version} + @@ -271,6 +276,22 @@ + + org.antlr + antlr4-maven-plugin + ${antlr4-maven-plugin.version} + + false + + + + generate-sources + + antlr4 + + + + diff --git a/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4 b/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4 new file mode 100644 index 0000000000..2fcd623e3e --- /dev/null +++ b/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4 @@ -0,0 +1,621 @@ +grammar KsqlGrammar; + +tokens { + DELIMITER +} + +@lexer::members { + public static final int COMMENTS = 2; + public static final int WHITESPACE = 3; + public static final int DIRECTIVES = 4; +} + +statements + : (singleStatement)* EOF + ; + +testStatement + : (singleStatement | assertStatement ';' | runScript ';') EOF? + ; + +singleStatement + : statement ';' + ; + +singleExpression + : expression EOF + ; + +statement + : query #queryStatement + | (LIST | SHOW) PROPERTIES #listProperties + | (LIST | SHOW) ALL? TOPICS EXTENDED? #listTopics + | (LIST | SHOW) STREAMS EXTENDED? #listStreams + | (LIST | SHOW) TABLES EXTENDED? #listTables + | (LIST | SHOW) FUNCTIONS #listFunctions + | (LIST | SHOW) (SOURCE | SINK)? CONNECTORS #listConnectors + | (LIST | SHOW) CONNECTOR PLUGINS #listConnectorPlugins + | (LIST | SHOW) TYPES #listTypes + | (LIST | SHOW) VARIABLES #listVariables + | DESCRIBE sourceName EXTENDED? #showColumns + | DESCRIBE STREAMS EXTENDED? #describeStreams + | DESCRIBE FUNCTION identifier #describeFunction + | DESCRIBE CONNECTOR identifier #describeConnector + | PRINT (identifier| STRING) printClause #printTopic + | (LIST | SHOW) QUERIES EXTENDED? #listQueries + | TERMINATE identifier #terminateQuery + | TERMINATE ALL #terminateQuery + | SET STRING EQ STRING #setProperty + | UNSET STRING #unsetProperty + | DEFINE variableName EQ variableValue #defineVariable + | UNDEFINE variableName #undefineVariable + | CREATE (OR REPLACE)? (SOURCE)? STREAM (IF NOT EXISTS)? sourceName + (tableElements)? + (WITH tableProperties)? #createStream + | CREATE (OR REPLACE)? STREAM (IF NOT EXISTS)? sourceName + (WITH tableProperties)? AS query #createStreamAs + | CREATE (OR REPLACE)? (SOURCE)? TABLE (IF NOT EXISTS)? sourceName + (tableElements)? + (WITH tableProperties)? #createTable + | CREATE (OR REPLACE)? TABLE (IF NOT EXISTS)? sourceName + (WITH tableProperties)? AS query #createTableAs + | CREATE (SINK | SOURCE) CONNECTOR (IF NOT EXISTS)? identifier + WITH tableProperties #createConnector + | INSERT INTO sourceName (WITH tableProperties)? query #insertInto + | INSERT INTO sourceName (columns)? VALUES values #insertValues + | DROP STREAM (IF EXISTS)? sourceName (DELETE TOPIC)? #dropStream + | DROP TABLE (IF EXISTS)? sourceName (DELETE TOPIC)? #dropTable + | DROP CONNECTOR (IF EXISTS)? identifier #dropConnector + | EXPLAIN (statement | identifier) #explain + | CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType + | DROP TYPE (IF EXISTS)? identifier #dropType + | ALTER (STREAM | TABLE) sourceName alterOption (',' alterOption)* #alterSource + ; + +assertStatement + : ASSERT VALUES sourceName (columns)? VALUES values #assertValues + | ASSERT NULL VALUES sourceName (columns)? KEY values #assertTombstone + | ASSERT STREAM sourceName (tableElements)? (WITH tableProperties)? #assertStream + | ASSERT TABLE sourceName (tableElements)? (WITH tableProperties)? #assertTable + ; + +runScript + : RUN SCRIPT STRING + ; + +query + : SELECT selectItem (',' selectItem)* + FROM from=relation + (WINDOW windowExpression)? + (WHERE where=booleanExpression)? + (GROUP BY groupBy)? + (PARTITION BY partitionBy)? + (HAVING having=booleanExpression)? + (EMIT resultMaterialization)? + limitClause? + ; + +resultMaterialization + : CHANGES + | FINAL + ; + +alterOption + : ADD (COLUMN)? identifier type + ; + +tableElements + : '(' tableElement (',' tableElement)* ')' + ; + +tableElement + : identifier type columnConstraints? + ; + +columnConstraints + : ((PRIMARY)? KEY) + | HEADERS + | HEADER '(' STRING ')' + ; + +tableProperties + : '(' tableProperty (',' tableProperty)* ')' + ; + +tableProperty + : (identifier | STRING) EQ literal + ; + +printClause + : (FROM BEGINNING)? intervalClause? limitClause? + ; + +intervalClause + : (INTERVAL | SAMPLE) number + ; + +limitClause + : LIMIT number + ; + +retentionClause + : RETENTION number windowUnit + ; + +gracePeriodClause + : GRACE PERIOD number windowUnit + ; + +windowExpression + : (IDENTIFIER)? + ( tumblingWindowExpression | hoppingWindowExpression | sessionWindowExpression ) + ; + +tumblingWindowExpression + : TUMBLING '(' SIZE number windowUnit (',' retentionClause)? (',' gracePeriodClause)?')' + ; + +hoppingWindowExpression + : HOPPING '(' SIZE number windowUnit ',' ADVANCE BY number windowUnit (',' retentionClause)? (',' gracePeriodClause)?')' + ; + +sessionWindowExpression + : SESSION '(' number windowUnit (',' retentionClause)? (',' gracePeriodClause)?')' + ; + +windowUnit + : DAY + | HOUR + | MINUTE + | SECOND + | MILLISECOND + | DAYS + | HOURS + | MINUTES + | SECONDS + | MILLISECONDS + ; + +groupBy + : valueExpression (',' valueExpression)* + | '(' (valueExpression (',' valueExpression)*)? ')' + ; + +partitionBy + : valueExpression (',' valueExpression)* + | '(' (valueExpression (',' valueExpression)*)? ')' + ; + +values + : '(' (valueExpression (',' valueExpression)*)? ')' + ; + +selectItem + : expression (AS? identifier)? #selectSingle + | identifier '.' ASTERISK #selectAll + | ASTERISK #selectAll + ; + +relation + : left=aliasedRelation joinedSource+ #joinRelation + | aliasedRelation #relationDefault + ; + +joinedSource + : joinType JOIN aliasedRelation joinWindow? joinCriteria + ; + +joinType + : INNER? #innerJoin + | FULL OUTER? #outerJoin + | LEFT OUTER? #leftJoin + ; + +joinWindow + : WITHIN withinExpression + ; + +withinExpression + : '(' joinWindowSize ',' joinWindowSize ')' (gracePeriodClause)? # joinWindowWithBeforeAndAfter + | joinWindowSize (gracePeriodClause)? # singleJoinWindow + ; + +joinWindowSize + : number windowUnit + ; + +joinCriteria + : ON booleanExpression + ; + +aliasedRelation + : relationPrimary (AS? sourceName)? + ; + +columns + : '(' identifier (',' identifier)* ')' + ; + +relationPrimary + : sourceName #tableName + ; + +expression + : booleanExpression + ; + +booleanExpression + : predicated #booleanDefault + | NOT booleanExpression #logicalNot + | left=booleanExpression operator=AND right=booleanExpression #logicalBinary + | left=booleanExpression operator=OR right=booleanExpression #logicalBinary + ; + +predicated + : valueExpression predicate[$valueExpression.ctx]? + ; + +predicate[ParserRuleContext value] + : comparisonOperator right=valueExpression #comparison + | NOT? BETWEEN lower=valueExpression AND upper=valueExpression #between + | NOT? IN '(' expression (',' expression)* ')' #inList + | NOT? LIKE pattern=valueExpression (ESCAPE escape=STRING)? #like + | IS NOT? NULL #nullPredicate + | IS NOT? DISTINCT FROM right=valueExpression #distinctFrom + ; + +valueExpression + : primaryExpression #valueExpressionDefault + | valueExpression AT timeZoneSpecifier #atTimeZone + | operator=(MINUS | PLUS) valueExpression #arithmeticUnary + | left=valueExpression operator=(ASTERISK | SLASH | PERCENT) right=valueExpression #arithmeticBinary + | left=valueExpression operator=(PLUS | MINUS) right=valueExpression #arithmeticBinary + | left=valueExpression CONCAT right=valueExpression #concatenation + ; + +primaryExpression + : literal #literalExpression + | identifier STRING #typeConstructor + | CASE valueExpression whenClause+ (ELSE elseExpression=expression)? END #simpleCase + | CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase + | CAST '(' expression AS type ')' #cast + | ARRAY '[' (expression (',' expression)*)? ']' #arrayConstructor + | MAP '(' (expression ASSIGN expression (',' expression ASSIGN expression)*)? ')' #mapConstructor + | STRUCT '(' (identifier ASSIGN expression (',' identifier ASSIGN expression)*)? ')' #structConstructor + | identifier '(' ASTERISK ')' #functionCall + | identifier '(' (functionArgument (',' functionArgument)* (',' lambdaFunction)*)? ')' #functionCall + | value=primaryExpression '[' index=valueExpression ']' #subscript + | identifier #columnReference + | identifier '.' identifier #qualifiedColumnReference + | base=primaryExpression STRUCT_FIELD_REF fieldName=identifier #dereference + | '(' expression ')' #parenthesizedExpression + ; + +functionArgument + : expression + | windowUnit + ; + +timeZoneSpecifier + : TIME ZONE STRING #timeZoneString + ; + +comparisonOperator + : EQ | NEQ | LT | LTE | GT | GTE + ; + +booleanValue + : TRUE | FALSE + ; + +type + : type ARRAY + | ARRAY '<' type '>' + | MAP '<' type ',' type '>' + | STRUCT '<' (identifier type (',' identifier type)*)? '>' + | DECIMAL '(' number ',' number ')' + | baseType ('(' typeParameter (',' typeParameter)* ')')? + ; + +typeParameter + : INTEGER_VALUE | 'STRING' + ; + +baseType + : identifier + ; + +whenClause + : WHEN condition=expression THEN result=expression + ; + +identifier + : VARIABLE #variableIdentifier + | IDENTIFIER #unquotedIdentifier + | QUOTED_IDENTIFIER #quotedIdentifierAlternative + | nonReserved #unquotedIdentifier + | BACKQUOTED_IDENTIFIER #backQuotedIdentifier + | DIGIT_IDENTIFIER #digitIdentifier + ; + +lambdaFunction + : identifier '=>' expression #lambda + | '(' identifier (',' identifier)* ')' '=>' expression #lambda + ; + +variableName + : IDENTIFIER + ; + +variableValue + : STRING + ; + +sourceName + : identifier + ; + +number + : MINUS? DECIMAL_VALUE #decimalLiteral + | MINUS? FLOATING_POINT_VALUE #floatLiteral + | MINUS? INTEGER_VALUE #integerLiteral + ; + +literal + : NULL #nullLiteral + | number #numericLiteral + | booleanValue #booleanLiteral + | STRING #stringLiteral + | VARIABLE #variableLiteral + ; + +nonReserved + : SHOW | TABLES | COLUMNS | COLUMN | PARTITIONS | FUNCTIONS | FUNCTION | SESSION + | STRUCT | MAP | ARRAY | PARTITION + | INTEGER | DATE | TIME | TIMESTAMP | INTERVAL | ZONE | 'STRING' + | YEAR | MONTH | DAY | HOUR | MINUTE | SECOND + | EXPLAIN | ANALYZE | TYPE | TYPES + | SET | RESET + | IF + | SOURCE | SINK + | PRIMARY | KEY + | EMIT + | CHANGES + | FINAL + | ESCAPE + | REPLACE + | ASSERT + | ALTER + | ADD + ; + +EMIT: 'EMIT'; +CHANGES: 'CHANGES'; +FINAL: 'FINAL'; +SELECT: 'SELECT'; +FROM: 'FROM'; +AS: 'AS'; +ALL: 'ALL'; +DISTINCT: 'DISTINCT'; +WHERE: 'WHERE'; +WITHIN: 'WITHIN'; +WINDOW: 'WINDOW'; +GROUP: 'GROUP'; +BY: 'BY'; +HAVING: 'HAVING'; +LIMIT: 'LIMIT'; +AT: 'AT'; +OR: 'OR'; +AND: 'AND'; +IN: 'IN'; +NOT: 'NOT'; +EXISTS: 'EXISTS'; +BETWEEN: 'BETWEEN'; +LIKE: 'LIKE'; +ESCAPE: 'ESCAPE'; +IS: 'IS'; +NULL: 'NULL'; +TRUE: 'TRUE'; +FALSE: 'FALSE'; +INTEGER: 'INTEGER'; +DATE: 'DATE'; +TIME: 'TIME'; +TIMESTAMP: 'TIMESTAMP'; +INTERVAL: 'INTERVAL'; +YEAR: 'YEAR'; +MONTH: 'MONTH'; +DAY: 'DAY'; +HOUR: 'HOUR'; +MINUTE: 'MINUTE'; +SECOND: 'SECOND'; +MILLISECOND: 'MILLISECOND'; +YEARS: 'YEARS'; +MONTHS: 'MONTHS'; +DAYS: 'DAYS'; +HOURS: 'HOURS'; +MINUTES: 'MINUTES'; +SECONDS: 'SECONDS'; +MILLISECONDS: 'MILLISECONDS'; +ZONE: 'ZONE'; +TUMBLING: 'TUMBLING'; +HOPPING: 'HOPPING'; +SIZE: 'SIZE'; +ADVANCE: 'ADVANCE'; +RETENTION: 'RETENTION'; +GRACE: 'GRACE'; +PERIOD: 'PERIOD'; +CASE: 'CASE'; +WHEN: 'WHEN'; +THEN: 'THEN'; +ELSE: 'ELSE'; +END: 'END'; +JOIN: 'JOIN'; +FULL: 'FULL'; +OUTER: 'OUTER'; +INNER: 'INNER'; +LEFT: 'LEFT'; +RIGHT: 'RIGHT'; +ON: 'ON'; +PARTITION: 'PARTITION'; +STRUCT: 'STRUCT'; +WITH: 'WITH'; +VALUES: 'VALUES'; +CREATE: 'CREATE'; +TABLE: 'TABLE'; +TOPIC: 'TOPIC'; +STREAM: 'STREAM'; +STREAMS: 'STREAMS'; +INSERT: 'INSERT'; +DELETE: 'DELETE'; +INTO: 'INTO'; +DESCRIBE: 'DESCRIBE'; +EXTENDED: 'EXTENDED'; +PRINT: 'PRINT'; +EXPLAIN: 'EXPLAIN'; +ANALYZE: 'ANALYZE'; +TYPE: 'TYPE'; +TYPES: 'TYPES'; +CAST: 'CAST'; +SHOW: 'SHOW'; +LIST: 'LIST'; +TABLES: 'TABLES'; +TOPICS: 'TOPICS'; +QUERY: 'QUERY'; +QUERIES: 'QUERIES'; +TERMINATE: 'TERMINATE'; +LOAD: 'LOAD'; +COLUMNS: 'COLUMNS'; +COLUMN: 'COLUMN'; +PARTITIONS: 'PARTITIONS'; +FUNCTIONS: 'FUNCTIONS'; +FUNCTION: 'FUNCTION'; +DROP: 'DROP'; +TO: 'TO'; +RENAME: 'RENAME'; +ARRAY: 'ARRAY'; +MAP: 'MAP'; +SET: 'SET'; +DEFINE: 'DEFINE'; +UNDEFINE: 'UNDEFINE'; +RESET: 'RESET'; +SESSION: 'SESSION'; +SAMPLE: 'SAMPLE'; +EXPORT: 'EXPORT'; +CATALOG: 'CATALOG'; +PROPERTIES: 'PROPERTIES'; +BEGINNING: 'BEGINNING'; +UNSET: 'UNSET'; +RUN: 'RUN'; +SCRIPT: 'SCRIPT'; +DECIMAL: 'DECIMAL'; +KEY: 'KEY'; +CONNECTOR: 'CONNECTOR'; +CONNECTORS: 'CONNECTORS'; +SINK: 'SINK'; +SOURCE: 'SOURCE'; +NAMESPACE: 'NAMESPACE'; +MATERIALIZED: 'MATERIALIZED'; +VIEW: 'VIEW'; +PRIMARY: 'PRIMARY'; +REPLACE: 'REPLACE'; +ASSERT: 'ASSERT'; +ADD: 'ADD'; +ALTER: 'ALTER'; +VARIABLES: 'VARIABLES'; +PLUGINS: 'PLUGINS'; +HEADERS: 'HEADERS'; +HEADER: 'HEADER'; + +IF: 'IF'; + +EQ : '='; +NEQ : '<>' | '!='; +LT : '<'; +LTE : '<='; +GT : '>'; +GTE : '>='; + +PLUS: '+'; +MINUS: '-'; +ASTERISK: '*'; +SLASH: '/'; +PERCENT: '%'; +CONCAT: '||'; + +ASSIGN: ':='; +STRUCT_FIELD_REF: '->'; + +LAMBDA_EXPRESSION: '=>'; + +STRING + : '\'' ( ~'\'' | '\'\'' )* '\'' + ; + +INTEGER_VALUE + : DIGIT+ + ; + +DECIMAL_VALUE + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + +FLOATING_POINT_VALUE + : DIGIT+ ('.' DIGIT*)? EXPONENT + | '.' DIGIT+ EXPONENT + ; + +IDENTIFIER + : (LETTER | '_') (LETTER | DIGIT | '_' | '@' )* + ; + +DIGIT_IDENTIFIER + : DIGIT (LETTER | DIGIT | '_' | '@' )+ + ; + +QUOTED_IDENTIFIER + : '"' ( ~'"' | '""' )* '"' + ; + +BACKQUOTED_IDENTIFIER + : '`' ( ~'`' | '``' )* '`' + ; + +VARIABLE + : '${' IDENTIFIER '}' + ; + +fragment EXPONENT + : 'E' [+-]? DIGIT+ + ; + +fragment DIGIT + : [0-9] + ; + +fragment LETTER + : [A-Z] + ; + +SIMPLE_COMMENT + : '--' ~'@' ~[\r\n]* '\r'? '\n'? -> channel(2) // channel(COMMENTS) + ; + +DIRECTIVE_COMMENT + : '--@' ~[\r\n]* '\r'? '\n'? -> channel(4) // channel(DIRECTIVES) + ; + +BRACKETED_COMMENT + : '/*' .*? '*/' -> channel(2) // channel(COMMENTS) + ; + +WS + : [ \r\n\t]+ -> channel(3) // channel(WHITESPACE) + ; + +// Catch-all for anything we can't recognize. +// We use this to be able to ignore and recover all the text +// when splitting statements with DelimiterLexer +UNRECOGNIZED + : . + ; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java index 4c4770f3bd..c1bf6c2b63 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java @@ -3,14 +3,22 @@ package com.provectus.kafka.ui.controller; import com.provectus.kafka.ui.api.KsqlApi; import com.provectus.kafka.ui.model.KsqlCommandDTO; import com.provectus.kafka.ui.model.KsqlCommandResponseDTO; +import com.provectus.kafka.ui.model.KsqlResponseDTO; +import com.provectus.kafka.ui.model.KsqlTableResponseDTO; import com.provectus.kafka.ui.service.KsqlService; +import com.provectus.kafka.ui.service.ksql.KsqlApiClient; +import java.util.List; +import java.util.Map; +import java.util.Optional; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; + @RestController @RequiredArgsConstructor @Log4j2 @@ -25,4 +33,26 @@ public class KsqlController extends AbstractController implements KsqlApi { return ksqlService.executeKsqlCommand(getCluster(clusterName), ksqlCommand) .map(ResponseEntity::ok); } + + @SuppressWarnings("unchecked") + @Override + public Mono>> executeKsql(String clusterName, + Mono ksqlCommand, + ServerWebExchange exchange) { + return Mono.just( + ResponseEntity.ok( + ksqlCommand + .flux() + .flatMap(command -> + new KsqlApiClient(getCluster(clusterName)) + .execute( + command.getKsql(), + Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))) + .map(table -> new KsqlResponseDTO() + .table( + new KsqlTableResponseDTO() + .header(table.getHeader()) + .columnNames(table.getColumnNames()) + .values((List>) ((List) (table.getValues()))))))); + } } diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java index 7315c20a61..e9f33f2e2c 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java @@ -8,6 +8,7 @@ import org.springframework.http.HttpStatus; public enum ErrorCode { UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR), + KSQL_API_ERROR(5001, HttpStatus.INTERNAL_SERVER_ERROR), BINDING_FAIL(4001, HttpStatus.BAD_REQUEST), NOT_FOUND(404, HttpStatus.NOT_FOUND), INVALID_ENTITY_STATE(4001, HttpStatus.BAD_REQUEST), diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlApiException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlApiException.java new file mode 100644 index 0000000000..1a867dfa01 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlApiException.java @@ -0,0 +1,17 @@ +package com.provectus.kafka.ui.exception; + +public class KsqlApiException extends CustomBaseException { + + public KsqlApiException(String message) { + super(message); + } + + public KsqlApiException(String message, Throwable cause) { + super(message, cause); + } + + @Override + public ErrorCode getErrorCode() { + return ErrorCode.KSQL_API_ERROR; + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java new file mode 100644 index 0000000000..0604b9a9ff --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java @@ -0,0 +1,114 @@ +package com.provectus.kafka.ui.service.ksql; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.provectus.kafka.ui.exception.ValidationException; +import com.provectus.kafka.ui.model.KafkaCluster; +import com.provectus.kafka.ui.service.ksql.response.ResponseParser; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.Builder; +import lombok.Value; +import org.springframework.http.MediaType; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Flux; + +public class KsqlApiClient { + + @Builder + @Value + public static class KsqlResponseTable { + String header; + List columnNames; + List> values; + } + + @Value + private static class KsqlRequest { + String ksql; + Map streamsProperties; + } + + //-------------------------------------------------------------------------------------------- + + private final KafkaCluster cluster; + + public KsqlApiClient(KafkaCluster cluster) { + this.cluster = cluster; + } + + private WebClient webClient() { + return WebClient.create(); + } + + private String baseKsqlDbUri() { + return cluster.getKsqldbServer(); + } + + private KsqlRequest ksqlRequest(String ksql, Map streamProperties) { + return new KsqlRequest(ksql, streamProperties); + } + + private Flux executeSelect(String ksql, Map streamProperties) { + return webClient() + .post() + .uri(baseKsqlDbUri() + "/query") + .accept(MediaType.parseMediaType("application/vnd.ksql.v1+json")) + .contentType(MediaType.parseMediaType("application/json")) + .bodyValue(ksqlRequest(ksql, streamProperties)) + .retrieve() + .bodyToFlux(JsonNode.class) + .map(ResponseParser::parseSelectResponse) + .filter(Optional::isPresent) + .map(Optional::get) + .onErrorResume(WebClientResponseException.class, + e -> Flux.just(ResponseParser.parseErrorResponse(e))); + } + + private Flux executeStatement(String ksql, + Map streamProperties) { + return webClient() + .post() + .uri(baseKsqlDbUri() + "/ksql") + .accept(MediaType.parseMediaType("application/vnd.ksql.v1+json")) + .contentType(MediaType.parseMediaType("application/json")) + .bodyValue(ksqlRequest(ksql, streamProperties)) + .exchangeToFlux( + resp -> { + if (resp.statusCode().isError()) { + return resp.createException().flux().map(ResponseParser::parseErrorResponse); + } + return resp.bodyToFlux(JsonNode.class) + .flatMap(body -> + // body can be an array or single object + (body.isArray() ? Flux.fromIterable(body) : Flux.just(body)) + .flatMapIterable(ResponseParser::parseStatementResponse)) + // body can be empty for some statements like INSERT + .switchIfEmpty( + Flux.just(KsqlResponseTable.builder() + .header("Query Result") + .columnNames(List.of("Result")) + .values(List.of(List.of(new TextNode("Success")))) + .build())); + } + ); + } + + public Flux execute(String ksql, Map streamProperties) { + var parsed = KsqlGrammar.parse(ksql); + if (parsed.getStatements().size() > 1) { + throw new ValidationException("Only single statement supported now"); + } + if (parsed.getStatements().size() == 0) { + throw new ValidationException("No valid ksql statement found"); + } + if (KsqlGrammar.isSelect(parsed.getStatements().get(0))) { + return executeSelect(ksql, streamProperties); + } else { + return executeStatement(ksql, streamProperties); + } + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java new file mode 100644 index 0000000000..efb759258c --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java @@ -0,0 +1,83 @@ +package com.provectus.kafka.ui.service.ksql; + +import com.provectus.kafka.ui.exception.ValidationException; +import java.util.List; +import ksql.KsqlGrammarLexer; +import ksql.KsqlGrammarParser; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import lombok.experimental.Delegate; +import org.antlr.v4.runtime.BaseErrorListener; +import org.antlr.v4.runtime.CharStream; +import org.antlr.v4.runtime.CharStreams; +import org.antlr.v4.runtime.CommonTokenStream; +import org.antlr.v4.runtime.IntStream; +import org.antlr.v4.runtime.RecognitionException; +import org.antlr.v4.runtime.Recognizer; +import org.antlr.v4.runtime.atn.PredictionMode; + +class KsqlGrammar { + + @Value + static class KsqlStatements { + List statements; + } + + static KsqlStatements parse(String ksql) { + var parsed = parseStatements(ksql); + if (parsed.singleStatement().stream() + .anyMatch(s -> s.statement().exception != null)) { + throw new ValidationException("Error parsing ksql statement. Check syntax!"); + } + return new KsqlStatements(parsed.singleStatement()); + } + + static boolean isSelect(KsqlGrammarParser.SingleStatementContext statement) { + return statement.statement() instanceof ksql.KsqlGrammarParser.QueryStatementContext; + } + + private static ksql.KsqlGrammarParser.StatementsContext parseStatements(final String sql) { + var lexer = new KsqlGrammarLexer(CaseInsensitiveStream.from(CharStreams.fromString(sql))); + var tokenStream = new CommonTokenStream(lexer); + var grammarParser = new ksql.KsqlGrammarParser(tokenStream); + + lexer.addErrorListener(new BaseErrorListener() { + @Override + public void syntaxError(Recognizer recognizer, Object offendingSymbol, + int line, int charPositionInLine, + String msg, RecognitionException e) { + throw new ValidationException("Invalid syntax: " + msg); + } + }); + grammarParser.getInterpreter().setPredictionMode(PredictionMode.LL); + try { + return grammarParser.statements(); + } catch (Exception e) { + throw new ValidationException("Error parsing ksql query: " + e.getMessage()); + } + } + + // impl copied from https://github.com/confluentinc/ksql/blob/master/ksqldb-parser/src/main/java/io/confluent/ksql/parser/CaseInsensitiveStream.java + @RequiredArgsConstructor + private static class CaseInsensitiveStream implements CharStream { + @Delegate + final CharStream stream; + + public static CaseInsensitiveStream from(CharStream stream) { + // we only need to override LA method + return new CaseInsensitiveStream(stream) { + @Override + public int LA(final int i) { + final int result = stream.LA(i); + switch (result) { + case 0: + case IntStream.EOF: + return result; + default: + return Character.toUpperCase(result); + } + } + }; + } + } +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java new file mode 100644 index 0000000000..c86a3293ff --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java @@ -0,0 +1,69 @@ +package com.provectus.kafka.ui.service.ksql.response; + +import static com.provectus.kafka.ui.service.ksql.KsqlApiClient.KsqlResponseTable; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + + +class DynamicParser { + + static KsqlResponseTable parseArray(String tableName, JsonNode array) { + return parseArray(tableName, getFieldNamesFromArray(array), array); + } + + static KsqlResponseTable parseArray(String tableName, + List columnNames, + JsonNode array) { + return KsqlResponseTable.builder() + .header(tableName) + .columnNames(columnNames) + .values( + StreamSupport.stream(array.spliterator(), false) + .map(node -> + columnNames.stream() + .map(node::get) + .collect(Collectors.toList())) + .collect(Collectors.toList()) + ).build(); + } + + private static List getFieldNamesFromArray(JsonNode array) { + List fields = new ArrayList<>(); + array.forEach(node -> node.fieldNames().forEachRemaining(f -> { + if (!fields.contains(f)) { + fields.add(f); + } + })); + return fields; + } + + static KsqlResponseTable parseObject(String tableName, JsonNode node) { + if (!node.isObject()) { + return KsqlResponseTable.builder() + .header(tableName) + .columnNames(List.of("value")) + .values(List.of(List.of(node))) + .build(); + } + return parseObject(tableName, Lists.newArrayList(node.fieldNames()), node); + } + + static KsqlResponseTable parseObject(String tableName, List columnNames, JsonNode node) { + return KsqlResponseTable.builder() + .header(tableName) + .columnNames(columnNames) + .values( + List.of( + columnNames.stream() + .map(node::get) + .collect(Collectors.toList())) + ) + .build(); + } + +} diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java new file mode 100644 index 0000000000..7e0d8cb483 --- /dev/null +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java @@ -0,0 +1,161 @@ +package com.provectus.kafka.ui.service.ksql.response; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.google.common.collect.Lists; +import com.provectus.kafka.ui.exception.KsqlApiException; +import com.provectus.kafka.ui.service.ksql.KsqlApiClient; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import org.springframework.web.reactive.function.client.WebClientResponseException; + +public class ResponseParser { + + public static Optional parseSelectResponse(JsonNode jsonNode) { + // in response we getting either header record or row data + if (arrayFieldNonEmpty(jsonNode, "header")) { + return Optional.of( + KsqlApiClient.KsqlResponseTable.builder() + .header("Schema") + .columnNames( + Arrays.stream(jsonNode.get("header").get("schema").asText().split(",")) + .map(String::trim) + .collect(Collectors.toList()) + ) + .build()); + } + if (arrayFieldNonEmpty(jsonNode, "row")) { + return Optional.of( + KsqlApiClient.KsqlResponseTable.builder() + .header("Row") + .values( + List.of(Lists.newArrayList(jsonNode.get("row").get("columns")))) + .build()); + } + if (jsonNode.hasNonNull("errorMessage")) { + throw new KsqlApiException("Error: " + jsonNode.get("errorMessage")); + } + // remaining events can be skipped + return Optional.empty(); + } + + public static KsqlApiClient.KsqlResponseTable parseErrorResponse(WebClientResponseException e) { + try { + var errBody = new JsonMapper().readTree(e.getResponseBodyAsString()); + return DynamicParser.parseObject("Execution error", errBody); + } catch (Exception ex) { + throw new KsqlApiException( + String.format( + "Unparsable error response from ksqdb, status:'%s', body: '%s'", + e.getStatusCode(), e.getResponseBodyAsString()), e); + } + } + + public static List parseStatementResponse(JsonNode jsonNode) { + var type = Optional.ofNullable(jsonNode.get("@type")) + .map(JsonNode::asText) + .orElse("unknown"); + + // messages structure can be inferred from https://github.com/confluentinc/ksql/blob/master/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java + switch (type) { + case "currentStatus": + return parseObject( + "Status", + List.of("status", "message"), + jsonNode.get("commandStatus") + ); + case "properties": + return parseProperties(jsonNode); + case "queries": + return parseArray("Queries", "queries", jsonNode); + case "sourceDescription": + return parseObjectDynamically("Source Description", jsonNode.get("sourceDescription")); + case "queryDescription": + return parseArray("Queries Description", "queryDescription", jsonNode); + case "topicDescription": + return parseObject( + "Topic Description", + List.of("name", "kafkaTopic", "format", "schemaString"), + jsonNode + ); + case "streams": + return parseArray("Streams", "streams", jsonNode); + case "tables": + return parseArray("Tables", "tables", jsonNode); + case "kafka_topics": + return parseArray("Topics", "topics", jsonNode); + case "kafka_topics_extended": + return parseArray("Topics extended", "topics", jsonNode); + case "executionPlan": + return parseObject("Execution plan", List.of("executionPlanText"), jsonNode); + case "source_descriptions": + return parseArray("Source descriptions", "sourceDescriptions", jsonNode); + case "query_descriptions": + return parseArray("Queries", "queryDescriptions", jsonNode); + case "describe_function": + return parseObject("Function description", + List.of("name", "author", "version", "description", "functions", "path", "type"), + jsonNode + ); + case "function_names": + return parseArray("Function Names", "functions", jsonNode); + case "connector_info": + return parseObjectDynamically("Connector Info", jsonNode.get("info")); + case "drop_connector": + return parseObject("Dropped connector", List.of("connectorName"), jsonNode); + case "connector_list": + return parseArray("Connectors", "connectors", jsonNode); + case "connector_plugins_list": + return parseArray("Connector Plugins", "connectorPlugins", jsonNode); + case "connector_description": + return parseObject("Connector Description", + List.of("connectorClass", "status", "sources", "topics"), + jsonNode + ); + default: + return parseUnknownResponse(jsonNode); + } + } + + private static List parseObjectDynamically( + String tableName, JsonNode jsonNode) { + return List.of(DynamicParser.parseObject(tableName, jsonNode)); + } + + private static List parseObject( + String tableName, List fields, JsonNode jsonNode) { + return List.of(DynamicParser.parseObject(tableName, fields, jsonNode)); + } + + private static List parseArray( + String tableName, String arrayField, JsonNode jsonNode) { + return List.of(DynamicParser.parseArray(tableName, jsonNode.get(arrayField))); + } + + private static List parseProperties(JsonNode jsonNode) { + var tables = new ArrayList(); + if (arrayFieldNonEmpty(jsonNode, "properties")) { + tables.add(DynamicParser.parseArray("properties", jsonNode.get("properties"))); + } + if (arrayFieldNonEmpty(jsonNode, "overwrittenProperties")) { + tables.add(DynamicParser.parseArray("overwrittenProperties", + jsonNode.get("overwrittenProperties"))); + } + if (arrayFieldNonEmpty(jsonNode, "defaultProperties")) { + tables.add(DynamicParser.parseArray("defaultProperties", jsonNode.get("defaultProperties"))); + } + return tables; + } + + private static List parseUnknownResponse(JsonNode jsonNode) { + return List.of(DynamicParser.parseObject("Ksql Response", jsonNode)); + } + + private static boolean arrayFieldNonEmpty(JsonNode json, String field) { + return json.hasNonNull(field) && !json.get(field).isEmpty(); + } + +} diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 93e159a5d4..00873584c5 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -1338,6 +1338,7 @@ paths: description: OK /api/clusters/{clusterName}/ksql: + description: Deprecated - use ksql/v2 instead! post: tags: - Ksql @@ -1362,6 +1363,33 @@ paths: schema: $ref: '#/components/schemas/KsqlCommandResponse' + /api/clusters/{clusterName}/ksql/v2: + post: + tags: + - Ksql + summary: executeKsql + operationId: executeKsql + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/KsqlCommand' + responses: + 200: + description: OK + content: + text/event-stream: + schema: + type: array + items: + $ref: '#/components/schemas/KsqlResponse' + /api/clusters/{clusterName}/connects/{connectName}/plugins: get: tags: @@ -2465,6 +2493,28 @@ components: - headers - rows + KsqlResponse: + type: object + properties: + table: + $ref: '#/components/schemas/KsqlTableResponse' + + KsqlTableResponse: + type: object + properties: + header: + type: string + columnNames: + type: array + items: + type: string + values: + type: array + items: + type: array + items: + type: object + FullConnectorInfo: type: object properties: diff --git a/pom.xml b/pom.xml index 40e2d5fd23..7864ac7438 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ 5.7.2 2.21.0 3.19.0 + 4.7.1 ..//kafka-ui-react-app/src/generated-sources