Selaa lähdekoodia

check if the acquis tomb is dying while processing logs in replay mode for file/s3/docker (#2152)

blotus 2 vuotta sitten
vanhempi
commit
0279e549bd

+ 19 - 14
pkg/acquisition/modules/docker/docker.go

@@ -311,21 +311,26 @@ func (d *DockerSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) er
 				scanner = bufio.NewScanner(reader)
 			}
 			for scanner.Scan() {
-				line := scanner.Text()
-				if line == "" {
-					continue
+				select {
+				case <-t.Dying():
+					d.logger.Infof("Shutting down reader for container %s", containerConfig.Name)
+				default:
+					line := scanner.Text()
+					if line == "" {
+						continue
+					}
+					l := types.Line{}
+					l.Raw = line
+					l.Labels = d.Config.Labels
+					l.Time = time.Now().UTC()
+					l.Src = containerConfig.Name
+					l.Process = true
+					l.Module = d.GetName()
+					linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
+					evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
+					out <- evt
+					d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
 				}
-				l := types.Line{}
-				l.Raw = line
-				l.Labels = d.Config.Labels
-				l.Time = time.Now().UTC()
-				l.Src = containerConfig.Name
-				l.Process = true
-				l.Module = d.GetName()
-				linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
-				evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
-				out <- evt
-				d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
 			}
 			err = scanner.Err()
 			if err != nil {

+ 21 - 15
pkg/acquisition/modules/file/file.go

@@ -514,22 +514,28 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
 		scanner.Buffer(buf, f.config.MaxBufferSize)
 	}
 	for scanner.Scan() {
-		if scanner.Text() == "" {
-			continue
-		}
-		l := types.Line{
-			Raw:     scanner.Text(),
-			Time:    time.Now().UTC(),
-			Src:     filename,
-			Labels:  f.config.Labels,
-			Process: true,
-			Module:  f.GetName(),
-		}
-		logger.Debugf("line %s", l.Raw)
-		linesRead.With(prometheus.Labels{"source": filename}).Inc()
+		select {
+		case <-t.Dying():
+			logger.Infof("File datasource %s stopping", filename)
+			return nil
+		default:
+			if scanner.Text() == "" {
+				continue
+			}
+			l := types.Line{
+				Raw:     scanner.Text(),
+				Time:    time.Now().UTC(),
+				Src:     filename,
+				Labels:  f.config.Labels,
+				Process: true,
+				Module:  f.GetName(),
+			}
+			logger.Debugf("line %s", l.Raw)
+			linesRead.With(prometheus.Labels{"source": filename}).Inc()
 
-		//we're reading logs at once, it must be time-machine buckets
-		out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
+			//we're reading logs at once, it must be time-machine buckets
+			out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
+		}
 	}
 	if err := scanner.Err(); err != nil {
 		logger.Errorf("Error while reading file: %s", err)

+ 23 - 16
pkg/acquisition/modules/s3/s3.go

@@ -418,23 +418,29 @@ func (s *S3Source) readFile(bucket string, key string) error {
 		scanner.Buffer(buf, s.Config.MaxBufferSize)
 	}
 	for scanner.Scan() {
-		text := scanner.Text()
-		logger.Tracef("Read line %s", text)
-		linesRead.WithLabelValues(bucket).Inc()
-		l := types.Line{}
-		l.Raw = text
-		l.Labels = s.Config.Labels
-		l.Time = time.Now().UTC()
-		l.Process = true
-		l.Module = s.GetName()
-		l.Src = bucket
-		var evt types.Event
-		if !s.Config.UseTimeMachine {
-			evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
-		} else {
-			evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
+		select {
+		case <-s.t.Dying():
+			s.logger.Infof("Shutting down reader for %s/%s", bucket, key)
+			return nil
+		default:
+			text := scanner.Text()
+			logger.Tracef("Read line %s", text)
+			linesRead.WithLabelValues(bucket).Inc()
+			l := types.Line{}
+			l.Raw = text
+			l.Labels = s.Config.Labels
+			l.Time = time.Now().UTC()
+			l.Process = true
+			l.Module = s.GetName()
+			l.Src = bucket
+			var evt types.Event
+			if !s.Config.UseTimeMachine {
+				evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
+			} else {
+				evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
+			}
+			s.out <- evt
 		}
-		s.out <- evt
 	}
 	if err := scanner.Err(); err != nil {
 		return fmt.Errorf("failed to read object %s/%s: %s", bucket, key, err)
@@ -629,6 +635,7 @@ func (s *S3Source) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error
 	s.out = out
 	s.ctx, s.cancel = context.WithCancel(context.Background())
 	s.Config.UseTimeMachine = true
+	s.t = t
 	if s.Config.Key != "" {
 		err := s.readFile(s.Config.BucketName, s.Config.Key)
 		if err != nil {