Compare commits

..

137 commits

Author SHA1 Message Date
Vladysl
83b5a60cc0
added commons-collections4 library instead of commons-collections (#4427)
Co-authored-by: Narekmat <47845266+Narekmat@users.noreply.github.com>
2024-04-08 18:46:54 +00:00
Vladysl
3dc4446321
added env variable filtering.groovy.enabled which allows to enable/disable groovy script executions (#4426) 2024-04-08 22:30:38 +04:00
dependabot[bot]
53a6553765
Bump vite-tsconfig-paths from 4.2.0 to 4.2.1 in /kafka-ui-react-app (#4215)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-13 14:17:40 +04:00
dependabot[bot]
fc97dfa874
Bump react-router-dom from 6.3.0 to 6.15.0 in /kafka-ui-react-app (#4217)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-13 14:17:26 +04:00
Edward Muller
68f08a0c9b
Bump Spring Boot (#4227) 2023-09-13 14:08:13 +04:00
dependabot[bot]
cc12814a95
Bump @typescript-eslint/parser from 5.29.0 to 5.62.0 in /kafka-ui-react-app (#4214)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-12 18:23:14 +04:00
dependabot[bot]
5d5358010b
Bump @testing-library/dom from 9.0.0 to 9.3.1 in /kafka-ui-react-app (#4216)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-12 18:23:05 +04:00
dependabot[bot]
de2f06ccf8
Bump react-hot-toast from 2.4.0 to 2.4.1 in /kafka-ui-react-app (#4218)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-12 18:22:49 +04:00
dependabot[bot]
ff106a2061
Bump @openapitools/openapi-generator-cli from 2.5.2 to 2.7.0 in /kafka-ui-react-app (#4219)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-12 18:22:37 +04:00
dependabot[bot]
c00cb320cd
Bump @types/lossless-json from 1.0.1 to 1.0.2 in /kafka-ui-react-app (#4220)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-12 18:22:23 +04:00
Victoria Miltcheva
8a1e9ad8e8
Docs: Fix persistent installation link in README (#4207) 2023-09-07 22:26:16 +04:00
dependabot[bot]
39bb860f8e
Bump allure.version from 2.22.2 to 2.23.0 (#4039)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-06 14:45:09 +04:00
dependabot[bot]
f66d234d83
Bump git-commit-id-plugin from 4.0.0 to 4.9.10 (#4028)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-06 14:44:51 +04:00
dependabot[bot]
68a7268f8b
Bump maven-surefire-plugin from 2.22.2 to 3.1.2 (#4026)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-06 14:44:34 +04:00
dependabot[bot]
aca3d25dc8
Bump ingestion-contract-client from 0.1.23 to 0.1.26 (#4047)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-06 14:43:45 +04:00
dependabot[bot]
0616883fee
Bump eslint from 8.16.0 to 8.48.0 in /kafka-ui-react-app (#4184)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-06 14:33:49 +04:00
Ilya Kuramshin
59584ed369
BE: Add a common prefix for all app metrics (#4177)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-09-05 18:03:09 +04:00
Mihai Alexandru Agache
bbb739af92
FE: Fix dateTimeHelpers.spec.ts linter error (#4090)
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-09-05 18:01:45 +04:00
dependabot[bot]
145bf07b5d
Bump jest-environment-jsdom from 29.5.0 to 29.6.4 in /kafka-ui-react-app (#4183)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 17:42:55 +04:00
dependabot[bot]
ceb821acdf
Bump sass from 1.52.3 to 1.66.1 in /kafka-ui-react-app (#4185)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 17:42:46 +04:00
dependabot[bot]
d2b0cc51e3
Bump json-schema-faker from 0.5.0-rcv.44 to 0.5.3 in /kafka-ui-react-app (#4189)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 17:42:35 +04:00
dependabot[bot]
9e7bc02c8a
Bump eslint-plugin-jsx-a11y from 6.5.1 to 6.7.1 in /kafka-ui-react-app (#4159)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 13:33:07 +04:00
dependabot[bot]
2836b2f5d2
Bump eslint-config-prettier from 8.5.0 to 9.0.0 in /kafka-ui-react-app (#4182)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 13:32:37 +04:00
dependabot[bot]
a47848f809
Bump vite-tsconfig-paths from 4.0.2 to 4.2.0 in /kafka-ui-react-app (#4186)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 13:31:50 +04:00
dependabot[bot]
5c9fb994a4
Bump react and @types/react in /kafka-ui-react-app (#4187)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 13:31:27 +04:00
dependabot[bot]
14efe9da1e
Bump @jest/types from 29.5.0 to 29.6.3 in /kafka-ui-react-app (#4188)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 13:31:09 +04:00
dependabot[bot]
6676747606
Bump aquasecurity/trivy-action from 0.11.2 to 0.12.0 (#4190)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-09-05 13:30:50 +04:00
Ilya Kuramshin
b0583a3ca7
BE: Refactor SchemaRegistry serialization logic (#4116)
Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-08-30 12:39:58 +04:00
Ilya Kuramshin
4ec7975b2e
BE: Implement topic active producers API (#4121)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-08-30 12:38:54 +04:00
dependabot[bot]
c05abc1e0a
Bump eslint-import-resolver-node from 0.3.6 to 0.3.9 in /kafka-ui-react-app (#4166)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-30 12:32:47 +04:00
Evgeny Petrushevsky
729ca79581
BE: Fix cognito roles extractor regression (#4171)
Co-authored-by: Evgeny Petrushevsky <evgenuypetrushevskiv8@gmail.com>
2023-08-30 12:29:36 +04:00
Shubhadeep Das
80024c8758
BE: Fix sonar code smells (#3971)
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-08-29 17:54:21 +04:00
Prady
0d6f293ab9
FE: Topic analysis: Updated style for percentage of completion (#4123) 2023-08-29 17:53:13 +04:00
dependabot[bot]
8f2a29d15d
Bump aws-actions/configure-aws-credentials from 2 to 3 (#4169)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-29 17:46:10 +04:00
dependabot[bot]
552691fc5d
Bump use-debounce from 9.0.3 to 9.0.4 in /kafka-ui-react-app (#4160)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-29 17:45:24 +04:00
dependabot[bot]
342b534ac9
Bump jest from 29.5.0 to 29.6.4 in /kafka-ui-react-app (#4161)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-29 17:45:02 +04:00
dependabot[bot]
2051f6f653
Bump eslint-plugin-prettier from 4.0.0 to 4.2.1 in /kafka-ui-react-app (#4162)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-29 17:44:42 +04:00
dependabot[bot]
b2b02a5d60
Bump @hookform/error-message from 2.0.0 to 2.0.1 in /kafka-ui-react-app (#4163)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-29 17:44:23 +04:00
dependabot[bot]
d7eb3ba99e
Bump eslint-plugin-react-hooks from 4.5.0 to 4.6.0 in /kafka-ui-react-app (#4165)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-29 17:43:59 +04:00
dependabot[bot]
7de883d3ab
Bump @types/eventsource from 1.1.8 to 1.1.11 in /kafka-ui-react-app (#4167)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-29 17:43:36 +04:00
dependabot[bot]
4519d9a48c
Bump lossless-json from 2.0.8 to 2.0.11 in /kafka-ui-react-app (#4168)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-29 17:43:20 +04:00
Mitsuaki Ito
cca2c96997
FE: Fix active controller badge on invalid node (#4085)
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-08-23 11:41:46 +00:00
dependabot[bot]
844eb17d7a
Bump tough-cookie from 4.0.0 to 4.1.3 in /kafka-ui-react-app (#4150)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-23 14:28:47 +04:00
dependabot[bot]
37a6e62684
Bump vite from 4.0.0 to 4.0.5 in /kafka-ui-react-app (#4147)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-23 13:48:54 +04:00
dependabot[bot]
4f211b39ba
Bump word-wrap from 1.2.3 to 1.2.5 in /kafka-ui-react-app (#4151)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-23 13:48:31 +04:00
dependabot[bot]
8d35761b8d
Bump json5 from 1.0.1 to 1.0.2 in /kafka-ui-react-app (#4148)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-23 13:46:35 +04:00
dependabot[bot]
b12a0634a0
Bump semver from 6.3.0 to 6.3.1 in /kafka-ui-react-app (#4149)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-23 12:50:24 +04:00
Roman Zabaluev
8d402798c5
FE: Bump node & pnpm versions (#4146) 2023-08-23 11:15:32 +04:00
Malav Mevada
ed9f91fd8a
FE: Broker: Config: Implement search by the Value (#3804)
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-08-23 10:51:37 +04:00
pvmsikrsna
d2a5acc82d
FE: Topics: Minor fixes for Create Topic form (#3969)
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-08-23 10:46:03 +04:00
p-eye
7a82079471
FE: Topics: Do not send properties which are not defined explicitly (#4097) 2023-08-23 10:42:09 +04:00
Ilya Kuramshin
9acbf2b681
BE: Minor restart logic refactoring (#4140)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-08-21 09:36:17 +00:00
dependabot[bot]
5f89e3b97e
Bump actions/setup-node from 3.7.0 to 3.8.1 (#4141)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-21 13:17:38 +04:00
Roman Zabaluev
1df8625fc8
BE: Audit: consider RBAC might be disabled (#4138) 2023-08-18 08:50:05 +00:00
Roman Zabaluev
c8ad262d77
BE: Consider context path in basic auth html (#4136) 2023-08-18 12:01:05 +04:00
Roman Zabaluev
bdbbdcccbe
BE: Use HTTP GET for basic auth (#4135) 2023-08-18 12:00:09 +04:00
Roman Zabaluev
3114509ebf
FE: RBAC: Fix cluster config menu item is always enabled (#4130) 2023-08-18 11:24:10 +04:00
Roman Zabaluev
6224b12ed3
FE: Refactor RBAC tests, fix "subset" permissions issues (#4128) 2023-08-18 11:23:32 +04:00
dependabot[bot]
78e53d7d93
Bump svenstaro/upload-release-action from 2.6.1 to 2.7.0 (#4071)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-16 13:08:39 +04:00
dependabot[bot]
f9e89661d7
Bump pnpm/action-setup from 2.2.4 to 2.4.0 (#4072)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-08-16 13:08:12 +04:00
Ilya Kuramshin
2a61b97fab
Messaging polling refactoring (#4088)
1. RecordEmitter logic refactored - all polling logic moved RangePollingEmitter. Backward/Forward implementations just chouse how range is calculated for next step.
2. Using consumer.position() method to define that range is fully polled (not empty polls counting deleted)
3. AnalysisTask: calculating progress based on processed offsets, not polled messages (more accurate for compacted topics)
4. Sorting polled messages by ts before sending to frontend
2023-08-14 14:37:55 +04:00
Ilya Kuramshin
b32ab01436
BE: Implement audit log level (#4103) 2023-08-11 13:13:14 +00:00
Ilya Kuramshin
fa9547b95a
BE: Controllers structure minor refactr (#4110)
Audit & access controll services moved to AbstractController
2023-08-11 16:41:07 +04:00
Kostas Dizas
d915de4fd8
Add protobuf raw message deserializer (#4041)
Implemented a Protobuf Raw deserialiser that works like protoc --decode_raw. This is a no config alternative to the existing ProtobufFileSerde.

Co-authored-by: Ilya Kuramshin <iliax@proton.me>
2023-08-11 08:47:28 +00:00
Roman Zabaluev
150fc21fb8
RBAC: Implement roles by github teams (#4093)
Co-authored-by: Ilya Kuramshin <iliax@proton.me>
2023-08-07 19:01:39 +04:00
Narekmat
ba18f3b042
Infra: Fix frontend sonar jdk warning (#4098) 2023-08-07 18:18:24 +04:00
Roman Zabaluev
ac09efcd34
FE: Update topic deletion disabled message (#4092) 2023-08-07 18:04:10 +04:00
Roman Zabaluev
333eae2475
Make the container timezone configurable (#4084)
Co-authored-by: Ilya Kuramshin <iliax@proton.me>
2023-08-04 15:43:40 +04:00
Sungyun Hur
69ebd3d52b
FE: display consumerGroupID in Reset Offsets page (#3866) 2023-08-03 11:05:09 +00:00
Roman Zabaluev
6a40146fb1
BE: Throw obvious exception in case no oauth2 providers have been defined (#4002) 2023-08-02 13:06:31 +00:00
Roman Zabaluev
4515ecaf41
Update README
Add stats banner
2023-08-02 15:30:37 +04:00
Ilya Kuramshin
92157bdd39
BE: Hex serde initialization fix (#4081)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-08-02 09:58:58 +00:00
Ilya Kuramshin
8126607b91
BE: Audit: Minor refactoring (#4064) 2023-08-01 15:25:09 +00:00
Roman Zabaluev
77f1ec9490
BE: RBAC: Skip rbac checks in case of app config (#4078) 2023-08-01 19:06:17 +04:00
Roman Zabaluev
3cde6c21ec
FE: Logout button link is bound to a wrong div (#4045) 2023-08-01 18:27:57 +04:00
Roman Zabaluev
15f4543402
BE: RBAC: Fix unknown resource exception (#4033) 2023-08-01 18:27:31 +04:00
Ilya Kuramshin
c96a0c6be5
ISSSUE4052: Polling metrics (#4069)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-08-01 14:19:03 +00:00
Ilya Kuramshin
b2c3fcc321
BE: ACL enablement check fixes (#4034)
Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-08-01 12:42:00 +00:00
Ilya Kuramshin
1cd303a90b
BE: Impl ACL endpoints for consumer, producer, stream apps (#3783)
Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-08-01 16:23:19 +04:00
Ilya Kuramshin
895d27a306
Unavailable connects suppressing (#4061)
1. suppressing connect errors when returning list of all connectors
2. minor refactoring of KafkaConnectService.getAllConnectors
2023-08-01 12:07:46 +00:00
Ilya Kuramshin
476cbfb691
BE: Serde tiny improvements and fixes (#4063)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-08-01 15:47:03 +04:00
Ilya Kuramshin
2db89593a7
BE: Implement Hex serde (#4074)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-08-01 12:30:05 +04:00
Roman Zabaluev
0b99f745b0
BE: Migrate deprecated spring components (#4056)
Co-authored-by: Ilya Kuramshin <iliax@proton.me>
2023-07-31 13:01:36 +04:00
Ilya Kuramshin
7eaae31345
Error message from SR propagation (#4058)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-07-26 08:17:55 +00:00
Igor Khrol
ca2d53f936
BE: Respect system proxy settings in WebClient (#4042) 2023-07-24 07:43:18 +00:00
Narekmat
a32272d07e
Infra: tighten github_token security (#4043)
Co-authored-by: Narek Matevosyan <nmatevosyan@provectus.com>
2023-07-24 11:25:13 +04:00
dependabot[bot]
32cd55928a
Bump qase.io.version from 3.0.4 to 3.0.5 (#4000)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-07-13 19:02:14 +04:00
Ilya Kuramshin
d4001b5a39
SchemaRegistry references support (#3747)
1. references fields added to io dtos
2. json schema references parsing fixed (using confluent JsonSchema)
3. schema refs resolving added to ODD schema extractors
2023-07-12 10:47:38 +00:00
dependabot[bot]
f124fa632d
Bump actions/setup-node from 3.6.0 to 3.7.0 (#4023)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-07-10 20:10:29 +04:00
Roman Zabaluev
8ae8ae40a4
CVE fixes, July 23 (#4003) 2023-07-10 20:02:27 +04:00
Roman Zabaluev
5f231c7681
FE: Fix Messages: Produce: "Keep contents" not re-generated (#4008) 2023-07-10 12:18:05 +00:00
Roman Zabaluev
17cde82dff
Get rid of pull_request_target modes 2023-07-06 19:46:02 +08:00
Roman Zabaluev
9ab4580c47
FE: Consumers: Replace zero consumer lag with "N/A" (#4009) 2023-07-04 14:12:21 +04:00
Narekmat
d572e43b4f
Infra: Fix image build for non OCI-compliant runtimes (#3986)
Co-authored-by: Narek Matevosyan <nmatevosyan@provectus.com>
2023-07-03 16:31:53 +04:00
Roman Zabaluev
ab58618d83
FE: Add robots.txt (#3995) 2023-06-30 16:12:35 +04:00
seono
216c87670d
FE: Consumers: Add a tooltip for a state (#3871)
Co-authored-by: seonho.jeong <seonho.jeong@navercorp.com>
2023-06-30 15:26:55 +04:00
dependabot[bot]
e57b0bac43
Bump protobuf-java from 3.21.9 to 3.23.3 (#3966)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-30 11:20:53 +00:00
dependabot[bot]
0c732db436
Bump allure.version from 2.21.0 to 2.22.2 (#3967)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-30 14:44:38 +04:00
dependabot[bot]
d26490e82e
Bump slf4j-simple from 2.0.5 to 2.0.7 (#3987)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-30 14:44:15 +04:00
dependabot[bot]
be2f9f0605
Bump org.mapstruct.version from 1.4.2.Final to 1.5.5.Final (#3707)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-28 16:13:41 +04:00
Ilya Kuramshin
50b9c56112
ODD: Skipping topic exporting if we failed to load/parse topic's schema from SR (#3980)
Co-authored-by: iliax <ikuramshin@provectus.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-06-28 11:12:46 +04:00
Roman Zabaluev
401c9f12c1
Refactor workflow names 2023-06-27 21:08:06 +08:00
Roman Zabaluev
b700ac3991
BE: RBAC: Implement an authorities extractor to support subject-level role matching (#3979)
Co-authored-by: Ilya Kuramshin <iliax@proton.me>
2023-06-27 16:30:40 +04:00
Roman Zabaluev
b9bbb1a823
FE: Ignore expected 404 on topic statistics page (#3964) 2023-06-26 09:32:21 +00:00
dependabot[bot]
81805703c8
Bump peter-evans/create-or-update-comment from 2 to 3 (#3973)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-26 08:36:26 +00:00
Vlad Senyuta
6b67313d1a
[e2e] Checking Broker's config search is case insensitive (#3972) 2023-06-26 12:10:29 +04:00
Roman Zabaluev
9549f68d7e
BE: Fix CORS once again (#3957) 2023-06-23 08:45:12 +00:00
Ilya Kuramshin
8337c9c183
BE: Serde: Implement a serde for consumer_offsets topic (#3771) 2023-06-22 08:46:11 +00:00
Ilya Kuramshin
b1ac3482db
Skipping full qualified union-type names for avro <-> json convertion (#3931)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-06-21 12:33:30 +00:00
Roman Zabaluev
cdb4f84e23
FE: Add precision for disk usage (#3962) 2023-06-21 15:12:38 +04:00
dependabot[bot]
4134d68316
Bump maven-checkstyle-plugin from 3.1.2 to 3.3.0 (#3956)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-21 14:13:53 +04:00
Andrey Nenashev
742e6eed3e
BE: Chore: drop deprecated protobuf single file property (#3825) 2023-06-21 14:13:17 +04:00
Ilya Kuramshin
328d91de8b
Smart filters test execution endpoint added (#3656)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-06-21 10:04:58 +00:00
Ilya Kuramshin
c743067ffa
Audit backend (#3831)
AuditService added to log all API operations
2023-06-21 12:02:27 +04:00
Narekmat
7f7242eb8b
Infra: Revert comment action for published images (#3953)
Co-authored-by: Narek Matevosyan <nmatevosyan@provectus.com>
2023-06-20 11:18:20 +04:00
dependabot[bot]
593ef7ec9c
Bump kentaro-m/task-completed-checker-action from 0.1.1 to 0.1.2 (#3851)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-20 10:09:21 +04:00
dependabot[bot]
55ed7f4821
Bump mheap/github-action-required-labels from 4 to 5 (#3926)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-20 10:08:59 +04:00
dependabot[bot]
e60fe062b6
Bump maven-jar-plugin from 3.0.2 to 3.3.0 (#3704)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-20 10:08:30 +04:00
dependabot[bot]
0a35038826
Bump svenstaro/upload-release-action from 2.5.0 to 2.6.1 (#3904)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-20 10:07:38 +04:00
dependabot[bot]
fa65ec2753
Bump aquasecurity/trivy-action from 0.10.0 to 0.11.2 (#3927)
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-06-20 10:06:48 +04:00
Roman Zabaluev
f84bbb9ebb
Update issue templates
Drop helm & add a redirect
2023-06-20 09:20:57 +04:00
Roman Zabaluev
d14b935765
Update codeowners 2023-06-20 08:56:21 +04:00
Roman Zabaluev
f2ef0c2793
Nuke helm charts
due to them being moved to a separate repo
https://github.com/provectus/kafka-ui-charts
2023-06-20 08:52:26 +04:00
Nicolas BOURON
c998e17e83
Helm: Add loadBalancerIP attribute for LB svc (#3795)
Signed-off-by: Nicolas BOURON <nicolas.bouron@gmail.com>
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-06-20 08:17:27 +04:00
Vikas Rajput
d0088490a4
FE: Messages: Reset timestamp value w/ Clear all filters (#3923)
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-06-20 08:14:51 +04:00
Roman Zabaluev
6fe6165427
BE: RBAC: Fix viewable topics filter (#3946) 2023-06-20 07:51:50 +04:00
Roman Zabaluev
2ac8646769
FE: Fix react query not initiating requests for local host (#3915) 2023-06-16 16:25:06 +04:00
Roman Zabaluev
af2cff20b6
FE: Disable resetting offsets for unassigned CGs (#3935) 2023-06-16 16:24:27 +04:00
Narekmat
2fb05ca947
Infra: Release workflow: Call charts action in another repo (#3917)
Co-authored-by: Narek Matevosyan <nmatevosyan@provectus.com>
2023-06-16 10:38:48 +04:00
Narekmat
fdd4947142
Infra: post links as checks, not comments (#3889)
Co-authored-by: Narek Matevosyan <nmatevosyan@provectus.com>
2023-06-16 10:34:34 +04:00
Hyojin Hwang
5c59239456
FE: fix dropdown overlapping on topics (#3898)
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-06-16 05:17:50 +00:00
Roman Zabaluev
9a2f6bfc8e
FE: Wizard: Fix keytab param and boolean params quoting (#3934) 2023-06-16 09:00:08 +04:00
Azat Belgibayev
5d23f2a4ed
FE: Fix KSQL syntax color theme in dark mode (#3941)
+ Refactor theme selection mechanism
2023-06-16 08:42:35 +04:00
David Bejanyan
20bb274f0e
FE: Redirect the user to the wizard page (#3655)
Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com>
2023-06-16 08:38:37 +04:00
Ilya Kuramshin
03b7d1bd60
BE: Fix sanitizing on KC null values config entries (#3937)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-06-14 05:29:22 +04:00
Vlad Senyuta
100bb1dac6
[e2e] Checking Broker's config search - non-first page results (#3928) 2023-06-12 12:35:38 +04:00
Ilya Kuramshin
c355955641
BE: Impl support for Avro logical types (#3808)
Co-authored-by: iliax <ikuramshin@provectus.com>
2023-06-08 13:14:39 +00:00
268 changed files with 9428 additions and 4816 deletions

4
.github/CODEOWNERS vendored
View file

@ -14,5 +14,5 @@
# TESTS
/kafka-ui-e2e-checks/ @provectus/kafka-qa
# HELM CHARTS
/charts/ @provectus/kafka-devops
# INFRA
/.github/workflows/ @provectus/kafka-devops

View file

@ -1,5 +1,8 @@
blank_issues_enabled: false
contact_links:
- name: Report helm issue
url: https://github.com/provectus/kafka-ui-charts
about: Our helm charts are located in another repo. Please raise issues/PRs regarding charts in that repo.
- name: Official documentation
url: https://docs.kafka-ui.provectus.io/
about: Before reaching out for support, please refer to our documentation. Read "FAQ" and "Common problems", also try using search there.

View file

@ -1,92 +0,0 @@
name: "⎈ K8s/Helm problem report"
description: "Report a problem with k8s/helm charts/etc"
labels: ["status/triage", "scope/k8s"]
assignees: []
body:
- type: markdown
attributes:
value: |
Hi, thanks for raising the issue(-s), all contributions really matter!
Please, note that we'll close the issue without further explanation if you don't follow
this template and don't provide the information requested within this template.
- type: checkboxes
id: terms
attributes:
label: Issue submitter TODO list
description: By you checking these checkboxes we can be sure you've done the essential things.
options:
- label: I've looked up my issue in [FAQ](https://docs.kafka-ui.provectus.io/faq/common-problems)
required: true
- label: I've searched for an already existing issues [here](https://github.com/provectus/kafka-ui/issues)
required: true
- label: I've tried running `master`-labeled docker image and the issue still persists there
required: true
- label: I'm running a supported version of the application which is listed [here](https://github.com/provectus/kafka-ui/blob/master/SECURITY.md)
required: true
- type: textarea
attributes:
label: Describe the bug (actual behavior)
description: A clear and concise description of what the bug is. Use a list, if there is more than one problem
validations:
required: true
- type: textarea
attributes:
label: Expected behavior
description: A clear and concise description of what you expected to happen
validations:
required: false
- type: textarea
attributes:
label: Your installation details
description: |
How do you run the app? Please provide as much info as possible:
1. App version (commit hash in the top left corner of the UI)
2. Helm chart version
3. Your application config. Please remove the sensitive info like passwords or API keys.
4. Any IAAC configs
validations:
required: true
- type: textarea
attributes:
label: Steps to reproduce
description: |
Please write down the order of the actions required to reproduce the issue.
For the advanced setups/complicated issue, we might need you to provide
a minimal [reproducible example](https://stackoverflow.com/help/minimal-reproducible-example).
validations:
required: true
- type: textarea
attributes:
label: Screenshots
description: |
If applicable, add screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Logs
description: |
If applicable, *upload* screenshots to help explain your problem
validations:
required: false
- type: textarea
attributes:
label: Additional context
description: |
Add any other context about the problem here. E.G.:
1. Are there any alternative scenarios (different data/methods/configuration/setup) you have tried?
Were they successful or the same issue occurred? Please provide steps as well.
2. Related issues (if there are any).
3. Logs (if available)
4. Is there any serious impact or behaviour on the end-user because of this issue, that can be overlooked?
validations:
required: false

View file

@ -1,4 +1,4 @@
name: AWS Marketplace Publisher
name: "Infra: Release: AWS Marketplace Publisher"
on:
workflow_dispatch:
inputs:
@ -31,7 +31,7 @@ jobs:
echo "Packer will be triggered in this dir $WORK_DIR"
- name: Configure AWS credentials for Kafka-UI account
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_AMI_PUBLISH_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_AMI_PUBLISH_KEY_SECRET }}

View file

@ -1,4 +1,4 @@
name: Backend build and test
name: "Backend: PR/master build & test"
on:
push:
branches:
@ -8,6 +8,9 @@ on:
paths:
- "kafka-ui-api/**"
- "pom.xml"
permissions:
checks: write
pull-requests: write
jobs:
build-and-test:
runs-on: ubuntu-latest
@ -29,7 +32,7 @@ jobs:
key: ${{ runner.os }}-sonar
restore-keys: ${{ runner.os }}-sonar
- name: Build and analyze pull request target
if: ${{ github.event_name == 'pull_request_target' }}
if: ${{ github.event_name == 'pull_request' }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN_BACKEND }}

View file

@ -1,4 +1,4 @@
name: Pull Request Labels
name: "Infra: PR block merge"
on:
pull_request:
types: [opened, labeled, unlabeled, synchronize]
@ -6,7 +6,7 @@ jobs:
block_merge:
runs-on: ubuntu-latest
steps:
- uses: mheap/github-action-required-labels@v4
- uses: mheap/github-action-required-labels@v5
with:
mode: exactly
count: 0

View file

@ -1,4 +1,4 @@
name: Feature testing init
name: "Infra: Feature Testing: Init env"
on:
workflow_dispatch:
@ -45,7 +45,7 @@ jobs:
restore-keys: |
${{ runner.os }}-buildx-
- name: Configure AWS credentials for Kafka-UI account
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
@ -84,18 +84,22 @@ jobs:
git add ../kafka-ui-from-branch/
git commit -m "added env:${{ needs.build.outputs.deploy }}" && git push || true
- name: make comment with private deployment link
- name: update status check for private deployment
if: ${{ github.event.label.name == 'status/feature_testing' }}
uses: peter-evans/create-or-update-comment@v3
uses: Sibz/github-status-action@v1.1.6
with:
issue-number: ${{ github.event.pull_request.number }}
body: |
Custom deployment will be available at http://${{ needs.build.outputs.tag }}.internal.kafka-ui.provectus.io
authToken: ${{secrets.GITHUB_TOKEN}}
context: "Click Details button to open custom deployment page"
state: "success"
sha: ${{ github.event.pull_request.head.sha || github.sha }}
target_url: "http://${{ needs.build.outputs.tag }}.internal.kafka-ui.provectus.io"
- name: make comment with public deployment link
- name: update status check for public deployment
if: ${{ github.event.label.name == 'status/feature_testing_public' }}
uses: peter-evans/create-or-update-comment@v3
uses: Sibz/github-status-action@v1.1.6
with:
issue-number: ${{ github.event.pull_request.number }}
body: |
Custom deployment will be available at http://${{ needs.build.outputs.tag }}.kafka-ui.provectus.io in 5 minutes
authToken: ${{secrets.GITHUB_TOKEN}}
context: "Click Details button to open custom deployment page"
state: "success"
sha: ${{ github.event.pull_request.head.sha || github.sha }}
target_url: "http://${{ needs.build.outputs.tag }}.internal.kafka-ui.provectus.io"

View file

@ -1,4 +1,4 @@
name: Feature testing destroy
name: "Infra: Feature Testing: Destroy env"
on:
workflow_dispatch:
pull_request:
@ -20,9 +20,3 @@ jobs:
git config --global user.name "infra-tech"
git add ../kafka-ui-from-branch/
git commit -m "removed env:${{ needs.build.outputs.deploy }}" && git push || true
- name: make comment with deployment link
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |
Custom deployment removed

View file

@ -1,4 +1,4 @@
name: Build Docker image and push
name: "Infra: Image Testing: Deploy"
on:
workflow_dispatch:
pull_request:
@ -42,7 +42,7 @@ jobs:
restore-keys: |
${{ runner.os }}-buildx-
- name: Configure AWS credentials for Kafka-UI account
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
@ -70,6 +70,5 @@ jobs:
issue-number: ${{ github.event.pull_request.number }}
body: |
Image published at public.ecr.aws/provectus/kafka-ui-custom-build:${{ steps.extract_branch.outputs.tag }}
outputs:
tag: ${{ steps.extract_branch.outputs.tag }}

View file

@ -1,28 +0,0 @@
name: Prepare helm release
on:
repository_dispatch:
types: [prepare-helm-release]
jobs:
change-app-version:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- run: |
git config user.name github-actions
git config user.email github-actions@github.com
- name: Change versions
run: |
git checkout -b release-${{ github.event.client_payload.appversion}}
version=$(cat charts/kafka-ui/Chart.yaml | grep version | awk '{print $2}')
version=${version%.*}.$((${version##*.}+1))
sed -i "s/version:.*/version: ${version}/" charts/kafka-ui/Chart.yaml
sed -i "s/appVersion:.*/appVersion: ${{ github.event.client_payload.appversion}}/" charts/kafka-ui/Chart.yaml
git add charts/kafka-ui/Chart.yaml
git commit -m "release ${version}"
git push --set-upstream origin release-${{ github.event.client_payload.appversion}}
- name: Slack Notification
uses: rtCamp/action-slack-notify@v2
env:
SLACK_TITLE: "release-${{ github.event.client_payload.appversion}}"
SLACK_MESSAGE: "A new release of the helm chart has been prepared. Branch name: release-${{ github.event.client_payload.appversion}}"
SLACK_WEBHOOK: ${{ secrets.SLACK_WEBHOOK }}

View file

@ -55,7 +55,7 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache
- name: Run CVE checks
uses: aquasecurity/trivy-action@0.10.0
uses: aquasecurity/trivy-action@0.12.0
with:
image-ref: "provectuslabs/kafka-ui:${{ steps.build.outputs.version }}"
format: "table"

View file

@ -1,4 +1,4 @@
name: Delete Public ECR Image
name: "Infra: Image Testing: Delete"
on:
workflow_dispatch:
pull_request:
@ -15,7 +15,7 @@ jobs:
tag='${{ github.event.pull_request.number }}'
echo "tag=${tag}" >> $GITHUB_OUTPUT
- name: Configure AWS credentials for Kafka-UI account
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
@ -32,9 +32,3 @@ jobs:
--repository-name kafka-ui-custom-build \
--image-ids imageTag=${{ steps.extract_branch.outputs.tag }} \
--region us-east-1
- name: make comment with private deployment link
uses: peter-evans/create-or-update-comment@v3
with:
issue-number: ${{ github.event.pull_request.number }}
body: |
Image tag public.ecr.aws/provectus/kafka-ui-custom-build:${{ steps.extract_branch.outputs.tag }} has been removed

View file

@ -1,4 +1,4 @@
name: Documentation URLs linter
name: "Infra: Docs: URL linter"
on:
pull_request:
types:

View file

@ -1,4 +1,4 @@
name: E2E Automation suite
name: "E2E: Automation suite"
on:
workflow_dispatch:
inputs:
@ -24,7 +24,7 @@ jobs:
with:
ref: ${{ github.sha }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

View file

@ -1,4 +1,4 @@
name: E2E PR health check
name: "E2E: PR healthcheck"
on:
pull_request_target:
types: [ "opened", "edited", "reopened", "synchronize" ]
@ -8,6 +8,8 @@ on:
- "kafka-ui-react-app/**"
- "kafka-ui-e2e-checks/**"
- "pom.xml"
permissions:
statuses: write
jobs:
build-and-test:
runs-on: ubuntu-latest
@ -16,10 +18,10 @@ jobs:
with:
ref: ${{ github.event.pull_request.head.sha }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-access-key-id: ${{ secrets.S3_AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.S3_AWS_SECRET_ACCESS_KEY }}
aws-region: eu-central-1
- name: Set up environment
id: set_env_values

View file

@ -1,4 +1,4 @@
name: E2E Manual suite
name: "E2E: Manual suite"
on:
workflow_dispatch:
inputs:

View file

@ -1,4 +1,4 @@
name: E2E Weekly suite
name: "E2E: Weekly suite"
on:
schedule:
- cron: '0 1 * * 1'
@ -11,7 +11,7 @@ jobs:
with:
ref: ${{ github.sha }}
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

View file

@ -1,4 +1,4 @@
name: Frontend build and test
name: "Frontend: PR/master build & test"
on:
push:
branches:
@ -8,6 +8,9 @@ on:
paths:
- "kafka-ui-contract/**"
- "kafka-ui-react-app/**"
permissions:
checks: write
pull-requests: write
jobs:
build-and-test:
env:
@ -20,13 +23,13 @@ jobs:
# Disabling shallow clone is recommended for improving relevancy of reporting
fetch-depth: 0
ref: ${{ github.event.pull_request.head.sha }}
- uses: pnpm/action-setup@v2.2.4
- uses: pnpm/action-setup@v2.4.0
with:
version: 7.4.0
version: 8.6.12
- name: Install node
uses: actions/setup-node@v3.6.0
uses: actions/setup-node@v3.8.1
with:
node-version: "16.15.0"
node-version: "18.17.1"
cache: "pnpm"
cache-dependency-path: "./kafka-ui-react-app/pnpm-lock.yaml"
- name: Install Node dependencies
@ -46,7 +49,7 @@ jobs:
cd kafka-ui-react-app/
pnpm test:CI
- name: SonarCloud Scan
uses: workshur/sonarcloud-github-action@improved_basedir
uses: sonarsource/sonarcloud-github-action@master
with:
projectBaseDir: ./kafka-ui-react-app
args: -Dsonar.pullrequest.key=${{ github.event.pull_request.number }} -Dsonar.pullrequest.branch=${{ github.head_ref }} -Dsonar.pullrequest.base=${{ github.base_ref }}

View file

@ -1,38 +0,0 @@
name: Helm linter
on:
pull_request:
types: ["opened", "edited", "reopened", "synchronize"]
branches:
- 'master'
paths:
- "charts/**"
jobs:
build-and-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Helm tool installer
uses: Azure/setup-helm@v3
- name: Setup Kubeval
uses: lra/setup-kubeval@v1.0.1
#check, was helm version increased in Chart.yaml?
- name: Check version
shell: bash
run: |
helm_version_new=$(cat charts/kafka-ui/Chart.yaml | grep version | awk '{print $2}')
helm_version_old=$(curl -s https://raw.githubusercontent.com/provectus/kafka-ui/master/charts/kafka-ui/Chart.yaml | grep version | awk '{print $2}' )
echo $helm_version_old
echo $helm_version_new
if [[ "$helm_version_new" > "$helm_version_old" ]]; then exit 0 ; else exit 1 ; fi
- name: Run kubeval
shell: bash
run: |
sed -i "s@enabled: false@enabled: true@g" charts/kafka-ui/values.yaml
K8S_VERSIONS=$(git ls-remote --refs --tags https://github.com/kubernetes/kubernetes.git | cut -d/ -f3 | grep -e '^v1\.[0-9]\{2\}\.[0]\{1,2\}$' | grep -v -e '^v1\.1[0-7]\{1\}' | cut -c2-)
echo "NEXT K8S VERSIONS ARE GOING TO BE TESTED: $K8S_VERSIONS"
echo ""
for version in $K8S_VERSIONS
do
echo $version;
helm template --kube-version $version --set ingress.enabled=true charts/kafka-ui -f charts/kafka-ui/values.yaml | kubeval --additional-schema-locations https://raw.githubusercontent.com/yannh/kubernetes-json-schema/master --strict -v $version;
done

View file

@ -1,4 +1,4 @@
name: Master branch build & deploy
name: "Master: Build & deploy"
on:
workflow_dispatch:
push:
@ -58,6 +58,7 @@ jobs:
builder: ${{ steps.buildx.outputs.name }}
context: kafka-ui-api
platforms: linux/amd64,linux/arm64
provenance: false
push: true
tags: |
provectuslabs/kafka-ui:${{ steps.build.outputs.version }}

View file

@ -1,13 +1,14 @@
name: "PR Checklist checked"
name: "PR: Checklist linter"
on:
pull_request_target:
types: [opened, edited, synchronize, reopened]
permissions:
checks: write
jobs:
task-check:
runs-on: ubuntu-latest
steps:
- uses: kentaro-m/task-completed-checker-action@v0.1.1
- uses: kentaro-m/task-completed-checker-action@v0.1.2
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"
- uses: dekinderfiets/pr-description-enforcer@0.0.1

View file

@ -1,39 +0,0 @@
name: Release helm
on:
push:
branches:
- master
paths:
- "charts/**"
jobs:
release-helm:
runs-on:
ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 1
- run: |
git config user.name github-actions
git config user.email github-actions@github.com
- uses: azure/setup-helm@v3
- name: add chart #realse helm with new version
run: |
VERSION=$(cat charts/kafka-ui/Chart.yaml | grep version | awk '{print $2}')
echo "HELM_VERSION=$(echo ${VERSION})" >> $GITHUB_ENV
MSG=$(helm package charts/kafka-ui)
git fetch origin
git stash
git checkout -b gh-pages origin/gh-pages
git pull
helm repo index .
git add -f ${MSG##*/} index.yaml
git commit -m "release ${VERSION}"
git push
- uses: rickstaa/action-create-tag@v1 #create new tag
with:
tag: "charts/kafka-ui-${{ env.HELM_VERSION }}"

View file

@ -1,4 +1,4 @@
name: Release serde api
name: "Infra: Release: Serde API"
on: workflow_dispatch
jobs:

View file

@ -1,4 +1,4 @@
name: Release
name: "Infra: Release"
on:
release:
types: [published]
@ -34,7 +34,7 @@ jobs:
echo "version=${VERSION}" >> $GITHUB_OUTPUT
- name: Upload files to a GitHub release
uses: svenstaro/upload-release-action@2.5.0
uses: svenstaro/upload-release-action@2.7.0
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
file: kafka-ui-api/target/kafka-ui-api-${{ steps.build.outputs.version }}.jar
@ -89,14 +89,12 @@ jobs:
charts:
runs-on: ubuntu-latest
permissions:
contents: write
needs: release
steps:
- name: Repository Dispatch
uses: peter-evans/repository-dispatch@v2
with:
token: ${{ secrets.GITHUB_TOKEN }}
repository: provectus/kafka-ui
token: ${{ secrets.CHARTS_ACTIONS_TOKEN }}
repository: provectus/kafka-ui-charts
event-type: prepare-helm-release
client-payload: '{"appversion": "${{ needs.release.outputs.version }}"}'

View file

@ -1,4 +1,4 @@
name: Release Drafter
name: "Infra: Release Drafter run"
on:
push:

View file

@ -1,4 +1,4 @@
name: Separate environment create
name: "Infra: Feature Testing Public: Init env"
on:
workflow_dispatch:
inputs:
@ -47,7 +47,7 @@ jobs:
restore-keys: |
${{ runner.os }}-buildx-
- name: Configure AWS credentials for Kafka-UI account
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

View file

@ -1,4 +1,4 @@
name: Separate environment remove
name: "Infra: Feature Testing Public: Destroy env"
on:
workflow_dispatch:
inputs:

View file

@ -1,4 +1,4 @@
name: 'Close stale issues'
name: 'Infra: Close stale issues'
on:
schedule:
- cron: '30 1 * * *'

View file

@ -1,4 +1,4 @@
name: Terraform deploy
name: "Infra: Terraform deploy"
on:
workflow_dispatch:
inputs:
@ -26,7 +26,7 @@ jobs:
echo "Terraform will be triggered in this dir $TF_DIR"
- name: Configure AWS credentials for Kafka-UI account
uses: aws-actions/configure-aws-credentials@v2
uses: aws-actions/configure-aws-credentials@v3
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}

View file

@ -1,4 +1,4 @@
name: Add triage label to new issues
name: "Infra: Triage: Apply triage label for issues"
on:
issues:
types:

View file

@ -1,4 +1,4 @@
name: Add triage label to new PRs
name: "Infra: Triage: Apply triage label for PRs"
on:
pull_request:
types:

View file

@ -7,7 +7,9 @@ on:
issues:
types:
- opened
permissions:
issues: write
pull-requests: write
jobs:
welcome:
runs-on: ubuntu-latest

View file

@ -1,4 +1,4 @@
name: "Workflow linter"
name: "Infra: Workflow linter"
on:
pull_request:
types:

3
.gitignore vendored
View file

@ -31,6 +31,9 @@ build/
.vscode/
/kafka-ui-api/app/node
### SDKMAN ###
.sdkmanrc
.DS_Store
*.code-workspace

View file

@ -18,6 +18,10 @@
<a href="https://www.producthunt.com/products/ui-for-apache-kafka/reviews/new">ProductHunt</a>
</p>
<p align="center">
<img src="https://repobeats.axiom.co/api/embed/2e8a7c2d711af9daddd34f9791143e7554c35d0f.svg" />
</p>
#### UI for Apache Kafka is a free, open-source web UI to monitor and manage Apache Kafka clusters.
UI for Apache Kafka is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.
@ -87,7 +91,7 @@ docker run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-u
Then access the web UI at [http://localhost:8080](http://localhost:8080)
The command is sufficient to try things out. When you're done trying things out, you can proceed with a [persistent installation](https://docs.kafka-ui.provectus.io/configuration/quick-start#persistent-start)
The command is sufficient to try things out. When you're done trying things out, you can proceed with a [persistent installation](https://docs.kafka-ui.provectus.io/quick-start/persistent-start)
## Persistent installation

View file

@ -1,25 +0,0 @@
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/
example/
README.md

View file

@ -1,7 +0,0 @@
apiVersion: v2
name: kafka-ui
description: A Helm chart for kafka-UI
type: application
version: 0.7.0
appVersion: v0.7.0
icon: https://github.com/provectus/kafka-ui/raw/master/documentation/images/kafka-ui-logo.png

View file

@ -1 +0,0 @@
Please refer to our [documentation](https://docs.kafka-ui.provectus.io/configuration/helm-charts) to get some info on our helm charts.

View file

@ -1,3 +0,0 @@
apiVersion: v1
entries: {}
generated: "2021-11-11T12:26:08.479581+03:00"

View file

@ -1,21 +0,0 @@
1. Get the application URL by running these commands:
{{- if .Values.ingress.enabled }}
{{- range $host := .Values.ingress.hosts }}
{{- range .paths }}
http{{ if $.Values.ingress.tls }}s{{ end }}://{{ $host.host }}{{ . }}
{{- end }}
{{- end }}
{{- else if contains "NodePort" .Values.service.type }}
export NODE_PORT=$(kubectl get --namespace {{ .Release.Namespace }} -o jsonpath="{.spec.ports[0].nodePort}" services {{ include "kafka-ui.fullname" . }})
export NODE_IP=$(kubectl get nodes --namespace {{ .Release.Namespace }} -o jsonpath="{.items[0].status.addresses[0].address}")
echo http://$NODE_IP:$NODE_PORT
{{- else if contains "LoadBalancer" .Values.service.type }}
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running 'kubectl get --namespace {{ .Release.Namespace }} svc -w {{ include "kafka-ui.fullname" . }}'
export SERVICE_IP=$(kubectl get svc --namespace {{ .Release.Namespace }} {{ include "kafka-ui.fullname" . }} --template "{{"{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}"}}")
echo http://$SERVICE_IP:{{ .Values.service.port }}
{{- else if contains "ClusterIP" .Values.service.type }}
export POD_NAME=$(kubectl get pods --namespace {{ .Release.Namespace }} -l "app.kubernetes.io/name={{ include "kafka-ui.name" . }},app.kubernetes.io/instance={{ .Release.Name }}" -o jsonpath="{.items[0].metadata.name}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace {{ .Release.Namespace }} port-forward $POD_NAME 8080:8080
{{- end }}

View file

@ -1,84 +0,0 @@
{{/* vim: set filetype=mustache: */}}
{{/*
Expand the name of the chart.
*/}}
{{- define "kafka-ui.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "kafka-ui.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "kafka-ui.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "kafka-ui.labels" -}}
helm.sh/chart: {{ include "kafka-ui.chart" . }}
{{ include "kafka-ui.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "kafka-ui.selectorLabels" -}}
app.kubernetes.io/name: {{ include "kafka-ui.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Create the name of the service account to use
*/}}
{{- define "kafka-ui.serviceAccountName" -}}
{{- if .Values.serviceAccount.create }}
{{- default (include "kafka-ui.fullname" .) .Values.serviceAccount.name }}
{{- else }}
{{- default "default" .Values.serviceAccount.name }}
{{- end }}
{{- end }}
{{/*
This allows us to check if the registry of the image is specified or not.
*/}}
{{- define "kafka-ui.imageName" -}}
{{- $registryName := .Values.image.registry -}}
{{- if .Values.global }}
{{- if .Values.global.imageRegistry }}
{{- $registryName = .Values.global.imageRegistry -}}
{{- end -}}
{{- end -}}
{{- $repository := .Values.image.repository -}}
{{- $tag := .Values.image.tag | default .Chart.AppVersion -}}
{{- if $registryName }}
{{- printf "%s/%s:%s" $registryName $repository $tag -}}
{{- else }}
{{- printf "%s:%s" $repository $tag -}}
{{- end }}
{{- end -}}

View file

@ -1,10 +0,0 @@
{{- if .Values.envs.config -}}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "kafka-ui.fullname" . }}
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
data:
{{- toYaml .Values.envs.config | nindent 2 }}
{{- end -}}

View file

@ -1,11 +0,0 @@
{{- if .Values.yamlApplicationConfig -}}
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "kafka-ui.fullname" . }}-fromvalues
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
data:
config.yml: |-
{{- toYaml .Values.yamlApplicationConfig | nindent 4}}
{{ end }}

View file

@ -1,150 +0,0 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "kafka-ui.fullname" . }}
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
{{- with .Values.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
{{- if not .Values.autoscaling.enabled }}
replicas: {{ .Values.replicaCount }}
{{- end }}
selector:
matchLabels:
{{- include "kafka-ui.selectorLabels" . | nindent 6 }}
template:
metadata:
annotations:
{{- with .Values.podAnnotations }}
{{- toYaml . | nindent 8 }}
{{- end }}
checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }}
checksum/configFromValues: {{ include (print $.Template.BasePath "/configmap_fromValues.yaml") . | sha256sum }}
checksum/secret: {{ include (print $.Template.BasePath "/secret.yaml") . | sha256sum }}
labels:
{{- include "kafka-ui.selectorLabels" . | nindent 8 }}
{{- if .Values.podLabels }}
{{- toYaml .Values.podLabels | nindent 8 }}
{{- end }}
spec:
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.initContainers }}
initContainers:
{{- toYaml . | nindent 8 }}
{{- end }}
serviceAccountName: {{ include "kafka-ui.serviceAccountName" . }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: {{ .Chart.Name }}
securityContext:
{{- toYaml .Values.securityContext | nindent 12 }}
image: {{ include "kafka-ui.imageName" . }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
{{- if or .Values.env .Values.yamlApplicationConfig .Values.yamlApplicationConfigConfigMap}}
env:
{{- with .Values.env }}
{{- toYaml . | nindent 12 }}
{{- end }}
{{- if or .Values.yamlApplicationConfig .Values.yamlApplicationConfigConfigMap}}
- name: SPRING_CONFIG_ADDITIONAL-LOCATION
{{- if .Values.yamlApplicationConfig }}
value: /kafka-ui/config.yml
{{- else if .Values.yamlApplicationConfigConfigMap }}
value: /kafka-ui/{{ .Values.yamlApplicationConfigConfigMap.keyName | default "config.yml" }}
{{- end }}
{{- end }}
{{- end }}
envFrom:
{{- if .Values.existingConfigMap }}
- configMapRef:
name: {{ .Values.existingConfigMap }}
{{- end }}
{{- if .Values.envs.config }}
- configMapRef:
name: {{ include "kafka-ui.fullname" . }}
{{- end }}
{{- if .Values.existingSecret }}
- secretRef:
name: {{ .Values.existingSecret }}
{{- end }}
{{- if .Values.envs.secret}}
- secretRef:
name: {{ include "kafka-ui.fullname" . }}
{{- end}}
ports:
- name: http
containerPort: 8080
protocol: TCP
livenessProbe:
httpGet:
{{- $contextPath := .Values.envs.config.SERVER_SERVLET_CONTEXT_PATH | default "" | printf "%s/actuator/health" | urlParse }}
path: {{ get $contextPath "path" }}
port: http
{{- if .Values.probes.useHttpsScheme }}
scheme: HTTPS
{{- end }}
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
readinessProbe:
httpGet:
{{- $contextPath := .Values.envs.config.SERVER_SERVLET_CONTEXT_PATH | default "" | printf "%s/actuator/health" | urlParse }}
path: {{ get $contextPath "path" }}
port: http
{{- if .Values.probes.useHttpsScheme }}
scheme: HTTPS
{{- end }}
initialDelaySeconds: 60
periodSeconds: 30
timeoutSeconds: 10
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- if or .Values.yamlApplicationConfig .Values.volumeMounts .Values.yamlApplicationConfigConfigMap}}
volumeMounts:
{{- with .Values.volumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
{{- if .Values.yamlApplicationConfig }}
- name: kafka-ui-yaml-conf
mountPath: /kafka-ui/
{{- end }}
{{- if .Values.yamlApplicationConfigConfigMap}}
- name: kafka-ui-yaml-conf-configmap
mountPath: /kafka-ui/
{{- end }}
{{- end }}
{{- if or .Values.yamlApplicationConfig .Values.volumes .Values.yamlApplicationConfigConfigMap}}
volumes:
{{- with .Values.volumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.yamlApplicationConfig }}
- name: kafka-ui-yaml-conf
configMap:
name: {{ include "kafka-ui.fullname" . }}-fromvalues
{{- end }}
{{- if .Values.yamlApplicationConfigConfigMap}}
- name: kafka-ui-yaml-conf-configmap
configMap:
name: {{ .Values.yamlApplicationConfigConfigMap.name }}
{{- end }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}

View file

@ -1,46 +0,0 @@
{{- if .Values.autoscaling.enabled }}
{{- $kubeCapabilityVersion := semver .Capabilities.KubeVersion.Version -}}
{{- $isHigher1p25 := ge (semver "1.25" | $kubeCapabilityVersion.Compare) 0 -}}
{{- if and ($.Capabilities.APIVersions.Has "autoscaling/v2") $isHigher1p25 -}}
apiVersion: autoscaling/v2
{{- else }}
apiVersion: autoscaling/v2beta1
{{- end }}
kind: HorizontalPodAutoscaler
metadata:
name: {{ include "kafka-ui.fullname" . }}
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: {{ include "kafka-ui.fullname" . }}
minReplicas: {{ .Values.autoscaling.minReplicas }}
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
metrics:
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
- type: Resource
resource:
name: cpu
{{- if $isHigher1p25 }}
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
{{- else }}
targetAverageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
{{- end }}
{{- end }}
{{- if .Values.autoscaling.targetMemoryUtilizationPercentage }}
- type: Resource
resource:
name: memory
{{- if $isHigher1p25 }}
target:
type: Utilization
averageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- else }}
targetAverageUtilization: {{ .Values.autoscaling.targetMemoryUtilizationPercentage }}
{{- end }}
{{- end }}
{{- end }}

View file

@ -1,89 +0,0 @@
{{- if .Values.ingress.enabled -}}
{{- $fullName := include "kafka-ui.fullname" . -}}
{{- $svcPort := .Values.service.port -}}
{{- $kubeCapabilityVersion := semver .Capabilities.KubeVersion.Version -}}
{{- $isHigher1p19 := ge (semver "1.19" | $kubeCapabilityVersion.Compare) 0 -}}
{{- if and ($.Capabilities.APIVersions.Has "networking.k8s.io/v1") $isHigher1p19 -}}
apiVersion: networking.k8s.io/v1
{{- else if $.Capabilities.APIVersions.Has "networking.k8s.io/v1beta1" }}
apiVersion: networking.k8s.io/v1beta1
{{- else }}
apiVersion: extensions/v1beta1
{{- end }}
kind: Ingress
metadata:
name: {{ $fullName }}
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
{{- with .Values.ingress.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
{{- if .Values.ingress.tls.enabled }}
tls:
- hosts:
- {{ tpl .Values.ingress.host . }}
secretName: {{ .Values.ingress.tls.secretName }}
{{- end }}
{{- if .Values.ingress.ingressClassName }}
ingressClassName: {{ .Values.ingress.ingressClassName }}
{{- end }}
rules:
- http:
paths:
{{- if and ($.Capabilities.APIVersions.Has "networking.k8s.io/v1") $isHigher1p19 -}}
{{- range .Values.ingress.precedingPaths }}
- path: {{ .path }}
pathType: {{ .Values.ingress.pathType }}
backend:
service:
name: {{ .serviceName }}
port:
number: {{ .servicePort }}
{{- end }}
- backend:
service:
name: {{ $fullName }}
port:
number: {{ $svcPort }}
pathType: {{ .Values.ingress.pathType }}
{{- if .Values.ingress.path }}
path: {{ .Values.ingress.path }}
{{- end }}
{{- range .Values.ingress.succeedingPaths }}
- path: {{ .path }}
pathType: {{ .Values.ingress.pathType }}
backend:
service:
name: {{ .serviceName }}
port:
number: {{ .servicePort }}
{{- end }}
{{- if tpl .Values.ingress.host . }}
host: {{tpl .Values.ingress.host . }}
{{- end }}
{{- else -}}
{{- range .Values.ingress.precedingPaths }}
- path: {{ .path }}
backend:
serviceName: {{ .serviceName }}
servicePort: {{ .servicePort }}
{{- end }}
- backend:
serviceName: {{ $fullName }}
servicePort: {{ $svcPort }}
{{- if .Values.ingress.path }}
path: {{ .Values.ingress.path }}
{{- end }}
{{- range .Values.ingress.succeedingPaths }}
- path: {{ .path }}
backend:
serviceName: {{ .serviceName }}
servicePort: {{ .servicePort }}
{{- end }}
{{- if tpl .Values.ingress.host . }}
host: {{ tpl .Values.ingress.host . }}
{{- end }}
{{- end }}
{{- end }}

View file

@ -1,18 +0,0 @@
{{- if and .Values.networkPolicy.enabled .Values.networkPolicy.egressRules.customRules }}
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: {{ printf "%s-egress" (include "kafka-ui.fullname" .) }}
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
spec:
podSelector:
matchLabels:
{{- include "kafka-ui.selectorLabels" . | nindent 6 }}
policyTypes:
- Egress
egress:
{{- if .Values.networkPolicy.egressRules.customRules }}
{{- toYaml .Values.networkPolicy.egressRules.customRules | nindent 4 }}
{{- end }}
{{- end }}

View file

@ -1,18 +0,0 @@
{{- if and .Values.networkPolicy.enabled .Values.networkPolicy.ingressRules.customRules }}
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: {{ printf "%s-ingress" (include "kafka-ui.fullname" .) }}
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
spec:
podSelector:
matchLabels:
{{- include "kafka-ui.selectorLabels" . | nindent 6 }}
policyTypes:
- Ingress
ingress:
{{- if .Values.networkPolicy.ingressRules.customRules }}
{{- toYaml .Values.networkPolicy.ingressRules.customRules | nindent 4 }}
{{- end }}
{{- end }}

View file

@ -1,13 +0,0 @@
{{- if .Values.envs.secret -}}
apiVersion: v1
kind: Secret
metadata:
name: {{ include "kafka-ui.fullname" . }}
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
type: Opaque
data:
{{- range $key, $val := .Values.envs.secret }}
{{ $key }}: {{ $val | b64enc | quote }}
{{- end -}}
{{- end}}

View file

@ -1,22 +0,0 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "kafka-ui.fullname" . }}
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
{{- if .Values.service.annotations }}
annotations:
{{ toYaml .Values.service.annotations | nindent 4 }}
{{- end }}
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: http
protocol: TCP
name: http
{{- if (and (eq .Values.service.type "NodePort") .Values.service.nodePort) }}
nodePort: {{ .Values.service.nodePort }}
{{- end }}
selector:
{{- include "kafka-ui.selectorLabels" . | nindent 4 }}

View file

@ -1,12 +0,0 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "kafka-ui.serviceAccountName" . }}
labels:
{{- include "kafka-ui.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end }}

View file

@ -1,161 +0,0 @@
replicaCount: 1
image:
registry: docker.io
repository: provectuslabs/kafka-ui
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: ""
imagePullSecrets: []
nameOverride: ""
fullnameOverride: ""
serviceAccount:
# Specifies whether a service account should be created
create: true
# Annotations to add to the service account
annotations: {}
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
name: ""
existingConfigMap: ""
yamlApplicationConfig:
{}
# kafka:
# clusters:
# - name: yaml
# bootstrapServers: kafka-service:9092
# spring:
# security:
# oauth2:
# auth:
# type: disabled
# management:
# health:
# ldap:
# enabled: false
yamlApplicationConfigConfigMap:
{}
# keyName: config.yml
# name: configMapName
existingSecret: ""
envs:
secret: {}
config: {}
networkPolicy:
enabled: false
egressRules:
## Additional custom egress rules
## e.g:
## customRules:
## - to:
## - namespaceSelector:
## matchLabels:
## label: example
customRules: []
ingressRules:
## Additional custom ingress rules
## e.g:
## customRules:
## - from:
## - namespaceSelector:
## matchLabels:
## label: example
customRules: []
podAnnotations: {}
podLabels: {}
## Annotations to be added to kafka-ui Deployment
##
annotations: {}
## Set field schema as HTTPS for readines and liveness probe
##
probes:
useHttpsScheme: false
podSecurityContext:
{}
# fsGroup: 2000
securityContext:
{}
# capabilities:
# drop:
# - ALL
# readOnlyRootFilesystem: true
# runAsNonRoot: true
# runAsUser: 1000
service:
type: ClusterIP
port: 80
# if you want to force a specific nodePort. Must be use with service.type=NodePort
# nodePort:
# Ingress configuration
ingress:
# Enable ingress resource
enabled: false
# Annotations for the Ingress
annotations: {}
# ingressClassName for the Ingress
ingressClassName: ""
# The path for the Ingress
path: "/"
# The path type for the Ingress
pathType: "Prefix"
# The hostname for the Ingress
host: ""
# configs for Ingress TLS
tls:
# Enable TLS termination for the Ingress
enabled: false
# the name of a pre-created Secret containing a TLS private key and certificate
secretName: ""
# HTTP paths to add to the Ingress before the default path
precedingPaths: []
# Http paths to add to the Ingress after the default path
succeedingPaths: []
resources:
{}
# limits:
# cpu: 200m
# memory: 512Mi
# requests:
# cpu: 200m
# memory: 256Mi
autoscaling:
enabled: false
minReplicas: 1
maxReplicas: 100
targetCPUUtilizationPercentage: 80
# targetMemoryUtilizationPercentage: 80
nodeSelector: {}
tolerations: []
affinity: {}
env: {}
initContainers: {}
volumeMounts: {}
volumes: {}

View file

@ -1,2 +1,2 @@
rules:
- pattern: ".*"
- pattern: ".*"

View file

@ -20,6 +20,8 @@ services:
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
DYNAMIC_CONFIG_ENABLED: 'true' # not necessary, added for tests
KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'
kafka0:
image: confluentinc/cp-kafka:7.2.1.arm64

View file

@ -1,7 +1,11 @@
#FROM azul/zulu-openjdk-alpine:17-jre-headless
FROM azul/zulu-openjdk-alpine@sha256:a36679ac0d28cb835e2a8c00e1e0d95509c6c51c5081c7782b85edb1f37a771a
RUN apk add --no-cache gcompat # need to make snappy codec work
RUN apk add --no-cache \
# snappy codec
gcompat \
# configuring timezones
tzdata
RUN addgroup -S kafkaui && adduser -S kafkaui -G kafkaui
# creating folder for dynamic config usage (certificates uploads, etc)

View file

@ -81,6 +81,12 @@
<groupId>io.confluent</groupId>
<artifactId>kafka-json-schema-serializer</artifactId>
<version>${confluent.version}</version>
<exclusions>
<exclusion>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
@ -91,7 +97,7 @@
<dependency>
<groupId>software.amazon.msk</groupId>
<artifactId>aws-msk-iam-auth</artifactId>
<version>1.1.6</version>
<version>1.1.7</version>
</dependency>
<dependency>
@ -114,6 +120,11 @@
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@ -130,6 +141,11 @@
<artifactId>commons-pool2</artifactId>
<version>${apache.commons.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
@ -233,8 +249,6 @@
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-ldap</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-jsr223</artifactId>
@ -311,7 +325,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.2</version>
<version>3.3.0</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
@ -389,7 +403,7 @@
<plugin>
<groupId>pl.project13.maven</groupId>
<artifactId>git-commit-id-plugin</artifactId>
<version>4.0.0</version>
<version>4.9.10</version>
<executions>
<execution>
<id>get-the-git-infos</id>

View file

@ -51,13 +51,12 @@ public class ClustersProperties {
List<Masking> masking;
Long pollingThrottleRate;
TruststoreConfig ssl;
AuditProperties audit;
}
@Data
public static class PollingProperties {
Integer pollTimeoutMs;
Integer partitionPollTimeout;
Integer noDataEmptyPolls;
Integer maxPageSize;
Integer defaultPageSize;
}
@ -143,6 +142,23 @@ public class ClustersProperties {
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class AuditProperties {
String topic;
Integer auditTopicsPartitions;
Boolean topicAuditEnabled;
Boolean consoleAuditEnabled;
LogLevel level;
Map<String, String> auditTopicProperties;
public enum LogLevel {
ALL,
ALTER_ONLY //default
}
}
@PostConstruct
public void validateAndSetDefaults() {
if (clusters != null) {

View file

@ -1,18 +1,39 @@
package com.provectus.kafka.ui.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.config.CorsRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
@Configuration
public class CorsGlobalConfiguration implements WebFluxConfigurer {
public class CorsGlobalConfiguration {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*")
.allowedMethods("*")
.allowedHeaders("*")
.allowCredentials(false);
@Bean
public WebFilter corsFilter() {
return (final ServerWebExchange ctx, final WebFilterChain chain) -> {
final ServerHttpRequest request = ctx.getRequest();
final ServerHttpResponse response = ctx.getResponse();
final HttpHeaders headers = response.getHeaders();
headers.add("Access-Control-Allow-Origin", "*");
headers.add("Access-Control-Allow-Methods", "GET, PUT, POST, DELETE, OPTIONS");
headers.add("Access-Control-Max-Age", "3600");
headers.add("Access-Control-Allow-Headers", "Content-Type");
if (request.getMethod() == HttpMethod.OPTIONS) {
response.setStatusCode(HttpStatus.OK);
return Mono.empty();
}
return chain.filter(ctx);
};
}
}

View file

@ -1,7 +1,6 @@
package com.provectus.kafka.ui.config;
import com.provectus.kafka.ui.exception.ValidationException;
import java.beans.Transient;
import javax.annotation.PostConstruct;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

View file

@ -13,6 +13,7 @@ abstract class AbstractAuthSecurityConfig {
"/resources/**",
"/actuator/health/**",
"/actuator/info",
"/actuator/prometheus",
"/auth",
"/login",
"/logout",

View file

@ -1,7 +1,6 @@
package com.provectus.kafka.ui.config.auth;
import java.util.Collection;
import lombok.Value;
public record AuthenticatedUser(String principal, Collection<String> groups) {

View file

@ -6,13 +6,13 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.SecurityWebFiltersOrder;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.web.server.SecurityWebFilterChain;
import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler;
import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler;
import org.springframework.security.web.server.ui.LogoutPageGeneratingWebFilter;
import org.springframework.security.web.server.util.matcher.ServerWebExchangeMatchers;
@Configuration
@EnableWebFluxSecurity
@ -33,15 +33,19 @@ public class BasicAuthSecurityConfig extends AbstractAuthSecurityConfig {
final var logoutSuccessHandler = new RedirectServerLogoutSuccessHandler();
logoutSuccessHandler.setLogoutSuccessUrl(URI.create(LOGOUT_URL));
return http
.addFilterAfter(new LogoutPageGeneratingWebFilter(), SecurityWebFiltersOrder.REACTOR_CONTEXT)
.csrf().disable()
.authorizeExchange()
.pathMatchers(AUTH_WHITELIST).permitAll()
.anyExchange().authenticated()
.and().formLogin().loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler)
.and().logout().logoutSuccessHandler(logoutSuccessHandler)
.and().build();
return http.authorizeExchange(spec -> spec
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
)
.formLogin(spec -> spec.loginPage(LOGIN_URL).authenticationSuccessHandler(authHandler))
.logout(spec -> spec
.logoutSuccessHandler(logoutSuccessHandler)
.requiresLogout(ServerWebExchangeMatchers.pathMatchers(HttpMethod.GET, "/logout")))
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.build();
}
}

View file

@ -27,10 +27,12 @@ public class DisabledAuthSecurityConfig extends AbstractAuthSecurityConfig {
System.exit(1);
}
log.warn("Authentication is disabled. Access will be unrestricted.");
return http.authorizeExchange()
.anyExchange().permitAll()
.and()
.csrf().disable()
return http.authorizeExchange(spec -> spec
.anyExchange()
.permitAll()
)
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.build();
}

View file

@ -15,6 +15,8 @@ public class LdapProperties {
private String userFilterSearchBase;
private String userFilterSearchFilter;
private String groupFilterSearchBase;
private String groupFilterSearchFilter;
private String groupRoleAttribute;
@Value("${oauth2.ldap.activeDirectory:false}")
private boolean isActiveDirectory;

View file

@ -3,14 +3,16 @@ package com.provectus.kafka.ui.config.auth;
import static com.provectus.kafka.ui.config.auth.AbstractAuthSecurityConfig.AUTH_WHITELIST;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.service.rbac.extractor.RbacLdapAuthoritiesExtractor;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.ldap.LdapAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@ -22,6 +24,7 @@ import org.springframework.security.authentication.AuthenticationManager;
import org.springframework.security.authentication.ProviderManager;
import org.springframework.security.authentication.ReactiveAuthenticationManager;
import org.springframework.security.authentication.ReactiveAuthenticationManagerAdapter;
import org.springframework.security.config.Customizer;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
import org.springframework.security.core.GrantedAuthority;
@ -50,9 +53,9 @@ public class LdapSecurityConfig {
@Bean
public ReactiveAuthenticationManager authenticationManager(BaseLdapPathContextSource contextSource,
LdapAuthoritiesPopulator ldapAuthoritiesPopulator,
@Nullable AccessControlService acs) {
var rbacEnabled = acs != null && acs.isRbacEnabled();
LdapAuthoritiesPopulator authoritiesExtractor,
AccessControlService acs) {
var rbacEnabled = acs.isRbacEnabled();
BindAuthenticator ba = new BindAuthenticator(contextSource);
if (props.getBase() != null) {
ba.setUserDnPatterns(new String[] {props.getBase()});
@ -67,7 +70,7 @@ public class LdapSecurityConfig {
AbstractLdapAuthenticationProvider authenticationProvider;
if (!props.isActiveDirectory()) {
authenticationProvider = rbacEnabled
? new LdapAuthenticationProvider(ba, ldapAuthoritiesPopulator)
? new LdapAuthenticationProvider(ba, authoritiesExtractor)
: new LdapAuthenticationProvider(ba);
} else {
authenticationProvider = new ActiveDirectoryLdapAuthenticationProvider(props.getActiveDirectoryDomain(),
@ -97,11 +100,24 @@ public class LdapSecurityConfig {
@Bean
@Primary
public LdapAuthoritiesPopulator ldapAuthoritiesPopulator(BaseLdapPathContextSource contextSource) {
var authoritiesPopulator = new DefaultLdapAuthoritiesPopulator(contextSource, props.getGroupFilterSearchBase());
authoritiesPopulator.setRolePrefix("");
authoritiesPopulator.setConvertToUpperCase(false);
return authoritiesPopulator;
public DefaultLdapAuthoritiesPopulator ldapAuthoritiesExtractor(ApplicationContext context,
BaseLdapPathContextSource contextSource,
AccessControlService acs) {
var rbacEnabled = acs != null && acs.isRbacEnabled();
DefaultLdapAuthoritiesPopulator extractor;
if (rbacEnabled) {
extractor = new RbacLdapAuthoritiesExtractor(context, contextSource, props.getGroupFilterSearchBase());
} else {
extractor = new DefaultLdapAuthoritiesPopulator(contextSource, props.getGroupFilterSearchBase());
}
Optional.ofNullable(props.getGroupFilterSearchFilter()).ifPresent(extractor::setGroupSearchFilter);
extractor.setRolePrefix("");
extractor.setConvertToUpperCase(false);
extractor.setSearchSubtree(true);
return extractor;
}
@Bean
@ -111,21 +127,15 @@ public class LdapSecurityConfig {
log.info("Active Directory support for LDAP has been enabled.");
}
return http
.authorizeExchange()
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
.and()
.formLogin()
.and()
.logout()
.and()
.csrf().disable()
return http.authorizeExchange(spec -> spec
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
)
.formLogin(Customizer.withDefaults())
.logout(Customizer.withDefaults())
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.build();
}

View file

@ -12,10 +12,11 @@ import lombok.extern.log4j.Log4j2;
import org.jetbrains.annotations.Nullable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientProperties;
import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientPropertiesRegistrationAdapter;
import org.springframework.boot.autoconfigure.security.oauth2.client.OAuth2ClientPropertiesMapper;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.Customizer;
import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
import org.springframework.security.config.web.server.ServerHttpSecurity;
@ -49,21 +50,15 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
public SecurityWebFilterChain configure(ServerHttpSecurity http, OAuthLogoutSuccessHandler logoutHandler) {
log.info("Configuring OAUTH2 authentication.");
return http.authorizeExchange()
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
.and()
.oauth2Login()
.and()
.logout()
.logoutSuccessHandler(logoutHandler)
.and()
.csrf().disable()
return http.authorizeExchange(spec -> spec
.pathMatchers(AUTH_WHITELIST)
.permitAll()
.anyExchange()
.authenticated()
)
.oauth2Login(Customizer.withDefaults())
.logout(spec -> spec.logoutSuccessHandler(logoutHandler))
.csrf(ServerHttpSecurity.CsrfSpec::disable)
.build();
}
@ -103,7 +98,10 @@ public class OAuthSecurityConfig extends AbstractAuthSecurityConfig {
public InMemoryReactiveClientRegistrationRepository clientRegistrationRepository() {
final OAuth2ClientProperties props = OAuthPropertiesConverter.convertProperties(properties);
final List<ClientRegistration> registrations =
new ArrayList<>(OAuth2ClientPropertiesRegistrationAdapter.getClientRegistrations(props).values());
new ArrayList<>(new OAuth2ClientPropertiesMapper(props).asClientRegistrations().values());
if (registrations.isEmpty()) {
throw new IllegalArgumentException("OAuth2 authentication is enabled but no providers specified.");
}
return new InMemoryReactiveClientRegistrationRepository(registrations);
}

View file

@ -2,7 +2,6 @@ package com.provectus.kafka.ui.config.auth;
import java.util.Collection;
import java.util.Map;
import lombok.Value;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.oauth2.core.user.OAuth2User;

View file

@ -2,7 +2,6 @@ package com.provectus.kafka.ui.config.auth;
import java.util.Collection;
import java.util.Map;
import lombok.Value;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.oauth2.core.oidc.OidcIdToken;
import org.springframework.security.oauth2.core.oidc.OidcUserInfo;

View file

@ -1,13 +1,14 @@
package com.provectus.kafka.ui.config.auth.condition;
import com.provectus.kafka.ui.service.rbac.AbstractProviderCondition;
import org.jetbrains.annotations.NotNull;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.type.AnnotatedTypeMetadata;
public class CognitoCondition extends AbstractProviderCondition implements Condition {
@Override
public boolean matches(final ConditionContext context, final AnnotatedTypeMetadata metadata) {
public boolean matches(final ConditionContext context, final @NotNull AnnotatedTypeMetadata metadata) {
return getRegisteredProvidersTypes(context.getEnvironment()).stream().anyMatch(a -> a.equalsIgnoreCase("cognito"));
}
}
}

View file

@ -2,12 +2,19 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.exception.ClusterNotFoundException;
import com.provectus.kafka.ui.model.KafkaCluster;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ClustersStorage;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import org.springframework.beans.factory.annotation.Autowired;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Signal;
public abstract class AbstractController {
private ClustersStorage clustersStorage;
protected ClustersStorage clustersStorage;
protected AccessControlService accessControlService;
protected AuditService auditService;
protected KafkaCluster getCluster(String name) {
return clustersStorage.getClusterByName(name)
@ -15,8 +22,26 @@ public abstract class AbstractController {
String.format("Cluster with name '%s' not found", name)));
}
protected Mono<Void> validateAccess(AccessContext context) {
return accessControlService.validateAccess(context);
}
protected void audit(AccessContext acxt, Signal<?> sig) {
auditService.audit(acxt, sig);
}
@Autowired
public void setClustersStorage(ClustersStorage clustersStorage) {
this.clustersStorage = clustersStorage;
}
@Autowired
public void setAccessControlService(AccessControlService accessControlService) {
this.accessControlService = accessControlService;
}
@Autowired
public void setAuditService(AuditService auditService) {
this.auditService = auditService;
}
}

View file

@ -13,7 +13,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -38,7 +37,7 @@ public class AccessController implements AuthorizationApi {
.filter(role -> user.groups().contains(role.getName()))
.map(role -> mapPermissions(role.getPermissions(), role.getClusters()))
.flatMap(Collection::stream)
.collect(Collectors.toList())
.toList()
)
.switchIfEmpty(Mono.just(Collections.emptyList()));
@ -70,10 +69,10 @@ public class AccessController implements AuthorizationApi {
.map(String::toUpperCase)
.map(this::mapAction)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
.toList());
return dto;
})
.collect(Collectors.toList());
.toList();
}
@Nullable

View file

@ -2,13 +2,15 @@ package com.provectus.kafka.ui.controller;
import com.provectus.kafka.ui.api.AclsApi;
import com.provectus.kafka.ui.mapper.ClusterMapper;
import com.provectus.kafka.ui.model.CreateConsumerAclDTO;
import com.provectus.kafka.ui.model.CreateProducerAclDTO;
import com.provectus.kafka.ui.model.CreateStreamAppAclDTO;
import com.provectus.kafka.ui.model.KafkaAclDTO;
import com.provectus.kafka.ui.model.KafkaAclNamePatternTypeDTO;
import com.provectus.kafka.ui.model.KafkaAclResourceTypeDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.service.acl.AclsService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.resource.PatternType;
@ -25,7 +27,6 @@ import reactor.core.publisher.Mono;
public class AclsController extends AbstractController implements AclsApi {
private final AclsService aclsService;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
@ -33,12 +34,14 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("createAcl")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -48,12 +51,14 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("deleteAcl")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -66,6 +71,7 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.operationName("listAcls")
.build();
var resourceType = Optional.ofNullable(resourceTypeDto)
@ -78,12 +84,12 @@ public class AclsController extends AbstractController implements AclsApi {
var filter = new ResourcePatternFilter(resourceType, resourceName, namePatternType);
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
aclsService.listAcls(getCluster(clusterName), filter)
.map(ClusterMapper::toKafkaAclDto)))
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -91,12 +97,14 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.operationName("getAclAsCsv")
.build();
return accessControlService.validateAccess(context).then(
return validateAccess(context).then(
aclsService.getAclAsCsvString(getCluster(clusterName))
.map(ResponseEntity::ok)
.flatMap(Mono::just)
.doOnEach(sig -> audit(context, sig))
);
}
@ -105,11 +113,64 @@ public class AclsController extends AbstractController implements AclsApi {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("syncAclsCsv")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(csvMono)
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Void>> createConsumerAcl(String clusterName,
Mono<CreateConsumerAclDTO> createConsumerAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("createConsumerAcl")
.build();
return validateAccess(context)
.then(createConsumerAclDto)
.flatMap(req -> aclsService.createConsumerAcl(getCluster(clusterName), req))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Void>> createProducerAcl(String clusterName,
Mono<CreateProducerAclDTO> createProducerAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("createProducerAcl")
.build();
return validateAccess(context)
.then(createProducerAclDto)
.flatMap(req -> aclsService.createProducerAcl(getCluster(clusterName), req))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<Void>> createStreamAppAcl(String clusterName,
Mono<CreateStreamAppAclDTO> createStreamAppAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("createStreamAppAcl")
.build();
return validateAccess(context)
.then(createStreamAppAclDto)
.flatMap(req -> aclsService.createStreamAppAcl(getCluster(clusterName), req))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
}

View file

@ -15,7 +15,6 @@ import com.provectus.kafka.ui.model.UploadedFileInfoDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ApplicationInfoService;
import com.provectus.kafka.ui.service.KafkaClusterFactory;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationRestarter;
import com.provectus.kafka.ui.util.DynamicConfigOperations;
import com.provectus.kafka.ui.util.DynamicConfigOperations.PropertiesStructure;
@ -38,7 +37,7 @@ import reactor.util.function.Tuples;
@Slf4j
@RestController
@RequiredArgsConstructor
public class ApplicationConfigController implements ApplicationConfigApi {
public class ApplicationConfigController extends AbstractController implements ApplicationConfigApi {
private static final PropertiesMapper MAPPER = Mappers.getMapper(PropertiesMapper.class);
@ -50,7 +49,6 @@ public class ApplicationConfigController implements ApplicationConfigApi {
ApplicationConfigPropertiesDTO toDto(PropertiesStructure propertiesStructure);
}
private final AccessControlService accessControlService;
private final DynamicConfigOperations dynamicConfigOperations;
private final ApplicationRestarter restarter;
private final KafkaClusterFactory kafkaClusterFactory;
@ -63,62 +61,69 @@ public class ApplicationConfigController implements ApplicationConfigApi {
@Override
public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(VIEW)
.build()
)
var context = AccessContext.builder()
.applicationConfigActions(VIEW)
.operationName("getCurrentConfig")
.build();
return validateAccess(context)
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
new ApplicationConfigDTO()
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
)));
)))
.doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(EDIT)
.build()
)
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("restartWithConfig")
.build();
return validateAccess(context)
.then(restartRequestDto)
.map(dto -> {
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
restarter.requestRestart();
return ResponseEntity.ok().build();
});
.doOnNext(restartDto -> {
var newConfig = MAPPER.fromDto(restartDto.getConfig().getProperties());
dynamicConfigOperations.persist(newConfig);
})
.doOnEach(sig -> audit(context, sig))
.doOnSuccess(dto -> restarter.requestRestart())
.map(dto -> ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(EDIT)
.build()
)
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("uploadConfigRelatedFile")
.build();
return validateAccess(context)
.then(fileFlux.single())
.flatMap(file ->
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok));
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
ServerWebExchange exchange) {
return configDto
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("validateConfig")
.build();
return validateAccess(context)
.then(configDto)
.flatMap(config -> {
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
ClustersProperties clustersProperties = propertiesStructure.getKafka();
PropertiesStructure newConfig = MAPPER.fromDto(config.getProperties());
ClustersProperties clustersProperties = newConfig.getKafka();
return validateClustersConfig(clustersProperties)
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
})
.map(ResponseEntity::ok);
.map(ResponseEntity::ok)
.doOnEach(sig -> audit(context, sig));
}
private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(

View file

@ -36,10 +36,10 @@ public class AuthController {
+ " <meta name=\"description\" content=\"\">\n"
+ " <meta name=\"author\" content=\"\">\n"
+ " <title>Please sign in</title>\n"
+ " <link href=\"/static/css/bootstrap.min.css\" rel=\"stylesheet\" "
+ " <link href=\"" + contextPath + "/static/css/bootstrap.min.css\" rel=\"stylesheet\" "
+ "integrity=\"sha384-/Y6pD6FV/Vv2HJnA6t+vslU6fwYXjCFtcEpHbNJ0lyAFsXTsjBbfaDjzALeQsN6M\" "
+ "crossorigin=\"anonymous\">\n"
+ " <link href=\"/static/css/signin.css\" "
+ " <link href=\"" + contextPath + "/static/css/signin.css\" "
+ "rel=\"stylesheet\" crossorigin=\"anonymous\"/>\n"
+ " </head>\n"
+ " <body>\n"

View file

@ -11,8 +11,9 @@ import com.provectus.kafka.ui.model.BrokersLogdirsDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.service.BrokerService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
@ -25,63 +26,79 @@ import reactor.core.publisher.Mono;
@RequiredArgsConstructor
@Slf4j
public class BrokersController extends AbstractController implements BrokersApi {
private static final String BROKER_ID = "brokerId";
private final BrokerService brokerService;
private final ClusterMapper clusterMapper;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.build());
.operationName("getBrokers")
.build();
var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);
return validateAccess.thenReturn(ResponseEntity.ok(job));
return validateAccess(context)
.thenReturn(ResponseEntity.ok(job))
.doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.build());
.operationName("getBrokersMetrics")
.operationParams(Map.of("id", id))
.build();
return validateAccess.then(
brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
);
return validateAccess(context)
.then(
brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
)
.doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
List<Integer> brokers,
@Nullable List<Integer> brokers,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.build());
return validateAccess.thenReturn(ResponseEntity.ok(
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers)));
List<Integer> brokerIds = brokers == null ? List.of() : brokers;
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("getAllBrokersLogdirs")
.operationParams(Map.of("brokerIds", brokerIds))
.build();
return validateAccess(context)
.thenReturn(ResponseEntity.ok(
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
.doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName,
Integer id,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.clusterConfigActions(ClusterConfigAction.VIEW)
.build());
.operationName("getBrokerConfig")
.operationParams(Map.of(BROKER_ID, id))
.build();
return validateAccess.thenReturn(
return validateAccess(context).thenReturn(
ResponseEntity.ok(
brokerService.getBrokerConfig(getCluster(clusterName), id)
.map(clusterMapper::toBrokerConfig))
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -89,16 +106,18 @@ public class BrokersController extends AbstractController implements BrokersApi
Integer id,
Mono<BrokerLogdirUpdateDTO> brokerLogdir,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
.build());
.operationName("updateBrokerTopicPartitionLogDir")
.operationParams(Map.of(BROKER_ID, id))
.build();
return validateAccess.then(
return validateAccess(context).then(
brokerLogdir
.flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -107,16 +126,18 @@ public class BrokersController extends AbstractController implements BrokersApi
String name,
Mono<BrokerConfigItemDTO> brokerConfig,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
.build());
.operationName("updateBrokerConfigByName")
.operationParams(Map.of(BROKER_ID, id))
.build();
return validateAccess.then(
return validateAccess(context).then(
brokerConfig
.flatMap(bci -> brokerService.updateBrokerConfigByName(
getCluster(clusterName), id, name, bci.getValue()))
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
}

View file

@ -6,7 +6,6 @@ import com.provectus.kafka.ui.model.ClusterMetricsDTO;
import com.provectus.kafka.ui.model.ClusterStatsDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ClusterService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
@ -20,7 +19,6 @@ import reactor.core.publisher.Mono;
@Slf4j
public class ClustersController extends AbstractController implements ClustersApi {
private final ClusterService clusterService;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<Flux<ClusterDTO>>> getClusters(ServerWebExchange exchange) {
@ -35,14 +33,16 @@ public class ClustersController extends AbstractController implements ClustersAp
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("getClusterMetrics")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(
clusterService.getClusterMetrics(getCluster(clusterName))
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
);
)
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -50,14 +50,16 @@ public class ClustersController extends AbstractController implements ClustersAp
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("getClusterStats")
.build();
return accessControlService.validateAccess(context)
return validateAccess(context)
.then(
clusterService.getClusterStats(getCluster(clusterName))
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
);
)
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -66,11 +68,11 @@ public class ClustersController extends AbstractController implements ClustersAp
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("updateClusterInfo")
.build();
return accessControlService.validateAccess(context)
.then(
clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok)
);
return validateAccess(context)
.then(clusterService.updateCluster(getCluster(clusterName)).map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}
}

View file

@ -19,11 +19,9 @@ import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.service.ConsumerGroupService;
import com.provectus.kafka.ui.service.OffsetsResetService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -41,7 +39,6 @@ public class ConsumerGroupsController extends AbstractController implements Cons
private final ConsumerGroupService consumerGroupService;
private final OffsetsResetService offsetsResetService;
private final AccessControlService accessControlService;
@Value("${consumer.groups.page.size:25}")
private int defaultConsumerGroupsPageSize;
@ -50,44 +47,47 @@ public class ConsumerGroupsController extends AbstractController implements Cons
public Mono<ResponseEntity<Void>> deleteConsumerGroup(String clusterName,
String id,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(id)
.consumerGroupActions(DELETE)
.build());
.operationName("deleteConsumerGroup")
.build();
return validateAccess.then(
consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id)
.thenReturn(ResponseEntity.ok().build())
);
return validateAccess(context)
.then(consumerGroupService.deleteConsumerGroupById(getCluster(clusterName), id))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@Override
public Mono<ResponseEntity<ConsumerGroupDetailsDTO>> getConsumerGroup(String clusterName,
String consumerGroupId,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.consumerGroup(consumerGroupId)
.consumerGroupActions(VIEW)
.build());
.operationName("getConsumerGroup")
.build();
return validateAccess.then(
consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
return validateAccess(context)
.then(consumerGroupService.getConsumerGroupDetail(getCluster(clusterName), consumerGroupId)
.map(ConsumerGroupMapper::toDetailsDto)
.map(ResponseEntity::ok)
);
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> getTopicConsumerGroups(String clusterName,
String topicName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.build());
.operationName("getTopicConsumerGroups")
.build();
Mono<ResponseEntity<Flux<ConsumerGroupDTO>>> job =
consumerGroupService.getConsumerGroupsForTopic(getCluster(clusterName), topicName)
@ -99,7 +99,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
.map(ResponseEntity::ok)
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()));
return validateAccess.then(job);
return validateAccess(context)
.then(job)
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -112,12 +114,13 @@ public class ConsumerGroupsController extends AbstractController implements Cons
SortOrderDTO sortOrderDto,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
// consumer group access validation is within the service
.build());
.operationName("getConsumerGroupsPage")
.build();
return validateAccess.then(
return validateAccess(context).then(
consumerGroupService.getConsumerGroupsPage(
getCluster(clusterName),
Optional.ofNullable(page).filter(i -> i > 0).orElse(1),
@ -128,7 +131,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
)
.map(this::convertPage)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -137,12 +140,13 @@ public class ConsumerGroupsController extends AbstractController implements Cons
Mono<ConsumerGroupOffsetsResetDTO> resetDto,
ServerWebExchange exchange) {
return resetDto.flatMap(reset -> {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(reset.getTopic())
.topicActions(TopicAction.VIEW)
.consumerGroupActions(RESET_OFFSETS)
.build());
.operationName("resetConsumerGroupOffsets")
.build();
Supplier<Mono<Void>> mono = () -> {
var cluster = getCluster(clusterName);
@ -182,7 +186,9 @@ public class ConsumerGroupsController extends AbstractController implements Cons
}
};
return validateAccess.then(mono.get());
return validateAccess(context)
.then(mono.get())
.doOnEach(sig -> audit(context, sig));
}).thenReturn(ResponseEntity.ok().build());
}
@ -193,7 +199,7 @@ public class ConsumerGroupsController extends AbstractController implements Cons
.consumerGroups(consumerGroupConsumerGroupsPage.consumerGroups()
.stream()
.map(ConsumerGroupMapper::toDto)
.collect(Collectors.toList()));
.toList());
}
}

View file

@ -18,7 +18,6 @@ import com.provectus.kafka.ui.model.TaskDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ConnectAction;
import com.provectus.kafka.ui.service.KafkaConnectService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
@ -37,8 +36,9 @@ import reactor.core.publisher.Mono;
public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
private static final Set<ConnectorActionDTO> RESTART_ACTIONS
= Set.of(RESTART, RESTART_FAILED_TASKS, RESTART_ALL_TASKS);
private static final String CONNECTOR_NAME = "connectorName";
private final KafkaConnectService kafkaConnectService;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
@ -54,15 +54,16 @@ public class KafkaConnectController extends AbstractController implements KafkaC
public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, String connectName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.build());
.operationName("getConnectors")
.build();
return validateAccess.thenReturn(
ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName))
);
return validateAccess(context)
.thenReturn(ResponseEntity.ok(kafkaConnectService.getConnectorNames(getCluster(clusterName), connectName)))
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -70,16 +71,17 @@ public class KafkaConnectController extends AbstractController implements KafkaC
@Valid Mono<NewConnectorDTO> connector,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.CREATE)
.build());
.operationName("createConnector")
.build();
return validateAccess.then(
return validateAccess(context).then(
kafkaConnectService.createConnector(getCluster(clusterName), connectName, connector)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -87,17 +89,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.connector(connectorName)
.build());
.operationName("getConnector")
.build();
return validateAccess.then(
return validateAccess(context).then(
kafkaConnectService.getConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -105,16 +108,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.build());
.operationName("deleteConnector")
.operationParams(Map.of(CONNECTOR_NAME, connectName))
.build();
return validateAccess.then(
return validateAccess(context).then(
kafkaConnectService.deleteConnector(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@ -126,14 +131,23 @@ public class KafkaConnectController extends AbstractController implements KafkaC
SortOrderDTO sortOrder,
ServerWebExchange exchange
) {
var context = AccessContext.builder()
.cluster(clusterName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.operationName("getAllConnectors")
.build();
var comparator = sortOrder == null || sortOrder.equals(SortOrderDTO.ASC)
? getConnectorsComparator(orderBy)
: getConnectorsComparator(orderBy).reversed();
Flux<FullConnectorInfoDTO> job = kafkaConnectService.getAllConnectors(getCluster(clusterName), search)
.filterWhen(dto -> accessControlService.isConnectAccessible(dto.getConnect(), clusterName))
.filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName));
.filterWhen(dto -> accessControlService.isConnectorAccessible(dto.getConnect(), dto.getName(), clusterName))
.sort(comparator);
return Mono.just(ResponseEntity.ok(job.sort(comparator)));
return Mono.just(ResponseEntity.ok(job))
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -142,17 +156,18 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.build());
.operationName("getConnectorConfig")
.build();
return validateAccess.then(
return validateAccess(context).then(
kafkaConnectService
.getConnectorConfig(getCluster(clusterName), connectName, connectorName)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -161,16 +176,19 @@ public class KafkaConnectController extends AbstractController implements KafkaC
Mono<Map<String, Object>> requestBody,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.EDIT)
.build());
.operationName("setConnectorConfig")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
return validateAccess.then(
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok));
return validateAccess(context).then(
kafkaConnectService
.setConnectorConfig(getCluster(clusterName), connectName, connectorName, requestBody)
.map(ResponseEntity::ok))
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -178,7 +196,6 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName,
ConnectorActionDTO action,
ServerWebExchange exchange) {
ConnectAction[] connectActions;
if (RESTART_ACTIONS.contains(action)) {
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.RESTART};
@ -186,17 +203,19 @@ public class KafkaConnectController extends AbstractController implements KafkaC
connectActions = new ConnectAction[] {ConnectAction.VIEW, ConnectAction.EDIT};
}
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(connectActions)
.build());
.operationName("updateConnectorState")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
return validateAccess.then(
return validateAccess(context).then(
kafkaConnectService
.updateConnectorState(getCluster(clusterName), connectName, connectorName, action)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -204,17 +223,19 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectName,
String connectorName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.build());
.operationName("getConnectorTasks")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
return validateAccess.thenReturn(
return validateAccess(context).thenReturn(
ResponseEntity
.ok(kafkaConnectService
.getConnectorTasks(getCluster(clusterName), connectName, connectorName))
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -222,34 +243,37 @@ public class KafkaConnectController extends AbstractController implements KafkaC
String connectorName, Integer taskId,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW, ConnectAction.RESTART)
.build());
.operationName("restartConnectorTask")
.operationParams(Map.of(CONNECTOR_NAME, connectorName))
.build();
return validateAccess.then(
return validateAccess(context).then(
kafkaConnectService
.restartConnectorTask(getCluster(clusterName), connectName, connectorName, taskId)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
String clusterName, String connectName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.connect(connectName)
.connectActions(ConnectAction.VIEW)
.build());
.operationName("getConnectorPlugins")
.build();
return validateAccess.then(
return validateAccess(context).then(
Mono.just(
ResponseEntity.ok(
kafkaConnectService.getConnectorPlugins(getCluster(clusterName), connectName)))
);
).doOnEach(sig -> audit(context, sig));
}
@Override

View file

@ -10,7 +10,6 @@ import com.provectus.kafka.ui.model.KsqlTableResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.KsqlAction;
import com.provectus.kafka.ui.service.ksql.KsqlServiceV2;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -28,39 +27,42 @@ import reactor.core.publisher.Mono;
public class KsqlController extends AbstractController implements KsqlApi {
private final KsqlServiceV2 ksqlServiceV2;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<KsqlCommandV2ResponseDTO>> executeKsql(String clusterName,
Mono<KsqlCommandV2DTO>
ksqlCommand2Dto,
Mono<KsqlCommandV2DTO> ksqlCmdDo,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.build());
return validateAccess.then(
ksqlCommand2Dto.map(dto -> {
var id = ksqlServiceV2.registerCommand(
getCluster(clusterName),
dto.getKsql(),
Optional.ofNullable(dto.getStreamsProperties()).orElse(Map.of()));
return new KsqlCommandV2ResponseDTO().pipeId(id);
}).map(ResponseEntity::ok)
);
return ksqlCmdDo.flatMap(
command -> {
var context = AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.operationName("executeKsql")
.operationParams(command)
.build();
return validateAccess(context).thenReturn(
new KsqlCommandV2ResponseDTO().pipeId(
ksqlServiceV2.registerCommand(
getCluster(clusterName),
command.getKsql(),
Optional.ofNullable(command.getStreamsProperties()).orElse(Map.of()))))
.doOnEach(sig -> audit(context, sig));
}
)
.map(ResponseEntity::ok);
}
@Override
public Mono<ResponseEntity<Flux<KsqlResponseDTO>>> openKsqlResponsePipe(String clusterName,
String pipeId,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.build());
.operationName("openKsqlResponsePipe")
.build();
return validateAccess.thenReturn(
return validateAccess(context).thenReturn(
ResponseEntity.ok(ksqlServiceV2.execute(pipeId)
.map(table -> new KsqlResponseDTO()
.table(
@ -74,22 +76,28 @@ public class KsqlController extends AbstractController implements KsqlApi {
@Override
public Mono<ResponseEntity<Flux<KsqlStreamDescriptionDTO>>> listStreams(String clusterName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.build());
.operationName("listStreams")
.build();
return validateAccess.thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))));
return validateAccess(context)
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listStreams(getCluster(clusterName))))
.doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<KsqlTableDescriptionDTO>>> listTables(String clusterName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.ksqlActions(KsqlAction.EXECUTE)
.build());
.operationName("listTables")
.build();
return validateAccess.thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))));
return validateAccess(context)
.thenReturn(ResponseEntity.ok(ksqlServiceV2.listTables(getCluster(clusterName))))
.doOnEach(sig -> audit(context, sig));
}
}

View file

@ -15,13 +15,16 @@ import com.provectus.kafka.ui.model.MessageFilterTypeDTO;
import com.provectus.kafka.ui.model.SeekDirectionDTO;
import com.provectus.kafka.ui.model.SeekTypeDTO;
import com.provectus.kafka.ui.model.SerdeUsageDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionDTO;
import com.provectus.kafka.ui.model.SmartFilterTestExecutionResultDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicSerdeSuggestionDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AuditAction;
import com.provectus.kafka.ui.model.rbac.permission.TopicAction;
import com.provectus.kafka.ui.service.DeserializationService;
import com.provectus.kafka.ui.service.MessagesService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.DynamicConfigOperations;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -29,6 +32,7 @@ import javax.annotation.Nullable;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
@ -45,26 +49,34 @@ public class MessagesController extends AbstractController implements MessagesAp
private final MessagesService messagesService;
private final DeserializationService deserializationService;
private final AccessControlService accessControlService;
private final DynamicConfigOperations dynamicConfigOperations;
@Override
public Mono<ResponseEntity<Void>> deleteTopicMessages(
String clusterName, String topicName, @Valid List<Integer> partitions,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_DELETE)
.build());
.build();
return validateAccess.then(
return validateAccess(context).<ResponseEntity<Void>>then(
messagesService.deleteTopicMessages(
getCluster(clusterName),
topicName,
Optional.ofNullable(partitions).orElse(List.of())
).thenReturn(ResponseEntity.ok().build())
);
).doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilterTest(
Mono<SmartFilterTestExecutionDTO> smartFilterTestExecutionDto, ServerWebExchange exchange) {
return smartFilterTestExecutionDto
.map(MessagesService::execSmartFilterTest)
.map(ResponseEntity::ok);
}
@Override
@ -79,11 +91,19 @@ public class MessagesController extends AbstractController implements MessagesAp
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());
.operationName("getTopicMessages");
if (StringUtils.isNoneEmpty(q) && MessageFilterTypeDTO.GROOVY_SCRIPT == filterQueryType) {
dynamicConfigOperations.checkIfFilteringGroovyEnabled();
}
if (auditService.isAuditTopic(getCluster(clusterName), topicName)) {
contextBuilder.auditActions(AuditAction.VIEW);
}
seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
@ -102,7 +122,10 @@ public class MessagesController extends AbstractController implements MessagesAp
)
);
return validateAccess.then(job);
var context = contextBuilder.build();
return validateAccess(context)
.then(job)
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -110,17 +133,18 @@ public class MessagesController extends AbstractController implements MessagesAp
String clusterName, String topicName, @Valid Mono<CreateTopicMessageDTO> createTopicMessage,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_PRODUCE)
.build());
.operationName("sendTopicMessages")
.build();
return validateAccess.then(
return validateAccess(context).then(
createTopicMessage.flatMap(msg ->
messagesService.sendMessage(getCluster(clusterName), topicName, msg).then()
).map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
/**
@ -156,12 +180,12 @@ public class MessagesController extends AbstractController implements MessagesAp
String topicName,
SerdeUsageDTO use,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(TopicAction.VIEW)
.build());
.operationName("getSerdes")
.build();
TopicSerdeSuggestionDTO dto = new TopicSerdeSuggestionDTO()
.key(use == SerdeUsageDTO.SERIALIZE
@ -171,10 +195,14 @@ public class MessagesController extends AbstractController implements MessagesAp
? deserializationService.getSerdesForSerialize(getCluster(clusterName), topicName, VALUE)
: deserializationService.getSerdesForDeserialize(getCluster(clusterName), topicName, VALUE));
return validateAccess.then(
return validateAccess(context).then(
Mono.just(dto)
.subscribeOn(Schedulers.boundedElastic())
.map(ResponseEntity::ok)
);
}
}

View file

@ -13,9 +13,8 @@ import com.provectus.kafka.ui.model.SchemaSubjectsResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.SchemaAction;
import com.provectus.kafka.ui.service.SchemaRegistryService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.stream.Collectors;
import java.util.Map;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -36,7 +35,6 @@ public class SchemasController extends AbstractController implements SchemasApi
private final KafkaSrMapper kafkaSrMapper = new KafkaSrMapperImpl();
private final SchemaRegistryService schemaRegistryService;
private final AccessControlService accessControlService;
@Override
protected KafkaCluster getCluster(String clusterName) {
@ -51,13 +49,14 @@ public class SchemasController extends AbstractController implements SchemasApi
public Mono<ResponseEntity<CompatibilityCheckResponseDTO>> checkSchemaCompatibility(
String clusterName, String subject, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.build());
.operationName("checkSchemaCompatibility")
.build();
return validateAccess.then(
return validateAccess(context).then(
newSchemaSubjectMono.flatMap(subjectDTO ->
schemaRegistryService.checksSchemaCompatibility(
getCluster(clusterName),
@ -66,19 +65,20 @@ public class SchemasController extends AbstractController implements SchemasApi
))
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<SchemaSubjectDTO>> createNewSchema(
String clusterName, @Valid Mono<NewSchemaSubjectDTO> newSchemaSubjectMono,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.CREATE)
.build());
.operationName("createNewSchema")
.build();
return validateAccess.then(
return validateAccess(context).then(
newSchemaSubjectMono.flatMap(newSubject ->
schemaRegistryService.registerNewSchema(
getCluster(clusterName),
@ -87,20 +87,22 @@ public class SchemasController extends AbstractController implements SchemasApi
)
).map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> deleteLatestSchema(
String clusterName, String subject, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.build());
.operationName("deleteLatestSchema")
.build();
return validateAccess.then(
return validateAccess(context).then(
schemaRegistryService.deleteLatestSchemaSubject(getCluster(clusterName), subject)
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -108,14 +110,16 @@ public class SchemasController extends AbstractController implements SchemasApi
@Override
public Mono<ResponseEntity<Void>> deleteSchema(
String clusterName, String subject, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.DELETE)
.build());
.operationName("deleteSchema")
.build();
return validateAccess.then(
return validateAccess(context).then(
schemaRegistryService.deleteSchemaSubjectEntirely(getCluster(clusterName), subject)
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -123,14 +127,16 @@ public class SchemasController extends AbstractController implements SchemasApi
@Override
public Mono<ResponseEntity<Void>> deleteSchemaByVersion(
String clusterName, String subjectName, Integer version, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.DELETE)
.build());
.operationName("deleteSchemaByVersion")
.build();
return validateAccess.then(
return validateAccess(context).then(
schemaRegistryService.deleteSchemaSubjectByVersion(getCluster(clusterName), subjectName, version)
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -138,16 +144,20 @@ public class SchemasController extends AbstractController implements SchemasApi
@Override
public Mono<ResponseEntity<Flux<SchemaSubjectDTO>>> getAllVersionsBySubject(
String clusterName, String subjectName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subjectName)
.schemaActions(SchemaAction.VIEW)
.build());
.operationName("getAllVersionsBySubject")
.build();
Flux<SchemaSubjectDTO> schemas =
schemaRegistryService.getAllVersionsBySubject(getCluster(clusterName), subjectName)
.map(kafkaSrMapper::toDto);
return validateAccess.thenReturn(ResponseEntity.ok(schemas));
return validateAccess(context)
.thenReturn(ResponseEntity.ok(schemas))
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -163,34 +173,37 @@ public class SchemasController extends AbstractController implements SchemasApi
public Mono<ResponseEntity<SchemaSubjectDTO>> getLatestSchema(String clusterName,
String subject,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.build());
.operationName("getLatestSchema")
.build();
return validateAccess.then(
return validateAccess(context).then(
schemaRegistryService.getLatestSchemaVersionBySubject(getCluster(clusterName), subject)
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<SchemaSubjectDTO>> getSchemaByVersion(
String clusterName, String subject, Integer version, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schema(subject)
.schemaActions(SchemaAction.VIEW)
.build());
.operationName("getSchemaByVersion")
.operationParams(Map.of("subject", subject, "version", version))
.build();
return validateAccess.then(
return validateAccess(context).then(
schemaRegistryService.getSchemaSubjectByVersion(
getCluster(clusterName), subject, version)
.map(kafkaSrMapper::toDto)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -199,6 +212,11 @@ public class SchemasController extends AbstractController implements SchemasApi
@Valid Integer perPage,
@Valid String search,
ServerWebExchange serverWebExchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.operationName("getSchemas")
.build();
return schemaRegistryService
.getAllSubjectNames(getCluster(clusterName))
.flatMapIterable(l -> l)
@ -216,29 +234,32 @@ public class SchemasController extends AbstractController implements SchemasApi
List<String> subjectsToRender = filteredSubjects.stream()
.skip(subjectToSkip)
.limit(pageSize)
.collect(Collectors.toList());
.toList();
return schemaRegistryService.getAllLatestVersionSchemas(getCluster(clusterName), subjectsToRender)
.map(subjs -> subjs.stream().map(kafkaSrMapper::toDto).toList())
.map(subjs -> new SchemaSubjectsResponseDTO().pageCount(totalPages).schemas(subjs));
}).map(ResponseEntity::ok);
}).map(ResponseEntity::ok)
.doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> updateGlobalSchemaCompatibilityLevel(
String clusterName, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.MODIFY_GLOBAL_COMPATIBILITY)
.build());
.operationName("updateGlobalSchemaCompatibilityLevel")
.build();
return validateAccess.then(
return validateAccess(context).then(
compatibilityLevelMono
.flatMap(compatibilityLevelDTO ->
schemaRegistryService.updateGlobalSchemaCompatibility(
getCluster(clusterName),
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -247,12 +268,14 @@ public class SchemasController extends AbstractController implements SchemasApi
public Mono<ResponseEntity<Void>> updateSchemaCompatibilityLevel(
String clusterName, String subject, @Valid Mono<CompatibilityLevelDTO> compatibilityLevelMono,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.schemaActions(SchemaAction.EDIT)
.build());
.operationName("updateSchemaCompatibilityLevel")
.operationParams(Map.of("subject", subject))
.build();
return validateAccess.then(
return validateAccess(context).then(
compatibilityLevelMono
.flatMap(compatibilityLevelDTO ->
schemaRegistryService.updateSchemaCompatibility(
@ -260,6 +283,7 @@ public class SchemasController extends AbstractController implements SchemasApi
subject,
kafkaSrMapper.fromDto(compatibilityLevelDTO.getCompatibility())
))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}

View file

@ -22,14 +22,15 @@ import com.provectus.kafka.ui.model.TopicConfigDTO;
import com.provectus.kafka.ui.model.TopicCreationDTO;
import com.provectus.kafka.ui.model.TopicDTO;
import com.provectus.kafka.ui.model.TopicDetailsDTO;
import com.provectus.kafka.ui.model.TopicProducerStateDTO;
import com.provectus.kafka.ui.model.TopicUpdateDTO;
import com.provectus.kafka.ui.model.TopicsResponseDTO;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.TopicsService;
import com.provectus.kafka.ui.service.analyze.TopicAnalysisService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -51,70 +52,79 @@ public class TopicsController extends AbstractController implements TopicsApi {
private final TopicsService topicsService;
private final TopicAnalysisService topicAnalysisService;
private final ClusterMapper clusterMapper;
private final AccessControlService accessControlService;
@Override
public Mono<ResponseEntity<TopicDTO>> createTopic(
String clusterName, @Valid Mono<TopicCreationDTO> topicCreation, ServerWebExchange exchange) {
String clusterName, @Valid Mono<TopicCreationDTO> topicCreationMono, ServerWebExchange exchange) {
return topicCreationMono.flatMap(topicCreation -> {
var context = AccessContext.builder()
.cluster(clusterName)
.topicActions(CREATE)
.operationName("createTopic")
.operationParams(topicCreation)
.build();
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topicActions(CREATE)
.build());
return validateAccess.then(
topicsService.createTopic(getCluster(clusterName), topicCreation)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
);
return validateAccess(context)
.then(topicsService.createTopic(getCluster(clusterName), topicCreation))
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.OK))
.switchIfEmpty(Mono.just(ResponseEntity.notFound().build()))
.doOnEach(sig -> audit(context, sig));
});
}
@Override
public Mono<ResponseEntity<TopicDTO>> recreateTopic(String clusterName,
String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, CREATE, DELETE)
.build());
.operationName("recreateTopic")
.build();
return validateAccess.then(
return validateAccess(context).then(
topicsService.recreateTopic(getCluster(clusterName), topicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
);
).doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<TopicDTO>> cloneTopic(
String clusterName, String topicName, String newTopicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, CREATE)
.build());
.operationName("cloneTopic")
.operationParams(Map.of("newTopicName", newTopicName))
.build();
return validateAccess.then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
);
return validateAccess(context)
.then(topicsService.cloneTopic(getCluster(clusterName), topicName, newTopicName)
.map(clusterMapper::toTopic)
.map(s -> new ResponseEntity<>(s, HttpStatus.CREATED))
).doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> deleteTopic(
String clusterName, String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(DELETE)
.build());
.operationName("deleteTopic")
.build();
return validateAccess.then(
topicsService.deleteTopic(getCluster(clusterName), topicName).map(ResponseEntity::ok)
);
return validateAccess(context)
.then(
topicsService.deleteTopic(getCluster(clusterName), topicName)
.thenReturn(ResponseEntity.ok().<Void>build())
).doOnEach(sig -> audit(context, sig));
}
@ -122,38 +132,40 @@ public class TopicsController extends AbstractController implements TopicsApi {
public Mono<ResponseEntity<Flux<TopicConfigDTO>>> getTopicConfigs(
String clusterName, String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW)
.build());
.operationName("getTopicConfigs")
.build();
return validateAccess.then(
return validateAccess(context).then(
topicsService.getTopicConfigs(getCluster(clusterName), topicName)
.map(lst -> lst.stream()
.map(InternalTopicConfig::from)
.map(clusterMapper::toTopicConfig)
.collect(toList()))
.toList())
.map(Flux::fromIterable)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<TopicDetailsDTO>> getTopicDetails(
String clusterName, String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW)
.build());
.operationName("getTopicDetails")
.build();
return validateAccess.then(
return validateAccess(context).then(
topicsService.getTopicDetails(getCluster(clusterName), topicName)
.map(clusterMapper::toTopicDetails)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -166,13 +178,19 @@ public class TopicsController extends AbstractController implements TopicsApi {
@Valid SortOrderDTO sortOrder,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.operationName("getTopics")
.build();
return topicsService.getTopicsForPagination(getCluster(clusterName))
.flatMap(existingTopics -> {
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
.flatMap(topics -> {
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
List<InternalTopic> filtered = existingTopics.stream()
List<InternalTopic> filtered = topics.stream()
.filter(topic -> !topic.isInternal()
|| showInternal != null && showInternal)
.filter(topic -> search == null || StringUtils.containsIgnoreCase(topic.getName(), search))
@ -188,15 +206,13 @@ public class TopicsController extends AbstractController implements TopicsApi {
.collect(toList());
return topicsService.loadTopics(getCluster(clusterName), topicsPage)
.flatMapMany(Flux::fromIterable)
.filterWhen(dto -> accessControlService.isTopicAccessible(dto, clusterName))
.collectList()
.map(topicsToRender ->
new TopicsResponseDTO()
.topics(topicsToRender.stream().map(clusterMapper::toTopic).collect(toList()))
.topics(topicsToRender.stream().map(clusterMapper::toTopic).toList())
.pageCount(totalPages));
})
.map(ResponseEntity::ok);
.map(ResponseEntity::ok)
.doOnEach(sig -> audit(context, sig));
}
@Override
@ -204,18 +220,19 @@ public class TopicsController extends AbstractController implements TopicsApi {
String clusterName, String topicName, @Valid Mono<TopicUpdateDTO> topicUpdate,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, EDIT)
.build());
.operationName("updateTopic")
.build();
return validateAccess.then(
return validateAccess(context).then(
topicsService
.updateTopic(getCluster(clusterName), topicName, topicUpdate)
.map(clusterMapper::toTopic)
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -224,17 +241,17 @@ public class TopicsController extends AbstractController implements TopicsApi {
Mono<PartitionsIncreaseDTO> partitionsIncrease,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, EDIT)
.build());
.build();
return validateAccess.then(
return validateAccess(context).then(
partitionsIncrease.flatMap(partitions ->
topicsService.increaseTopicPartitions(getCluster(clusterName), topicName, partitions)
).map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
@ -243,31 +260,34 @@ public class TopicsController extends AbstractController implements TopicsApi {
Mono<ReplicationFactorChangeDTO> replicationFactorChange,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW, EDIT)
.build());
.operationName("changeReplicationFactor")
.build();
return validateAccess.then(
return validateAccess(context).then(
replicationFactorChange
.flatMap(rfc ->
topicsService.changeReplicationFactor(getCluster(clusterName), topicName, rfc))
.map(ResponseEntity::ok)
);
).doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Void>> analyzeTopic(String clusterName, String topicName, ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());
.operationName("analyzeTopic")
.build();
return validateAccess.then(
return validateAccess(context).then(
topicAnalysisService.analyze(getCluster(clusterName), topicName)
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build())
);
}
@ -275,15 +295,17 @@ public class TopicsController extends AbstractController implements TopicsApi {
@Override
public Mono<ResponseEntity<Void>> cancelTopicAnalysis(String clusterName, String topicName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());
.operationName("cancelTopicAnalysis")
.build();
topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName);
return validateAccess.thenReturn(ResponseEntity.ok().build());
return validateAccess(context)
.then(Mono.fromRunnable(() -> topicAnalysisService.cancelAnalysis(getCluster(clusterName), topicName)))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
@ -292,15 +314,46 @@ public class TopicsController extends AbstractController implements TopicsApi {
String topicName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(MESSAGES_READ)
.build());
.operationName("getTopicAnalysis")
.build();
return validateAccess.thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
return validateAccess(context)
.thenReturn(topicAnalysisService.getTopicAnalysis(getCluster(clusterName), topicName)
.map(ResponseEntity::ok)
.orElseGet(() -> ResponseEntity.notFound().build()))
.doOnEach(sig -> audit(context, sig));
}
@Override
public Mono<ResponseEntity<Flux<TopicProducerStateDTO>>> getActiveProducerStates(String clusterName,
String topicName,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topic(topicName)
.topicActions(VIEW)
.operationName("getActiveProducerStates")
.build();
Comparator<TopicProducerStateDTO> ordering =
Comparator.comparingInt(TopicProducerStateDTO::getPartition)
.thenComparing(Comparator.comparing(TopicProducerStateDTO::getProducerId).reversed());
Flux<TopicProducerStateDTO> states = topicsService.getActiveProducersState(getCluster(clusterName), topicName)
.flatMapMany(statesMap ->
Flux.fromStream(
statesMap.entrySet().stream()
.flatMap(e -> e.getValue().stream().map(p -> clusterMapper.map(e.getKey().partition(), p)))
.sorted(ordering)));
return validateAccess(context)
.thenReturn(states)
.map(ResponseEntity::ok)
.orElseGet(() -> ResponseEntity.notFound().build()));
.doOnEach(sig -> audit(context, sig));
}
private Comparator<InternalTopic> getComparatorForTopic(

View file

@ -1,38 +1,23 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.time.Duration;
import java.time.Instant;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
public abstract class AbstractEmitter {
abstract class AbstractEmitter implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final MessagesProcessing messagesProcessing;
private final PollingThrottler throttler;
protected final PollingSettings pollingSettings;
private final PollingSettings pollingSettings;
protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
this.messagesProcessing = messagesProcessing;
this.pollingSettings = pollingSettings;
this.throttler = pollingSettings.getPollingThrottler();
}
protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
return poll(sink, consumer, pollingSettings.getPollTimeout());
}
protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
Instant start = Instant.now();
ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
Instant finish = Instant.now();
int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
throttler.throttleAfterPoll(polledBytes);
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
var records = consumer.pollEnhanced(pollingSettings.getPollTimeout());
sendConsuming(sink, records);
return records;
}
@ -40,19 +25,16 @@ public abstract class AbstractEmitter {
return messagesProcessing.limitReached();
}
protected void sendMessage(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecord<Bytes, Bytes> msg) {
messagesProcessing.sendMsg(sink, msg);
protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
messagesProcessing.send(sink, records);
}
protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
messagesProcessing.sendPhase(sink, name);
}
protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> records,
long elapsed) {
return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
messagesProcessing.sentConsumingInfo(sink, records);
}
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {

View file

@ -0,0 +1,60 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
public class BackwardEmitter extends RangePollingEmitter {
public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
false,
messagesPerPage
),
pollingSettings
);
}
@Override
protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
SeekOperations seekOperations) {
TreeMap<TopicPartition, Long> readToOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
if (prevRange.isEmpty()) {
readToOffsets.putAll(seekOperations.getOffsetsForSeek());
} else {
readToOffsets.putAll(
prevRange.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().from()))
);
}
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readToOffsets.size());
TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
readToOffsets.forEach((tp, toOffset) -> {
long tpStartOffset = seekOperations.getBeginOffsets().get(tp);
if (toOffset > tpStartOffset) {
result.put(tp, new FromToOffset(Math.max(tpStartOffset, toOffset - msgsToPollPerPartition), toOffset));
}
});
return result;
}
}

View file

@ -1,128 +0,0 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;
public BackwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
super(messagesProcessing, pollingSettings);
this.consumerPosition = consumerPosition;
this.messagesPerPage = messagesPerPage;
this.consumerSupplier = consumerSupplier;
}
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting backward polling for {}", consumerPosition);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
sendPhase(sink, "Created consumer");
var seekOperations = SeekOperations.create(consumer, consumerPosition);
var readUntilOffsets = new TreeMap<TopicPartition, Long>(Comparator.comparingInt(TopicPartition::partition));
readUntilOffsets.putAll(seekOperations.getOffsetsForSeek());
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readUntilOffsets.size());
log.debug("'Until' offsets for polling: {}", readUntilOffsets);
while (!sink.isCancelled() && !readUntilOffsets.isEmpty() && !sendLimitReached()) {
new TreeMap<>(readUntilOffsets).forEach((tp, readToOffset) -> {
if (sink.isCancelled()) {
return; //fast return in case of sink cancellation
}
long beginOffset = seekOperations.getBeginOffsets().get(tp);
long readFromOffset = Math.max(beginOffset, readToOffset - msgsToPollPerPartition);
partitionPollIteration(tp, readFromOffset, readToOffset, consumer, sink)
.forEach(r -> sendMessage(sink, r));
if (beginOffset == readFromOffset) {
// we fully read this partition -> removing it from polling iterations
readUntilOffsets.remove(tp);
} else {
// updating 'to' offset for next polling iteration
readUntilOffsets.put(tp, readFromOffset);
}
});
if (readUntilOffsets.isEmpty()) {
log.debug("begin reached after partitions poll iteration");
} else if (sink.isCancelled()) {
log.debug("sink is cancelled after partitions poll iteration");
}
}
sendFinishStatsAndCompleteSink(sink);
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
sink.complete();
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
sink.error(e);
}
}
private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
TopicPartition tp,
long fromOffset,
long toOffset,
Consumer<Bytes, Bytes> consumer,
FluxSink<TopicMessageEventDTO> sink
) {
consumer.assign(Collections.singleton(tp));
consumer.seek(tp, fromOffset);
sendPhase(sink, String.format("Polling partition: %s from offset %s", tp, fromOffset));
int desiredMsgsToPoll = (int) (toOffset - fromOffset);
var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& recordsToSend.size() < desiredMsgsToPoll
&& !emptyPolls.noDataEmptyPollsReached()) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
emptyPolls.count(polledRecords);
log.debug("{} records polled from {}", polledRecords.count(), tp);
var filteredRecords = polledRecords.records(tp).stream()
.filter(r -> r.offset() < toOffset)
.toList();
if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
// we already read all messages in target offsets interval
break;
}
recordsToSend.addAll(filteredRecords);
}
log.debug("{} records to send", recordsToSend.size());
Collections.reverse(recordsToSend);
return recordsToSend;
}
}

View file

@ -2,9 +2,6 @@ package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
class ConsumingStats {
@ -12,41 +9,37 @@ class ConsumingStats {
private long bytes = 0;
private int records = 0;
private long elapsed = 0;
private int filterApplyErrors = 0;
/**
* returns bytes polled.
*/
int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed,
int filterApplyErrors) {
int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
bytes += polledBytes;
this.records += polledRecords.count();
this.elapsed += elapsed;
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
bytes += polledRecords.bytes();
records += polledRecords.count();
elapsed += polledRecords.elapsed().toMillis();
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
.consuming(createConsumingStats(sink, filterApplyErrors))
.consuming(createConsumingStats())
);
return polledBytes;
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
void incFilterApplyError() {
filterApplyErrors++;
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.consuming(createConsumingStats(sink, filterApplyErrors))
.consuming(createConsumingStats())
);
}
private TopicMessageConsumingDTO createConsumingStats(FluxSink<TopicMessageEventDTO> sink,
int filterApplyErrors) {
private TopicMessageConsumingDTO createConsumingStats() {
return new TopicMessageConsumingDTO()
.bytesConsumed(this.bytes)
.elapsedMs(this.elapsed)
.isCancelled(sink.isCancelled())
.bytesConsumed(bytes)
.elapsedMs(elapsed)
.isCancelled(false)
.filterApplyErrors(filterApplyErrors)
.messagesConsumed(this.records);
.messagesConsumed(records);
}
}

View file

@ -1,28 +0,0 @@
package com.provectus.kafka.ui.emitter;
import org.apache.kafka.clients.consumer.ConsumerRecords;
// In some situations it is hard to say whether records range (between two offsets) was fully polled.
// This happens when we have holes in records sequences that is usual case for compact topics or
// topics with transactional writes. In such cases if you want to poll all records between offsets X and Y
// there is no guarantee that you will ever see record with offset Y.
// To workaround this we can assume that after N consecutive empty polls all target messages were read.
public class EmptyPollsCounter {
private final int maxEmptyPolls;
private int emptyPolls = 0;
EmptyPollsCounter(int maxEmptyPolls) {
this.maxEmptyPolls = maxEmptyPolls;
}
public void count(ConsumerRecords<?, ?> polled) {
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
}
public boolean noDataEmptyPollsReached() {
return emptyPolls >= maxEmptyPolls;
}
}

View file

@ -0,0 +1,82 @@
package com.provectus.kafka.ui.emitter;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;
public class EnhancedConsumer extends KafkaConsumer<Bytes, Bytes> {
private final PollingThrottler throttler;
private final ApplicationMetrics metrics;
private String pollingTopic;
public EnhancedConsumer(Properties properties,
PollingThrottler throttler,
ApplicationMetrics metrics) {
super(properties, new BytesDeserializer(), new BytesDeserializer());
this.throttler = throttler;
this.metrics = metrics;
metrics.activeConsumers().incrementAndGet();
}
public PolledRecords pollEnhanced(Duration dur) {
var stopwatch = Stopwatch.createStarted();
ConsumerRecords<Bytes, Bytes> polled = poll(dur);
PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
metrics.meterPolledRecords(pollingTopic, polledEnhanced, throttled);
return polledEnhanced;
}
@Override
public void assign(Collection<TopicPartition> partitions) {
super.assign(partitions);
Set<String> assignedTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
Preconditions.checkState(assignedTopics.size() == 1);
this.pollingTopic = assignedTopics.iterator().next();
}
@Override
public void subscribe(Pattern pattern) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Collection<String> topics) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void close(Duration timeout) {
metrics.activeConsumers().decrementAndGet();
super.close(timeout);
}
}

View file

@ -0,0 +1,61 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
public class ForwardEmitter extends RangePollingEmitter {
public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
true,
messagesPerPage
),
pollingSettings
);
}
@Override
protected TreeMap<TopicPartition, FromToOffset> nextPollingRange(TreeMap<TopicPartition, FromToOffset> prevRange,
SeekOperations seekOperations) {
TreeMap<TopicPartition, Long> readFromOffsets = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
if (prevRange.isEmpty()) {
readFromOffsets.putAll(seekOperations.getOffsetsForSeek());
} else {
readFromOffsets.putAll(
prevRange.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().to()))
);
}
int msgsToPollPerPartition = (int) Math.ceil((double) messagesPerPage / readFromOffsets.size());
TreeMap<TopicPartition, FromToOffset> result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition));
readFromOffsets.forEach((tp, fromOffset) -> {
long tpEndOffset = seekOperations.getEndOffsets().get(tp);
if (fromOffset < tpEndOffset) {
result.put(tp, new FromToOffset(fromOffset, Math.min(tpEndOffset, fromOffset + msgsToPollPerPartition)));
}
});
return result;
}
}

View file

@ -1,66 +0,0 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class ForwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final ConsumerPosition position;
public ForwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
ConsumerPosition position,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
super(messagesProcessing, pollingSettings);
this.position = position;
this.consumerSupplier = consumerSupplier;
}
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting forward polling for {}", position);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
sendPhase(sink, "Assigning partitions");
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& !seekOperations.assignedPartitionsFullyPolled()
&& !emptyPolls.noDataEmptyPollsReached()) {
sendPhase(sink, "Polling");
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
emptyPolls.count(records);
log.debug("{} records polled", records.count());
for (ConsumerRecord<Bytes, Bytes> msg : records) {
sendMessage(sink, msg);
}
}
sendFinishStatsAndCompleteSink(sink);
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
sink.complete();
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
sink.error(e);
}
}
}

View file

@ -1,71 +1,75 @@
package com.provectus.kafka.ui.emitter;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.model.TopicMessagePhaseDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class MessagesProcessing {
@RequiredArgsConstructor
class MessagesProcessing {
private final ConsumingStats consumingStats = new ConsumingStats();
private long sentMessages = 0;
private int filterApplyErrors = 0;
private final ConsumerRecordDeserializer deserializer;
private final Predicate<TopicMessageDTO> filter;
private final boolean ascendingSortBeforeSend;
private final @Nullable Integer limit;
public MessagesProcessing(ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
@Nullable Integer limit) {
this.deserializer = deserializer;
this.filter = filter;
this.limit = limit;
}
boolean limitReached() {
return limit != null && sentMessages >= limit;
}
void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> rec) {
if (!sink.isCancelled() && !limitReached()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
.message(topicMessage)
);
sentMessages++;
}
} catch (Exception e) {
filterApplyErrors++;
log.trace("Error applying filter for message {}", topicMessage);
}
}
void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> polled) {
sortForSending(polled, ascendingSortBeforeSend)
.forEach(rec -> {
if (!limitReached() && !sink.isCancelled()) {
TopicMessageDTO topicMessage = deserializer.deserialize(rec);
try {
if (filter.test(topicMessage)) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.MESSAGE)
.message(topicMessage)
);
sentMessages++;
}
} catch (Exception e) {
consumingStats.incFilterApplyError();
log.trace("Error applying filter for message {}", topicMessage);
}
}
});
}
int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed) {
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
if (!sink.isCancelled()) {
return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
consumingStats.sendConsumingEvt(sink, polledRecords);
}
return 0;
}
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
if (!sink.isCancelled()) {
consumingStats.sendFinishEvent(sink, filterApplyErrors);
consumingStats.sendFinishEvent(sink);
}
}
@ -79,4 +83,30 @@ public class MessagesProcessing {
}
}
/*
* Sorting by timestamps, BUT requesting that records within same partitions should be ordered by offsets.
*/
@VisibleForTesting
static Iterable<ConsumerRecord<Bytes, Bytes>> sortForSending(Iterable<ConsumerRecord<Bytes, Bytes>> records,
boolean asc) {
Comparator<ConsumerRecord> offsetComparator = asc
? Comparator.comparingLong(ConsumerRecord::offset)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::offset).reversed();
// partition -> sorted by offsets records
Map<Integer, List<ConsumerRecord<Bytes, Bytes>>> perPartition = Streams.stream(records)
.collect(
groupingBy(
ConsumerRecord::partition,
TreeMap::new,
collectingAndThen(toList(), lst -> lst.stream().sorted(offsetComparator).toList())));
Comparator<ConsumerRecord> tsComparator = asc
? Comparator.comparing(ConsumerRecord::timestamp)
: Comparator.<ConsumerRecord>comparingLong(ConsumerRecord::timestamp).reversed();
// merge-sorting records from partitions one by one using timestamp comparator
return Iterables.mergeSorted(perPartition.values(), tsComparator);
}
}

View file

@ -5,15 +5,15 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
@Slf4j
@Getter
public class OffsetsInfo {
class OffsetsInfo {
private final Consumer<?, ?> consumer;
@ -23,16 +23,15 @@ public class OffsetsInfo {
private final Set<TopicPartition> nonEmptyPartitions = new HashSet<>();
private final Set<TopicPartition> emptyPartitions = new HashSet<>();
public OffsetsInfo(Consumer<?, ?> consumer, String topic) {
OffsetsInfo(Consumer<?, ?> consumer, String topic) {
this(consumer,
consumer.partitionsFor(topic).stream()
.map(pi -> new TopicPartition(topic, pi.partition()))
.collect(Collectors.toList())
.toList()
);
}
public OffsetsInfo(Consumer<?, ?> consumer,
Collection<TopicPartition> targetPartitions) {
OffsetsInfo(Consumer<?, ?> consumer, Collection<TopicPartition> targetPartitions) {
this.consumer = consumer;
this.beginOffsets = consumer.beginningOffsets(targetPartitions);
this.endOffsets = consumer.endOffsets(targetPartitions);
@ -46,8 +45,8 @@ public class OffsetsInfo {
});
}
public boolean assignedPartitionsFullyPolled() {
for (var tp: consumer.assignment()) {
boolean assignedPartitionsFullyPolled() {
for (var tp : consumer.assignment()) {
Preconditions.checkArgument(endOffsets.containsKey(tp));
if (endOffsets.get(tp) > consumer.position(tp)) {
return false;
@ -56,4 +55,10 @@ public class OffsetsInfo {
return true;
}
long summaryOffsetsRange() {
MutableLong cnt = new MutableLong();
nonEmptyPartitions.forEach(tp -> cnt.add(endOffsets.get(tp) - beginOffsets.get(tp)));
return cnt.getValue();
}
}

View file

@ -0,0 +1,48 @@
package com.provectus.kafka.ui.emitter;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Bytes;
public record PolledRecords(int count,
int bytes,
Duration elapsed,
ConsumerRecords<Bytes, Bytes> records) implements Iterable<ConsumerRecord<Bytes, Bytes>> {
static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
return new PolledRecords(
polled.count(),
calculatePolledRecSize(polled),
pollDuration,
polled
);
}
public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
return records.records(tp);
}
@Override
public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
return records.iterator();
}
private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
int polledBytes = 0;
for (ConsumerRecord<Bytes, Bytes> rec : recs) {
for (Header header : rec.headers()) {
polledBytes +=
(header.key() != null ? header.key().getBytes().length : 0)
+ (header.value() != null ? header.value().length : 0);
}
polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
}
return polledBytes;
}
}

View file

@ -8,13 +8,8 @@ import java.util.function.Supplier;
public class PollingSettings {
private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(1_000);
private static final Duration DEFAULT_PARTITION_POLL_TIMEOUT = Duration.ofMillis(200);
private static final int DEFAULT_NO_DATA_EMPTY_POLLS = 3;
private final Duration pollTimeout;
private final Duration partitionPollTimeout;
private final int notDataEmptyPolls; //see EmptyPollsCounter docs
private final Supplier<PollingThrottler> throttlerSupplier;
public static PollingSettings create(ClustersProperties.Cluster cluster,
@ -26,18 +21,8 @@ public class PollingSettings {
? Duration.ofMillis(pollingProps.getPollTimeoutMs())
: DEFAULT_POLL_TIMEOUT;
var partitionPollTimeout = pollingProps.getPartitionPollTimeout() != null
? Duration.ofMillis(pollingProps.getPartitionPollTimeout())
: Duration.ofMillis(pollTimeout.toMillis() / 5);
int noDataEmptyPolls = pollingProps.getNoDataEmptyPolls() != null
? pollingProps.getNoDataEmptyPolls()
: DEFAULT_NO_DATA_EMPTY_POLLS;
return new PollingSettings(
pollTimeout,
partitionPollTimeout,
noDataEmptyPolls,
PollingThrottler.throttlerSupplier(cluster)
);
}
@ -45,34 +30,20 @@ public class PollingSettings {
public static PollingSettings createDefault() {
return new PollingSettings(
DEFAULT_POLL_TIMEOUT,
DEFAULT_PARTITION_POLL_TIMEOUT,
DEFAULT_NO_DATA_EMPTY_POLLS,
PollingThrottler::noop
);
}
private PollingSettings(Duration pollTimeout,
Duration partitionPollTimeout,
int notDataEmptyPolls,
Supplier<PollingThrottler> throttlerSupplier) {
this.pollTimeout = pollTimeout;
this.partitionPollTimeout = partitionPollTimeout;
this.notDataEmptyPolls = notDataEmptyPolls;
this.throttlerSupplier = throttlerSupplier;
}
public EmptyPollsCounter createEmptyPollsCounter() {
return new EmptyPollsCounter(notDataEmptyPolls);
}
public Duration getPollTimeout() {
return pollTimeout;
}
public Duration getPartitionPollTimeout() {
return partitionPollTimeout;
}
public PollingThrottler getPollingThrottler() {
return throttlerSupplier.get();
}

View file

@ -3,11 +3,8 @@ package com.provectus.kafka.ui.emitter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.provectus.kafka.ui.config.ClustersProperties;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
@Slf4j
public class PollingThrottler {
@ -36,18 +33,17 @@ public class PollingThrottler {
return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
}
public void throttleAfterPoll(int polledBytes) {
//returns true if polling was throttled
public boolean throttleAfterPoll(int polledBytes) {
if (polledBytes > 0) {
double sleptSeconds = rateLimiter.acquire(polledBytes);
if (!throttled && sleptSeconds > 0.0) {
throttled = true;
log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
return true;
}
}
}
public void throttleAfterPoll(ConsumerRecords<Bytes, Bytes> polled) {
throttleAfterPoll(ConsumerRecordsUtil.calculatePolledSize(polled));
return false;
}
}

View file

@ -0,0 +1,98 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
abstract class RangePollingEmitter extends AbstractEmitter {
private final Supplier<EnhancedConsumer> consumerSupplier;
protected final ConsumerPosition consumerPosition;
protected final int messagesPerPage;
protected RangePollingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
super(messagesProcessing, pollingSettings);
this.consumerPosition = consumerPosition;
this.messagesPerPage = messagesPerPage;
this.consumerSupplier = consumerSupplier;
}
protected record FromToOffset(/*inclusive*/ long from, /*exclusive*/ long to) {
}
//should return empty map if polling should be stopped
protected abstract TreeMap<TopicPartition, FromToOffset> nextPollingRange(
TreeMap<TopicPartition, FromToOffset> prevRange, //empty on start
SeekOperations seekOperations
);
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting polling for {}", consumerPosition);
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Consumer created");
var seekOperations = SeekOperations.create(consumer, consumerPosition);
TreeMap<TopicPartition, FromToOffset> pollRange = nextPollingRange(new TreeMap<>(), seekOperations);
log.debug("Starting from offsets {}", pollRange);
while (!sink.isCancelled() && !pollRange.isEmpty() && !sendLimitReached()) {
var polled = poll(consumer, sink, pollRange);
send(sink, polled);
pollRange = nextPollingRange(pollRange, seekOperations);
}
if (sink.isCancelled()) {
log.debug("Polling finished due to sink cancellation");
}
sendFinishStatsAndCompleteSink(sink);
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
sink.complete();
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
sink.error(e);
}
}
private List<ConsumerRecord<Bytes, Bytes>> poll(EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink,
TreeMap<TopicPartition, FromToOffset> range) {
log.trace("Polling range {}", range);
sendPhase(sink,
"Polling partitions: %s".formatted(range.keySet().stream().map(TopicPartition::partition).sorted().toList()));
consumer.assign(range.keySet());
range.forEach((tp, fromTo) -> consumer.seek(tp, fromTo.from));
List<ConsumerRecord<Bytes, Bytes>> result = new ArrayList<>();
while (!sink.isCancelled() && consumer.paused().size() < range.size()) {
var polledRecords = poll(sink, consumer);
range.forEach((tp, fromTo) -> {
polledRecords.records(tp).stream()
.filter(r -> r.offset() < fromTo.to)
.forEach(result::add);
//next position is out of target range -> pausing partition
if (consumer.position(tp) >= fromTo.to) {
consumer.pause(List.of(tp));
}
});
}
consumer.resume(consumer.paused());
return result;
}
}

View file

@ -10,17 +10,18 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
class SeekOperations {
public class SeekOperations {
private final Consumer<?, ?> consumer;
private final OffsetsInfo offsetsInfo;
private final Map<TopicPartition, Long> offsetsForSeek; //only contains non-empty partitions!
static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
public static SeekOperations create(Consumer<?, ?> consumer, ConsumerPosition consumerPosition) {
OffsetsInfo offsetsInfo;
if (consumerPosition.getSeekTo() == null) {
offsetsInfo = new OffsetsInfo(consumer, consumerPosition.getTopic());
@ -34,25 +35,37 @@ class SeekOperations {
);
}
void assignAndSeekNonEmptyPartitions() {
public void assignAndSeekNonEmptyPartitions() {
consumer.assign(offsetsForSeek.keySet());
offsetsForSeek.forEach(consumer::seek);
}
Map<TopicPartition, Long> getBeginOffsets() {
public Map<TopicPartition, Long> getBeginOffsets() {
return offsetsInfo.getBeginOffsets();
}
Map<TopicPartition, Long> getEndOffsets() {
public Map<TopicPartition, Long> getEndOffsets() {
return offsetsInfo.getEndOffsets();
}
boolean assignedPartitionsFullyPolled() {
public boolean assignedPartitionsFullyPolled() {
return offsetsInfo.assignedPartitionsFullyPolled();
}
// sum of (end - start) offsets for all partitions
public long summaryOffsetsRange() {
return offsetsInfo.summaryOffsetsRange();
}
// sum of differences between initial consumer seek and current consumer position (across all partitions)
public long offsetsProcessedFromSeek() {
MutableLong count = new MutableLong();
offsetsForSeek.forEach((tp, initialOffset) -> count.add(consumer.position(tp) - initialOffset));
return count.getValue();
}
// Get offsets to seek to. NOTE: offsets do not contain empty partitions offsets
Map<TopicPartition, Long> getOffsetsForSeek() {
public Map<TopicPartition, Long> getOffsetsForSeek() {
return offsetsForSeek;
}
@ -61,19 +74,19 @@ class SeekOperations {
*/
@VisibleForTesting
static Map<TopicPartition, Long> getOffsetsForSeek(Consumer<?, ?> consumer,
OffsetsInfo offsetsInfo,
SeekTypeDTO seekType,
@Nullable Map<TopicPartition, Long> seekTo) {
OffsetsInfo offsetsInfo,
SeekTypeDTO seekType,
@Nullable Map<TopicPartition, Long> seekTo) {
switch (seekType) {
case LATEST:
return consumer.endOffsets(offsetsInfo.getNonEmptyPartitions());
case BEGINNING:
return consumer.beginningOffsets(offsetsInfo.getNonEmptyPartitions());
case OFFSET:
Preconditions.checkNotNull(offsetsInfo);
Preconditions.checkNotNull(seekTo);
return fixOffsets(offsetsInfo, seekTo);
case TIMESTAMP:
Preconditions.checkNotNull(offsetsInfo);
Preconditions.checkNotNull(seekTo);
return offsetsForTimestamp(consumer, offsetsInfo, seekTo);
default:
throw new IllegalStateException();
@ -100,7 +113,7 @@ class SeekOperations {
}
private static Map<TopicPartition, Long> offsetsForTimestamp(Consumer<?, ?> consumer, OffsetsInfo offsetsInfo,
Map<TopicPartition, Long> timestamps) {
Map<TopicPartition, Long> timestamps) {
timestamps = new HashMap<>(timestamps);
timestamps.keySet().retainAll(offsetsInfo.getNonEmptyPartitions());

View file

@ -1,27 +1,28 @@
package com.provectus.kafka.ui.emitter;
import com.provectus.kafka.ui.model.ConsumerPosition;
import com.provectus.kafka.ui.model.TopicMessageDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.serdes.ConsumerRecordDeserializer;
import java.util.HashMap;
import java.util.function.Predicate;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
@Slf4j
public class TailingEmitter extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {
public class TailingEmitter extends AbstractEmitter {
private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
public TailingEmitter(Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
public TailingEmitter(Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
MessagesProcessing messagesProcessing,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
super(messagesProcessing, pollingSettings);
super(new MessagesProcessing(deserializer, filter, false, null), pollingSettings);
this.consumerSupplier = consumerSupplier;
this.consumerPosition = consumerPosition;
}
@ -29,12 +30,12 @@ public class TailingEmitter extends AbstractEmitter
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting tailing polling for {}", consumerPosition);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
assignAndSeek(consumer);
while (!sink.isCancelled()) {
sendPhase(sink, "Polling");
var polled = poll(sink, consumer);
polled.forEach(r -> sendMessage(sink, r));
send(sink, polled);
}
sink.complete();
log.debug("Tailing finished");
@ -47,7 +48,7 @@ public class TailingEmitter extends AbstractEmitter
}
}
private void assignAndSeek(KafkaConsumer<Bytes, Bytes> consumer) {
private void assignAndSeek(EnhancedConsumer consumer) {
var seekOperations = SeekOperations.create(consumer, consumerPosition);
var seekOffsets = new HashMap<>(seekOperations.getEndOffsets()); // defaulting offsets to topic end
seekOffsets.putAll(seekOperations.getOffsetsForSeek()); // this will only set non-empty partitions

View file

@ -106,7 +106,7 @@ public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHan
err.setFieldName(e.getKey());
err.setRestrictions(List.copyOf(e.getValue()));
return err;
}).collect(Collectors.toList());
}).toList();
var message = fieldsErrors.isEmpty()
? exception.getMessage()

View file

@ -0,0 +1,7 @@
package com.provectus.kafka.ui.exception;
public class JsonAvroConversionException extends ValidationException {
public JsonAvroConversionException(String message) {
super(message);
}
}

Some files were not shown because too many files have changed in this diff Show more