Ksql functionality reimplemented (#1161)

* Ksql functionality reimplemented

Co-authored-by: iliax <ikuramshin@provectus.com>
This commit is contained in:
Ilya Kuramshin 2021-12-13 13:16:47 +03:00 committed by GitHub
parent 242f85aa2e
commit ec42c37f24
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 1172 additions and 0 deletions

View file

@ -51,6 +51,8 @@ services:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
JMX_PORT: 9997
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997
@ -74,6 +76,8 @@ services:
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
JMX_PORT: 9998
KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka1 -Dcom.sun.management.jmxremote.rmi.port=9998

View file

@ -186,6 +186,11 @@
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>${antlr4-maven-plugin.version}</version>
</dependency>
</dependencies>
<build>
@ -271,6 +276,22 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>${antlr4-maven-plugin.version}</version>
<configuration>
<visitor>false</visitor>
</configuration>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>antlr4</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

View file

@ -0,0 +1,621 @@
grammar KsqlGrammar;
tokens {
DELIMITER
}
@lexer::members {
public static final int COMMENTS = 2;
public static final int WHITESPACE = 3;
public static final int DIRECTIVES = 4;
}
statements
: (singleStatement)* EOF
;
testStatement
: (singleStatement | assertStatement ';' | runScript ';') EOF?
;
singleStatement
: statement ';'
;
singleExpression
: expression EOF
;
statement
: query #queryStatement
| (LIST | SHOW) PROPERTIES #listProperties
| (LIST | SHOW) ALL? TOPICS EXTENDED? #listTopics
| (LIST | SHOW) STREAMS EXTENDED? #listStreams
| (LIST | SHOW) TABLES EXTENDED? #listTables
| (LIST | SHOW) FUNCTIONS #listFunctions
| (LIST | SHOW) (SOURCE | SINK)? CONNECTORS #listConnectors
| (LIST | SHOW) CONNECTOR PLUGINS #listConnectorPlugins
| (LIST | SHOW) TYPES #listTypes
| (LIST | SHOW) VARIABLES #listVariables
| DESCRIBE sourceName EXTENDED? #showColumns
| DESCRIBE STREAMS EXTENDED? #describeStreams
| DESCRIBE FUNCTION identifier #describeFunction
| DESCRIBE CONNECTOR identifier #describeConnector
| PRINT (identifier| STRING) printClause #printTopic
| (LIST | SHOW) QUERIES EXTENDED? #listQueries
| TERMINATE identifier #terminateQuery
| TERMINATE ALL #terminateQuery
| SET STRING EQ STRING #setProperty
| UNSET STRING #unsetProperty
| DEFINE variableName EQ variableValue #defineVariable
| UNDEFINE variableName #undefineVariable
| CREATE (OR REPLACE)? (SOURCE)? STREAM (IF NOT EXISTS)? sourceName
(tableElements)?
(WITH tableProperties)? #createStream
| CREATE (OR REPLACE)? STREAM (IF NOT EXISTS)? sourceName
(WITH tableProperties)? AS query #createStreamAs
| CREATE (OR REPLACE)? (SOURCE)? TABLE (IF NOT EXISTS)? sourceName
(tableElements)?
(WITH tableProperties)? #createTable
| CREATE (OR REPLACE)? TABLE (IF NOT EXISTS)? sourceName
(WITH tableProperties)? AS query #createTableAs
| CREATE (SINK | SOURCE) CONNECTOR (IF NOT EXISTS)? identifier
WITH tableProperties #createConnector
| INSERT INTO sourceName (WITH tableProperties)? query #insertInto
| INSERT INTO sourceName (columns)? VALUES values #insertValues
| DROP STREAM (IF EXISTS)? sourceName (DELETE TOPIC)? #dropStream
| DROP TABLE (IF EXISTS)? sourceName (DELETE TOPIC)? #dropTable
| DROP CONNECTOR (IF EXISTS)? identifier #dropConnector
| EXPLAIN (statement | identifier) #explain
| CREATE TYPE (IF NOT EXISTS)? identifier AS type #registerType
| DROP TYPE (IF EXISTS)? identifier #dropType
| ALTER (STREAM | TABLE) sourceName alterOption (',' alterOption)* #alterSource
;
assertStatement
: ASSERT VALUES sourceName (columns)? VALUES values #assertValues
| ASSERT NULL VALUES sourceName (columns)? KEY values #assertTombstone
| ASSERT STREAM sourceName (tableElements)? (WITH tableProperties)? #assertStream
| ASSERT TABLE sourceName (tableElements)? (WITH tableProperties)? #assertTable
;
runScript
: RUN SCRIPT STRING
;
query
: SELECT selectItem (',' selectItem)*
FROM from=relation
(WINDOW windowExpression)?
(WHERE where=booleanExpression)?
(GROUP BY groupBy)?
(PARTITION BY partitionBy)?
(HAVING having=booleanExpression)?
(EMIT resultMaterialization)?
limitClause?
;
resultMaterialization
: CHANGES
| FINAL
;
alterOption
: ADD (COLUMN)? identifier type
;
tableElements
: '(' tableElement (',' tableElement)* ')'
;
tableElement
: identifier type columnConstraints?
;
columnConstraints
: ((PRIMARY)? KEY)
| HEADERS
| HEADER '(' STRING ')'
;
tableProperties
: '(' tableProperty (',' tableProperty)* ')'
;
tableProperty
: (identifier | STRING) EQ literal
;
printClause
: (FROM BEGINNING)? intervalClause? limitClause?
;
intervalClause
: (INTERVAL | SAMPLE) number
;
limitClause
: LIMIT number
;
retentionClause
: RETENTION number windowUnit
;
gracePeriodClause
: GRACE PERIOD number windowUnit
;
windowExpression
: (IDENTIFIER)?
( tumblingWindowExpression | hoppingWindowExpression | sessionWindowExpression )
;
tumblingWindowExpression
: TUMBLING '(' SIZE number windowUnit (',' retentionClause)? (',' gracePeriodClause)?')'
;
hoppingWindowExpression
: HOPPING '(' SIZE number windowUnit ',' ADVANCE BY number windowUnit (',' retentionClause)? (',' gracePeriodClause)?')'
;
sessionWindowExpression
: SESSION '(' number windowUnit (',' retentionClause)? (',' gracePeriodClause)?')'
;
windowUnit
: DAY
| HOUR
| MINUTE
| SECOND
| MILLISECOND
| DAYS
| HOURS
| MINUTES
| SECONDS
| MILLISECONDS
;
groupBy
: valueExpression (',' valueExpression)*
| '(' (valueExpression (',' valueExpression)*)? ')'
;
partitionBy
: valueExpression (',' valueExpression)*
| '(' (valueExpression (',' valueExpression)*)? ')'
;
values
: '(' (valueExpression (',' valueExpression)*)? ')'
;
selectItem
: expression (AS? identifier)? #selectSingle
| identifier '.' ASTERISK #selectAll
| ASTERISK #selectAll
;
relation
: left=aliasedRelation joinedSource+ #joinRelation
| aliasedRelation #relationDefault
;
joinedSource
: joinType JOIN aliasedRelation joinWindow? joinCriteria
;
joinType
: INNER? #innerJoin
| FULL OUTER? #outerJoin
| LEFT OUTER? #leftJoin
;
joinWindow
: WITHIN withinExpression
;
withinExpression
: '(' joinWindowSize ',' joinWindowSize ')' (gracePeriodClause)? # joinWindowWithBeforeAndAfter
| joinWindowSize (gracePeriodClause)? # singleJoinWindow
;
joinWindowSize
: number windowUnit
;
joinCriteria
: ON booleanExpression
;
aliasedRelation
: relationPrimary (AS? sourceName)?
;
columns
: '(' identifier (',' identifier)* ')'
;
relationPrimary
: sourceName #tableName
;
expression
: booleanExpression
;
booleanExpression
: predicated #booleanDefault
| NOT booleanExpression #logicalNot
| left=booleanExpression operator=AND right=booleanExpression #logicalBinary
| left=booleanExpression operator=OR right=booleanExpression #logicalBinary
;
predicated
: valueExpression predicate[$valueExpression.ctx]?
;
predicate[ParserRuleContext value]
: comparisonOperator right=valueExpression #comparison
| NOT? BETWEEN lower=valueExpression AND upper=valueExpression #between
| NOT? IN '(' expression (',' expression)* ')' #inList
| NOT? LIKE pattern=valueExpression (ESCAPE escape=STRING)? #like
| IS NOT? NULL #nullPredicate
| IS NOT? DISTINCT FROM right=valueExpression #distinctFrom
;
valueExpression
: primaryExpression #valueExpressionDefault
| valueExpression AT timeZoneSpecifier #atTimeZone
| operator=(MINUS | PLUS) valueExpression #arithmeticUnary
| left=valueExpression operator=(ASTERISK | SLASH | PERCENT) right=valueExpression #arithmeticBinary
| left=valueExpression operator=(PLUS | MINUS) right=valueExpression #arithmeticBinary
| left=valueExpression CONCAT right=valueExpression #concatenation
;
primaryExpression
: literal #literalExpression
| identifier STRING #typeConstructor
| CASE valueExpression whenClause+ (ELSE elseExpression=expression)? END #simpleCase
| CASE whenClause+ (ELSE elseExpression=expression)? END #searchedCase
| CAST '(' expression AS type ')' #cast
| ARRAY '[' (expression (',' expression)*)? ']' #arrayConstructor
| MAP '(' (expression ASSIGN expression (',' expression ASSIGN expression)*)? ')' #mapConstructor
| STRUCT '(' (identifier ASSIGN expression (',' identifier ASSIGN expression)*)? ')' #structConstructor
| identifier '(' ASTERISK ')' #functionCall
| identifier '(' (functionArgument (',' functionArgument)* (',' lambdaFunction)*)? ')' #functionCall
| value=primaryExpression '[' index=valueExpression ']' #subscript
| identifier #columnReference
| identifier '.' identifier #qualifiedColumnReference
| base=primaryExpression STRUCT_FIELD_REF fieldName=identifier #dereference
| '(' expression ')' #parenthesizedExpression
;
functionArgument
: expression
| windowUnit
;
timeZoneSpecifier
: TIME ZONE STRING #timeZoneString
;
comparisonOperator
: EQ | NEQ | LT | LTE | GT | GTE
;
booleanValue
: TRUE | FALSE
;
type
: type ARRAY
| ARRAY '<' type '>'
| MAP '<' type ',' type '>'
| STRUCT '<' (identifier type (',' identifier type)*)? '>'
| DECIMAL '(' number ',' number ')'
| baseType ('(' typeParameter (',' typeParameter)* ')')?
;
typeParameter
: INTEGER_VALUE | 'STRING'
;
baseType
: identifier
;
whenClause
: WHEN condition=expression THEN result=expression
;
identifier
: VARIABLE #variableIdentifier
| IDENTIFIER #unquotedIdentifier
| QUOTED_IDENTIFIER #quotedIdentifierAlternative
| nonReserved #unquotedIdentifier
| BACKQUOTED_IDENTIFIER #backQuotedIdentifier
| DIGIT_IDENTIFIER #digitIdentifier
;
lambdaFunction
: identifier '=>' expression #lambda
| '(' identifier (',' identifier)* ')' '=>' expression #lambda
;
variableName
: IDENTIFIER
;
variableValue
: STRING
;
sourceName
: identifier
;
number
: MINUS? DECIMAL_VALUE #decimalLiteral
| MINUS? FLOATING_POINT_VALUE #floatLiteral
| MINUS? INTEGER_VALUE #integerLiteral
;
literal
: NULL #nullLiteral
| number #numericLiteral
| booleanValue #booleanLiteral
| STRING #stringLiteral
| VARIABLE #variableLiteral
;
nonReserved
: SHOW | TABLES | COLUMNS | COLUMN | PARTITIONS | FUNCTIONS | FUNCTION | SESSION
| STRUCT | MAP | ARRAY | PARTITION
| INTEGER | DATE | TIME | TIMESTAMP | INTERVAL | ZONE | 'STRING'
| YEAR | MONTH | DAY | HOUR | MINUTE | SECOND
| EXPLAIN | ANALYZE | TYPE | TYPES
| SET | RESET
| IF
| SOURCE | SINK
| PRIMARY | KEY
| EMIT
| CHANGES
| FINAL
| ESCAPE
| REPLACE
| ASSERT
| ALTER
| ADD
;
EMIT: 'EMIT';
CHANGES: 'CHANGES';
FINAL: 'FINAL';
SELECT: 'SELECT';
FROM: 'FROM';
AS: 'AS';
ALL: 'ALL';
DISTINCT: 'DISTINCT';
WHERE: 'WHERE';
WITHIN: 'WITHIN';
WINDOW: 'WINDOW';
GROUP: 'GROUP';
BY: 'BY';
HAVING: 'HAVING';
LIMIT: 'LIMIT';
AT: 'AT';
OR: 'OR';
AND: 'AND';
IN: 'IN';
NOT: 'NOT';
EXISTS: 'EXISTS';
BETWEEN: 'BETWEEN';
LIKE: 'LIKE';
ESCAPE: 'ESCAPE';
IS: 'IS';
NULL: 'NULL';
TRUE: 'TRUE';
FALSE: 'FALSE';
INTEGER: 'INTEGER';
DATE: 'DATE';
TIME: 'TIME';
TIMESTAMP: 'TIMESTAMP';
INTERVAL: 'INTERVAL';
YEAR: 'YEAR';
MONTH: 'MONTH';
DAY: 'DAY';
HOUR: 'HOUR';
MINUTE: 'MINUTE';
SECOND: 'SECOND';
MILLISECOND: 'MILLISECOND';
YEARS: 'YEARS';
MONTHS: 'MONTHS';
DAYS: 'DAYS';
HOURS: 'HOURS';
MINUTES: 'MINUTES';
SECONDS: 'SECONDS';
MILLISECONDS: 'MILLISECONDS';
ZONE: 'ZONE';
TUMBLING: 'TUMBLING';
HOPPING: 'HOPPING';
SIZE: 'SIZE';
ADVANCE: 'ADVANCE';
RETENTION: 'RETENTION';
GRACE: 'GRACE';
PERIOD: 'PERIOD';
CASE: 'CASE';
WHEN: 'WHEN';
THEN: 'THEN';
ELSE: 'ELSE';
END: 'END';
JOIN: 'JOIN';
FULL: 'FULL';
OUTER: 'OUTER';
INNER: 'INNER';
LEFT: 'LEFT';
RIGHT: 'RIGHT';
ON: 'ON';
PARTITION: 'PARTITION';
STRUCT: 'STRUCT';
WITH: 'WITH';
VALUES: 'VALUES';
CREATE: 'CREATE';
TABLE: 'TABLE';
TOPIC: 'TOPIC';
STREAM: 'STREAM';
STREAMS: 'STREAMS';
INSERT: 'INSERT';
DELETE: 'DELETE';
INTO: 'INTO';
DESCRIBE: 'DESCRIBE';
EXTENDED: 'EXTENDED';
PRINT: 'PRINT';
EXPLAIN: 'EXPLAIN';
ANALYZE: 'ANALYZE';
TYPE: 'TYPE';
TYPES: 'TYPES';
CAST: 'CAST';
SHOW: 'SHOW';
LIST: 'LIST';
TABLES: 'TABLES';
TOPICS: 'TOPICS';
QUERY: 'QUERY';
QUERIES: 'QUERIES';
TERMINATE: 'TERMINATE';
LOAD: 'LOAD';
COLUMNS: 'COLUMNS';
COLUMN: 'COLUMN';
PARTITIONS: 'PARTITIONS';
FUNCTIONS: 'FUNCTIONS';
FUNCTION: 'FUNCTION';
DROP: 'DROP';
TO: 'TO';
RENAME: 'RENAME';
ARRAY: 'ARRAY';
MAP: 'MAP';
SET: 'SET';
DEFINE: 'DEFINE';
UNDEFINE: 'UNDEFINE';
RESET: 'RESET';
SESSION: 'SESSION';
SAMPLE: 'SAMPLE';
EXPORT: 'EXPORT';
CATALOG: 'CATALOG';
PROPERTIES: 'PROPERTIES';
BEGINNING: 'BEGINNING';
UNSET: 'UNSET';
RUN: 'RUN';
SCRIPT: 'SCRIPT';
DECIMAL: 'DECIMAL';
KEY: 'KEY';
CONNECTOR: 'CONNECTOR';
CONNECTORS: 'CONNECTORS';
SINK: 'SINK';
SOURCE: 'SOURCE';
NAMESPACE: 'NAMESPACE';
MATERIALIZED: 'MATERIALIZED';
VIEW: 'VIEW';
PRIMARY: 'PRIMARY';
REPLACE: 'REPLACE';
ASSERT: 'ASSERT';
ADD: 'ADD';
ALTER: 'ALTER';
VARIABLES: 'VARIABLES';
PLUGINS: 'PLUGINS';
HEADERS: 'HEADERS';
HEADER: 'HEADER';
IF: 'IF';
EQ : '=';
NEQ : '<>' | '!=';
LT : '<';
LTE : '<=';
GT : '>';
GTE : '>=';
PLUS: '+';
MINUS: '-';
ASTERISK: '*';
SLASH: '/';
PERCENT: '%';
CONCAT: '||';
ASSIGN: ':=';
STRUCT_FIELD_REF: '->';
LAMBDA_EXPRESSION: '=>';
STRING
: '\'' ( ~'\'' | '\'\'' )* '\''
;
INTEGER_VALUE
: DIGIT+
;
DECIMAL_VALUE
: DIGIT+ '.' DIGIT*
| '.' DIGIT+
;
FLOATING_POINT_VALUE
: DIGIT+ ('.' DIGIT*)? EXPONENT
| '.' DIGIT+ EXPONENT
;
IDENTIFIER
: (LETTER | '_') (LETTER | DIGIT | '_' | '@' )*
;
DIGIT_IDENTIFIER
: DIGIT (LETTER | DIGIT | '_' | '@' )+
;
QUOTED_IDENTIFIER
: '"' ( ~'"' | '""' )* '"'
;
BACKQUOTED_IDENTIFIER
: '`' ( ~'`' | '``' )* '`'
;
VARIABLE
: '${' IDENTIFIER '}'
;
fragment EXPONENT
: 'E' [+-]? DIGIT+
;
fragment DIGIT
: [0-9]
;
fragment LETTER
: [A-Z]
;
SIMPLE_COMMENT
: '--' ~'@' ~[\r\n]* '\r'? '\n'? -> channel(2) // channel(COMMENTS)
;
DIRECTIVE_COMMENT
: '--@' ~[\r\n]* '\r'? '\n'? -> channel(4) // channel(DIRECTIVES)
;
BRACKETED_COMMENT
: '/*' .*? '*/' -> channel(2) // channel(COMMENTS)
;
WS
: [ \r\n\t]+ -> channel(3) // channel(WHITESPACE)
;
// Catch-all for anything we can't recognize.
// We use this to be able to ignore and recover all the text
// when splitting statements with DelimiterLexer
UNRECOGNIZED
: .
;

View file

@ -3,14 +3,22 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.KsqlApi;
import com.provectus.kafka.ui.model.KsqlCommandDTO;
import com.provectus.kafka.ui.model.KsqlCommandResponseDTO;
import com.provectus.kafka.ui.model.KsqlResponseDTO;
import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
import com.provectus.kafka.ui.service.KsqlService;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@RestController
@RequiredArgsConstructor
@Log4j2
@ -25,4 +33,26 @@ public class KsqlController extends AbstractController implements KsqlApi {
return ksqlService.executeKsqlCommand(getCluster(clusterName), ksqlCommand)
.map(ResponseEntity::ok);
}
@SuppressWarnings("unchecked")
@Override
public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> executeKsql(String clusterName,
Mono<KsqlCommandDTO> ksqlCommand,
ServerWebExchange exchange) {
return Mono.just(
ResponseEntity.ok(
ksqlCommand
.flux()
.flatMap(command ->
new KsqlApiClient(getCluster(clusterName))
.execute(
command.getKsql(),
Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of())))
.map(table -> new KsqlResponseDTO()
.table(
new KsqlTableResponseDTO()
.header(table.getHeader())
.columnNames(table.getColumnNames())
.values((List<List<Object>>) ((List<?>) (table.getValues())))))));
}
}

