|
@@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
|
+import org.apache.kafka.common.errors.InterruptException;
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
import org.apache.kafka.common.utils.Bytes;
|
|
import reactor.core.publisher.FluxSink;
|
|
import reactor.core.publisher.FluxSink;
|
|
|
|
|
|
@@ -59,6 +60,9 @@ public class ForwardRecordEmitter
|
|
}
|
|
}
|
|
sendFinishStatsAndCompleteSink(sink);
|
|
sendFinishStatsAndCompleteSink(sink);
|
|
log.debug("Polling finished");
|
|
log.debug("Polling finished");
|
|
|
|
+ } catch (InterruptException kafkaInterruptException) {
|
|
|
|
+ log.debug("Polling finished due to thread interruption");
|
|
|
|
+ sink.complete();
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
log.error("Error occurred while consuming records", e);
|
|
log.error("Error occurred while consuming records", e);
|
|
sink.error(e);
|
|
sink.error(e);
|