Pārlūkot izejas kodu

Fallback to file polling for jsonlog reader on err

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
Brian Goff 9 gadi atpakaļ
vecāks
revīzija
c136a33c5b

+ 2 - 201
daemon/logger/jsonfilelog/jsonfilelog.go

@@ -7,29 +7,20 @@ import (
 	"bytes"
 	"encoding/json"
 	"fmt"
-	"io"
 	"os"
 	"strconv"
 	"sync"
-	"time"
-
-	"gopkg.in/fsnotify.v1"
 
 	"github.com/Sirupsen/logrus"
 	"github.com/docker/docker/daemon/logger"
-	"github.com/docker/docker/pkg/ioutils"
 	"github.com/docker/docker/pkg/jsonlog"
 	"github.com/docker/docker/pkg/pubsub"
-	"github.com/docker/docker/pkg/tailfile"
 	"github.com/docker/docker/pkg/timeutils"
 	"github.com/docker/docker/pkg/units"
 )
 
-const (
-	// Name is the name of the file that the jsonlogger logs to.
-	Name               = "json-file"
-	maxJSONDecodeRetry = 20000
-)
+// Name is the name of the file that the jsonlogger logs to.
+const Name = "json-file"
 
 // JSONFileLogger is Logger implementation for default Docker logging.
 type JSONFileLogger struct {
@@ -228,193 +219,3 @@ func (l *JSONFileLogger) Close() error {
 func (l *JSONFileLogger) Name() string {
 	return Name
 }
-
-func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
-	l.Reset()
-	if err := dec.Decode(l); err != nil {
-		return nil, err
-	}
-	msg := &logger.Message{
-		Source:    l.Stream,
-		Timestamp: l.Created,
-		Line:      []byte(l.Log),
-	}
-	return msg, nil
-}
-
-// ReadLogs implements the logger's LogReader interface for the logs
-// created by this driver.
-func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
-	logWatcher := logger.NewLogWatcher()
-
-	go l.readLogs(logWatcher, config)
-	return logWatcher
-}
-
-func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
-	defer close(logWatcher.Msg)
-
-	pth := l.ctx.LogPath
-	var files []io.ReadSeeker
-	for i := l.n; i > 1; i-- {
-		f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
-		if err != nil {
-			if !os.IsNotExist(err) {
-				logWatcher.Err <- err
-				break
-			}
-			continue
-		}
-		defer f.Close()
-		files = append(files, f)
-	}
-
-	latestFile, err := os.Open(pth)
-	if err != nil {
-		logWatcher.Err <- err
-		return
-	}
-	defer latestFile.Close()
-
-	files = append(files, latestFile)
-	tailer := ioutils.MultiReadSeeker(files...)
-
-	if config.Tail != 0 {
-		tailFile(tailer, logWatcher, config.Tail, config.Since)
-	}
-
-	if !config.Follow {
-		return
-	}
-
-	if config.Tail >= 0 {
-		latestFile.Seek(0, os.SEEK_END)
-	}
-
-	l.mu.Lock()
-	l.readers[logWatcher] = struct{}{}
-	l.mu.Unlock()
-
-	notifyRotate := l.notifyRotate.Subscribe()
-	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
-
-	l.mu.Lock()
-	delete(l.readers, logWatcher)
-	l.mu.Unlock()
-
-	l.notifyRotate.Evict(notifyRotate)
-}
-
-func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
-	var rdr io.Reader = f
-	if tail > 0 {
-		ls, err := tailfile.TailFile(f, tail)
-		if err != nil {
-			logWatcher.Err <- err
-			return
-		}
-		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
-	}
-	dec := json.NewDecoder(rdr)
-	l := &jsonlog.JSONLog{}
-	for {
-		msg, err := decodeLogLine(dec, l)
-		if err != nil {
-			if err != io.EOF {
-				logWatcher.Err <- err
-			}
-			return
-		}
-		if !since.IsZero() && msg.Timestamp.Before(since) {
-			continue
-		}
-		logWatcher.Msg <- msg
-	}
-}
-
-func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
-	dec := json.NewDecoder(f)
-	l := &jsonlog.JSONLog{}
-	fileWatcher, err := fsnotify.NewWatcher()
-	if err != nil {
-		logWatcher.Err <- err
-		return
-	}
-	defer fileWatcher.Close()
-	if err := fileWatcher.Add(f.Name()); err != nil {
-		logWatcher.Err <- err
-		return
-	}
-
-	var retries int
-	for {
-		msg, err := decodeLogLine(dec, l)
-		if err != nil {
-			if err != io.EOF {
-				// try again because this shouldn't happen
-				if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
-					dec = json.NewDecoder(f)
-					retries++
-					continue
-				}
-
-				// io.ErrUnexpectedEOF is returned from json.Decoder when there is
-				// remaining data in the parser's buffer while an io.EOF occurs.
-				// If the json logger writes a partial json log entry to the disk
-				// while at the same time the decoder tries to decode it, the race codition happens.
-				if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
-					reader := io.MultiReader(dec.Buffered(), f)
-					dec = json.NewDecoder(reader)
-					retries++
-					continue
-				}
-				logWatcher.Err <- err
-				return
-			}
-
-			select {
-			case <-fileWatcher.Events:
-				dec = json.NewDecoder(f)
-				continue
-			case <-fileWatcher.Errors:
-				logWatcher.Err <- err
-				return
-			case <-logWatcher.WatchClose():
-				return
-			case <-notifyRotate:
-				fileWatcher.Remove(f.Name())
-
-				f, err = os.Open(f.Name())
-				if err != nil {
-					logWatcher.Err <- err
-					return
-				}
-				if err := fileWatcher.Add(f.Name()); err != nil {
-					logWatcher.Err <- err
-				}
-				dec = json.NewDecoder(f)
-				continue
-			}
-		}
-
-		retries = 0 // reset retries since we've succeeded
-		if !since.IsZero() && msg.Timestamp.Before(since) {
-			continue
-		}
-		select {
-		case logWatcher.Msg <- msg:
-		case <-logWatcher.WatchClose():
-			logWatcher.Msg <- msg
-			for {
-				msg, err := decodeLogLine(dec, l)
-				if err != nil {
-					return
-				}
-				if !since.IsZero() && msg.Timestamp.Before(since) {
-					continue
-				}
-				logWatcher.Msg <- msg
-			}
-		}
-	}
-}

