Pārlūkot izejas kodu

Merge pull request #44221 from thaJeztah/migrate_pubsub

migrate pkg/pubsub to github.com/moby/pubsub
Sebastiaan van Stijn 2 gadi atpakaļ
vecāks
revīzija
62bc40c6e7

+ 1 - 1
daemon/cluster/controllers/plugin/controller_test.go

@@ -13,9 +13,9 @@ import (
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/registry"
 	"github.com/docker/docker/api/types/swarm/runtime"
-	"github.com/docker/docker/pkg/pubsub"
 	"github.com/docker/docker/plugin"
 	v2 "github.com/docker/docker/plugin/v2"
+	"github.com/moby/pubsub"
 	"github.com/sirupsen/logrus"
 )
 

+ 1 - 1
daemon/events/events.go

@@ -5,7 +5,7 @@ import (
 	"time"
 
 	eventtypes "github.com/docker/docker/api/types/events"
-	"github.com/docker/docker/pkg/pubsub"
+	"github.com/moby/pubsub"
 )
 
 const (

+ 1 - 1
daemon/stats/collector.go

@@ -8,7 +8,7 @@ import (
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/container"
 	"github.com/docker/docker/errdefs"
-	"github.com/docker/docker/pkg/pubsub"
+	"github.com/moby/pubsub"
 	"github.com/sirupsen/logrus"
 )
 

+ 7 - 117
pkg/pubsub/publisher.go

@@ -1,127 +1,17 @@
 package pubsub // import "github.com/docker/docker/pkg/pubsub"
 
-import (
-	"sync"
-	"time"
-)
-
-var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
+import "github.com/moby/pubsub"
 
 // NewPublisher creates a new pub/sub publisher to broadcast messages.
 // The duration is used as the send timeout as to not block the publisher publishing
 // messages to other clients if one client is slow or unresponsive.
 // The buffer is used when creating new channels for subscribers.
-func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
-	return &Publisher{
-		buffer:      buffer,
-		timeout:     publishTimeout,
-		subscribers: make(map[subscriber]topicFunc),
-	}
-}
-
-type subscriber chan interface{}
-type topicFunc func(v interface{}) bool
+//
+// Deprecated: use github.com/moby/pubsub.NewPublisher
+var NewPublisher = pubsub.NewPublisher
 
 // Publisher is basic pub/sub structure. Allows to send events and subscribe
 // to them. Can be safely used from multiple goroutines.
-type Publisher struct {
-	m           sync.RWMutex
-	buffer      int
-	timeout     time.Duration
-	subscribers map[subscriber]topicFunc
-}
-
-// Len returns the number of subscribers for the publisher
-func (p *Publisher) Len() int {
-	p.m.RLock()
-	i := len(p.subscribers)
-	p.m.RUnlock()
-	return i
-}
-
-// Subscribe adds a new subscriber to the publisher returning the channel.
-func (p *Publisher) Subscribe() chan interface{} {
-	return p.SubscribeTopic(nil)
-}
-
-// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
-func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
-	ch := make(chan interface{}, p.buffer)
-	p.m.Lock()
-	p.subscribers[ch] = topic
-	p.m.Unlock()
-	return ch
-}
-
-// SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic.
-// The returned channel has a buffer of the specified size.
-func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
-	ch := make(chan interface{}, buffer)
-	p.m.Lock()
-	p.subscribers[ch] = topic
-	p.m.Unlock()
-	return ch
-}
-
-// Evict removes the specified subscriber from receiving any more messages.
-func (p *Publisher) Evict(sub chan interface{}) {
-	p.m.Lock()
-	_, exists := p.subscribers[sub]
-	if exists {
-		delete(p.subscribers, sub)
-		close(sub)
-	}
-	p.m.Unlock()
-}
-
-// Publish sends the data in v to all subscribers currently registered with the publisher.
-func (p *Publisher) Publish(v interface{}) {
-	p.m.RLock()
-	if len(p.subscribers) == 0 {
-		p.m.RUnlock()
-		return
-	}
-
-	wg := wgPool.Get().(*sync.WaitGroup)
-	for sub, topic := range p.subscribers {
-		wg.Add(1)
-		go p.sendTopic(sub, topic, v, wg)
-	}
-	wg.Wait()
-	wgPool.Put(wg)
-	p.m.RUnlock()
-}
-
-// Close closes the channels to all subscribers registered with the publisher.
-func (p *Publisher) Close() {
-	p.m.Lock()
-	for sub := range p.subscribers {
-		delete(p.subscribers, sub)
-		close(sub)
-	}
-	p.m.Unlock()
-}
-
-func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
-	defer wg.Done()
-	if topic != nil && !topic(v) {
-		return
-	}
-
-	// send under a select as to not block if the receiver is unavailable
-	if p.timeout > 0 {
-		timeout := time.NewTimer(p.timeout)
-		defer timeout.Stop()
-
-		select {
-		case sub <- v:
-		case <-timeout.C:
-		}
-		return
-	}
-
-	select {
-	case sub <- v:
-	default:
-	}
-}
+//
+// Deprecated: use github.com/moby/pubsub.Publisher
+type Publisher = pubsub.Publisher

