فهرست منبع

Enable MessageV2 API to fix sorting and paging issues

gokhanimral 1 سال پیش
والد
کامیت
757c8f468f

+ 79 - 79
kafka-ui-api/src/main/resources/application-local.yml

@@ -10,22 +10,22 @@ logging:
 #server:
 #  port: 8080 #- Port in which kafka-ui will run.
 
-spring:
-  jmx:
-    enabled: true
-  ldap:
-    urls: ldap://localhost:10389
-    base: "cn={0},ou=people,dc=planetexpress,dc=com"
-    admin-user: "cn=admin,dc=planetexpress,dc=com"
-    admin-password: "GoodNewsEveryone"
-    user-filter-search-base: "dc=planetexpress,dc=com"
-    user-filter-search-filter: "(&(uid={0})(objectClass=inetOrgPerson))"
-    group-filter-search-base: "ou=people,dc=planetexpress,dc=com"
+#spring:
+#  jmx:
+#    enabled: true
+#  ldap:
+#    urls: ldap://localhost:10389
+#    base: "cn={0},ou=people,dc=planetexpress,dc=com"
+#    admin-user: "cn=admin,dc=planetexpress,dc=com"
+#    admin-password: "GoodNewsEveryone"
+#    user-filter-search-base: "dc=planetexpress,dc=com"
+#    user-filter-search-filter: "(&(uid={0})(objectClass=inetOrgPerson))"
+#    group-filter-search-base: "ou=people,dc=planetexpress,dc=com"
 
 kafka:
   clusters:
     - name: local
-      bootstrapServers: localhost:9092
+      bootstrapServers: localhost:9096
       schemaRegistry: http://localhost:8085
       ksqldbServer: http://localhost:8088
       kafkaConnect:
@@ -80,70 +80,70 @@ auth:
         custom-params:
           type: github
 
-rbac:
-  roles:
-    - name: "memelords"
-      clusters:
-        - local
-      subjects:
-        - provider: oauth_google
-          type: domain
-          value: "provectus.com"
-        - provider: oauth_google
-          type: user
-          value: "name@provectus.com"
-
-        - provider: oauth_github
-          type: organization
-          value: "provectus"
-        - provider: oauth_github
-          type: user
-          value: "memelord"
-
-        - provider: oauth_cognito
-          type: user
-          value: "username"
-        - provider: oauth_cognito
-          type: group
-          value: "memelords"
-
-        - provider: ldap
-          type: group
-          value: "admin_staff"
-
-        # NOT IMPLEMENTED YET
-      #        - provider: ldap_ad
-      #          type: group
-      #          value: "admin_staff"
-
-      permissions:
-        - resource: applicationconfig
-          actions: all
-
-        - resource: clusterconfig
-          actions: all
-
-        - resource: topic
-          value: ".*"
-          actions: all
-
-        - resource: consumer
-          value: ".*"
-          actions: all
-
-        - resource: schema
-          value: ".*"
-          actions: all
-
-        - resource: connect
-          value: "*"
-          actions: all
-
-        - resource: ksql
-          actions: all
-
-        - resource: acl
-          actions: all
-
-        - resource: audit
-          actions: all
+#rbac:
+#  roles:
+#    - name: "memelords"
+#      clusters:
+#        - local
+#      subjects:
+#        - provider: oauth_google
+#          type: domain
+#          value: "provectus.com"
+#        - provider: oauth_google
+#          type: user
+#          value: "name@provectus.com"
+#
+#        - provider: oauth_github
+#          type: organization
+#          value: "provectus"
+#        - provider: oauth_github
+#          type: user
+#          value: "memelord"
+#
+#        - provider: oauth_cognito
+#          type: user
+#          value: "username"
+#        - provider: oauth_cognito
+#          type: group
+#          value: "memelords"
+#
+#        - provider: ldap
+#          type: group
+#          value: "admin_staff"
+#
+#        # NOT IMPLEMENTED YET
+#      #        - provider: ldap_ad
+#      #          type: group
+#      #          value: "admin_staff"
+#
+#      permissions:
+#        - resource: applicationconfig
+#          actions: all
+#
+#        - resource: clusterconfig
+#          actions: all
+#
+#        - resource: topic
+#          value: ".*"
+#          actions: all
+#
+#        - resource: consumer
+#          value: ".*"
+#          actions: all
+#
+#        - resource: schema
+#          value: ".*"
+#          actions: all
+#
+#        - resource: connect
+#          value: "*"
+#          actions: all
+#
+#        - resource: ksql
+#          actions: all
+#
+#        - resource: acl
+#          actions: all
+#
+#        - resource: audit
+#          actions: all

+ 103 - 15
kafka-ui-api/src/main/resources/application.yml

@@ -1,21 +1,109 @@
-auth:
-  type: DISABLED
-
-management:
-  endpoint:
-    info:
-      enabled: true
-    health:
-      enabled: true
-  endpoints:
-    web:
-      exposure:
-        include: "info,health,prometheus"
-
 logging:
   level:
     root: INFO
     com.provectus: DEBUG
+    #org.springframework.http.codec.json.Jackson2JsonEncoder: DEBUG
+    #org.springframework.http.codec.json.Jackson2JsonDecoder: DEBUG
     reactor.netty.http.server.AccessLog: INFO
-    org.hibernate.validator: WARN
+    org.springframework.security: DEBUG
+
+#server:
+#  port: 8080 #- Port in which kafka-ui will run.
+
+#spring:
+#  jmx:
+#    enabled: true
+#  ldap:
+#    urls: ldap://localhost:10389
+#    base: "cn={0},ou=people,dc=planetexpress,dc=com"
+#    admin-user: "cn=admin,dc=planetexpress,dc=com"
+#    admin-password: "GoodNewsEveryone"
+#    user-filter-search-base: "dc=planetexpress,dc=com"
+#    user-filter-search-filter: "(&(uid={0})(objectClass=inetOrgPerson))"
+#    group-filter-search-base: "ou=people,dc=planetexpress,dc=com"
+
+kafka:
+  clusters:
+    - name: local
+      bootstrapServers: localhost:9096
+#      schemaRegistry: http://localhost:8085
+#      ksqldbServer: http://localhost:8088
+#      kafkaConnect:
+#        - name: first
+#          address: http://localhost:8083
+#      metrics:
+#        port: 9997
+#        type: JMX
+
+auth:
+  type: DISABLED
+
+dynamic.config.enabled: true
 
