|
@@ -68,12 +68,13 @@ public class ConsumingService {
|
|
assignPartitions(consumer);
|
|
assignPartitions(consumer);
|
|
seekOffsets(consumer);
|
|
seekOffsets(consumer);
|
|
int pollsCount = 0;
|
|
int pollsCount = 0;
|
|
- while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) {
|
|
|
|
|
|
+ while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) {
|
|
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
|
|
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
|
|
log.info("{} records polled", records.count());
|
|
log.info("{} records polled", records.count());
|
|
records.iterator()
|
|
records.iterator()
|
|
.forEachRemaining(sink::next);
|
|
.forEachRemaining(sink::next);
|
|
}
|
|
}
|
|
|
|
+ sink.complete();
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
log.error("Error occurred while consuming records", e);
|
|
log.error("Error occurred while consuming records", e);
|
|
throw new RuntimeException(e);
|
|
throw new RuntimeException(e);
|