View file

@ -8,6 +8,7 @@ import org.springframework.http.HttpStatus;
public enum ErrorCode {
UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR),
KSQL_API_ERROR(5001, HttpStatus.INTERNAL_SERVER_ERROR),
BINDING_FAIL(4001, HttpStatus.BAD_REQUEST),
NOT_FOUND(404, HttpStatus.NOT_FOUND),
INVALID_ENTITY_STATE(4001, HttpStatus.BAD_REQUEST),

View file

@ -0,0 +1,17 @@
package com.provectus.kafka.ui.exception;
public class KsqlApiException extends CustomBaseException {
public KsqlApiException(String message) {
super(message);
}
public KsqlApiException(String message, Throwable cause) {
super(message, cause);
}
@Override
public ErrorCode getErrorCode() {
return ErrorCode.KSQL_API_ERROR;
}
}

View file

@ -0,0 +1,114 @@
package com.provectus.kafka.ui.service.ksql;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.service.ksql.response.ResponseParser;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.Builder;
import lombok.Value;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
public class KsqlApiClient {
@Builder
@Value
public static class KsqlResponseTable {
String header;
List<String> columnNames;
List<List<JsonNode>> values;
}
@Value
private static class KsqlRequest {
String ksql;
Map<String, String> streamsProperties;
}
//--------------------------------------------------------------------------------------------
private final KafkaCluster cluster;
public KsqlApiClient(KafkaCluster cluster) {
this.cluster = cluster;
}
private WebClient webClient() {
return WebClient.create();
}
private String baseKsqlDbUri() {
return cluster.getKsqldbServer();
}
private KsqlRequest ksqlRequest(String ksql, Map<String, String> streamProperties) {
return new KsqlRequest(ksql, streamProperties);
}
private Flux<KsqlResponseTable> executeSelect(String ksql, Map<String, String> streamProperties) {
return webClient()
.post()
.uri(baseKsqlDbUri() + "/query")
.accept(MediaType.parseMediaType("application/vnd.ksql.v1+json"))
.contentType(MediaType.parseMediaType("application/json"))
.bodyValue(ksqlRequest(ksql, streamProperties))
.retrieve()
.bodyToFlux(JsonNode.class)
.map(ResponseParser::parseSelectResponse)
.filter(Optional::isPresent)
.map(Optional::get)
.onErrorResume(WebClientResponseException.class,
e -> Flux.just(ResponseParser.parseErrorResponse(e)));
}
private Flux<KsqlResponseTable> executeStatement(String ksql,
Map<String, String> streamProperties) {
return webClient()
.post()
.uri(baseKsqlDbUri() + "/ksql")
.accept(MediaType.parseMediaType("application/vnd.ksql.v1+json"))
.contentType(MediaType.parseMediaType("application/json"))
.bodyValue(ksqlRequest(ksql, streamProperties))
.exchangeToFlux(
resp -> {
if (resp.statusCode().isError()) {
return resp.createException().flux().map(ResponseParser::parseErrorResponse);
}
return resp.bodyToFlux(JsonNode.class)
.flatMap(body ->
// body can be an array or single object
(body.isArray() ? Flux.fromIterable(body) : Flux.just(body))
.flatMapIterable(ResponseParser::parseStatementResponse))
// body can be empty for some statements like INSERT
.switchIfEmpty(
Flux.just(KsqlResponseTable.builder()
.header("Query Result")
.columnNames(List.of("Result"))
.values(List.of(List.of(new TextNode("Success"))))
.build()));
}
);
}
public Flux<KsqlResponseTable> execute(String ksql, Map<String, String> streamProperties) {
var parsed = KsqlGrammar.parse(ksql);
if (parsed.getStatements().size() > 1) {
throw new ValidationException("Only single statement supported now");
}
if (parsed.getStatements().size() == 0) {
throw new ValidationException("No valid ksql statement found");
}
if (KsqlGrammar.isSelect(parsed.getStatements().get(0))) {
return executeSelect(ksql, streamProperties);
} else {
return executeStatement(ksql, streamProperties);
}
}
}