+ 216 - 0
daemon/logger/jsonfilelog/read.go

@@ -0,0 +1,216 @@
+package jsonfilelog
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"os"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+	"github.com/docker/docker/daemon/logger"
+	"github.com/docker/docker/pkg/filenotify"
+	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/jsonlog"
+	"github.com/docker/docker/pkg/tailfile"
+)
+
+const maxJSONDecodeRetry = 20000
+
+func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
+	l.Reset()
+	if err := dec.Decode(l); err != nil {
+		return nil, err
+	}
+	msg := &logger.Message{
+		Source:    l.Stream,
+		Timestamp: l.Created,
+		Line:      []byte(l.Log),
+	}
+	return msg, nil
+}
+
+// ReadLogs implements the logger's LogReader interface for the logs
+// created by this driver.
+func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
+	logWatcher := logger.NewLogWatcher()
+
+	go l.readLogs(logWatcher, config)
+	return logWatcher
+}
+
+func (l *JSONFileLogger) readLogs(logWatcher *logger.LogWatcher, config logger.ReadConfig) {
+	defer close(logWatcher.Msg)
+
+	pth := l.ctx.LogPath
+	var files []io.ReadSeeker
+	for i := l.n; i > 1; i-- {
+		f, err := os.Open(fmt.Sprintf("%s.%d", pth, i-1))
+		if err != nil {
+			if !os.IsNotExist(err) {
+				logWatcher.Err <- err
+				break
+			}
+			continue
+		}
+		defer f.Close()
+		files = append(files, f)
+	}
+
+	latestFile, err := os.Open(pth)
+	if err != nil {
+		logWatcher.Err <- err
+		return
+	}
+	defer latestFile.Close()
+
+	files = append(files, latestFile)
+	tailer := ioutils.MultiReadSeeker(files...)
+
+	if config.Tail != 0 {
+		tailFile(tailer, logWatcher, config.Tail, config.Since)
+	}
+
+	if !config.Follow {
+		return
+	}
+
+	if config.Tail >= 0 {
+		latestFile.Seek(0, os.SEEK_END)
+	}
+
+	l.mu.Lock()
+	l.readers[logWatcher] = struct{}{}
+	l.mu.Unlock()
+
+	notifyRotate := l.notifyRotate.Subscribe()
+	followLogs(latestFile, logWatcher, notifyRotate, config.Since)
+
+	l.mu.Lock()
+	delete(l.readers, logWatcher)
+	l.mu.Unlock()
+
+	l.notifyRotate.Evict(notifyRotate)
+}
+
+func tailFile(f io.ReadSeeker, logWatcher *logger.LogWatcher, tail int, since time.Time) {
+	var rdr io.Reader = f
+	if tail > 0 {
+		ls, err := tailfile.TailFile(f, tail)
+		if err != nil {
+			logWatcher.Err <- err
+			return
+		}
+		rdr = bytes.NewBuffer(bytes.Join(ls, []byte("\n")))
+	}
+	dec := json.NewDecoder(rdr)
+	l := &jsonlog.JSONLog{}
+	for {
+		msg, err := decodeLogLine(dec, l)
+		if err != nil {
+			if err != io.EOF {
+				logWatcher.Err <- err
+			}
+			return
+		}
+		if !since.IsZero() && msg.Timestamp.Before(since) {
+			continue
+		}
+		logWatcher.Msg <- msg
+	}
+}
+
+func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan interface{}, since time.Time) {
+	dec := json.NewDecoder(f)
+	l := &jsonlog.JSONLog{}
+
+	fileWatcher, err := filenotify.New()
+	if err != nil {
+		logWatcher.Err <- err
+	}
+	defer fileWatcher.Close()
+
+	var retries int
+	for {
+		msg, err := decodeLogLine(dec, l)
+		if err != nil {
+			if err != io.EOF {
+				// try again because this shouldn't happen
+				if _, ok := err.(*json.SyntaxError); ok && retries <= maxJSONDecodeRetry {
+					dec = json.NewDecoder(f)
+					retries++
+					continue
+				}
+
+				// io.ErrUnexpectedEOF is returned from json.Decoder when there is
+				// remaining data in the parser's buffer while an io.EOF occurs.
+				// If the json logger writes a partial json log entry to the disk
+				// while at the same time the decoder tries to decode it, the race codition happens.
+				if err == io.ErrUnexpectedEOF && retries <= maxJSONDecodeRetry {
+					reader := io.MultiReader(dec.Buffered(), f)
+					dec = json.NewDecoder(reader)
+					retries++
+					continue
+				}
+				logWatcher.Err <- err
+				return
+			}
+
+			logrus.WithField("logger", "json-file").Debugf("waiting for events")
+			if err := fileWatcher.Add(f.Name()); err != nil {
+				logrus.WithField("logger", "json-file").Warn("falling back to file poller")
+				fileWatcher.Close()
+				fileWatcher = filenotify.NewPollingWatcher()
+				if err := fileWatcher.Add(f.Name()); err != nil {
+					logrus.Errorf("error watching log file for modifications: %v", err)
+					logWatcher.Err <- err
+				}
+			}
+			select {
+			case <-fileWatcher.Events():
+				dec = json.NewDecoder(f)
+				fileWatcher.Remove(f.Name())
+				continue
+			case <-fileWatcher.Errors():
+				fileWatcher.Remove(f.Name())
+				logWatcher.Err <- err
+				return
+			case <-logWatcher.WatchClose():
+				fileWatcher.Remove(f.Name())
+				return
+			case <-notifyRotate:
+				f, err = os.Open(f.Name())
+				if err != nil {
+					logWatcher.Err <- err
+					return
+				}
+
+				dec = json.NewDecoder(f)
+				fileWatcher.Remove(f.Name())
+				fileWatcher.Add(f.Name())
+				continue
+			}
+		}
+
+		retries = 0 // reset retries since we've succeeded
+		if !since.IsZero() && msg.Timestamp.Before(since) {
+			continue
+		}
+		select {
+		case logWatcher.Msg <- msg:
+		case <-logWatcher.WatchClose():
+			logWatcher.Msg <- msg
+			for {
+				msg, err := decodeLogLine(dec, l)
+				if err != nil {
+					return
+				}
+				if !since.IsZero() && msg.Timestamp.Before(since) {
+					continue
+				}
+				logWatcher.Msg <- msg
+			}
+		}
+	}
+}

