diff --git a/api/client/events.go b/api/client/events.go index 86ed0269b5..fa84eac518 100644 --- a/api/client/events.go +++ b/api/client/events.go @@ -26,7 +26,7 @@ func (cli *DockerCli) CmdEvents(args ...string) error { var ( v = url.Values{} - eventFilterArgs = filters.Args{} + eventFilterArgs = filters.NewArgs() ) // Consolidate all filter flags, and sanity check them early. @@ -53,7 +53,7 @@ func (cli *DockerCli) CmdEvents(args ...string) error { } v.Set("until", ts) } - if len(eventFilterArgs) > 0 { + if eventFilterArgs.Len() > 0 { filterJSON, err := filters.ToParam(eventFilterArgs) if err != nil { return err diff --git a/api/client/images.go b/api/client/images.go index ba26512a2d..26e7c9706e 100644 --- a/api/client/images.go +++ b/api/client/images.go @@ -36,7 +36,7 @@ func (cli *DockerCli) CmdImages(args ...string) error { // Consolidate all filter flags, and sanity check them early. // They'll get process in the daemon/server. - imageFilterArgs := filters.Args{} + imageFilterArgs := filters.NewArgs() for _, f := range flFilter.GetAll() { var err error imageFilterArgs, err = filters.ParseFlag(f, imageFilterArgs) @@ -47,7 +47,7 @@ func (cli *DockerCli) CmdImages(args ...string) error { matchName := cmd.Arg(0) v := url.Values{} - if len(imageFilterArgs) > 0 { + if imageFilterArgs.Len() > 0 { filterJSON, err := filters.ToParam(imageFilterArgs) if err != nil { return err diff --git a/api/client/ps.go b/api/client/ps.go index 76a00214a9..5bcf4bb2ee 100644 --- a/api/client/ps.go +++ b/api/client/ps.go @@ -20,7 +20,7 @@ func (cli *DockerCli) CmdPs(args ...string) error { var ( err error - psFilterArgs = filters.Args{} + psFilterArgs = filters.NewArgs() v = url.Values{} cmd = Cli.Subcmd("ps", nil, Cli.DockerCommands["ps"].Description, true) @@ -72,7 +72,7 @@ func (cli *DockerCli) CmdPs(args ...string) error { } } - if len(psFilterArgs) > 0 { + if psFilterArgs.Len() > 0 { filterJSON, err := filters.ToParam(psFilterArgs) if err != nil { return err diff --git a/api/client/volume.go b/api/client/volume.go index 1dc0ea2d04..08d83f5a70 100644 --- a/api/client/volume.go +++ b/api/client/volume.go @@ -54,7 +54,7 @@ func (cli *DockerCli) CmdVolumeLs(args ...string) error { cmd.Require(flag.Exact, 0) cmd.ParseFlags(args, true) - volFilterArgs := filters.Args{} + volFilterArgs := filters.NewArgs() for _, f := range flFilter.GetAll() { var err error volFilterArgs, err = filters.ParseFlag(f, volFilterArgs) @@ -64,7 +64,7 @@ func (cli *DockerCli) CmdVolumeLs(args ...string) error { } v := url.Values{} - if len(volFilterArgs) > 0 { + if volFilterArgs.Len() > 0 { filterJSON, err := filters.ToParam(volFilterArgs) if err != nil { return err diff --git a/api/server/router/local/info.go b/api/server/router/local/info.go index 1ebcae91b7..00314212bd 100644 --- a/api/server/router/local/info.go +++ b/api/server/router/local/info.go @@ -92,27 +92,11 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R enc := json.NewEncoder(output) - current, l, cancel := s.daemon.SubscribeToEvents() - defer cancel() + buffered, l := s.daemon.SubscribeToEvents(since, sinceNano, ef) + defer s.daemon.UnsubscribeFromEvents(l) - eventFilter := s.daemon.GetEventFilter(ef) - handleEvent := func(ev *jsonmessage.JSONMessage) error { - if eventFilter.Include(ev) { - if err := enc.Encode(ev); err != nil { - return err - } - } - return nil - } - - if since == -1 { - current = nil - } - for _, ev := range current { - if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) { - continue - } - if err := handleEvent(ev); err != nil { + for _, ev := range buffered { + if err := enc.Encode(ev); err != nil { return err } } @@ -129,7 +113,7 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R if !ok { continue } - if err := handleEvent(jev); err != nil { + if err := enc.Encode(jev); err != nil { return err } case <-timer.C: diff --git a/api/server/router/network/network_routes.go b/api/server/router/network/network_routes.go index 5b70c4f31d..f639481514 100644 --- a/api/server/router/network/network_routes.go +++ b/api/server/router/network/network_routes.go @@ -29,27 +29,23 @@ func (n *networkRouter) getNetworksList(ctx context.Context, w http.ResponseWrit } list := []*types.NetworkResource{} - var nameFilter, idFilter bool - var names, ids []string - if names, nameFilter = netFilters["name"]; nameFilter { - for _, name := range names { - if nw, err := n.backend.GetNetwork(name, daemon.NetworkByName); err == nil { - list = append(list, buildNetworkResource(nw)) - } else { - logrus.Errorf("failed to get network for filter=%s : %v", name, err) - } + netFilters.WalkValues("name", func(name string) error { + if nw, err := n.backend.GetNetwork(name, daemon.NetworkByName); err == nil { + list = append(list, buildNetworkResource(nw)) + } else { + logrus.Errorf("failed to get network for filter=%s : %v", name, err) } - } + return nil + }) - if ids, idFilter = netFilters["id"]; idFilter { - for _, id := range ids { - for _, nw := range n.backend.GetNetworksByID(id) { - list = append(list, buildNetworkResource(nw)) - } + netFilters.WalkValues("id", func(id string) error { + for _, nw := range n.backend.GetNetworksByID(id) { + list = append(list, buildNetworkResource(nw)) } - } + return nil + }) - if !nameFilter && !idFilter { + if !netFilters.Include("name") && !netFilters.Include("id") { nwList := n.backend.GetNetworksByID("") for _, nw := range nwList { list = append(list, buildNetworkResource(nw)) diff --git a/daemon/daemon.go b/daemon/daemon.go index 2081ef5247..ccc19bd211 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -532,24 +532,30 @@ func (daemon *Daemon) GetByName(name string) (*Container, error) { return e, nil } -// GetEventFilter returns a filters.Filter for a set of filters -func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter { +// getEventFilter returns a filters.Filter for a set of filters +func (daemon *Daemon) getEventFilter(filter filters.Args) *events.Filter { // incoming container filter can be name, id or partial id, convert to // a full container id - for i, cn := range filter["container"] { + for _, cn := range filter.Get("container") { c, err := daemon.Get(cn) - if err != nil { - filter["container"][i] = "" - } else { - filter["container"][i] = c.ID + filter.Del("container", cn) + if err == nil { + filter.Add("container", c.ID) } } return events.NewFilter(filter, daemon.GetLabels) } // SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events. -func (daemon *Daemon) SubscribeToEvents() ([]*jsonmessage.JSONMessage, chan interface{}, func()) { - return daemon.EventsService.Subscribe() +func (daemon *Daemon) SubscribeToEvents(since, sinceNano int64, filter filters.Args) ([]*jsonmessage.JSONMessage, chan interface{}) { + ef := daemon.getEventFilter(filter) + return daemon.EventsService.SubscribeTopic(since, sinceNano, ef) +} + +// UnsubscribeFromEvents stops the event subscription for a client by closing the +// channel where the daemon sends events to. +func (daemon *Daemon) UnsubscribeFromEvents(listener chan interface{}) { + daemon.EventsService.Evict(listener) } // GetLabels for a container or image id diff --git a/daemon/events/events.go b/daemon/events/events.go index 996c856f42..3674170fe3 100644 --- a/daemon/events/events.go +++ b/daemon/events/events.go @@ -8,7 +8,10 @@ import ( "github.com/docker/docker/pkg/pubsub" ) -const eventsLimit = 64 +const ( + eventsLimit = 64 + bufferSize = 1024 +) // Events is pubsub channel for *jsonmessage.JSONMessage type Events struct { @@ -21,7 +24,7 @@ type Events struct { func New() *Events { return &Events{ events: make([]*jsonmessage.JSONMessage, 0, eventsLimit), - pub: pubsub.NewPublisher(100*time.Millisecond, 1024), + pub: pubsub.NewPublisher(100*time.Millisecond, bufferSize), } } @@ -42,6 +45,41 @@ func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}, func return current, l, cancel } +// SubscribeTopic adds new listener to events, returns slice of 64 stored +// last events, a channel in which you can expect new events (in form +// of interface{}, so you need type assertion). +func (e *Events) SubscribeTopic(since, sinceNano int64, ef *Filter) ([]*jsonmessage.JSONMessage, chan interface{}) { + e.mu.Lock() + defer e.mu.Unlock() + + var buffered []*jsonmessage.JSONMessage + topic := func(m interface{}) bool { + return ef.Include(m.(*jsonmessage.JSONMessage)) + } + + if since != -1 { + for i := len(e.events) - 1; i >= 0; i-- { + ev := e.events[i] + if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) { + break + } + if ef.filter.Len() == 0 || topic(ev) { + buffered = append([]*jsonmessage.JSONMessage{ev}, buffered...) + } + } + } + + var ch chan interface{} + if ef.filter.Len() > 0 { + ch = e.pub.SubscribeTopic(topic) + } else { + // Subscribe to all events if there are no filters + ch = e.pub.Subscribe() + } + + return buffered, ch +} + // Evict evicts listener from pubsub func (e *Events) Evict(l chan interface{}) { e.pub.Evict(l) diff --git a/daemon/events/filter.go b/daemon/events/filter.go index ae7fba93d1..e15ca436d4 100644 --- a/daemon/events/filter.go +++ b/daemon/events/filter.go @@ -19,14 +19,14 @@ func NewFilter(filter filters.Args, getLabels func(id string) map[string]string) // Include returns true when the event ev is included by the filters func (ef *Filter) Include(ev *jsonmessage.JSONMessage) bool { - return isFieldIncluded(ev.Status, ef.filter["event"]) && - isFieldIncluded(ev.ID, ef.filter["container"]) && + return ef.filter.ExactMatch("event", ev.Status) && + ef.filter.ExactMatch("container", ev.ID) && ef.isImageIncluded(ev.ID, ev.From) && ef.isLabelFieldIncluded(ev.ID) } func (ef *Filter) isLabelFieldIncluded(id string) bool { - if _, ok := ef.filter["label"]; !ok { + if !ef.filter.Include("label") { return true } return ef.filter.MatchKVList("label", ef.getLabels(id)) @@ -37,31 +37,16 @@ func (ef *Filter) isLabelFieldIncluded(id string) bool { // from an image will be included in the image events. Also compare both // against the stripped repo name without any tags. func (ef *Filter) isImageIncluded(eventID string, eventFrom string) bool { - stripTag := func(image string) string { - ref, err := reference.ParseNamed(image) - if err != nil { - return image - } - return ref.Name() - } - - return isFieldIncluded(eventID, ef.filter["image"]) || - isFieldIncluded(eventFrom, ef.filter["image"]) || - isFieldIncluded(stripTag(eventID), ef.filter["image"]) || - isFieldIncluded(stripTag(eventFrom), ef.filter["image"]) + return ef.filter.ExactMatch("image", eventID) || + ef.filter.ExactMatch("image", eventFrom) || + ef.filter.ExactMatch("image", stripTag(eventID)) || + ef.filter.ExactMatch("image", stripTag(eventFrom)) } -func isFieldIncluded(field string, filter []string) bool { - if len(field) == 0 { - return true +func stripTag(image string) string { + ref, err := reference.ParseNamed(image) + if err != nil { + return image } - if len(filter) == 0 { - return true - } - for _, v := range filter { - if v == field { - return true - } - } - return false + return ref.Name() } diff --git a/daemon/images.go b/daemon/images.go index b4c506ee05..a2d3d7bac9 100644 --- a/daemon/images.go +++ b/daemon/images.go @@ -4,7 +4,6 @@ import ( "fmt" "path" "sort" - "strings" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" @@ -13,9 +12,9 @@ import ( "github.com/docker/docker/pkg/parsers/filters" ) -var acceptedImageFilterTags = map[string]struct{}{ - "dangling": {}, - "label": {}, +var acceptedImageFilterTags = map[string]bool{ + "dangling": true, + "label": true, } // byCreated is a temporary type used to sort a list of images by creation @@ -47,19 +46,15 @@ func (daemon *Daemon) Images(filterArgs, filter string, all bool) ([]*types.Imag if err != nil { return nil, err } - for name := range imageFilters { - if _, ok := acceptedImageFilterTags[name]; !ok { - return nil, fmt.Errorf("Invalid filter '%s'", name) - } + if err := imageFilters.Validate(acceptedImageFilterTags); err != nil { + return nil, err } - if i, ok := imageFilters["dangling"]; ok { - for _, value := range i { - if v := strings.ToLower(value); v == "true" { - danglingOnly = true - } else if v != "false" { - return nil, fmt.Errorf("Invalid filter 'dangling=%s'", v) - } + if imageFilters.Include("dangling") { + if imageFilters.ExactMatch("dangling", "true") { + danglingOnly = true + } else if !imageFilters.ExactMatch("dangling", "false") { + return nil, fmt.Errorf("Invalid filter 'dangling=%s'", imageFilters.Get("dangling")) } } @@ -82,9 +77,9 @@ func (daemon *Daemon) Images(filterArgs, filter string, all bool) ([]*types.Imag } for id, img := range allImages { - if _, ok := imageFilters["label"]; ok { + if imageFilters.Include("label") { + // Very old image that do not have image.Config (or even labels) if img.Config == nil { - // Very old image that do not have image.Config (or even labels) continue } // We are now sure image.Config is not nil diff --git a/daemon/list.go b/daemon/list.go index b64103a0b7..f289f2a66a 100644 --- a/daemon/list.go +++ b/daemon/list.go @@ -8,7 +8,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/api/types" - derr "github.com/docker/docker/errors" "github.com/docker/docker/image" "github.com/docker/docker/pkg/graphdb" "github.com/docker/docker/pkg/nat" @@ -136,64 +135,65 @@ func (daemon *Daemon) foldFilter(config *ContainersConfig) (*listContext, error) } var filtExited []int - if i, ok := psFilters["exited"]; ok { - for _, value := range i { - code, err := strconv.Atoi(value) - if err != nil { - return nil, err - } - filtExited = append(filtExited, code) + err = psFilters.WalkValues("exited", func(value string) error { + code, err := strconv.Atoi(value) + if err != nil { + return err } + filtExited = append(filtExited, code) + return nil + }) + if err != nil { + return nil, err } - if i, ok := psFilters["status"]; ok { - for _, value := range i { - if !isValidStateString(value) { - return nil, errors.New("Unrecognised filter value for status") - } - - config.All = true + err = psFilters.WalkValues("status", func(value string) error { + if !isValidStateString(value) { + return fmt.Errorf("Unrecognised filter value for status: %s", value) } + + config.All = true + return nil + }) + if err != nil { + return nil, err } var beforeContFilter, sinceContFilter *Container - if i, ok := psFilters["before"]; ok { - for _, value := range i { - beforeContFilter, err = daemon.Get(value) - if err != nil { - return nil, err - } - } + err = psFilters.WalkValues("before", func(value string) error { + beforeContFilter, err = daemon.Get(value) + return err + }) + if err != nil { + return nil, err } - if i, ok := psFilters["since"]; ok { - for _, value := range i { - sinceContFilter, err = daemon.Get(value) - if err != nil { - return nil, err - } - } + err = psFilters.WalkValues("since", func(value string) error { + sinceContFilter, err = daemon.Get(value) + return err + }) + if err != nil { + return nil, err } imagesFilter := map[image.ID]bool{} var ancestorFilter bool - if ancestors, ok := psFilters["ancestor"]; ok { + if psFilters.Include("ancestor") { ancestorFilter = true - // The idea is to walk the graph down the most "efficient" way. - for _, ancestor := range ancestors { - // First, get the imageId of the ancestor filter (yay) + psFilters.WalkValues("ancestor", func(ancestor string) error { id, err := daemon.GetImageID(ancestor) if err != nil { logrus.Warnf("Error while looking up for image %v", ancestor) - continue + return nil } if imagesFilter[id] { // Already seen this ancestor, skip it - continue + return nil } // Then walk down the graph and put the imageIds in imagesFilter populateImageFilterByParents(imagesFilter, id, daemon.imageStore.Children) - } + return nil + }) } names := make(map[string][]string) @@ -202,14 +202,14 @@ func (daemon *Daemon) foldFilter(config *ContainersConfig) (*listContext, error) return nil }, 1) - if config.Before != "" { + if config.Before != "" && beforeContFilter == nil { beforeContFilter, err = daemon.Get(config.Before) if err != nil { return nil, err } } - if config.Since != "" { + if config.Since != "" && sinceContFilter == nil { sinceContFilter, err = daemon.Get(config.Since) if err != nil { return nil, err @@ -397,17 +397,8 @@ func (daemon *Daemon) Volumes(filter string) ([]*types.Volume, error) { return nil, err } - filterUsed := false - if i, ok := volFilters["dangling"]; ok { - if len(i) > 1 { - return nil, derr.ErrorCodeDanglingOne - } - - filterValue := i[0] - if strings.ToLower(filterValue) == "true" || filterValue == "1" { - filterUsed = true - } - } + filterUsed := volFilters.Include("dangling") && + (volFilters.ExactMatch("dangling", "true") || volFilters.ExactMatch("dangling", "1")) volumes := daemon.volumes.List() for _, v := range volumes { diff --git a/integration-cli/docker_api_network_test.go b/integration-cli/docker_api_network_test.go index 2180cd10e4..bd0b5e490b 100644 --- a/integration-cli/docker_api_network_test.go +++ b/integration-cli/docker_api_network_test.go @@ -230,9 +230,9 @@ func isNetworkAvailable(c *check.C, name string) bool { func getNetworkIDByName(c *check.C, name string) string { var ( v = url.Values{} - filterArgs = filters.Args{} + filterArgs = filters.NewArgs() ) - filterArgs["name"] = []string{name} + filterArgs.Add("name", name) filterJSON, err := filters.ToParam(filterArgs) c.Assert(err, checker.IsNil) v.Set("filters", filterJSON) diff --git a/integration-cli/docker_cli_build_test.go b/integration-cli/docker_cli_build_test.go index d4b31bd38f..1707680dba 100644 --- a/integration-cli/docker_cli_build_test.go +++ b/integration-cli/docker_cli_build_test.go @@ -1891,9 +1891,7 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) { startEpoch := daemonTime(c).Unix() // Watch for events since epoch. - eventsCmd := exec.Command( - dockerBinary, "events", - "--since", strconv.FormatInt(startEpoch, 10)) + eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(startEpoch, 10)) stdout, err := eventsCmd.StdoutPipe() if err != nil { c.Fatal(err) @@ -1932,12 +1930,12 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) { c.Fatalf("failed to run build: %s", err) } - matchCID := regexp.MustCompile("Running in ") + matchCID := regexp.MustCompile("Running in (.+)") scanner := bufio.NewScanner(stdoutBuild) for scanner.Scan() { line := scanner.Text() - if ok := matchCID.MatchString(line); ok { - containerID <- line[len(line)-12:] + if matches := matchCID.FindStringSubmatch(line); len(matches) > 0 { + containerID <- matches[1] break } } diff --git a/integration-cli/docker_cli_images_test.go b/integration-cli/docker_cli_images_test.go index 027d8fe53c..9ed7762b0f 100644 --- a/integration-cli/docker_cli_images_test.go +++ b/integration-cli/docker_cli_images_test.go @@ -74,7 +74,7 @@ func (s *DockerSuite) TestImagesErrorWithInvalidFilterNameTest(c *check.C) { c.Assert(out, checker.Contains, "Invalid filter") } -func (s *DockerSuite) TestImagesFilterLabel(c *check.C) { +func (s *DockerSuite) TestImagesFilterLabelMatch(c *check.C) { testRequires(c, DaemonIsLinux) imageName1 := "images_filter_test1" imageName2 := "images_filter_test2" diff --git a/pkg/parsers/filters/parse.go b/pkg/parsers/filters/parse.go index 6c394f1607..7444201fae 100644 --- a/pkg/parsers/filters/parse.go +++ b/pkg/parsers/filters/parse.go @@ -5,6 +5,7 @@ package filters import ( "encoding/json" "errors" + "fmt" "regexp" "strings" ) @@ -15,7 +16,14 @@ import ( // in an slice. // e.g given -f 'label=label1=1' -f 'label=label2=2' -f 'image.name=ubuntu' // the args will be {'label': {'label1=1','label2=2'}, 'image.name', {'ubuntu'}} -type Args map[string][]string +type Args struct { + fields map[string]map[string]bool +} + +// NewArgs initializes a new Args struct. +func NewArgs() Args { + return Args{fields: map[string]map[string]bool{}} +} // ParseFlag parses the argument to the filter flag. Like // @@ -25,9 +33,6 @@ type Args map[string][]string // map is created. func ParseFlag(arg string, prev Args) (Args, error) { filters := prev - if prev == nil { - filters = Args{} - } if len(arg) == 0 { return filters, nil } @@ -37,9 +42,11 @@ func ParseFlag(arg string, prev Args) (Args, error) { } f := strings.SplitN(arg, "=", 2) + name := strings.ToLower(strings.TrimSpace(f[0])) value := strings.TrimSpace(f[1]) - filters[name] = append(filters[name], value) + + filters.Add(name, value) return filters, nil } @@ -50,11 +57,11 @@ var ErrBadFormat = errors.New("bad format of filter (expected name=value)") // ToParam packs the Args into an string for easy transport from client to server. func ToParam(a Args) (string, error) { // this way we don't URL encode {}, just empty space - if len(a) == 0 { + if a.Len() == 0 { return "", nil } - buf, err := json.Marshal(a) + buf, err := json.Marshal(a.fields) if err != nil { return "", err } @@ -63,23 +70,71 @@ func ToParam(a Args) (string, error) { // FromParam unpacks the filter Args. func FromParam(p string) (Args, error) { - args := Args{} if len(p) == 0 { - return args, nil + return NewArgs(), nil } - if err := json.NewDecoder(strings.NewReader(p)).Decode(&args); err != nil { - return nil, err + + r := strings.NewReader(p) + d := json.NewDecoder(r) + + m := map[string]map[string]bool{} + if err := d.Decode(&m); err != nil { + r.Seek(0, 0) + + // Allow parsing old arguments in slice format. + // Because other libraries might be sending them in this format. + deprecated := map[string][]string{} + if deprecatedErr := d.Decode(&deprecated); deprecatedErr == nil { + m = deprecatedArgs(deprecated) + } else { + return NewArgs(), err + } } - return args, nil + return Args{m}, nil +} + +// Get returns the list of values associates with a field. +// It returns a slice of strings to keep backwards compatibility with old code. +func (filters Args) Get(field string) []string { + values := filters.fields[field] + if values == nil { + return make([]string, 0) + } + slice := make([]string, 0, len(values)) + for key := range values { + slice = append(slice, key) + } + return slice +} + +// Add adds a new value to a filter field. +func (filters Args) Add(name, value string) { + if _, ok := filters.fields[name]; ok { + filters.fields[name][value] = true + } else { + filters.fields[name] = map[string]bool{value: true} + } +} + +// Del removes a value from a filter field. +func (filters Args) Del(name, value string) { + if _, ok := filters.fields[name]; ok { + delete(filters.fields[name], value) + } +} + +// Len returns the number of fields in the arguments. +func (filters Args) Len() int { + return len(filters.fields) } // MatchKVList returns true if the values for the specified field maches the ones // from the sources. // e.g. given Args are {'label': {'label1=1','label2=1'}, 'image.name', {'ubuntu'}}, -// field is 'label' and sources are {'label':{'label1=1','label2=2','label3=3'}} +// field is 'label' and sources are {'label1': '1', 'label2': '2'} // it returns true. func (filters Args) MatchKVList(field string, sources map[string]string) bool { - fieldValues := filters[field] + fieldValues := filters.fields[field] //do not filter if there is no filter set or cannot determine filter if len(fieldValues) == 0 { @@ -90,21 +145,16 @@ func (filters Args) MatchKVList(field string, sources map[string]string) bool { return false } -outer: - for _, name2match := range fieldValues { + for name2match := range fieldValues { testKV := strings.SplitN(name2match, "=", 2) - for k, v := range sources { - if len(testKV) == 1 { - if k == testKV[0] { - continue outer - } - } else if k == testKV[0] && v == testKV[1] { - continue outer - } + v, ok := sources[testKV[0]] + if !ok { + return false + } + if len(testKV) == 2 && testKV[1] != v { + return false } - - return false } return true @@ -115,13 +165,12 @@ outer: // field is 'image.name' and source is 'ubuntu' // it returns true. func (filters Args) Match(field, source string) bool { - fieldValues := filters[field] - - //do not filter if there is no filter set or cannot determine filter - if len(fieldValues) == 0 { + if filters.ExactMatch(field, source) { return true } - for _, name2match := range fieldValues { + + fieldValues := filters.fields[field] + for name2match := range fieldValues { match, err := regexp.MatchString(name2match, source) if err != nil { continue @@ -132,3 +181,61 @@ func (filters Args) Match(field, source string) bool { } return false } + +// ExactMatch returns true if the source matches exactly one of the filters. +func (filters Args) ExactMatch(field, source string) bool { + fieldValues, ok := filters.fields[field] + //do not filter if there is no filter set or cannot determine filter + if !ok || len(fieldValues) == 0 { + return true + } + + // try to march full name value to avoid O(N) regular expression matching + if fieldValues[source] { + return true + } + return false +} + +// Include returns true if the name of the field to filter is in the filters. +func (filters Args) Include(field string) bool { + _, ok := filters.fields[field] + return ok +} + +// Validate ensures that all the fields in the filter are valid. +// It returns an error as soon as it finds an invalid field. +func (filters Args) Validate(accepted map[string]bool) error { + for name := range filters.fields { + if !accepted[name] { + return fmt.Errorf("Invalid filter '%s'", name) + } + } + return nil +} + +// WalkValues iterates over the list of filtered values for a field. +// It stops the iteration if it finds an error and it returns that error. +func (filters Args) WalkValues(field string, op func(value string) error) error { + if _, ok := filters.fields[field]; !ok { + return nil + } + for v := range filters.fields[field] { + if err := op(v); err != nil { + return err + } + } + return nil +} + +func deprecatedArgs(d map[string][]string) map[string]map[string]bool { + m := map[string]map[string]bool{} + for k, v := range d { + values := map[string]bool{} + for _, vv := range v { + values[vv] = true + } + m[k] = values + } + return m +} diff --git a/pkg/parsers/filters/parse_test.go b/pkg/parsers/filters/parse_test.go index eb9fcef90f..308d1bcdb8 100644 --- a/pkg/parsers/filters/parse_test.go +++ b/pkg/parsers/filters/parse_test.go @@ -1,7 +1,7 @@ package filters import ( - "sort" + "fmt" "testing" ) @@ -13,7 +13,7 @@ func TestParseArgs(t *testing.T) { "image.name=*untu", } var ( - args = Args{} + args = NewArgs() err error ) for i := range flagArgs { @@ -22,10 +22,10 @@ func TestParseArgs(t *testing.T) { t.Errorf("failed to parse %s: %s", flagArgs[i], err) } } - if len(args["created"]) != 1 { + if len(args.Get("created")) != 1 { t.Errorf("failed to set this arg") } - if len(args["image.name"]) != 2 { + if len(args.Get("image.name")) != 2 { t.Errorf("the args should have collapsed") } } @@ -36,7 +36,7 @@ func TestParseArgsEdgeCase(t *testing.T) { if err != nil { t.Fatal(err) } - if args == nil || len(args) != 0 { + if args.Len() != 0 { t.Fatalf("Expected an empty Args (map), got %v", args) } if args, err = ParseFlag("anything", args); err == nil || err != ErrBadFormat { @@ -45,10 +45,11 @@ func TestParseArgsEdgeCase(t *testing.T) { } func TestToParam(t *testing.T) { - a := Args{ - "created": []string{"today"}, - "image.name": []string{"ubuntu*", "*untu"}, + fields := map[string]map[string]bool{ + "created": {"today": true}, + "image.name": {"ubuntu*": true, "*untu": true}, } + a := Args{fields: fields} _, err := ToParam(a) if err != nil { @@ -63,42 +64,48 @@ func TestFromParam(t *testing.T) { "{'key': 'value'}", `{"key": "value"}`, } - valids := map[string]Args{ - `{"key": ["value"]}`: { - "key": {"value"}, + valid := map[*Args][]string{ + &Args{fields: map[string]map[string]bool{"key": {"value": true}}}: { + `{"key": ["value"]}`, + `{"key": {"value": true}}`, }, - `{"key": ["value1", "value2"]}`: { - "key": {"value1", "value2"}, + &Args{fields: map[string]map[string]bool{"key": {"value1": true, "value2": true}}}: { + `{"key": ["value1", "value2"]}`, + `{"key": {"value1": true, "value2": true}}`, }, - `{"key1": ["value1"], "key2": ["value2"]}`: { - "key1": {"value1"}, - "key2": {"value2"}, + &Args{fields: map[string]map[string]bool{"key1": {"value1": true}, "key2": {"value2": true}}}: { + `{"key1": ["value1"], "key2": ["value2"]}`, + `{"key1": {"value1": true}, "key2": {"value2": true}}`, }, } + for _, invalid := range invalids { if _, err := FromParam(invalid); err == nil { t.Fatalf("Expected an error with %v, got nothing", invalid) } } - for json, expectedArgs := range valids { - args, err := FromParam(json) - if err != nil { - t.Fatal(err) - } - if len(args) != len(expectedArgs) { - t.Fatalf("Expected %v, go %v", expectedArgs, args) - } - for key, expectedValues := range expectedArgs { - values := args[key] - sort.Strings(values) - sort.Strings(expectedValues) - if len(values) != len(expectedValues) { + + for expectedArgs, matchers := range valid { + for _, json := range matchers { + args, err := FromParam(json) + if err != nil { + t.Fatal(err) + } + if args.Len() != expectedArgs.Len() { t.Fatalf("Expected %v, go %v", expectedArgs, args) } - for index, expectedValue := range expectedValues { - if values[index] != expectedValue { + for key, expectedValues := range expectedArgs.fields { + values := args.Get(key) + + if len(values) != len(expectedValues) { t.Fatalf("Expected %v, go %v", expectedArgs, args) } + + for _, v := range values { + if !expectedValues[v] { + t.Fatalf("Expected %v, go %v", expectedArgs, args) + } + } } } } @@ -114,54 +121,63 @@ func TestEmpty(t *testing.T) { if err != nil { t.Errorf("%s", err) } - if len(a) != len(v1) { + if a.Len() != v1.Len() { t.Errorf("these should both be empty sets") } } -func TestArgsMatchKVList(t *testing.T) { - // empty sources - args := Args{ - "created": []string{"today"}, +func TestArgsMatchKVListEmptySources(t *testing.T) { + args := NewArgs() + if !args.MatchKVList("created", map[string]string{}) { + t.Fatalf("Expected true for (%v,created), got true", args) } + + args = Args{map[string]map[string]bool{"created": {"today": true}}} if args.MatchKVList("created", map[string]string{}) { t.Fatalf("Expected false for (%v,created), got true", args) } +} + +func TestArgsMatchKVList(t *testing.T) { // Not empty sources sources := map[string]string{ "key1": "value1", "key2": "value2", "key3": "value3", } + matches := map[*Args]string{ &Args{}: "field", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1": true}}, }: "labels", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1=value1"}, - }: "labels", - } - differs := map[*Args]string{ - &Args{ - "created": []string{"today"}, - }: "created", - &Args{ - "created": []string{"today"}, - "labels": []string{"key4"}, - }: "labels", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1=value3"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1=value1": true}}, }: "labels", } + for args, field := range matches { if args.MatchKVList(field, sources) != true { t.Fatalf("Expected true for %v on %v, got false", sources, args) } } + + differs := map[*Args]string{ + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key4": true}}, + }: "labels", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1=value3": true}}, + }: "labels", + } + for args, field := range differs { if args.MatchKVList(field, sources) != false { t.Fatalf("Expected false for %v on %v, got true", sources, args) @@ -171,48 +187,165 @@ func TestArgsMatchKVList(t *testing.T) { func TestArgsMatch(t *testing.T) { source := "today" + matches := map[*Args]string{ &Args{}: "field", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}}, }: "today", - &Args{ - "created": []string{"to*"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to*": true}}, }: "created", - &Args{ - "created": []string{"to(.*)"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to(.*)": true}}, }: "created", - &Args{ - "created": []string{"tod"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tod": true}}, }: "created", - &Args{ - "created": []string{"anything", "to*"}, - }: "created", - } - differs := map[*Args]string{ - &Args{ - "created": []string{"tomorrow"}, - }: "created", - &Args{ - "created": []string{"to(day"}, - }: "created", - &Args{ - "created": []string{"tom(.*)"}, - }: "created", - &Args{ - "created": []string{"today1"}, - "labels": []string{"today"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"anyting": true, "to*": true}}, }: "created", } + for args, field := range matches { if args.Match(field, source) != true { t.Fatalf("Expected true for %v on %v, got false", source, args) } } + + differs := map[*Args]string{ + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tomorrow": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to(day": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tom(.*)": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tom": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today1": true}, + "labels": map[string]bool{"today": true}}, + }: "created", + } + for args, field := range differs { if args.Match(field, source) != false { t.Fatalf("Expected false for %v on %v, got true", source, args) } } } + +func TestAdd(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + v := f.fields["status"] + if len(v) != 1 || !v["running"] { + t.Fatalf("Expected to include a running status, got %v", v) + } + + f.Add("status", "paused") + if len(v) != 2 || !v["paused"] { + t.Fatalf("Expected to include a paused status, got %v", v) + } +} + +func TestDel(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + f.Del("status", "running") + v := f.fields["status"] + if v["running"] { + t.Fatalf("Expected to not include a running status filter, got true") + } +} + +func TestLen(t *testing.T) { + f := NewArgs() + if f.Len() != 0 { + t.Fatalf("Expected to not include any field") + } + f.Add("status", "running") + if f.Len() != 1 { + t.Fatalf("Expected to include one field") + } +} + +func TestExactMatch(t *testing.T) { + f := NewArgs() + + if !f.ExactMatch("status", "running") { + t.Fatalf("Expected to match `running` when there are no filters, got false") + } + + f.Add("status", "running") + f.Add("status", "pause*") + + if !f.ExactMatch("status", "running") { + t.Fatalf("Expected to match `running` with one of the filters, got false") + } + + if f.ExactMatch("status", "paused") { + t.Fatalf("Expected to not match `paused` with one of the filters, got true") + } +} + +func TestInclude(t *testing.T) { + f := NewArgs() + if f.Include("status") { + t.Fatalf("Expected to not include a status key, got true") + } + f.Add("status", "running") + if !f.Include("status") { + t.Fatalf("Expected to include a status key, got false") + } +} + +func TestValidate(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + + valid := map[string]bool{ + "status": true, + "dangling": true, + } + + if err := f.Validate(valid); err != nil { + t.Fatal(err) + } + + f.Add("bogus", "running") + if err := f.Validate(valid); err == nil { + t.Fatalf("Expected to return an error, got nil") + } +} + +func TestWalkValues(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + f.Add("status", "paused") + + f.WalkValues("status", func(value string) error { + if value != "running" && value != "paused" { + t.Fatalf("Unexpected value %s", value) + } + return nil + }) + + err := f.WalkValues("status", func(value string) error { + return fmt.Errorf("return") + }) + if err == nil { + t.Fatalf("Expected to get an error, got nil") + } + + err = f.WalkValues("foo", func(value string) error { + return fmt.Errorf("return") + }) + if err != nil { + t.Fatalf("Expected to not iterate when the field doesn't exist, got %v", err) + } +} diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index ab457cfba9..8529ffa322 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -13,11 +13,12 @@ func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, - subscribers: make(map[subscriber]struct{}), + subscribers: make(map[subscriber]topicFunc), } } type subscriber chan interface{} +type topicFunc func(v interface{}) bool // Publisher is basic pub/sub structure. Allows to send events and subscribe // to them. Can be safely used from multiple goroutines. @@ -25,7 +26,7 @@ type Publisher struct { m sync.RWMutex buffer int timeout time.Duration - subscribers map[subscriber]struct{} + subscribers map[subscriber]topicFunc } // Len returns the number of subscribers for the publisher @@ -38,9 +39,14 @@ func (p *Publisher) Len() int { // Subscribe adds a new subscriber to the publisher returning the channel. func (p *Publisher) Subscribe() chan interface{} { + return p.SubscribeTopic(nil) +} + +// SubscribeTopic adds a new subscriber that filters messages sent by a topic. +func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() - p.subscribers[ch] = struct{}{} + p.subscribers[ch] = topic p.m.Unlock() return ch } @@ -56,20 +62,13 @@ func (p *Publisher) Evict(sub chan interface{}) { // Publish sends the data in v to all subscribers currently registered with the publisher. func (p *Publisher) Publish(v interface{}) { p.m.RLock() - for sub := range p.subscribers { - // send under a select as to not block if the receiver is unavailable - if p.timeout > 0 { - select { - case sub <- v: - case <-time.After(p.timeout): - } - continue - } - select { - case sub <- v: - default: - } + wg := new(sync.WaitGroup) + for sub, topic := range p.subscribers { + wg.Add(1) + + go p.sendTopic(sub, topic, v, wg) } + wg.Wait() p.m.RUnlock() } @@ -82,3 +81,24 @@ func (p *Publisher) Close() { } p.m.Unlock() } + +func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { + defer wg.Done() + if topic != nil && !topic(v) { + return + } + + // send under a select as to not block if the receiver is unavailable + if p.timeout > 0 { + select { + case sub <- v: + case <-time.After(p.timeout): + } + return + } + + select { + case sub <- v: + default: + } +}