diff --git a/docker/kafka-ui-connectors.yaml b/docker/kafka-ui-connectors.yaml
index 5f4a3b597d..8d0d88d755 100644
--- a/docker/kafka-ui-connectors.yaml
+++ b/docker/kafka-ui-connectors.yaml
@@ -51,6 +51,8 @@ services:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
@@ -74,6 +76,8 @@ services:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
+ KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
JMX_PORT: 9998
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9998
diff --git a/kafka-ui-api/pom.xml b/kafka-ui-api/pom.xml
index f134cdbe3d..6723a24cd3 100644
--- a/kafka-ui-api/pom.xml
+++ b/kafka-ui-api/pom.xml
@@ -186,6 +186,11 @@
spring-boot-starter-actuator
+
+ org.antlr
+ antlr4-runtime
+ ${antlr4-maven-plugin.version}
+
@@ -271,6 +276,22 @@
+
+ org.antlr
+ antlr4-maven-plugin
+ ${antlr4-maven-plugin.version}
+
+ false
+
+
+
+ generate-sources
+
+ antlr4
+
+
+
+
diff --git a/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4 b/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4
new file mode 100644
index 0000000000..2fcd623e3e
--- /dev/null
+++ b/kafka-ui-api/src/main/antlr4/ksql/KsqlGrammar.g4
@@ -0,0 +1,621 @@
+grammar KsqlGrammar;
+
+tokens {
+ DELIMITER
+}
+
+@lexer::members {
+ public static final int COMMENTS = 2;
+ public static final int WHITESPACE = 3;
+ public static final int DIRECTIVES = 4;
+}
+
+statements
+ : (singleStatement)* EOF
+ ;
+
+testStatement
+ : (singleStatement | assertStatement ';' | runScript ';') EOF?
+ ;
+
+singleStatement
+ : statement ';'
+ ;
+
+singleExpression
+ : expression EOF
+ ;
+
+statement
+ : query #queryStatement
+ | (LIST | SHOW) PROPERTIES #listProperties
+ | (LIST | SHOW) ALL? TOPICS EXTENDED? #listTopics
+ | (LIST | SHOW) STREAMS EXTENDED? #listStreams
+ | (LIST | SHOW) TABLES EXTENDED? #listTables
+ | (LIST | SHOW) FUNCTIONS #listFunctions
+ | (LIST | SHOW) (SOURCE | SINK)? CONNECTORS #listConnectors
+ | (LIST | SHOW) CONNECTOR PLUGINS #listConnectorPlugins
+ | (LIST | SHOW) TYPES #listTypes
+ | (LIST | SHOW) VARIABLES #listVariables
+ | DESCRIBE sourceName EXTENDED? #showColumns
+ | DESCRIBE STREAMS EXTENDED? #describeStreams
+ | DESCRIBE FUNCTION identifier #describeFunction
+ | DESCRIBE CONNECTOR identifier #describeConnector
+ | PRINT (identifier| STRING) printClause #printTopic
+ | (LIST | SHOW) QUERIES EXTENDED? #listQueries
+ | TERMINATE identifier #terminateQuery
+ | TERMINATE ALL #terminateQuery
+ | SET STRING EQ STRING #setProperty
+ | UNSET STRING #unsetProperty
+ | DEFINE variableName EQ variableValue #defineVariable
+ | UNDEFINE variableName #undefineVariable
+ | CREATE (OR REPLACE)? (SOURCE)? STREAM (IF NOT EXISTS)? sourceName
+ (tableElements)?
+ (WITH tableProperties)? #createStream
+ | CREATE (OR REPLACE)? STREAM (IF NOT EXISTS)? sourceName
+ (WITH tableProperties)? AS query #createStreamAs
+ | CREATE (OR REPLACE)? (SOURCE)? TABLE (IF NOT EXISTS)? sourceName
+ (tableElements)?
+ (WITH tableProperties)? #createTable
+ | CREATE (OR REPLACE)? TABLE (IF NOT EXISTS)? sourceName
+ (WITH tableProperties)? AS query #createTableAs
+ | CREATE (SINK | SOURCE) CONNECTOR (IF NOT EXISTS)? identifier
+ WITH tableProperties #createConnector
+ | INSERT INTO sourceName (WITH tableProperties)? query #insertInto
+ | INSERT INTO sourceName (columns)? VALUES values #insertValues
+ | DROP STREAM (IF EXISTS)? sourceName (DELETE TOPIC)? #dropStream
+ | DROP TABLE (IF EXISTS)? sourceName (DELETE TOPIC)? #dropTable
+ | DROP CONNECTOR (IF EXISTS)? identifier #dropConnector
+ | EXPLAIN (statement | identifier) #explain
+ | CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType
+ | DROP TYPE (IF EXISTS)? identifier #dropType
+ | ALTER (STREAM | TABLE) sourceName alterOption (',' alterOption)* #alterSource
+ ;
+
+assertStatement
+ : ASSERT VALUES sourceName (columns)? VALUES values #assertValues
+ | ASSERT NULL VALUES sourceName (columns)? KEY values #assertTombstone
+ | ASSERT STREAM sourceName (tableElements)? (WITH tableProperties)? #assertStream
+ | ASSERT TABLE sourceName (tableElements)? (WITH tableProperties)? #assertTable
+ ;
+
+runScript
+ : RUN SCRIPT STRING
+ ;
+
+query
+ : SELECT selectItem (',' selectItem)*
+ FROM from=relation
+ (WINDOW windowExpression)?
+ (WHERE where=booleanExpression)?
+ (GROUP BY groupBy)?
+ (PARTITION BY partitionBy)?
+ (HAVING having=booleanExpression)?
+ (EMIT resultMaterialization)?
+ limitClause?
+ ;
+
+resultMaterialization
+ : CHANGES
+ | FINAL
+ ;
+
+alterOption
+ : ADD (COLUMN)? identifier type
+ ;
+
+tableElements
+ : '(' tableElement (',' tableElement)* ')'
+ ;
+
+tableElement
+ : identifier type columnConstraints?
+ ;
+
+columnConstraints
+ : ((PRIMARY)? KEY)
+ | HEADERS
+ | HEADER '(' STRING ')'
+ ;
+
+tableProperties
+ : '(' tableProperty (',' tableProperty)* ')'
+ ;
+
+tableProperty
+ : (identifier | STRING) EQ literal
+ ;
+
+printClause
+ : (FROM BEGINNING)? intervalClause? limitClause?
+ ;
+
+intervalClause
+ : (INTERVAL | SAMPLE) number
+ ;
+
+limitClause
+ : LIMIT number
+ ;
+
+retentionClause
+ : RETENTION number windowUnit
+ ;
+
+gracePeriodClause
+ : GRACE PERIOD number windowUnit
+ ;
+
+windowExpression
+ : (IDENTIFIER)?
+ ( tumblingWindowExpression | hoppingWindowExpression | sessionWindowExpression )
+ ;
+
+tumblingWindowExpression
+ : TUMBLING '(' SIZE number windowUnit (',' retentionClause)? (',' gracePeriodClause)?')'
+ ;
+
+hoppingWindowExpression
+ : HOPPING '(' SIZE number windowUnit ',' ADVANCE BY number windowUnit (',' retentionClause)? (',' gracePeriodClause)?')'
+ ;
+
+sessionWindowExpression
+ : SESSION '(' number windowUnit (',' retentionClause)? (',' gracePeriodClause)?')'
+ ;
+
+windowUnit
+ : DAY
+ | HOUR
+ | MINUTE
+ | SECOND
+ | MILLISECOND
+ | DAYS
+ | HOURS
+ | MINUTES
+ | SECONDS
+ | MILLISECONDS
+ ;
+
+groupBy
+ : valueExpression (',' valueExpression)*
+ | '(' (valueExpression (',' valueExpression)*)? ')'
+ ;
+
+partitionBy
+ : valueExpression (',' valueExpression)*
+ | '(' (valueExpression (',' valueExpression)*)? ')'
+ ;
+
+values
+ : '(' (valueExpression (',' valueExpression)*)? ')'
+ ;
+
+selectItem
+ : expression (AS? identifier)? #selectSingle
+ | identifier '.' ASTERISK #selectAll
+ | ASTERISK #selectAll
+ ;
+
+relation
+ : left=aliasedRelation joinedSource+ #joinRelation
+ | aliasedRelation #relationDefault
+ ;
+
+joinedSource
+ : joinType JOIN aliasedRelation joinWindow? joinCriteria
+ ;
+
+joinType
+ : INNER? #innerJoin
+ | FULL OUTER? #outerJoin
+ | LEFT OUTER? #leftJoin
+ ;
+
+joinWindow
+ : WITHIN withinExpression
+ ;
+
+withinExpression
+ : '(' joinWindowSize ',' joinWindowSize ')' (gracePeriodClause)? # joinWindowWithBeforeAndAfter
+ | joinWindowSize (gracePeriodClause)? # singleJoinWindow
+ ;
+
+joinWindowSize
+ : number windowUnit
+ ;
+
+joinCriteria
+ : ON booleanExpression
+ ;
+
+aliasedRelation
+ : relationPrimary (AS? sourceName)?
+ ;
+
+columns
+ : '(' identifier (',' identifier)* ')'
+ ;
+
+relationPrimary
+ : sourceName #tableName
+ ;
+
+expression
+ : booleanExpression
+ ;
+
+booleanExpression
+ : predicated #booleanDefault
+ | NOT booleanExpression #logicalNot
+ | left=booleanExpression operator=AND right=booleanExpression #logicalBinary
+ | left=booleanExpression operator=OR right=booleanExpression #logicalBinary
+ ;
+
+predicated
+ : valueExpression predicate[$valueExpression.ctx]?
+ ;
+
+predicate[ParserRuleContext value]
+ : comparisonOperator right=valueExpression #comparison
+ | NOT? BETWEEN lower=valueExpression AND upper=valueExpression #between
+ | NOT? IN '(' expression (',' expression)* ')' #inList
+ | NOT? LIKE pattern=valueExpression (ESCAPE escape=STRING)? #like
+ | IS NOT? NULL #nullPredicate
+ | IS NOT? DISTINCT FROM right=valueExpression #distinctFrom
+ ;
+
+valueExpression
+ : primaryExpression #valueExpressionDefault
+ | valueExpression AT timeZoneSpecifier #atTimeZone
+ | operator=(MINUS | PLUS) valueExpression #arithmeticUnary
+ | left=valueExpression operator=(ASTERISK | SLASH | PERCENT) right=valueExpression #arithmeticBinary
+ | left=valueExpression operator=(PLUS | MINUS) right=valueExpression #arithmeticBinary
+ | left=valueExpression CONCAT right=valueExpression #concatenation
+ ;
+
+primaryExpression
+ : literal #literalExpression
+ | identifier STRING #typeConstructor
+ | CASE valueExpression whenClause+ (ELSE elseExpression=expression)? END #simpleCase
+ | CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase
+ | CAST '(' expression AS type ')' #cast
+ | ARRAY '[' (expression (',' expression)*)? ']' #arrayConstructor
+ | MAP '(' (expression ASSIGN expression (',' expression ASSIGN expression)*)? ')' #mapConstructor
+ | STRUCT '(' (identifier ASSIGN expression (',' identifier ASSIGN expression)*)? ')' #structConstructor
+ | identifier '(' ASTERISK ')' #functionCall
+ | identifier '(' (functionArgument (',' functionArgument)* (',' lambdaFunction)*)? ')' #functionCall
+ | value=primaryExpression '[' index=valueExpression ']' #subscript
+ | identifier #columnReference
+ | identifier '.' identifier #qualifiedColumnReference
+ | base=primaryExpression STRUCT_FIELD_REF fieldName=identifier #dereference
+ | '(' expression ')' #parenthesizedExpression
+ ;
+
+functionArgument
+ : expression
+ | windowUnit
+ ;
+
+timeZoneSpecifier
+ : TIME ZONE STRING #timeZoneString
+ ;
+
+comparisonOperator
+ : EQ | NEQ | LT | LTE | GT | GTE
+ ;
+
+booleanValue
+ : TRUE | FALSE
+ ;
+
+type
+ : type ARRAY
+ | ARRAY '<' type '>'
+ | MAP '<' type ',' type '>'
+ | STRUCT '<' (identifier type (',' identifier type)*)? '>'
+ | DECIMAL '(' number ',' number ')'
+ | baseType ('(' typeParameter (',' typeParameter)* ')')?
+ ;
+
+typeParameter
+ : INTEGER_VALUE | 'STRING'
+ ;
+
+baseType
+ : identifier
+ ;
+
+whenClause
+ : WHEN condition=expression THEN result=expression
+ ;
+
+identifier
+ : VARIABLE #variableIdentifier
+ | IDENTIFIER #unquotedIdentifier
+ | QUOTED_IDENTIFIER #quotedIdentifierAlternative
+ | nonReserved #unquotedIdentifier
+ | BACKQUOTED_IDENTIFIER #backQuotedIdentifier
+ | DIGIT_IDENTIFIER #digitIdentifier
+ ;
+
+lambdaFunction
+ : identifier '=>' expression #lambda
+ | '(' identifier (',' identifier)* ')' '=>' expression #lambda
+ ;
+
+variableName
+ : IDENTIFIER
+ ;
+
+variableValue
+ : STRING
+ ;
+
+sourceName
+ : identifier
+ ;
+
+number
+ : MINUS? DECIMAL_VALUE #decimalLiteral
+ | MINUS? FLOATING_POINT_VALUE #floatLiteral
+ | MINUS? INTEGER_VALUE #integerLiteral
+ ;
+
+literal
+ : NULL #nullLiteral
+ | number #numericLiteral
+ | booleanValue #booleanLiteral
+ | STRING #stringLiteral
+ | VARIABLE #variableLiteral
+ ;
+
+nonReserved
+ : SHOW | TABLES | COLUMNS | COLUMN | PARTITIONS | FUNCTIONS | FUNCTION | SESSION
+ | STRUCT | MAP | ARRAY | PARTITION
+ | INTEGER | DATE | TIME | TIMESTAMP | INTERVAL | ZONE | 'STRING'
+ | YEAR | MONTH | DAY | HOUR | MINUTE | SECOND
+ | EXPLAIN | ANALYZE | TYPE | TYPES
+ | SET | RESET
+ | IF
+ | SOURCE | SINK
+ | PRIMARY | KEY
+ | EMIT
+ | CHANGES
+ | FINAL
+ | ESCAPE
+ | REPLACE
+ | ASSERT
+ | ALTER
+ | ADD
+ ;
+
+EMIT: 'EMIT';
+CHANGES: 'CHANGES';
+FINAL: 'FINAL';
+SELECT: 'SELECT';
+FROM: 'FROM';
+AS: 'AS';
+ALL: 'ALL';
+DISTINCT: 'DISTINCT';
+WHERE: 'WHERE';
+WITHIN: 'WITHIN';
+WINDOW: 'WINDOW';
+GROUP: 'GROUP';
+BY: 'BY';
+HAVING: 'HAVING';
+LIMIT: 'LIMIT';
+AT: 'AT';
+OR: 'OR';
+AND: 'AND';
+IN: 'IN';
+NOT: 'NOT';
+EXISTS: 'EXISTS';
+BETWEEN: 'BETWEEN';
+LIKE: 'LIKE';
+ESCAPE: 'ESCAPE';
+IS: 'IS';
+NULL: 'NULL';
+TRUE: 'TRUE';
+FALSE: 'FALSE';
+INTEGER: 'INTEGER';
+DATE: 'DATE';
+TIME: 'TIME';
+TIMESTAMP: 'TIMESTAMP';
+INTERVAL: 'INTERVAL';
+YEAR: 'YEAR';
+MONTH: 'MONTH';
+DAY: 'DAY';
+HOUR: 'HOUR';
+MINUTE: 'MINUTE';
+SECOND: 'SECOND';
+MILLISECOND: 'MILLISECOND';
+YEARS: 'YEARS';
+MONTHS: 'MONTHS';
+DAYS: 'DAYS';
+HOURS: 'HOURS';
+MINUTES: 'MINUTES';
+SECONDS: 'SECONDS';
+MILLISECONDS: 'MILLISECONDS';
+ZONE: 'ZONE';
+TUMBLING: 'TUMBLING';
+HOPPING: 'HOPPING';
+SIZE: 'SIZE';
+ADVANCE: 'ADVANCE';
+RETENTION: 'RETENTION';
+GRACE: 'GRACE';
+PERIOD: 'PERIOD';
+CASE: 'CASE';
+WHEN: 'WHEN';
+THEN: 'THEN';
+ELSE: 'ELSE';
+END: 'END';
+JOIN: 'JOIN';
+FULL: 'FULL';
+OUTER: 'OUTER';
+INNER: 'INNER';
+LEFT: 'LEFT';
+RIGHT: 'RIGHT';
+ON: 'ON';
+PARTITION: 'PARTITION';
+STRUCT: 'STRUCT';
+WITH: 'WITH';
+VALUES: 'VALUES';
+CREATE: 'CREATE';
+TABLE: 'TABLE';
+TOPIC: 'TOPIC';
+STREAM: 'STREAM';
+STREAMS: 'STREAMS';
+INSERT: 'INSERT';
+DELETE: 'DELETE';
+INTO: 'INTO';
+DESCRIBE: 'DESCRIBE';
+EXTENDED: 'EXTENDED';
+PRINT: 'PRINT';
+EXPLAIN: 'EXPLAIN';
+ANALYZE: 'ANALYZE';
+TYPE: 'TYPE';
+TYPES: 'TYPES';
+CAST: 'CAST';
+SHOW: 'SHOW';
+LIST: 'LIST';
+TABLES: 'TABLES';
+TOPICS: 'TOPICS';
+QUERY: 'QUERY';
+QUERIES: 'QUERIES';
+TERMINATE: 'TERMINATE';
+LOAD: 'LOAD';
+COLUMNS: 'COLUMNS';
+COLUMN: 'COLUMN';
+PARTITIONS: 'PARTITIONS';
+FUNCTIONS: 'FUNCTIONS';
+FUNCTION: 'FUNCTION';
+DROP: 'DROP';
+TO: 'TO';
+RENAME: 'RENAME';
+ARRAY: 'ARRAY';
+MAP: 'MAP';
+SET: 'SET';
+DEFINE: 'DEFINE';
+UNDEFINE: 'UNDEFINE';
+RESET: 'RESET';
+SESSION: 'SESSION';
+SAMPLE: 'SAMPLE';
+EXPORT: 'EXPORT';
+CATALOG: 'CATALOG';
+PROPERTIES: 'PROPERTIES';
+BEGINNING: 'BEGINNING';
+UNSET: 'UNSET';
+RUN: 'RUN';
+SCRIPT: 'SCRIPT';
+DECIMAL: 'DECIMAL';
+KEY: 'KEY';
+CONNECTOR: 'CONNECTOR';
+CONNECTORS: 'CONNECTORS';
+SINK: 'SINK';
+SOURCE: 'SOURCE';
+NAMESPACE: 'NAMESPACE';
+MATERIALIZED: 'MATERIALIZED';
+VIEW: 'VIEW';
+PRIMARY: 'PRIMARY';
+REPLACE: 'REPLACE';
+ASSERT: 'ASSERT';
+ADD: 'ADD';
+ALTER: 'ALTER';
+VARIABLES: 'VARIABLES';
+PLUGINS: 'PLUGINS';
+HEADERS: 'HEADERS';
+HEADER: 'HEADER';
+
+IF: 'IF';
+
+EQ : '=';
+NEQ : '<>' | '!=';
+LT : '<';
+LTE : '<=';
+GT : '>';
+GTE : '>=';
+
+PLUS: '+';
+MINUS: '-';
+ASTERISK: '*';
+SLASH: '/';
+PERCENT: '%';
+CONCAT: '||';
+
+ASSIGN: ':=';
+STRUCT_FIELD_REF: '->';
+
+LAMBDA_EXPRESSION: '=>';
+
+STRING
+ : '\'' ( ~'\'' | '\'\'' )* '\''
+ ;
+
+INTEGER_VALUE
+ : DIGIT+
+ ;
+
+DECIMAL_VALUE
+ : DIGIT+ '.' DIGIT*
+ | '.' DIGIT+
+ ;
+
+FLOATING_POINT_VALUE
+ : DIGIT+ ('.' DIGIT*)? EXPONENT
+ | '.' DIGIT+ EXPONENT
+ ;
+
+IDENTIFIER
+ : (LETTER | '_') (LETTER | DIGIT | '_' | '@' )*
+ ;
+
+DIGIT_IDENTIFIER
+ : DIGIT (LETTER | DIGIT | '_' | '@' )+
+ ;
+
+QUOTED_IDENTIFIER
+ : '"' ( ~'"' | '""' )* '"'
+ ;
+
+BACKQUOTED_IDENTIFIER
+ : '`' ( ~'`' | '``' )* '`'
+ ;
+
+VARIABLE
+ : '${' IDENTIFIER '}'
+ ;
+
+fragment EXPONENT
+ : 'E' [+-]? DIGIT+
+ ;
+
+fragment DIGIT
+ : [0-9]
+ ;
+
+fragment LETTER
+ : [A-Z]
+ ;
+
+SIMPLE_COMMENT
+ : '--' ~'@' ~[\r\n]* '\r'? '\n'? -> channel(2) // channel(COMMENTS)
+ ;
+
+DIRECTIVE_COMMENT
+ : '--@' ~[\r\n]* '\r'? '\n'? -> channel(4) // channel(DIRECTIVES)
+ ;
+
+BRACKETED_COMMENT
+ : '/*' .*? '*/' -> channel(2) // channel(COMMENTS)
+ ;
+
+WS
+ : [ \r\n\t]+ -> channel(3) // channel(WHITESPACE)
+ ;
+
+// Catch-all for anything we can't recognize.
+// We use this to be able to ignore and recover all the text
+// when splitting statements with DelimiterLexer
+UNRECOGNIZED
+ : .
+ ;
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java
index 4c4770f3bd..c1bf6c2b63 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/KsqlController.java
@@ -3,14 +3,22 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.KsqlApi;
import com.provectus.kafka.ui.model.KsqlCommandDTO;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
+import com.provectus.kafka.ui.model.KsqlResponseDTO;
+import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
import com.provectus.kafka.ui.service.KsqlService;
+import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+
@RestController
@RequiredArgsConstructor
@Log4j2
@@ -25,4 +33,26 @@ public class KsqlController extends AbstractController implements KsqlApi {
return ksqlService.executeKsqlCommand(getCluster(clusterName), ksqlCommand)
.map(ResponseEntity::ok);
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Mono>> executeKsql(String clusterName,
+ Mono ksqlCommand,
+ ServerWebExchange exchange) {
+ return Mono.just(
+ ResponseEntity.ok(
+ ksqlCommand
+ .flux()
+ .flatMap(command ->
+ new KsqlApiClient(getCluster(clusterName))
+ .execute(
+ command.getKsql(),
+ Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of())))
+ .map(table -> new KsqlResponseDTO()
+ .table(
+ new KsqlTableResponseDTO()
+ .header(table.getHeader())
+ .columnNames(table.getColumnNames())
+ .values((List>) ((List>) (table.getValues())))))));
+ }
}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java
index 7315c20a61..e9f33f2e2c 100644
--- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/ErrorCode.java
@@ -8,6 +8,7 @@ import org.springframework.http.HttpStatus;
public enum ErrorCode {
UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR),
+ KSQL_API_ERROR(5001, HttpStatus.INTERNAL_SERVER_ERROR),
BINDING_FAIL(4001, HttpStatus.BAD_REQUEST),
NOT_FOUND(404, HttpStatus.NOT_FOUND),
INVALID_ENTITY_STATE(4001, HttpStatus.BAD_REQUEST),
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlApiException.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlApiException.java
new file mode 100644
index 0000000000..1a867dfa01
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/exception/KsqlApiException.java
@@ -0,0 +1,17 @@
+package com.provectus.kafka.ui.exception;
+
+public class KsqlApiException extends CustomBaseException {
+
+ public KsqlApiException(String message) {
+ super(message);
+ }
+
+ public KsqlApiException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ @Override
+ public ErrorCode getErrorCode() {
+ return ErrorCode.KSQL_API_ERROR;
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java
new file mode 100644
index 0000000000..0604b9a9ff
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlApiClient.java
@@ -0,0 +1,114 @@
+package com.provectus.kafka.ui.service.ksql;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.provectus.kafka.ui.exception.ValidationException;
+import com.provectus.kafka.ui.model.KafkaCluster;
+import com.provectus.kafka.ui.service.ksql.response.ResponseParser;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import lombok.Builder;
+import lombok.Value;
+import org.springframework.http.MediaType;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
+
+public class KsqlApiClient {
+
+ @Builder
+ @Value
+ public static class KsqlResponseTable {
+ String header;
+ List columnNames;
+ List> values;
+ }
+
+ @Value
+ private static class KsqlRequest {
+ String ksql;
+ Map streamsProperties;
+ }
+
+ //--------------------------------------------------------------------------------------------
+
+ private final KafkaCluster cluster;
+
+ public KsqlApiClient(KafkaCluster cluster) {
+ this.cluster = cluster;
+ }
+
+ private WebClient webClient() {
+ return WebClient.create();
+ }
+
+ private String baseKsqlDbUri() {
+ return cluster.getKsqldbServer();
+ }
+
+ private KsqlRequest ksqlRequest(String ksql, Map streamProperties) {
+ return new KsqlRequest(ksql, streamProperties);
+ }
+
+ private Flux executeSelect(String ksql, Map streamProperties) {
+ return webClient()
+ .post()
+ .uri(baseKsqlDbUri() + "/query")
+ .accept(MediaType.parseMediaType("application/vnd.ksql.v1+json"))
+ .contentType(MediaType.parseMediaType("application/json"))
+ .bodyValue(ksqlRequest(ksql, streamProperties))
+ .retrieve()
+ .bodyToFlux(JsonNode.class)
+ .map(ResponseParser::parseSelectResponse)
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .onErrorResume(WebClientResponseException.class,
+ e -> Flux.just(ResponseParser.parseErrorResponse(e)));
+ }
+
+ private Flux executeStatement(String ksql,
+ Map streamProperties) {
+ return webClient()
+ .post()
+ .uri(baseKsqlDbUri() + "/ksql")
+ .accept(MediaType.parseMediaType("application/vnd.ksql.v1+json"))
+ .contentType(MediaType.parseMediaType("application/json"))
+ .bodyValue(ksqlRequest(ksql, streamProperties))
+ .exchangeToFlux(
+ resp -> {
+ if (resp.statusCode().isError()) {
+ return resp.createException().flux().map(ResponseParser::parseErrorResponse);
+ }
+ return resp.bodyToFlux(JsonNode.class)
+ .flatMap(body ->
+ // body can be an array or single object
+ (body.isArray() ? Flux.fromIterable(body) : Flux.just(body))
+ .flatMapIterable(ResponseParser::parseStatementResponse))
+ // body can be empty for some statements like INSERT
+ .switchIfEmpty(
+ Flux.just(KsqlResponseTable.builder()
+ .header("Query Result")
+ .columnNames(List.of("Result"))
+ .values(List.of(List.of(new TextNode("Success"))))
+ .build()));
+ }
+ );
+ }
+
+ public Flux execute(String ksql, Map streamProperties) {
+ var parsed = KsqlGrammar.parse(ksql);
+ if (parsed.getStatements().size() > 1) {
+ throw new ValidationException("Only single statement supported now");
+ }
+ if (parsed.getStatements().size() == 0) {
+ throw new ValidationException("No valid ksql statement found");
+ }
+ if (KsqlGrammar.isSelect(parsed.getStatements().get(0))) {
+ return executeSelect(ksql, streamProperties);
+ } else {
+ return executeStatement(ksql, streamProperties);
+ }
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java
new file mode 100644
index 0000000000..efb759258c
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/KsqlGrammar.java
@@ -0,0 +1,83 @@
+package com.provectus.kafka.ui.service.ksql;
+
+import com.provectus.kafka.ui.exception.ValidationException;
+import java.util.List;
+import ksql.KsqlGrammarLexer;
+import ksql.KsqlGrammarParser;
+import lombok.RequiredArgsConstructor;
+import lombok.Value;
+import lombok.experimental.Delegate;
+import org.antlr.v4.runtime.BaseErrorListener;
+import org.antlr.v4.runtime.CharStream;
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.IntStream;
+import org.antlr.v4.runtime.RecognitionException;
+import org.antlr.v4.runtime.Recognizer;
+import org.antlr.v4.runtime.atn.PredictionMode;
+
+class KsqlGrammar {
+
+ @Value
+ static class KsqlStatements {
+ List statements;
+ }
+
+ static KsqlStatements parse(String ksql) {
+ var parsed = parseStatements(ksql);
+ if (parsed.singleStatement().stream()
+ .anyMatch(s -> s.statement().exception != null)) {
+ throw new ValidationException("Error parsing ksql statement. Check syntax!");
+ }
+ return new KsqlStatements(parsed.singleStatement());
+ }
+
+ static boolean isSelect(KsqlGrammarParser.SingleStatementContext statement) {
+ return statement.statement() instanceof ksql.KsqlGrammarParser.QueryStatementContext;
+ }
+
+ private static ksql.KsqlGrammarParser.StatementsContext parseStatements(final String sql) {
+ var lexer = new KsqlGrammarLexer(CaseInsensitiveStream.from(CharStreams.fromString(sql)));
+ var tokenStream = new CommonTokenStream(lexer);
+ var grammarParser = new ksql.KsqlGrammarParser(tokenStream);
+
+ lexer.addErrorListener(new BaseErrorListener() {
+ @Override
+ public void syntaxError(Recognizer, ?> recognizer, Object offendingSymbol,
+ int line, int charPositionInLine,
+ String msg, RecognitionException e) {
+ throw new ValidationException("Invalid syntax: " + msg);
+ }
+ });
+ grammarParser.getInterpreter().setPredictionMode(PredictionMode.LL);
+ try {
+ return grammarParser.statements();
+ } catch (Exception e) {
+ throw new ValidationException("Error parsing ksql query: " + e.getMessage());
+ }
+ }
+
+ // impl copied from https://github.com/confluentinc/ksql/blob/master/ksqldb-parser/src/main/java/io/confluent/ksql/parser/CaseInsensitiveStream.java
+ @RequiredArgsConstructor
+ private static class CaseInsensitiveStream implements CharStream {
+ @Delegate
+ final CharStream stream;
+
+ public static CaseInsensitiveStream from(CharStream stream) {
+ // we only need to override LA method
+ return new CaseInsensitiveStream(stream) {
+ @Override
+ public int LA(final int i) {
+ final int result = stream.LA(i);
+ switch (result) {
+ case 0:
+ case IntStream.EOF:
+ return result;
+ default:
+ return Character.toUpperCase(result);
+ }
+ }
+ };
+ }
+ }
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java
new file mode 100644
index 0000000000..c86a3293ff
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/DynamicParser.java
@@ -0,0 +1,69 @@
+package com.provectus.kafka.ui.service.ksql.response;
+
+import static com.provectus.kafka.ui.service.ksql.KsqlApiClient.KsqlResponseTable;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+
+class DynamicParser {
+
+ static KsqlResponseTable parseArray(String tableName, JsonNode array) {
+ return parseArray(tableName, getFieldNamesFromArray(array), array);
+ }
+
+ static KsqlResponseTable parseArray(String tableName,
+ List columnNames,
+ JsonNode array) {
+ return KsqlResponseTable.builder()
+ .header(tableName)
+ .columnNames(columnNames)
+ .values(
+ StreamSupport.stream(array.spliterator(), false)
+ .map(node ->
+ columnNames.stream()
+ .map(node::get)
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList())
+ ).build();
+ }
+
+ private static List getFieldNamesFromArray(JsonNode array) {
+ List fields = new ArrayList<>();
+ array.forEach(node -> node.fieldNames().forEachRemaining(f -> {
+ if (!fields.contains(f)) {
+ fields.add(f);
+ }
+ }));
+ return fields;
+ }
+
+ static KsqlResponseTable parseObject(String tableName, JsonNode node) {
+ if (!node.isObject()) {
+ return KsqlResponseTable.builder()
+ .header(tableName)
+ .columnNames(List.of("value"))
+ .values(List.of(List.of(node)))
+ .build();
+ }
+ return parseObject(tableName, Lists.newArrayList(node.fieldNames()), node);
+ }
+
+ static KsqlResponseTable parseObject(String tableName, List columnNames, JsonNode node) {
+ return KsqlResponseTable.builder()
+ .header(tableName)
+ .columnNames(columnNames)
+ .values(
+ List.of(
+ columnNames.stream()
+ .map(node::get)
+ .collect(Collectors.toList()))
+ )
+ .build();
+ }
+
+}
diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java
new file mode 100644
index 0000000000..7e0d8cb483
--- /dev/null
+++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ksql/response/ResponseParser.java
@@ -0,0 +1,161 @@
+package com.provectus.kafka.ui.service.ksql.response;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.google.common.collect.Lists;
+import com.provectus.kafka.ui.exception.KsqlApiException;
+import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+
+public class ResponseParser {
+
+ public static Optional parseSelectResponse(JsonNode jsonNode) {
+ // in response we getting either header record or row data
+ if (arrayFieldNonEmpty(jsonNode, "header")) {
+ return Optional.of(
+ KsqlApiClient.KsqlResponseTable.builder()
+ .header("Schema")
+ .columnNames(
+ Arrays.stream(jsonNode.get("header").get("schema").asText().split(","))
+ .map(String::trim)
+ .collect(Collectors.toList())
+ )
+ .build());
+ }
+ if (arrayFieldNonEmpty(jsonNode, "row")) {
+ return Optional.of(
+ KsqlApiClient.KsqlResponseTable.builder()
+ .header("Row")
+ .values(
+ List.of(Lists.newArrayList(jsonNode.get("row").get("columns"))))
+ .build());
+ }
+ if (jsonNode.hasNonNull("errorMessage")) {
+ throw new KsqlApiException("Error: " + jsonNode.get("errorMessage"));
+ }
+ // remaining events can be skipped
+ return Optional.empty();
+ }
+
+ public static KsqlApiClient.KsqlResponseTable parseErrorResponse(WebClientResponseException e) {
+ try {
+ var errBody = new JsonMapper().readTree(e.getResponseBodyAsString());
+ return DynamicParser.parseObject("Execution error", errBody);
+ } catch (Exception ex) {
+ throw new KsqlApiException(
+ String.format(
+ "Unparsable error response from ksqdb, status:'%s', body: '%s'",
+ e.getStatusCode(), e.getResponseBodyAsString()), e);
+ }
+ }
+
+ public static List parseStatementResponse(JsonNode jsonNode) {
+ var type = Optional.ofNullable(jsonNode.get("@type"))
+ .map(JsonNode::asText)
+ .orElse("unknown");
+
+ // messages structure can be inferred from https://github.com/confluentinc/ksql/blob/master/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java
+ switch (type) {
+ case "currentStatus":
+ return parseObject(
+ "Status",
+ List.of("status", "message"),
+ jsonNode.get("commandStatus")
+ );
+ case "properties":
+ return parseProperties(jsonNode);
+ case "queries":
+ return parseArray("Queries", "queries", jsonNode);
+ case "sourceDescription":
+ return parseObjectDynamically("Source Description", jsonNode.get("sourceDescription"));
+ case "queryDescription":
+ return parseArray("Queries Description", "queryDescription", jsonNode);
+ case "topicDescription":
+ return parseObject(
+ "Topic Description",
+ List.of("name", "kafkaTopic", "format", "schemaString"),
+ jsonNode
+ );
+ case "streams":
+ return parseArray("Streams", "streams", jsonNode);
+ case "tables":
+ return parseArray("Tables", "tables", jsonNode);
+ case "kafka_topics":
+ return parseArray("Topics", "topics", jsonNode);
+ case "kafka_topics_extended":
+ return parseArray("Topics extended", "topics", jsonNode);
+ case "executionPlan":
+ return parseObject("Execution plan", List.of("executionPlanText"), jsonNode);
+ case "source_descriptions":
+ return parseArray("Source descriptions", "sourceDescriptions", jsonNode);
+ case "query_descriptions":
+ return parseArray("Queries", "queryDescriptions", jsonNode);
+ case "describe_function":
+ return parseObject("Function description",
+ List.of("name", "author", "version", "description", "functions", "path", "type"),
+ jsonNode
+ );
+ case "function_names":
+ return parseArray("Function Names", "functions", jsonNode);
+ case "connector_info":
+ return parseObjectDynamically("Connector Info", jsonNode.get("info"));
+ case "drop_connector":
+ return parseObject("Dropped connector", List.of("connectorName"), jsonNode);
+ case "connector_list":
+ return parseArray("Connectors", "connectors", jsonNode);
+ case "connector_plugins_list":
+ return parseArray("Connector Plugins", "connectorPlugins", jsonNode);
+ case "connector_description":
+ return parseObject("Connector Description",
+ List.of("connectorClass", "status", "sources", "topics"),
+ jsonNode
+ );
+ default:
+ return parseUnknownResponse(jsonNode);
+ }
+ }
+
+ private static List parseObjectDynamically(
+ String tableName, JsonNode jsonNode) {
+ return List.of(DynamicParser.parseObject(tableName, jsonNode));
+ }
+
+ private static List parseObject(
+ String tableName, List fields, JsonNode jsonNode) {
+ return List.of(DynamicParser.parseObject(tableName, fields, jsonNode));
+ }
+
+ private static List parseArray(
+ String tableName, String arrayField, JsonNode jsonNode) {
+ return List.of(DynamicParser.parseArray(tableName, jsonNode.get(arrayField)));
+ }
+
+ private static List parseProperties(JsonNode jsonNode) {
+ var tables = new ArrayList();
+ if (arrayFieldNonEmpty(jsonNode, "properties")) {
+ tables.add(DynamicParser.parseArray("properties", jsonNode.get("properties")));
+ }
+ if (arrayFieldNonEmpty(jsonNode, "overwrittenProperties")) {
+ tables.add(DynamicParser.parseArray("overwrittenProperties",
+ jsonNode.get("overwrittenProperties")));
+ }
+ if (arrayFieldNonEmpty(jsonNode, "defaultProperties")) {
+ tables.add(DynamicParser.parseArray("defaultProperties", jsonNode.get("defaultProperties")));
+ }
+ return tables;
+ }
+
+ private static List parseUnknownResponse(JsonNode jsonNode) {
+ return List.of(DynamicParser.parseObject("Ksql Response", jsonNode));
+ }
+
+ private static boolean arrayFieldNonEmpty(JsonNode json, String field) {
+ return json.hasNonNull(field) && !json.get(field).isEmpty();
+ }
+
+}
diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
index 93e159a5d4..00873584c5 100644
--- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
+++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml
@@ -1338,6 +1338,7 @@ paths:
description: OK
/api/clusters/{clusterName}/ksql:
+ description: Deprecated - use ksql/v2 instead!
post:
tags:
- Ksql
@@ -1362,6 +1363,33 @@ paths:
schema:
$ref: '#/components/schemas/KsqlCommandResponse'
+ /api/clusters/{clusterName}/ksql/v2:
+ post:
+ tags:
+ - Ksql
+ summary: executeKsql
+ operationId: executeKsql
+ parameters:
+ - name: clusterName
+ in: path
+ required: true
+ schema:
+ type: string
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/KsqlCommand'
+ responses:
+ 200:
+ description: OK
+ content:
+ text/event-stream:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/KsqlResponse'
+
/api/clusters/{clusterName}/connects/{connectName}/plugins:
get:
tags:
@@ -2465,6 +2493,28 @@ components:
- headers
- rows
+ KsqlResponse:
+ type: object
+ properties:
+ table:
+ $ref: '#/components/schemas/KsqlTableResponse'
+
+ KsqlTableResponse:
+ type: object
+ properties:
+ header:
+ type: string
+ columnNames:
+ type: array
+ items:
+ type: string
+ values:
+ type: array
+ items:
+ type: array
+ items:
+ type: object
+
FullConnectorInfo:
type: object
properties:
diff --git a/pom.xml b/pom.xml
index 40e2d5fd23..7864ac7438 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,7 @@
5.7.2
2.21.0
3.19.0
+ 4.7.1
..//kafka-ui-react-app/src/generated-sources