+ 40 - 0
pkg/filenotify/filenotify.go

@@ -0,0 +1,40 @@
+// Package filenotify provides a mechanism for watching file(s) for changes.
+// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support.
+// These are wrapped up in a common interface so that either can be used interchangably in your code.
+package filenotify
+
+import "gopkg.in/fsnotify.v1"
+
+// FileWatcher is an interface for implementing file notification watchers
+type FileWatcher interface {
+	Events() <-chan fsnotify.Event
+	Errors() <-chan error
+	Add(name string) error
+	Remove(name string) error
+	Close() error
+}
+
+// New tries to use an fs-event watcher, and falls back to the poller if there is an error
+func New() (FileWatcher, error) {
+	if watcher, err := NewEventWatcher(); err == nil {
+		return watcher, nil
+	}
+	return NewPollingWatcher(), nil
+}
+
+// NewPollingWatcher returns a poll-based file watcher
+func NewPollingWatcher() FileWatcher {
+	return &filePoller{
+		events: make(chan fsnotify.Event),
+		errors: make(chan error),
+	}
+}
+
+// NewEventWatcher returns an fs-event based file watcher
+func NewEventWatcher() (FileWatcher, error) {
+	watcher, err := fsnotify.NewWatcher()
+	if err != nil {
+		return nil, err
+	}
+	return &fsNotifyWatcher{watcher}, nil
+}