View file

@ -0,0 +1,83 @@
package com.provectus.kafka.ui.service.ksql;
import com.provectus.kafka.ui.exception.ValidationException;
import java.util.List;
import ksql.KsqlGrammarLexer;
import ksql.KsqlGrammarParser;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.Delegate;
import org.antlr.v4.runtime.BaseErrorListener;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.IntStream;
import org.antlr.v4.runtime.RecognitionException;
import org.antlr.v4.runtime.Recognizer;
import org.antlr.v4.runtime.atn.PredictionMode;
class KsqlGrammar {
@Value
static class KsqlStatements {
List<KsqlGrammarParser.SingleStatementContext> statements;
}
static KsqlStatements parse(String ksql) {
var parsed = parseStatements(ksql);
if (parsed.singleStatement().stream()
.anyMatch(s -> s.statement().exception != null)) {
throw new ValidationException("Error parsing ksql statement. Check syntax!");
}
return new KsqlStatements(parsed.singleStatement());
}
static boolean isSelect(KsqlGrammarParser.SingleStatementContext statement) {
return statement.statement() instanceof ksql.KsqlGrammarParser.QueryStatementContext;
}
private static ksql.KsqlGrammarParser.StatementsContext parseStatements(final String sql) {
var lexer = new KsqlGrammarLexer(CaseInsensitiveStream.from(CharStreams.fromString(sql)));
var tokenStream = new CommonTokenStream(lexer);
var grammarParser = new ksql.KsqlGrammarParser(tokenStream);
lexer.addErrorListener(new BaseErrorListener() {
@Override
public void syntaxError(Recognizer<?, ?> recognizer, Object offendingSymbol,
int line, int charPositionInLine,
String msg, RecognitionException e) {
throw new ValidationException("Invalid syntax: " + msg);
}
});
grammarParser.getInterpreter().setPredictionMode(PredictionMode.LL);
try {
return grammarParser.statements();
} catch (Exception e) {
throw new ValidationException("Error parsing ksql query: " + e.getMessage());
}
}
// impl copied from https://github.com/confluentinc/ksql/blob/master/ksqldb-parser/src/main/java/io/confluent/ksql/parser/CaseInsensitiveStream.java
@RequiredArgsConstructor
private static class CaseInsensitiveStream implements CharStream {
@Delegate
final CharStream stream;
public static CaseInsensitiveStream from(CharStream stream) {
// we only need to override LA method
return new CaseInsensitiveStream(stream) {
@Override
public int LA(final int i) {
final int result = stream.LA(i);
switch (result) {
case 0:
case IntStream.EOF:
return result;
default:
return Character.toUpperCase(result);
}
}
};
}
}
}

