Browse Source

Merge pull request #16957 from MHBauer/eventsservice-refactor

refactor access to daemon member EventsService
David Calavera 9 years ago
parent
commit
b27fa6c58e
4 changed files with 24 additions and 14 deletions
  1. 4 5
      api/server/router/local/info.go
  2. 6 0
      daemon/daemon.go
  3. 10 5
      daemon/events/events.go
  4. 4 4
      daemon/events/events_test.go

+ 4 - 5
api/server/router/local/info.go

@@ -88,12 +88,11 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
 	}
 
 	enc := buildOutputEncoder(w)
-	d := s.daemon
-	es := d.EventsService
-	current, l := es.Subscribe()
-	defer es.Evict(l)
 
-	eventFilter := d.GetEventFilter(ef)
+	current, l, cancel := s.daemon.SubscribeToEvents()
+	defer cancel()
+
+	eventFilter := s.daemon.GetEventFilter(ef)
 	handleEvent := func(ev *jsonmessage.JSONMessage) error {
 		if eventFilter.Include(ev) {
 			if err := enc.Encode(ev); err != nil {

+ 6 - 0
daemon/daemon.go

@@ -39,6 +39,7 @@ import (
 	"github.com/docker/docker/pkg/graphdb"
 	"github.com/docker/docker/pkg/idtools"
 	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/jsonmessage"
 	"github.com/docker/docker/pkg/namesgenerator"
 	"github.com/docker/docker/pkg/nat"
 	"github.com/docker/docker/pkg/parsers/filters"
@@ -548,6 +549,11 @@ func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter {
 	return events.NewFilter(filter, daemon.GetLabels)
 }
 
+// SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events.
+func (daemon *Daemon) SubscribeToEvents() ([]*jsonmessage.JSONMessage, chan interface{}, func()) {
+	return daemon.EventsService.Subscribe()
+}
+
 // GetLabels for a container or image id
 func (daemon *Daemon) GetLabels(id string) map[string]string {
 	// TODO: TestCase

+ 10 - 5
daemon/events/events.go

@@ -25,16 +25,21 @@ func New() *Events {
 	}
 }
 
-// Subscribe adds new listener to events, returns slice of 64 stored last events
-// channel in which you can expect new events in form of interface{}, so you
-// need type assertion.
-func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) {
+// Subscribe adds new listener to events, returns slice of 64 stored
+// last events, a channel in which you can expect new events (in form
+// of interface{}, so you need type assertion), and a function to call
+// to stop the stream of events.
+func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}, func()) {
 	e.mu.Lock()
 	current := make([]*jsonmessage.JSONMessage, len(e.events))
 	copy(current, e.events)
 	l := e.pub.Subscribe()
 	e.mu.Unlock()
-	return current, l
+
+	cancel := func() {
+		e.Evict(l)
+	}
+	return current, l, cancel
 }
 
 // Evict evicts listener from pubsub

+ 4 - 4
daemon/events/events_test.go

@@ -10,8 +10,8 @@ import (
 
 func TestEventsLog(t *testing.T) {
 	e := New()
-	_, l1 := e.Subscribe()
-	_, l2 := e.Subscribe()
+	_, l1, _ := e.Subscribe()
+	_, l2, _ := e.Subscribe()
 	defer e.Evict(l1)
 	defer e.Evict(l2)
 	count := e.SubscribersCount()
@@ -65,7 +65,7 @@ func TestEventsLog(t *testing.T) {
 
 func TestEventsLogTimeout(t *testing.T) {
 	e := New()
-	_, l := e.Subscribe()
+	_, l, _ := e.Subscribe()
 	defer e.Evict(l)
 
 	c := make(chan struct{})
@@ -91,7 +91,7 @@ func TestLogEvents(t *testing.T) {
 		e.Log(action, id, from)
 	}
 	time.Sleep(50 * time.Millisecond)
-	current, l := e.Subscribe()
+	current, l, _ := e.Subscribe()
 	for i := 0; i < 10; i++ {
 		num := i + eventsLimit + 16
 		action := fmt.Sprintf("action_%d", num)