+#rbac:
+#  roles:
+#    - name: "memelords"
+#      clusters:
+#        - local
+#      subjects:
+#        - provider: oauth_google
+#          type: domain
+#          value: "provectus.com"
+#        - provider: oauth_google
+#          type: user
+#          value: "name@provectus.com"
+#
+#        - provider: oauth_github
+#          type: organization
+#          value: "provectus"
+#        - provider: oauth_github
+#          type: user
+#          value: "memelord"
+#
+#        - provider: oauth_cognito
+#          type: user
+#          value: "username"
+#        - provider: oauth_cognito
+#          type: group
+#          value: "memelords"
+#
+#        - provider: ldap
+#          type: group
+#          value: "admin_staff"
+#
+#        # NOT IMPLEMENTED YET
+#      #        - provider: ldap_ad
+#      #          type: group
+#      #          value: "admin_staff"
+#
+#      permissions:
+#        - resource: applicationconfig
+#          actions: all
+#
+#        - resource: clusterconfig
+#          actions: all
+#
+#        - resource: topic
+#          value: ".*"
+#          actions: all
+#
+#        - resource: consumer
+#          value: ".*"
+#          actions: all
+#
+#        - resource: schema
+#          value: ".*"
+#          actions: all
+#
+#        - resource: connect
+#          value: "*"
+#          actions: all
+#
+#        - resource: ksql
+#          actions: all
+#
+#        - resource: acl
+#          actions: all
+#
+#        - resource: audit
+#          actions: all

+ 1 - 1
kafka-ui-api/src/test/java/com/provectus/kafka/ui/emitter/MessageFiltersTest.java

@@ -51,7 +51,7 @@ class MessageFiltersTest {
           filter.test(msg().key(null).content(null))
       );
 
-      assertFalse(
+      assertTrue(
           filter.test(msg().key("aBc").content("AbC"))
       );
     }

+ 52 - 0
kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml

@@ -857,6 +857,33 @@ paths:
                 items:
                   $ref: '#/components/schemas/TopicMessageEvent'
 
+  /api/clusters/{clusterName}/topics/{topicName}/activeproducers:
+    get:
+      tags:
+        - Topics
+      summary: get producer states for topic
+      operationId: getActiveProducerStates
+      parameters:
+        - name: clusterName
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: topicName
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        200:
+          description: OK
+          content:
+            application/json:
+              schema:
+                type: array
+                items:
+                  $ref: '#/components/schemas/TopicProducerState'
+
   /api/clusters/{clusterName}/topics/{topicName}/consumer-groups:
     get:
       tags:
@@ -2499,6 +2526,31 @@ components:
         - PROTOBUF
         - UNKNOWN
 
+    TopicProducerState:
+      type: object
+      properties:
+        partition:
+          type: integer
+          format: int32
+        producerId:
+          type: integer
+          format: int64
+        producerEpoch:
+          type: integer
+          format: int32
+        lastSequence:
+          type: integer
+          format: int32
+        lastTimestampMs:
+          type: integer
+          format: int64
+        coordinatorEpoch:
+          type: integer
+          format: int32
+        currentTransactionStartOffset:
+          type: integer
+          format: int64
+
     ConsumerGroup:
       type: object
       properties:

+ 2 - 2
kafka-ui-react-app/package.json

@@ -21,7 +21,7 @@
     "fetch-mock": "^9.11.0",
     "jest": "^29.4.3",
     "jest-watch-typeahead": "^2.2.2",
-    "json-schema-faker": "^0.5.0-rcv.44",
+    "json-schema-faker": "^0.5.6",
     "jsonpath-plus": "^7.2.0",
     "lodash": "^4.17.21",
     "lossless-json": "^2.0.8",
@@ -109,4 +109,4 @@
     "node": "v18.17.1",
     "pnpm": "^8.6.12"
   }
-}
+}

+ 19 - 15
kafka-ui-react-app/pnpm-lock.yaml

@@ -57,8 +57,8 @@ dependencies:
     specifier: ^2.2.2
     version: 2.2.2(jest@29.6.4)
   json-schema-faker:
-    specifier: ^0.5.0-rcv.44
-    version: 0.5.3
+    specifier: ^0.5.6
+    version: 0.5.6
   jsonpath-plus:
     specifier: ^7.2.0
     version: 7.2.0
@@ -91,7 +91,7 @@ dependencies:
     version: 7.43.1(react@18.2.0)
   react-hot-toast:
     specifier: ^2.4.0
-    version: 2.4.1(csstype@3.1.2)(react-dom@18.1.0)(react@18.2.0)
+    version: 2.4.1(csstype@3.1.3)(react-dom@18.1.0)(react@18.2.0)
   react-is:
     specifier: ^18.2.0
     version: 18.2.0
@@ -2606,7 +2606,7 @@ packages:
       normalize-path: 3.0.0
       readdirp: 3.6.0
     optionalDependencies:
-      fsevents: 2.3.2
+      fsevents: 2.3.3
 
   /ci-info@3.3.1:
     resolution: {integrity: sha512-SXgeMX9VwDe7iFFaEWkA5AstuER9YKqy4EhHqr4DVqkwmD9rpVimkMKWHdjn30Ja45txyjhSn63lVX69eVCckg==}