View file

@ -0,0 +1,69 @@
package com.provectus.kafka.ui.service.ksql.response;
import static com.provectus.kafka.ui.service.ksql.KsqlApiClient.KsqlResponseTable;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
class DynamicParser {
static KsqlResponseTable parseArray(String tableName, JsonNode array) {
return parseArray(tableName, getFieldNamesFromArray(array), array);
}
static KsqlResponseTable parseArray(String tableName,
List<String> columnNames,
JsonNode array) {
return KsqlResponseTable.builder()
.header(tableName)
.columnNames(columnNames)
.values(
StreamSupport.stream(array.spliterator(), false)
.map(node ->
columnNames.stream()
.map(node::get)
.collect(Collectors.toList()))
.collect(Collectors.toList())
).build();
}
private static List<String> getFieldNamesFromArray(JsonNode array) {
List<String> fields = new ArrayList<>();
array.forEach(node -> node.fieldNames().forEachRemaining(f -> {
if (!fields.contains(f)) {
fields.add(f);
}
}));
return fields;
}
static KsqlResponseTable parseObject(String tableName, JsonNode node) {
if (!node.isObject()) {
return KsqlResponseTable.builder()
.header(tableName)
.columnNames(List.of("value"))
.values(List.of(List.of(node)))
.build();
}
return parseObject(tableName, Lists.newArrayList(node.fieldNames()), node);
}
static KsqlResponseTable parseObject(String tableName, List<String> columnNames, JsonNode node) {
return KsqlResponseTable.builder()
.header(tableName)
.columnNames(columnNames)
.values(
List.of(
columnNames.stream()
.map(node::get)
.collect(Collectors.toList()))
)
.build();
}
}

