Browse Source

discovery & test: Fix goroutine leaks by adding 1 buffer to channel

Signed-off-by: Ziheng Liu <lzhfromustc@gmail.com>
lzhfromustc 4 years ago
parent
commit
5ffcd162b5

+ 3 - 3
daemon/cluster/controllers/plugin/controller_test.go

@@ -108,7 +108,7 @@ func TestWaitCancel(t *testing.T) {
 	}
 	}
 
 
 	ctxCancel, cancel := context.WithCancel(ctx)
 	ctxCancel, cancel := context.WithCancel(ctx)
-	chErr := make(chan error)
+	chErr := make(chan error, 1)
 	go func() {
 	go func() {
 		chErr <- c.Wait(ctxCancel)
 		chErr <- c.Wait(ctxCancel)
 	}()
 	}()
@@ -134,7 +134,7 @@ func TestWaitDisabled(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	chErr := make(chan error)
+	chErr := make(chan error, 1)
 	go func() {
 	go func() {
 		chErr <- c.Wait(ctx)
 		chErr <- c.Wait(ctx)
 	}()
 	}()
@@ -215,7 +215,7 @@ func TestWaitEnabled(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
-	chErr := make(chan error)
+	chErr := make(chan error, 1)
 	go func() {
 	go func() {
 		chErr <- c.Wait(ctx)
 		chErr <- c.Wait(ctx)
 	}()
 	}()

+ 1 - 1
daemon/graphdriver/devmapper/devmapper_test.go

@@ -136,7 +136,7 @@ func TestDevmapperLockReleasedDeviceDeletion(t *testing.T) {
 	// DeviceSet Lock. If lock has not been released, this will hang.
 	// DeviceSet Lock. If lock has not been released, this will hang.
 	driver.DeviceSet.cleanupDeletedDevices()
 	driver.DeviceSet.cleanupDeletedDevices()
 
 
-	doneChan := make(chan bool)
+	doneChan := make(chan bool, 1)
 
 
 	go func() {
 	go func() {
 		driver.DeviceSet.Lock()
 		driver.DeviceSet.Lock()

+ 2 - 2
pkg/discovery/file/file.go

@@ -60,8 +60,8 @@ func (s *Discovery) fetch() (discovery.Entries, error) {
 
 
 // Watch is exported
 // Watch is exported
 func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
 func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
-	ch := make(chan discovery.Entries)
-	errCh := make(chan error)
+	ch := make(chan discovery.Entries, 1)
+	errCh := make(chan error, 1)
 	ticker := time.NewTicker(s.heartbeat)
 	ticker := time.NewTicker(s.heartbeat)
 
 
 	go func() {
 	go func() {

+ 2 - 2
pkg/discovery/memory/memory.go

@@ -33,8 +33,8 @@ func (s *Discovery) Initialize(_ string, heartbeat time.Duration, _ time.Duratio
 
 
 // Watch sends periodic discovery updates to a channel.
 // Watch sends periodic discovery updates to a channel.
 func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
 func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
-	ch := make(chan discovery.Entries)
-	errCh := make(chan error)
+	ch := make(chan discovery.Entries, 1)
+	errCh := make(chan error, 1)
 	ticker := time.NewTicker(s.heartbeat)
 	ticker := time.NewTicker(s.heartbeat)
 
 
 	go func() {
 	go func() {

+ 1 - 1
pkg/discovery/nodes/nodes.go

@@ -39,7 +39,7 @@ func (s *Discovery) Initialize(uris string, _ time.Duration, _ time.Duration, _
 
 
 // Watch is exported
 // Watch is exported
 func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
 func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) {
-	ch := make(chan discovery.Entries)
+	ch := make(chan discovery.Entries, 1)
 	go func() {
 	go func() {
 		defer close(ch)
 		defer close(ch)
 		ch <- s.entries
 		ch <- s.entries