@@ -2808,6 +2808,10 @@ packages:
   /csstype@3.1.2:
     resolution: {integrity: sha512-I7K1Uu0MBPzaFKg4nI5Q7Vs2t+3gWWW648spaF+Rg7pI9ds18Ugn+lvg4SHczUdKlHI5LWBXyqfS8+DufyBsgQ==}
 
+  /csstype@3.1.3:
+    resolution: {integrity: sha512-M1uQkMl8rQK/szD0LNhtqxIPLpimGm8sOBwU7lLnCpSbTyY3yeU1Vc7l4KT5zT4s/yOxHH5O7tIuuLOCnLADRw==}
+    dev: false
+
   /damerau-levenshtein@1.0.8:
     resolution: {integrity: sha512-sdQSFB7+llfUcQHUQO3+B8ERRj0Oa4w9POWMI/puGtuf7gFywGmkaLCElnudfTiKZV+NvHqL0ifzdrI8Ro7ESA==}
     dev: true
@@ -3741,8 +3745,8 @@ packages:
   /fs.realpath@1.0.0:
     resolution: {integrity: sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==}
 
-  /fsevents@2.3.2:
-    resolution: {integrity: sha512-xiqMQR4xAeHTuB9uWm+fFRcIOgKBMiOBP+eXiyT7jsgVCq1bkVygt00oASowB7EdtpOHaaPgKt812P9ab+DDKA==}
+  /fsevents@2.3.3:
+    resolution: {integrity: sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==}
     engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0}
     os: [darwin]
     requiresBuild: true
@@ -3903,12 +3907,12 @@ packages:
   /globrex@0.1.2:
     resolution: {integrity: sha512-uHJgbwAMwNFf5mLst7IWLNg14x1CkeqglJb/K3doi4dw6q2IvAAmM/Y81kevy83wP+Sst+nutFTYOGg3d1lsxg==}
 
-  /goober@2.1.10(csstype@3.1.2):
+  /goober@2.1.10(csstype@3.1.3):
     resolution: {integrity: sha512-7PpuQMH10jaTWm33sQgBQvz45pHR8N4l3Cu3WMGEWmHShAcTuuP7I+5/DwKo39fwti5A80WAjvqgz6SSlgWmGA==}
     peerDependencies:
       csstype: ^3.0.10
     dependencies:
-      csstype: 3.1.2
+      csstype: 3.1.3
     dev: false
 
   /gopd@1.0.1:
@@ -4544,7 +4548,7 @@ packages:
       micromatch: 4.0.5
       walker: 1.0.8
     optionalDependencies:
-      fsevents: 2.3.2
+      fsevents: 2.3.3
     dev: false
 
   /jest-leak-detector@29.6.3:
@@ -4903,8 +4907,8 @@ packages:
   /json-parse-even-better-errors@2.3.1:
     resolution: {integrity: sha512-xyFwyhro/JEof6Ghe2iz2NcXoj2sloNsWr/XsERDK/oiPCfaNhl5ONfp+jQdAZRQQ0IJWNzH9zIZF7li91kh2w==}
 
-  /json-schema-faker@0.5.3:
-    resolution: {integrity: sha512-BeIrR0+YSrTbAR9dOMnjbFl1MvHyXnq+Wpdw1FpWZDHWKLzK229hZ5huyPcmzFUfVq1ODwf40WdGVoE266UBUg==}
+  /json-schema-faker@0.5.6:
+    resolution: {integrity: sha512-u/cFC26/GDxh2vPiAC8B8xVvpXAW+QYtG2mijEbKrimCk8IHtiwQBjCE8TwvowdhALWq9IcdIWZ+/8ocXvdL3Q==}
     hasBin: true
     dependencies:
       json-schema-ref-parser: 6.1.0
@@ -5711,14 +5715,14 @@ packages:
       react: 18.2.0
     dev: false
 
-  /react-hot-toast@2.4.1(csstype@3.1.2)(react-dom@18.1.0)(react@18.2.0):
+  /react-hot-toast@2.4.1(csstype@3.1.3)(react-dom@18.1.0)(react@18.2.0):
     resolution: {integrity: sha512-j8z+cQbWIM5LY37pR6uZR6D4LfseplqnuAO4co4u8917hBUvXlEqyP1ZzqVLcqoyUesZZv/ImreoCeHVDpE5pQ==}
     engines: {node: '>=10'}
     peerDependencies:
       react: '>=16'
       react-dom: '>=16'
     dependencies:
-      goober: 2.1.10(csstype@3.1.2)
+      goober: 2.1.10(csstype@3.1.3)
       react: 18.2.0
       react-dom: 18.1.0(react@18.2.0)
     transitivePeerDependencies:
@@ -6022,7 +6026,7 @@ packages:
     engines: {node: '>=14.18.0', npm: '>=8.0.0'}
     hasBin: true
     optionalDependencies:
-      fsevents: 2.3.2
+      fsevents: 2.3.3
 
   /run-async@2.4.1:
     resolution: {integrity: sha512-tvVnVv01b8c1RrA6Ep7JkStj85Guv/YrMcwqYQnwjsAS2cTmmPGBBjAjpCW7RrSodNSoE2/qg9O4bceNvUuDgQ==}
@@ -6755,7 +6759,7 @@ packages:
       rollup: 3.7.3
       sass: 1.66.1
     optionalDependencies:
-      fsevents: 2.3.2
+      fsevents: 2.3.3
 
   /w3c-hr-time@1.0.2:
     resolution: {integrity: sha512-z8P5DvDNjKDoFIHK7q8r8lackT6l+jo/Ye3HOle7l9nICP9lf1Ci25fy9vHd0JOWewkIFzXIEig3TdKT7JQ5fQ==}

+ 126 - 89
kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/Filters.tsx