View file

@ -0,0 +1,161 @@
package com.provectus.kafka.ui.service.ksql.response;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.google.common.collect.Lists;
import com.provectus.kafka.ui.exception.KsqlApiException;
import com.provectus.kafka.ui.service.ksql.KsqlApiClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.springframework.web.reactive.function.client.WebClientResponseException;
public class ResponseParser {
public static Optional<KsqlApiClient.KsqlResponseTable> parseSelectResponse(JsonNode jsonNode) {
// in response we getting either header record or row data
if (arrayFieldNonEmpty(jsonNode, "header")) {
return Optional.of(
KsqlApiClient.KsqlResponseTable.builder()
.header("Schema")
.columnNames(
Arrays.stream(jsonNode.get("header").get("schema").asText().split(","))
.map(String::trim)
.collect(Collectors.toList())
)
.build());
}
if (arrayFieldNonEmpty(jsonNode, "row")) {
return Optional.of(
KsqlApiClient.KsqlResponseTable.builder()
.header("Row")
.values(
List.of(Lists.newArrayList(jsonNode.get("row").get("columns"))))
.build());
}
if (jsonNode.hasNonNull("errorMessage")) {
throw new KsqlApiException("Error: " + jsonNode.get("errorMessage"));
}
// remaining events can be skipped
return Optional.empty();
}
public static KsqlApiClient.KsqlResponseTable parseErrorResponse(WebClientResponseException e) {
try {
var errBody = new JsonMapper().readTree(e.getResponseBodyAsString());
return DynamicParser.parseObject("Execution error", errBody);
} catch (Exception ex) {
throw new KsqlApiException(
String.format(
"Unparsable error response from ksqdb, status:'%s', body: '%s'",
e.getStatusCode(), e.getResponseBodyAsString()), e);
}
}
public static List<KsqlApiClient.KsqlResponseTable> parseStatementResponse(JsonNode jsonNode) {
var type = Optional.ofNullable(jsonNode.get("@type"))
.map(JsonNode::asText)
.orElse("unknown");
// messages structure can be inferred from https://github.com/confluentinc/ksql/blob/master/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java
switch (type) {
case "currentStatus":
return parseObject(
"Status",
List.of("status", "message"),
jsonNode.get("commandStatus")
);
case "properties":
return parseProperties(jsonNode);
case "queries":
return parseArray("Queries", "queries", jsonNode);
case "sourceDescription":
return parseObjectDynamically("Source Description", jsonNode.get("sourceDescription"));
case "queryDescription":
return parseArray("Queries Description", "queryDescription", jsonNode);
case "topicDescription":
return parseObject(
"Topic Description",
List.of("name", "kafkaTopic", "format", "schemaString"),
jsonNode
);
case "streams":
return parseArray("Streams", "streams", jsonNode);
case "tables":
return parseArray("Tables", "tables", jsonNode);
case "kafka_topics":
return parseArray("Topics", "topics", jsonNode);
case "kafka_topics_extended":
return parseArray("Topics extended", "topics", jsonNode);
case "executionPlan":
return parseObject("Execution plan", List.of("executionPlanText"), jsonNode);
case "source_descriptions":
return parseArray("Source descriptions", "sourceDescriptions", jsonNode);
case "query_descriptions":
return parseArray("Queries", "queryDescriptions", jsonNode);
case "describe_function":
return parseObject("Function description",
List.of("name", "author", "version", "description", "functions", "path", "type"),
jsonNode
);
case "function_names":
return parseArray("Function Names", "functions", jsonNode);
case "connector_info":
return parseObjectDynamically("Connector Info", jsonNode.get("info"));
case "drop_connector":
return parseObject("Dropped connector", List.of("connectorName"), jsonNode);
case "connector_list":
return parseArray("Connectors", "connectors", jsonNode);
case "connector_plugins_list":
return parseArray("Connector Plugins", "connectorPlugins", jsonNode);
case "connector_description":
return parseObject("Connector Description",
List.of("connectorClass", "status", "sources", "topics"),
jsonNode
);
default:
return parseUnknownResponse(jsonNode);
}
}
private static List<KsqlApiClient.KsqlResponseTable> parseObjectDynamically(
String tableName, JsonNode jsonNode) {
return List.of(DynamicParser.parseObject(tableName, jsonNode));
}
private static List<KsqlApiClient.KsqlResponseTable> parseObject(
String tableName, List<String> fields, JsonNode jsonNode) {
return List.of(DynamicParser.parseObject(tableName, fields, jsonNode));
}
private static List<KsqlApiClient.KsqlResponseTable> parseArray(
String tableName, String arrayField, JsonNode jsonNode) {
return List.of(DynamicParser.parseArray(tableName, jsonNode.get(arrayField)));
}
private static List<KsqlApiClient.KsqlResponseTable> parseProperties(JsonNode jsonNode) {
var tables = new ArrayList<KsqlApiClient.KsqlResponseTable>();
if (arrayFieldNonEmpty(jsonNode, "properties")) {
tables.add(DynamicParser.parseArray("properties", jsonNode.get("properties")));
}
if (arrayFieldNonEmpty(jsonNode, "overwrittenProperties")) {
tables.add(DynamicParser.parseArray("overwrittenProperties",
jsonNode.get("overwrittenProperties")));
}
if (arrayFieldNonEmpty(jsonNode, "defaultProperties")) {
tables.add(DynamicParser.parseArray("defaultProperties", jsonNode.get("defaultProperties")));
}
return tables;
}
private static List<KsqlApiClient.KsqlResponseTable> parseUnknownResponse(JsonNode jsonNode) {
return List.of(DynamicParser.parseObject("Ksql Response", jsonNode));
}
private static boolean arrayFieldNonEmpty(JsonNode json, String field) {
return json.hasNonNull(field) && !json.get(field).isEmpty();
}
}