+ 0 - 142
pkg/pubsub/publisher_test.go

@@ -1,142 +0,0 @@
-package pubsub // import "github.com/docker/docker/pkg/pubsub"
-
-import (
-	"fmt"
-	"testing"
-	"time"
-)
-
-func TestSendToOneSub(t *testing.T) {
-	p := NewPublisher(100*time.Millisecond, 10)
-	c := p.Subscribe()
-
-	p.Publish("hi")
-
-	msg := <-c
-	if msg.(string) != "hi" {
-		t.Fatalf("expected message hi but received %v", msg)
-	}
-}
-
-func TestSendToMultipleSubs(t *testing.T) {
-	p := NewPublisher(100*time.Millisecond, 10)
-	var subs []chan interface{}
-	subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe())
-
-	p.Publish("hi")
-
-	for _, c := range subs {
-		msg := <-c
-		if msg.(string) != "hi" {
-			t.Fatalf("expected message hi but received %v", msg)
-		}
-	}
-}
-
-func TestEvictOneSub(t *testing.T) {
-	p := NewPublisher(100*time.Millisecond, 10)
-	s1 := p.Subscribe()
-	s2 := p.Subscribe()
-
-	p.Evict(s1)
-	p.Publish("hi")
-	if _, ok := <-s1; ok {
-		t.Fatal("expected s1 to not receive the published message")
-	}
-
-	msg := <-s2
-	if msg.(string) != "hi" {
-		t.Fatalf("expected message hi but received %v", msg)
-	}
-}
-
-func TestClosePublisher(t *testing.T) {
-	p := NewPublisher(100*time.Millisecond, 10)
-	var subs []chan interface{}
-	subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe())
-	p.Close()
-
-	for _, c := range subs {
-		if _, ok := <-c; ok {
-			t.Fatal("expected all subscriber channels to be closed")
-		}
-	}
-}
-
-const sampleText = "test"
-
-type testSubscriber struct {
-	dataCh chan interface{}
-	ch     chan error
-}
-
-func (s *testSubscriber) Wait() error {
-	return <-s.ch
-}
-
-func newTestSubscriber(p *Publisher) *testSubscriber {
-	ts := &testSubscriber{
-		dataCh: p.Subscribe(),
-		ch:     make(chan error),
-	}
-	go func() {
-		for data := range ts.dataCh {
-			s, ok := data.(string)
-			if !ok {
-				ts.ch <- fmt.Errorf("Unexpected type %T", data)
-				break
-			}
-			if s != sampleText {
-				ts.ch <- fmt.Errorf("Unexpected text %s", s)
-				break
-			}
-		}
-		close(ts.ch)
-	}()
-	return ts
-}
-
-// for testing with -race
-func TestPubSubRace(t *testing.T) {
-	p := NewPublisher(0, 1024)
-	var subs []*testSubscriber
-	for j := 0; j < 50; j++ {
-		subs = append(subs, newTestSubscriber(p))
-	}
-	for j := 0; j < 1000; j++ {
-		p.Publish(sampleText)
-	}
-	time.AfterFunc(1*time.Second, func() {
-		for _, s := range subs {
-			p.Evict(s.dataCh)
-		}
-	})
-	for _, s := range subs {
-		s.Wait()
-	}
-}
-
-func BenchmarkPubSub(b *testing.B) {
-	for i := 0; i < b.N; i++ {
-		b.StopTimer()
-		p := NewPublisher(0, 1024)
-		var subs []*testSubscriber
-		for j := 0; j < 50; j++ {
-			subs = append(subs, newTestSubscriber(p))
-		}
-		b.StartTimer()
-		for j := 0; j < 1000; j++ {
-			p.Publish(sampleText)
-		}
-		time.AfterFunc(1*time.Second, func() {
-			for _, s := range subs {
-				p.Evict(s.dataCh)
-			}
-		})
-		for _, s := range subs {
-			if err := s.Wait(); err != nil {
-				b.Fatal(err)
-			}
-		}
-	}
-}