@@ -3,6 +3,7 @@ import 'react-datepicker/dist/react-datepicker.css';
 import {
   MessageFilterType,
   Partition,
+  PollingMode,
   SeekDirection,
   SeekType,
   SerdeUsage,
@@ -10,6 +11,7 @@ import {
   TopicMessageConsuming,
   TopicMessageEvent,
   TopicMessageEventTypeEnum,
+  TopicMessageNextPageCursor,
 } from 'generated-sources';
 import React, { useContext } from 'react';
 import omitBy from 'lodash/omitBy';
@@ -35,7 +37,7 @@ import CloseIcon from 'components/common/Icons/CloseIcon';
 import ClockIcon from 'components/common/Icons/ClockIcon';
 import ArrowDownIcon from 'components/common/Icons/ArrowDownIcon';
 import FileIcon from 'components/common/Icons/FileIcon';
-import { useTopicDetails } from 'lib/hooks/api/topics';
+import { useRegisterFilter, useTopicDetails } from 'lib/hooks/api/topics';
 import { InputLabel } from 'components/common/Input/InputLabel.styled';
 import { getSerdeOptions } from 'components/Topics/Topic/SendMessage/utils';
 import { useSerdes } from 'lib/hooks/api/topicMessages';
@@ -47,6 +49,9 @@ import {
   getSelectedPartitionsFromSeekToParam,
   getTimestampFromSeekToParam,
 } from './utils';
+import { getTopicMessgesLastLoadedPage } from 'redux/reducers/topicMessages/selectors';
+import { useAppSelector } from 'lib/hooks/redux';
+import { getDefaultSerdeName } from '../getDefaultSerdeName';
 
 type Query = Record<string, string | string[] | number>;
 
@@ -55,12 +60,18 @@ export interface FiltersProps {
   meta: TopicMessageConsuming;
   isFetching: boolean;
   messageEventType?: string;
+  cursor?: TopicMessageNextPageCursor;
+  currentPage: number;
   addMessage(content: { message: TopicMessage; prepend: boolean }): void;
   resetMessages(): void;
   updatePhase(phase: string): void;
   updateMeta(meta: TopicMessageConsuming): void;
   setIsFetching(status: boolean): void;
   setMessageType(messageType: string): void;
+  updateCursor(cursor?: TopicMessageNextPageCursor): void;
+  setCurrentPage(page: number): void;
+  setLastLoadedPage(page: number): void;
+  resetAllMessages(): void;
 }
 
 export interface MessageFilters {
@@ -85,6 +96,7 @@ const Filters: React.FC<FiltersProps> = ({
   phaseMessage,
   meta: { elapsedMs, bytesConsumed, messagesConsumed, filterApplyErrors },
   isFetching,
+  currentPage,
   addMessage,
   resetMessages,
   updatePhase,
@@ -92,19 +104,25 @@ const Filters: React.FC<FiltersProps> = ({
   setIsFetching,
   setMessageType,
   messageEventType,
+  updateCursor,
+  setCurrentPage,
+  setLastLoadedPage,
+  resetAllMessages,
 }) => {
   const { clusterName, topicName } = useAppParams<RouteParamsClusterTopic>();
   const location = useLocation();
   const navigate = useNavigate();
   const [searchParams] = useSearchParams();
 
-  const page = searchParams.get('page');
-
   const { data: topic } = useTopicDetails({ clusterName, topicName });
 
+  const registerFilter = useRegisterFilter({ clusterName, topicName });
+
+  const lastLoadedPage = useAppSelector(getTopicMessgesLastLoadedPage);
+
   const partitions = topic?.partitions || [];
 
-  const { seekDirection, isLive, changeSeekDirection } =
+  const { seekDirection, isLive, changeSeekDirection, page, setPage } =
     useContext(TopicMessagesContext);
 
   const { value: isOpen, toggle } = useBoolean();
@@ -131,11 +149,18 @@ const Filters: React.FC<FiltersProps> = ({
   const [timestamp, setTimestamp] = React.useState<Date | null>(
     getTimestampFromSeekToParam(searchParams)
   );
+
+  const { data: serdes = {} } = useSerdes({
+    clusterName,
+    topicName,
+    use: SerdeUsage.DESERIALIZE,
+  });
+
   const [keySerde, setKeySerde] = React.useState<string>(
-    searchParams.get('keySerde') || ''
+    searchParams.get('keySerde') || getDefaultSerdeName(serdes.key || [])
   );
   const [valueSerde, setValueSerde] = React.useState<string>(
-    searchParams.get('valueSerde') || ''
+    searchParams.get('valueSerde') || getDefaultSerdeName(serdes.value || [])
   );
 
   const [savedFilters, setSavedFilters] = React.useState<MessageFilters[]>(
@@ -155,7 +180,7 @@ const Filters: React.FC<FiltersProps> = ({
       ? MessageFilterType.GROOVY_SCRIPT
       : MessageFilterType.STRING_CONTAINS
   );
-  const [query, setQuery] = React.useState<string>(searchParams.get('q') || '');
+  const [stringFilter, setStringFilter] = React.useState<string>('');
   const [isTailing, setIsTailing] = React.useState<boolean>(isLive);
 
   const isSeekTypeControlVisible = React.useMemo(
@@ -173,23 +198,12 @@ const Filters: React.FC<FiltersProps> = ({
     return false;
   }, [isSeekTypeControlVisible, currentSeekType, timestamp, isTailing]);
 
-  const partitionMap = React.useMemo(
-    () =>
-      partitions.reduce<Record<string, Partition>>(
-        (acc, partition) => ({
-          ...acc,
-          [partition.partition]: partition,
-        }),
-        {}
-      ),
-    [partitions]
-  );
-
   const handleClearAllFilters = () => {
     setCurrentSeekType(SeekType.OFFSET);
     setOffset('');
     setTimestamp(null);
-    setQuery('');
+    setStringFilter('');
+    setPage(1);
     changeSeekDirection(SeekDirection.FORWARD);
     getSelectedPartitionsFromSeekToParam(searchParams, partitions);
     setSelectedPartitions(
@@ -202,65 +216,60 @@ const Filters: React.FC<FiltersProps> = ({
     );
   };
 
-  const handleFiltersSubmit = (currentOffset: string) => {
-    const nextAttempt = Number(searchParams.get('attempt') || 0) + 1;
+  const getPollingMode = (seekDirection: SeekDirection, seekType: SeekType): PollingMode => {
+    if (seekDirection == SeekDirection.FORWARD) {
+      if (offset && currentSeekType === SeekType.OFFSET)
+        return PollingMode.FROM_OFFSET;
+      if (timestamp && currentSeekType === SeekType.TIMESTAMP)
+        return PollingMode.FROM_TIMESTAMP;
+      return PollingMode.EARLIEST;
+    }
+    if (seekDirection == SeekDirection.BACKWARD) {
+      if (offset && currentSeekType === SeekType.OFFSET)
+        return PollingMode.TO_OFFSET;
+      if (timestamp && currentSeekType === SeekType.TIMESTAMP)
+        return PollingMode.TO_TIMESTAMP;
+      return PollingMode.LATEST;
+    }
+    if (seekDirection == SeekDirection.TAILING)
+      return PollingMode.TAILING;
+    return PollingMode.LATEST;
+  }
+
+  const getSmartFilterId = async (code: string) => {
+    try {
+      const filterId = await registerFilter.mutateAsync({
+        filterCode: code
+      });
+      return filterId;
+    } catch (e) {
+      // do nothing
+    }
+  }
+
+  const handleFiltersSubmit = async (cursor?: TopicMessageNextPageCursor) => {
+
+    if (!keySerde || !valueSerde)
+      return;
     const props: Query = {
-      q:
-        queryType === MessageFilterType.GROOVY_SCRIPT
-          ? activeFilter.code
-          : query,
-      filterQueryType: queryType,
-      attempt: nextAttempt,
+      mode: getPollingMode(seekDirection, currentSeekType),
       limit: PER_PAGE,
-      page: page || 0,
-      seekDirection,
+      stringFilter: stringFilter,
+      offset: offset,
+      timestamp: timestamp?.getTime() || 0,
       keySerde: keySerde || searchParams.get('keySerde') || '',
       valueSerde: valueSerde || searchParams.get('valueSerde') || '',
     };
 
-    if (isSeekTypeControlVisible) {
-      switch (seekDirection) {
-        case SeekDirection.FORWARD:
-          props.seekType = SeekType.BEGINNING;
-          break;
-        case SeekDirection.BACKWARD:
-        case SeekDirection.TAILING:
-          props.seekType = SeekType.LATEST;
-          break;
-        default:
-          props.seekType = currentSeekType;
-      }
+    if (cursor?.id)
+      props.cursor = cursor?.id;
 
-      if (offset && currentSeekType === SeekType.OFFSET) {
-        props.seekType = SeekType.OFFSET;
-      }
-
-      if (timestamp && currentSeekType === SeekType.TIMESTAMP) {
-        props.seekType = SeekType.TIMESTAMP;
-      }
+    if (selectedPartitions.length !== partitions.length) {
+      props.partitions = selectedPartitions.map((p) => p.value);
+    }
 
-      const isSeekTypeWithSeekTo =
-        props.seekType === SeekType.TIMESTAMP ||
-        props.seekType === SeekType.OFFSET;
-
-      if (
-        selectedPartitions.length !== partitions.length ||
-        isSeekTypeWithSeekTo
-      ) {
-        // not everything in the partition is selected
-        props.seekTo = selectedPartitions.map(({ value }) => {
-          const offsetProperty =
-            seekDirection === SeekDirection.FORWARD ? 'offsetMin' : 'offsetMax';
-          const offsetBasedSeekTo =
-            currentOffset || partitionMap[value][offsetProperty];
-          const seekToOffset =
-            currentSeekType === SeekType.OFFSET
-              ? offsetBasedSeekTo
-              : timestamp?.getTime();
-
-          return `${value}::${seekToOffset || '0'}`;
-        });
-      }
+    if (queryType === MessageFilterType.GROOVY_SCRIPT) {
+      props.smartFilterId = (await getSmartFilterId(activeFilter.code))?.id || '';
     }
 
     const newProps = omitBy(props, (v) => v === undefined || v === '');
@@ -272,6 +281,12 @@ const Filters: React.FC<FiltersProps> = ({
     });
   };
 
+  const handleSubmit = async () => {
+    setPage(1);
+    resetAllMessages();
+    handleFiltersSubmit();
+  }
+
   const handleSSECancel = () => {
     if (!source.current) return;
     setIsFetching(false);
@@ -345,9 +360,16 @@ const Filters: React.FC<FiltersProps> = ({
   // eslint-disable-next-line consistent-return
   React.useEffect(() => {
     if (location.search?.length !== 0) {
+      if (page === currentPage)
+        return () => { };
+      if (page <= lastLoadedPage) {
+        setCurrentPage(page);
+        return () => { };
+      }
+
       const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
         clusterName
-      )}/topics/${topicName}/messages${location.search}`;
+      )}/topics/${topicName}/messages/v2${location.search}`;
       const sse = new EventSource(url);
 
       source.current = sse;
@@ -358,7 +380,7 @@ const Filters: React.FC<FiltersProps> = ({
         setIsFetching(true);
       };
       sse.onmessage = ({ data }) => {
-        const { type, message, phase, consuming }: TopicMessageEvent =
+        const { type, message, phase, consuming, cursor }: TopicMessageEvent =
           JSON.parse(data);
         switch (type) {
           case TopicMessageEventTypeEnum.MESSAGE:
@@ -381,13 +403,17 @@ const Filters: React.FC<FiltersProps> = ({
             if (consuming && type) {
               setMessageType(type);
               updateMeta(consuming);
+              updateCursor(cursor);
+              setCurrentPage(page);
+              setLastLoadedPage(page);
+              handleFiltersSubmit(cursor);
             }
             break;
           default:
         }
       };
 
-      sse.onerror = () => {
+      sse.onerror = (e) => {
         setIsFetching(false);
         sse.close();
       };
@@ -407,10 +433,15 @@ const Filters: React.FC<FiltersProps> = ({
     setIsFetching,
     updateMeta,
     updatePhase,
+    updateCursor,
+    setLastLoadedPage
   ]);
+
   React.useEffect(() => {
     if (location.search?.length === 0) {
-      handleFiltersSubmit(offset);
+      setPage(1);
+      resetAllMessages();
+      handleFiltersSubmit();
     }
   }, [
     seekDirection,
@@ -418,32 +449,38 @@ const Filters: React.FC<FiltersProps> = ({
     activeFilter,
     currentSeekType,
     timestamp,
-    query,
+    stringFilter,
     location,
   ]);
+
   React.useEffect(() => {
-    handleFiltersSubmit(offset);
+    setPage(1);
+    resetAllMessages();
+    handleFiltersSubmit();
   }, [
     seekDirection,
     queryType,
-    activeFilter,
     currentSeekType,
-    timestamp,
-    query,
     seekDirection,
-    page,
+    keySerde,
+    valueSerde
+  ]);
+
+  React.useEffect(() => {
+    setPage(1);
+    resetAllMessages();
+  }, [
+    selectedPartitions,
+    offset,
+    timestamp,
+    stringFilter,
+    activeFilter,
   ]);
 
   React.useEffect(() => {
     setIsTailing(isLive);
   }, [isLive]);
 
-  const { data: serdes = {} } = useSerdes({
-    clusterName,
-    topicName,
-    use: SerdeUsage.DESERIALIZE,
-  });
-
   return (
     <S.FiltersWrapper>
       <div>
@@ -531,7 +568,7 @@ const Filters: React.FC<FiltersProps> = ({
             buttonSize="M"
             disabled={isSubmitDisabled}
             onClick={() =>
-              isFetching ? handleSSECancel() : handleFiltersSubmit(offset)
+              isFetching ? handleSSECancel() : handleSubmit()
             }
             style={{ fontWeight: 500 }}
           >
@@ -548,7 +585,7 @@ const Filters: React.FC<FiltersProps> = ({
         />
       </div>
       <S.ActiveSmartFilterWrapper>
-        <Search placeholder="Search" disabled={isTailing} />
+        <Search placeholder="Search" disabled={isTailing} onChange={setStringFilter} />
 
         <Button buttonType="secondary" buttonSize="M" onClick={toggle}>
           <PlusIcon />

+ 12 - 0
kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters/FiltersContainer.ts

@@ -7,12 +7,18 @@ import {
   updateTopicMessagesPhase,
   setTopicMessagesFetchingStatus,
   setMessageEventType,
+  updateTopicMessagesCursor,
+  setTopicMessagesCurrentPage,
+  setTopicMessagesLastLoadedPage,
+  resetAllTopicMessages,
 } from 'redux/reducers/topicMessages/topicMessagesSlice';
 import {
   getTopicMessgesMeta,
   getTopicMessgesPhase,
   getIsTopicMessagesFetching,
   getIsTopicMessagesType,
+  getTopicMessgesCursor,
+  getTopicMessgesCurrentPage,
 } from 'redux/reducers/topicMessages/selectors';
 
 import Filters from './Filters';
@@ -22,6 +28,8 @@ const mapStateToProps = (state: RootState) => ({
   meta: getTopicMessgesMeta(state),
   isFetching: getIsTopicMessagesFetching(state),
   messageEventType: getIsTopicMessagesType(state),
+  cursor: getTopicMessgesCursor(state),
+  currentPage: getTopicMessgesCurrentPage(state),
 });
 
 const mapDispatchToProps = {
@@ -31,6 +39,10 @@ const mapDispatchToProps = {
   updateMeta: updateTopicMessagesMeta,
   setIsFetching: setTopicMessagesFetchingStatus,
   setMessageType: setMessageEventType,
+  updateCursor: updateTopicMessagesCursor,
+  setCurrentPage: setTopicMessagesCurrentPage,
+  setLastLoadedPage: setTopicMessagesLastLoadedPage,
+  resetAllMessages: resetAllTopicMessages,
 };
 
 export default connect(mapStateToProps, mapDispatchToProps)(Filters);

+ 28 - 24
kafka-ui-react-app/src/components/Topics/Topic/Messages/Messages.tsx

@@ -12,16 +12,16 @@ import MessagesTable from './MessagesTable';
 import FiltersContainer from './Filters/FiltersContainer';
 
 export const SeekDirectionOptionsObj = {
-  [SeekDirection.FORWARD]: {
-    value: SeekDirection.FORWARD,
-    label: 'Oldest First',
-    isLive: false,
-  },
   [SeekDirection.BACKWARD]: {
     value: SeekDirection.BACKWARD,
     label: 'Newest First',
     isLive: false,
   },
+  [SeekDirection.FORWARD]: {
+    value: SeekDirection.FORWARD,
+    label: 'Oldest First',
+    isLive: false,
+  },
   [SeekDirection.TAILING]: {
     value: SeekDirection.TAILING,
     label: 'Live Mode',
@@ -35,36 +35,38 @@ const Messages: React.FC = () => {
   const [searchParams, setSearchParams] = useSearchParams();
   const { clusterName, topicName } = useAppParams<RouteParamsClusterTopic>();
 
-  const { data: serdes = {} } = useSerdes({
-    clusterName,
-    topicName,
-    use: SerdeUsage.DESERIALIZE,
-  });
+  // const { data: serdes = {} } = useSerdes({
+  //   clusterName,
+  //   topicName,
+  //   use: SerdeUsage.DESERIALIZE,
+  // });
 
-  React.useEffect(() => {
-    if (!searchParams.get('keySerde')) {
-      searchParams.set('keySerde', getDefaultSerdeName(serdes.key || []));
-    }
-    if (!searchParams.get('valueSerde')) {
-      searchParams.set('valueSerde', getDefaultSerdeName(serdes.value || []));
-    }
-    if (!searchParams.get('limit')) {
-      searchParams.set('limit', MESSAGES_PER_PAGE);
-    }
-    setSearchParams(searchParams);
-  }, [serdes]);
+  // React.useEffect(() => {
+  //   if (!searchParams.get('keySerde')) {
+  //     searchParams.set('keySerde', getDefaultSerdeName(serdes.key || []));
+  //   }
+  //   if (!searchParams.get('valueSerde')) {
+  //     searchParams.set('valueSerde', getDefaultSerdeName(serdes.value || []));
+  //   }
+  //   if (!searchParams.get('limit')) {
+  //     searchParams.set('limit', MESSAGES_PER_PAGE);
+  //   }
+  //   setSearchParams(searchParams);
+  // }, [serdes]);
 
   const defaultSeekValue = SeekDirectionOptions[0];
 
   const [seekDirection, setSeekDirection] = React.useState<SeekDirection>(
     (searchParams.get('seekDirection') as SeekDirection) ||
-      defaultSeekValue.value
+    defaultSeekValue.value
   );
 
   const [isLive, setIsLive] = useState<boolean>(
     SeekDirectionOptionsObj[seekDirection].isLive
   );
 
+  const [page, setPage] = React.useState<number>(1);
+
   const changeSeekDirection = useCallback((val: string) => {
     switch (val) {
       case SeekDirection.FORWARD:
@@ -87,9 +89,11 @@ const Messages: React.FC = () => {
     () => ({
       seekDirection,
       changeSeekDirection,
+      page,
+      setPage,
       isLive,
     }),
-    [seekDirection, changeSeekDirection]
+    [seekDirection, changeSeekDirection, page, setPage]
   );
 
   return (

+ 13 - 13
kafka-ui-react-app/src/components/Topics/Topic/Messages/MessagesTable.tsx

@@ -24,8 +24,8 @@ const MessagesTable: React.FC = () => {
   const [contentFilters, setContentFilters] = useState<PreviewFilter[]>([]);
 
   const [searchParams, setSearchParams] = useSearchParams();
-  const page = searchParams.get('page');
-  const { isLive } = useContext(TopicMessagesContext);
+  // const page = searchParams.get('page');
+  const { isLive, page, setPage } = useContext(TopicMessagesContext);
 
   const messages = useAppSelector(getTopicMessges);
   const isFetching = useAppSelector(getIsTopicMessagesFetching);
@@ -39,15 +39,17 @@ const MessagesTable: React.FC = () => {
   const isNextPageButtonDisabled =
     isPaginationDisabled || messages.length < Number(MESSAGES_PER_PAGE);
   const isPrevPageButtonDisabled =
-    isPaginationDisabled || !Number(searchParams.get('page'));
+    isPaginationDisabled || page === 1;
 
   const handleNextPage = () => {
-    searchParams.set('page', String(Number(page || 0) + 1));
+    // searchParams.set('page', String(Number(page || 1) + 1));
+    setPage(Number(page || 1) + 1);
     setSearchParams(searchParams);
   };
 
   const handlePrevPage = () => {
-    searchParams.set('page', String(Number(page || 0) - 1));
+    // searchParams.set('page', String(Number(page || 1) - 1));
+    setPage(Number(page || 1) - 1);
     setSearchParams(searchParams);
   };
 
@@ -73,18 +75,16 @@ const MessagesTable: React.FC = () => {
             <TableHeaderCell title="Timestamp" />
             <TableHeaderCell
               title="Key"
-              previewText={`Preview ${
-                keyFilters.length ? `(${keyFilters.length} selected)` : ''
-              }`}
+              previewText={`Preview ${keyFilters.length ? `(${keyFilters.length} selected)` : ''
+                }`}
               onPreview={() => setPreviewFor('key')}
             />
             <TableHeaderCell
               title="Value"
-              previewText={`Preview ${
-                contentFilters.length
-                  ? `(${contentFilters.length} selected)`
-                  : ''
-              }`}
+              previewText={`Preview ${contentFilters.length
+                ? `(${contentFilters.length} selected)`
+                : ''
+                }`}
               onPreview={() => setPreviewFor('content')}
             />
             <TableHeaderCell> </TableHeaderCell>

+ 4 - 1
kafka-ui-react-app/src/components/common/Search/Search.tsx

@@ -45,7 +45,10 @@ const Search: React.FC<SearchProps> = ({
     }
   }, 500);
   const clearSearchValue = () => {
-    if (searchParams.get('q')) {
+    if (onChange) {
+      onChange('');
+    }
+    else if (searchParams.get('q')) {
       searchParams.set('q', '');
       setSearchParams(searchParams);
     }

+ 2 - 0
kafka-ui-react-app/src/components/contexts/TopicMessagesContext.ts

@@ -4,6 +4,8 @@ import { SeekDirection } from 'generated-sources';
 export interface ContextProps {
   seekDirection: SeekDirection;
   changeSeekDirection(val: string): void;
+  page: number,
+  setPage(page: number): void,
   isLive: boolean;
 }
 

+ 25 - 3
kafka-ui-react-app/src/lib/hooks/api/topics.ts

@@ -15,6 +15,8 @@ import {
   CreateTopicMessage,
   GetTopicDetailsRequest,
   GetTopicsRequest,
+  MessageFilterRegistration,
+  RegisterFilterRequest,
   Topic,
   TopicConfig,
   TopicCreation,
@@ -39,6 +41,8 @@ export const topicKeys = {
     [...topicKeys.details(props), 'consumerGroups'] as const,
   statistics: (props: GetTopicDetailsRequest) =>
     [...topicKeys.details(props), 'statistics'] as const,
+  filter: (props: GetTopicDetailsRequest) =>
+    [...topicKeys.details(props), 'messageFilterRegistration'] as const,
 };
 
 export function useTopics(props: GetTopicsRequest) {
@@ -104,9 +108,9 @@ const formatTopicCreation = (form: TopicFormData): TopicCreation => {
 
   return replicationFactor.toString() !== ''
     ? {
-        ...topicsvalue,
-        replicationFactor,
-      }
+      ...topicsvalue,
+      replicationFactor,
+    }
     : topicsvalue;
 };
 
@@ -329,3 +333,21 @@ export function useCancelTopicAnalysis(props: GetTopicDetailsRequest) {
     },
   });
 }
+
+export function useRegisterFilter(props: GetTopicDetailsRequest) {
+  const client = useQueryClient();
+  return useMutation(
+    (filter: MessageFilterRegistration) =>
+      messagesApi.registerFilter({ ...props, messageFilterRegistration: filter }),
+    {
+      onSuccess: () => {
+        showSuccessAlert({
+          message: `Filter successfully registered.`,
+        });
+        client.invalidateQueries(topicKeys.filter(props));
+      },
+      onError: (e) => {
+        showServerError(e as Response);
+      },
+    });
+}

+ 5 - 0
kafka-ui-react-app/src/redux/interfaces/topic.ts

@@ -4,6 +4,7 @@ import {
   TopicCreation,
   TopicMessage,
   TopicMessageConsuming,
+  TopicMessageNextPageCursor,
 } from 'generated-sources';
 
 export type TopicName = Topic['name'];
@@ -52,9 +53,13 @@ export interface TopicFormData {
 }
 
 export interface TopicMessagesState {
+  allMessages: TopicMessage[];
   messages: TopicMessage[];
   phase?: string;
   meta: TopicMessageConsuming;
   messageEventType?: string;
   isFetching: boolean;
+  cursor?: TopicMessageNextPageCursor;
+  currentPage: number;
+  lastLoadedPage: number;
 }

+ 16 - 0
kafka-ui-react-app/src/redux/reducers/topicMessages/selectors.ts

@@ -19,6 +19,22 @@ export const getTopicMessgesMeta = createSelector(
   ({ meta }) => meta
 );
 
+export const getTopicMessgesCursor = createSelector(
+  topicMessagesState,
+  ({ cursor }) => cursor
+);
+
+export const getTopicMessgesCurrentPage = createSelector(
+  topicMessagesState,
+  ({ currentPage }) => currentPage
+);
+
+export const getTopicMessgesLastLoadedPage = createSelector(
+  topicMessagesState,
+  ({ lastLoadedPage }) => lastLoadedPage
+);
+
+
 export const getIsTopicMessagesFetching = createSelector(
   topicMessagesState,
   ({ isFetching }) => isFetching

+ 39 - 2
kafka-ui-react-app/src/redux/reducers/topicMessages/topicMessagesSlice.ts

@@ -2,7 +2,10 @@ import { createSlice } from '@reduxjs/toolkit';
 import { TopicMessagesState } from 'redux/interfaces';
 import { TopicMessage } from 'generated-sources';
 
+const PER_PAGE = 100;
+
 export const initialState: TopicMessagesState = {
+  allMessages: [],
   messages: [],
   meta: {
     bytesConsumed: 0,
@@ -12,6 +15,8 @@ export const initialState: TopicMessagesState = {
   },
   messageEventType: '',
   isFetching: false,
+  currentPage: 0,
+  lastLoadedPage: 0
 };
 
 const topicMessagesSlice = createSlice({
@@ -19,16 +24,28 @@ const topicMessagesSlice = createSlice({
   initialState,
   reducers: {
     addTopicMessage: (state, action) => {
+      const allmessages: TopicMessage[] = action.payload.prepend
+        ? [action.payload.message, ...state.allMessages]
+        : [...state.allMessages, action.payload.message];
+
       const messages: TopicMessage[] = action.payload.prepend
         ? [action.payload.message, ...state.messages]
         : [...state.messages, action.payload.message];
 
       return {
         ...state,
-        messages,
+        allMessages: allmessages,
+        messages: messages,
       };
     },
-    resetTopicMessages: () => initialState,
+    resetTopicMessages: (state) => {
+      return {
+        ...initialState,
+        currentPage: state.currentPage,
+        allMessages: state.allMessages,
+      }
+    },
+    resetAllTopicMessages: () => initialState,
     updateTopicMessagesPhase: (state, action) => {
       state.phase = action.payload;
     },
@@ -42,6 +59,22 @@ const topicMessagesSlice = createSlice({
     setMessageEventType: (state, action) => {
       state.messageEventType = action.payload;
     },
+    updateTopicMessagesCursor: (state, action) => {
+      state.cursor = action.payload;
+    },
+    setTopicMessagesCurrentPage: (state, action) => {
+      if (state.currentPage != action.payload) {
+        const messages: TopicMessage[] = state.allMessages.slice((action.payload - 1) * PER_PAGE, (action.payload - 1) * PER_PAGE + PER_PAGE);
+        return {
+          ...state,
+          currentPage: action.payload,
+          messages
+        }
+      }
+    },
+    setTopicMessagesLastLoadedPage: (state, action) => {
+      state.lastLoadedPage = action.payload;
+    },
   },
 });
 
@@ -52,6 +85,10 @@ export const {
   updateTopicMessagesMeta,
   setTopicMessagesFetchingStatus,
   setMessageEventType,
+  updateTopicMessagesCursor,
+  setTopicMessagesCurrentPage,
+  setTopicMessagesLastLoadedPage,
+  resetAllTopicMessages
 } = topicMessagesSlice.actions;
 
 export default topicMessagesSlice.reducer;