View file

@ -1338,6 +1338,7 @@ paths:
description: OK
/api/clusters/{clusterName}/ksql:
description: Deprecated - use ksql/v2 instead!
post:
tags:
- Ksql
@ -1362,6 +1363,33 @@ paths:
schema:
$ref: '#/components/schemas/KsqlCommandResponse'
/api/clusters/{clusterName}/ksql/v2:
post:
tags:
- Ksql
summary: executeKsql
operationId: executeKsql
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/KsqlCommand'
responses:
200:
description: OK
content:
text/event-stream:
schema:
type: array
items:
$ref: '#/components/schemas/KsqlResponse'
/api/clusters/{clusterName}/connects/{connectName}/plugins:
get:
tags:
@ -2465,6 +2493,28 @@ components:
- headers
- rows
KsqlResponse:
type: object
properties:
table:
$ref: '#/components/schemas/KsqlTableResponse'
KsqlTableResponse:
type: object
properties:
header:
type: string
columnNames:
type: array
items:
type: string
values:
type: array
items:
type: array
items:
type: object
FullConnectorInfo:
type: object
properties:

View file

@ -39,6 +39,7 @@
<junit-jupiter-engine.version>5.7.2</junit-jupiter-engine.version>
<mockito.version>2.21.0</mockito.version>
<assertj.version>3.19.0</assertj.version>
<antlr4-maven-plugin.version>4.7.1</antlr4-maven-plugin.version>
<frontend-generated-sources-directory>..//kafka-ui-react-app/src/generated-sources
</frontend-generated-sources-directory>