+ 18 - 0
pkg/filenotify/fsnotify.go

@@ -0,0 +1,18 @@
+package filenotify
+
+import "gopkg.in/fsnotify.v1"
+
+// fsNotify wraps the fsnotify package to satisfy the FileNotifer interface
+type fsNotifyWatcher struct {
+	*fsnotify.Watcher
+}
+
+// GetEvents returns the fsnotify event channel receiver
+func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event {
+	return w.Watcher.Events
+}
+
+// GetErrors returns the fsnotify error channel receiver
+func (w *fsNotifyWatcher) Errors() <-chan error {
+	return w.Watcher.Errors
+}

+ 205 - 0
pkg/filenotify/poller.go

@@ -0,0 +1,205 @@
+package filenotify
+
+import (
+	"errors"
+	"fmt"
+	"os"
+	"sync"
+	"time"
+
+	"github.com/Sirupsen/logrus"
+
+	"gopkg.in/fsnotify.v1"
+)
+
+var (
+	// errPollerClosed is returned when the poller is closed
+	errPollerClosed = errors.New("poller is closed")
+	// errNoSuchPoller is returned when trying to remove a watch that doesn't exist
+	errNoSuchWatch = errors.New("poller does not exist")
+)
+
+// watchWaitTime is the time to wait between file poll loops
+const watchWaitTime = 200 * time.Millisecond
+
+// filePoller is used to poll files for changes, especially in cases where fsnotify
+// can't be run (e.g. when inotify handles are exhausted)
+// filePoller satifies the FileWatcher interface
+type filePoller struct {
+	// watches is the list of files currently being polled, close the associated channel to stop the watch
+	watches map[string]chan struct{}
+	// events is the channel to listen to for watch events
+	events chan fsnotify.Event
+	// errors is the channel to listen to for watch errors
+	errors chan error
+	// mu locks the poller for modification
+	mu sync.Mutex
+	// closed is used to specify when the poller has already closed
+	closed bool
+}
+
+// Add adds a filename to the list of watches
+// once added the file is polled for changes in a separate goroutine
+func (w *filePoller) Add(name string) error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+
+	if w.closed == true {
+		return errPollerClosed
+	}
+
+	f, err := os.Open(name)
+	if err != nil {
+		return err
+	}
+	fi, err := os.Stat(name)
+	if err != nil {
+		return err
+	}
+
+	if w.watches == nil {
+		w.watches = make(map[string]chan struct{})
+	}
+	if _, exists := w.watches[name]; exists {
+		return fmt.Errorf("watch exists")
+	}
+	chClose := make(chan struct{})
+	w.watches[name] = chClose
+
+	go w.watch(f, fi, chClose)
+	return nil
+}
+
+// Remove stops and removes watch with the specified name
+func (w *filePoller) Remove(name string) error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	return w.remove(name)
+}
+
+func (w *filePoller) remove(name string) error {
+	if w.closed == true {
+		return errPollerClosed
+	}
+
+	chClose, exists := w.watches[name]
+	if !exists {
+		return errNoSuchWatch
+	}
+	close(chClose)
+	delete(w.watches, name)
+	return nil
+}
+
+// Events returns the event channel
+// This is used for notifications on events about watched files
+func (w *filePoller) Events() <-chan fsnotify.Event {
+	return w.events
+}
+
+// Errors returns the errors channel
+// This is used for notifications about errors on watched files
+func (w *filePoller) Errors() <-chan error {
+	return w.errors
+}
+
+// Close closes the poller
+// All watches are stopped, removed, and the poller cannot be added to
+func (w *filePoller) Close() error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+
+	if w.closed {
+		return nil
+	}
+
+	w.closed = true
+	for name := range w.watches {
+		w.remove(name)
+		delete(w.watches, name)
+	}
+	close(w.events)
+	close(w.errors)
+	return nil
+}
+
+// sendEvent publishes the specified event to the events channel
+func (w *filePoller) sendEvent(e fsnotify.Event, chClose <-chan struct{}) error {
+	select {
+	case w.events <- e:
+	case <-chClose:
+		return fmt.Errorf("closed")
+	}
+	return nil
+}
+
+// sendErr publishes the specified error to the errors channel
+func (w *filePoller) sendErr(e error, chClose <-chan struct{}) error {
+	select {
+	case w.errors <- e:
+	case <-chClose:
+		return fmt.Errorf("closed")
+	}
+	return nil
+}
+
+// watch is responsible for polling the specified file for changes
+// upon finding changes to a file or errors, sendEvent/sendErr is called
+func (w *filePoller) watch(f *os.File, lastFi os.FileInfo, chClose chan struct{}) {
+	for {
+		time.Sleep(watchWaitTime)
+		select {
+		case <-chClose:
+			logrus.Debugf("watch for %s closed", f.Name())
+			return
+		default:
+		}
+
+		fi, err := os.Stat(f.Name())
+		if err != nil {
+			// if we got an error here and lastFi is not set, we can presume that nothing has changed
+			// This should be safe since before `watch()` is called, a stat is performed, there is any error `watch` is not called
+			if lastFi == nil {
+				continue
+			}
+			// If it doesn't exist at this point, it must have been removed
+			// no need to send the error here since this is a valid operation
+			if os.IsNotExist(err) {
+				if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Remove, Name: f.Name()}, chClose); err != nil {
+					return
+				}
+				lastFi = nil
+				continue
+			}
+			// at this point, send the error
+			if err := w.sendErr(err, chClose); err != nil {
+				return
+			}
+			continue
+		}
+
+		if lastFi == nil {
+			if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Create, Name: fi.Name()}, chClose); err != nil {
+				return
+			}
+			lastFi = fi
+			continue
+		}
+
+		if fi.Mode() != lastFi.Mode() {
+			if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Chmod, Name: fi.Name()}, chClose); err != nil {
+				return
+			}
+			lastFi = fi
+			continue
+		}
+
+		if fi.ModTime() != lastFi.ModTime() || fi.Size() != lastFi.Size() {
+			if err := w.sendEvent(fsnotify.Event{Op: fsnotify.Write, Name: fi.Name()}, chClose); err != nil {
+				return
+			}
+			lastFi = fi
+			continue
+		}
+	}
+}