+ 1 - 1
plugin/manager.go

@@ -19,9 +19,9 @@ import (
 	"github.com/docker/docker/pkg/authorization"
 	"github.com/docker/docker/pkg/containerfs"
 	"github.com/docker/docker/pkg/ioutils"
-	"github.com/docker/docker/pkg/pubsub"
 	v2 "github.com/docker/docker/plugin/v2"
 	"github.com/docker/docker/registry"
+	"github.com/moby/pubsub"
 	"github.com/opencontainers/go-digest"
 	specs "github.com/opencontainers/runtime-spec/specs-go"
 	"github.com/pkg/errors"

+ 1 - 0
vendor.mod

@@ -53,6 +53,7 @@ require (
 	github.com/moby/buildkit v0.10.4
 	github.com/moby/ipvs v1.0.2
 	github.com/moby/locker v1.0.1
+	github.com/moby/pubsub v1.0.0
 	github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a
 	github.com/moby/sys/mount v0.3.3
 	github.com/moby/sys/mountinfo v0.6.2

+ 2 - 0
vendor.sum

@@ -770,6 +770,8 @@ github.com/moby/ipvs v1.0.2 h1:NSbzuRTvfneftLU3VwPU5QuA6NZ0IUmqq9+VHcQxqHw=
 github.com/moby/ipvs v1.0.2/go.mod h1:2pngiyseZbIKXNv7hsKj3O9UEz30c53MT9005gt2hxQ=
 github.com/moby/locker v1.0.1 h1:fOXqR41zeveg4fFODix+1Ch4mj/gT0NE1XJbp/epuBg=
 github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc=
+github.com/moby/pubsub v1.0.0 h1:jkp/imWsmJz2f6LyFsk7EkVeN2HxR/HTTOY8kHrsxfA=
+github.com/moby/pubsub v1.0.0/go.mod h1:bXSO+3h5MNXXCaEG+6/NlAIk7MMZbySZlnB+cUQhKKc=
 github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c=
 github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a h1:gLcTxHH4egYVhMVFWRxvWsb79Ok4kfTt1/irZNyovUY=
 github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a/go.mod h1:/so6Lct4y1x14UprW/loFsOe6xoXVTlvh25V36ULXNQ=

+ 191 - 0
vendor/github.com/moby/pubsub/LICENSE

@@ -0,0 +1,191 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        https://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   Copyright 2013-2018 Docker, Inc.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       https://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

+ 19 - 0
vendor/github.com/moby/pubsub/NOTICE

@@ -0,0 +1,19 @@
+Docker
+Copyright 2012-2017 Docker, Inc.
+
+This product includes software developed at Docker, Inc. (https://www.docker.com).
+
+This product contains software (https://github.com/creack/pty) developed
+by Keith Rarick, licensed under the MIT License.
+
+The following is courtesy of our legal counsel:
+
+
+Use and transfer of Docker may be subject to certain restrictions by the
+United States and other governments.
+It is your responsibility to ensure that your use and/or transfer does not
+violate applicable laws.
+
+For more information, please see https://www.bis.doc.gov
+
+See also https://www.apache.org/dev/crypto.html and/or seek legal counsel.

+ 127 - 0
vendor/github.com/moby/pubsub/publisher.go

@@ -0,0 +1,127 @@
+package pubsub
+
+import (
+	"sync"
+	"time"
+)
+
+var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }}
+
+// NewPublisher creates a new pub/sub publisher to broadcast messages.
+// The duration is used as the send timeout as to not block the publisher publishing
+// messages to other clients if one client is slow or unresponsive.
+// The buffer is used when creating new channels for subscribers.
+func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
+	return &Publisher{
+		buffer:      buffer,
+		timeout:     publishTimeout,
+		subscribers: make(map[subscriber]topicFunc),
+	}
+}
+
+type subscriber chan interface{}
+type topicFunc func(v interface{}) bool
+
+// Publisher is basic pub/sub structure. Allows to send events and subscribe
+// to them. Can be safely used from multiple goroutines.
+type Publisher struct {
+	m           sync.RWMutex
+	buffer      int
+	timeout     time.Duration
+	subscribers map[subscriber]topicFunc
+}
+
+// Len returns the number of subscribers for the publisher
+func (p *Publisher) Len() int {
+	p.m.RLock()
+	i := len(p.subscribers)
+	p.m.RUnlock()
+	return i
+}
+
+// Subscribe adds a new subscriber to the publisher returning the channel.
+func (p *Publisher) Subscribe() chan interface{} {
+	return p.SubscribeTopic(nil)
+}
+
+// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
+func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
+	ch := make(chan interface{}, p.buffer)
+	p.m.Lock()
+	p.subscribers[ch] = topic
+	p.m.Unlock()
+	return ch
+}
+
+// SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic.
+// The returned channel has a buffer of the specified size.
+func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
+	ch := make(chan interface{}, buffer)
+	p.m.Lock()
+	p.subscribers[ch] = topic
+	p.m.Unlock()
+	return ch
+}
+
+// Evict removes the specified subscriber from receiving any more messages.
+func (p *Publisher) Evict(sub chan interface{}) {
+	p.m.Lock()
+	_, exists := p.subscribers[sub]
+	if exists {
+		delete(p.subscribers, sub)
+		close(sub)
+	}
+	p.m.Unlock()
+}
+
+// Publish sends the data in v to all subscribers currently registered with the publisher.
+func (p *Publisher) Publish(v interface{}) {
+	p.m.RLock()
+	if len(p.subscribers) == 0 {
+		p.m.RUnlock()
+		return
+	}
+
+	wg := wgPool.Get().(*sync.WaitGroup)
+	for sub, topic := range p.subscribers {
+		wg.Add(1)
+		go p.sendTopic(sub, topic, v, wg)
+	}
+	wg.Wait()
+	wgPool.Put(wg)
+	p.m.RUnlock()
+}
+
+// Close closes the channels to all subscribers registered with the publisher.
+func (p *Publisher) Close() {
+	p.m.Lock()
+	for sub := range p.subscribers {
+		delete(p.subscribers, sub)
+		close(sub)
+	}
+	p.m.Unlock()
+}
+
+func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
+	defer wg.Done()
+	if topic != nil && !topic(v) {
+		return
+	}
+
+	// send under a select as to not block if the receiver is unavailable
+	if p.timeout > 0 {
+		timeout := time.NewTimer(p.timeout)
+		defer timeout.Stop()
+
+		select {
+		case sub <- v:
+		case <-timeout.C:
+		}
+		return
+	}
+
+	select {
+	case sub <- v:
+	default:
+	}
+}

+ 3 - 0
vendor/modules.txt

@@ -593,6 +593,9 @@ github.com/moby/ipvs
 # github.com/moby/locker v1.0.1
 ## explicit; go 1.13
 github.com/moby/locker
+# github.com/moby/pubsub v1.0.0
+## explicit; go 1.19
+github.com/moby/pubsub
 # github.com/moby/swarmkit/v2 v2.0.0-20220721174824-48dd89375d0a
 ## explicit; go 1.17
 github.com/moby/swarmkit/v2/agent