PollingThrottler.java 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package com.provectus.kafka.ui.emitter;
  2. import com.google.common.annotations.VisibleForTesting;
  3. import com.google.common.util.concurrent.RateLimiter;
  4. import com.provectus.kafka.ui.config.ClustersProperties;
  5. import java.util.function.Supplier;
  6. import lombok.extern.slf4j.Slf4j;
  7. @Slf4j
  8. public class PollingThrottler {
  9. public static Supplier<PollingThrottler> throttlerSupplier(ClustersProperties.Cluster cluster) {
  10. Long rate = cluster.getPollingThrottleRate();
  11. if (rate == null || rate <= 0) {
  12. return PollingThrottler::noop;
  13. }
  14. // RateLimiter instance should be shared across all created throttlers
  15. var rateLimiter = RateLimiter.create(rate);
  16. return () -> new PollingThrottler(cluster.getName(), rateLimiter);
  17. }
  18. private final String clusterName;
  19. private final RateLimiter rateLimiter;
  20. private boolean throttled;
  21. @VisibleForTesting
  22. public PollingThrottler(String clusterName, RateLimiter rateLimiter) {
  23. this.clusterName = clusterName;
  24. this.rateLimiter = rateLimiter;
  25. }
  26. public static PollingThrottler noop() {
  27. return new PollingThrottler("noop", RateLimiter.create(Long.MAX_VALUE));
  28. }
  29. //returns true if polling was throttled
  30. public boolean throttleAfterPoll(int polledBytes) {
  31. if (polledBytes > 0) {
  32. double sleptSeconds = rateLimiter.acquire(polledBytes);
  33. if (!throttled && sleptSeconds > 0.0) {
  34. throttled = true;
  35. log.debug("Polling throttling enabled for cluster {} at rate {} bytes/sec", clusterName, rateLimiter.getRate());
  36. return true;
  37. }
  38. }
  39. return false;
  40. }
  41. }