+ 133 - 0
pkg/filenotify/poller_test.go

@@ -0,0 +1,133 @@
+package filenotify
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"testing"
+	"time"
+
+	"gopkg.in/fsnotify.v1"
+)
+
+func TestPollerAddRemove(t *testing.T) {
+	w := NewPollingWatcher()
+
+	if err := w.Add("no-such-file"); err == nil {
+		t.Fatal("should have gotten error when adding a non-existant file")
+	}
+	if err := w.Remove("no-such-file"); err == nil {
+		t.Fatal("should have gotten error when removing non-existant watch")
+	}
+
+	f, err := ioutil.TempFile("", "asdf")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(f.Name())
+
+	if err := w.Add(f.Name()); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := w.Remove(f.Name()); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestPollerEvent(t *testing.T) {
+	w := NewPollingWatcher()
+
+	f, err := ioutil.TempFile("", "test-poller")
+	if err != nil {
+		t.Fatal("error creating temp file")
+	}
+	defer os.RemoveAll(f.Name())
+	f.Close()
+
+	if err := w.Add(f.Name()); err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case <-w.Events():
+		t.Fatal("got event before anything happened")
+	case <-w.Errors():
+		t.Fatal("got error before anything happened")
+	default:
+	}
+
+	if err := ioutil.WriteFile(f.Name(), []byte("hello"), 644); err != nil {
+		t.Fatal(err)
+	}
+	if err := assertEvent(w, fsnotify.Write); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Chmod(f.Name(), 600); err != nil {
+		t.Fatal(err)
+	}
+	if err := assertEvent(w, fsnotify.Chmod); err != nil {
+		t.Fatal(err)
+	}
+
+	if err := os.Remove(f.Name()); err != nil {
+		t.Fatal(err)
+	}
+	if err := assertEvent(w, fsnotify.Remove); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestPollerClose(t *testing.T) {
+	w := NewPollingWatcher()
+	if err := w.Close(); err != nil {
+		t.Fatal(err)
+	}
+	// test double-close
+	if err := w.Close(); err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case _, open := <-w.Events():
+		if open {
+			t.Fatal("event chan should be closed")
+		}
+	default:
+		t.Fatal("event chan should be closed")
+	}
+
+	select {
+	case _, open := <-w.Errors():
+		if open {
+			t.Fatal("errors chan should be closed")
+		}
+	default:
+		t.Fatal("errors chan should be closed")
+	}
+
+	f, err := ioutil.TempFile("", "asdf")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(f.Name())
+	if err := w.Add(f.Name()); err == nil {
+		t.Fatal("should have gotten error adding watch for closed watcher")
+	}
+}
+
+func assertEvent(w FileWatcher, eType fsnotify.Op) error {
+	var err error
+	select {
+	case e := <-w.Events():
+		if e.Op != eType {
+			err = fmt.Errorf("got wrong event type, expected %q: %v", eType, e)
+		}
+	case e := <-w.Errors():
+		err = fmt.Errorf("got unexpected error waiting for events %v: %v", eType, e)
+	case <-time.After(watchWaitTime * 3):
+		err = fmt.Errorf("timeout waiting for event %v", eType)
+	}
+	return err
+}