Merge pull request #16530 from dnephin/filter_events_by_label

Filter events by labels
This commit is contained in:
Antonio Murdaca 2015-10-04 20:36:42 +02:00
commit e86291edd8
9 changed files with 246 additions and 85 deletions

View file

@ -23,16 +23,29 @@ func BoolValueOrDefault(r *http.Request, k string, d bool) bool {
return BoolValue(r, k)
}
// Int64ValueOrZero parses a form value into a int64 type.
// Int64ValueOrZero parses a form value into an int64 type.
// It returns 0 if the parsing fails.
func Int64ValueOrZero(r *http.Request, k string) int64 {
val, err := strconv.ParseInt(r.FormValue(k), 10, 64)
val, err := Int64ValueOrDefault(r, k, 0)
if err != nil {
return 0
}
return val
}
// Int64ValueOrDefault parses a form value into an int64 type. If there is an
// error, returns the error. If there is no value returns the default value.
func Int64ValueOrDefault(r *http.Request, field string, def int64) (int64, error) {
if r.Form.Get(field) != "" {
value, err := strconv.ParseInt(r.Form.Get(field), 10, 64)
if err != nil {
return value, err
}
return value, nil
}
return def, nil
}
// ArchiveOptions stores archive information for different operations.
type ArchiveOptions struct {
Name string

View file

@ -68,3 +68,38 @@ func TestInt64ValueOrZero(t *testing.T) {
}
}
}
func TestInt64ValueOrDefault(t *testing.T) {
cases := map[string]int64{
"": -1,
"-1": -1,
"42": 42,
}
for c, e := range cases {
v := url.Values{}
v.Set("test", c)
r, _ := http.NewRequest("POST", "", nil)
r.Form = v
a, err := Int64ValueOrDefault(r, "test", -1)
if a != e {
t.Fatalf("Value: %s, expected: %v, actual: %v", c, e, a)
}
if err != nil {
t.Fatalf("Error should be nil, but received: %s", err)
}
}
}
func TestInt64ValueOrDefaultWithError(t *testing.T) {
v := url.Values{}
v.Set("test", "invalid")
r, _ := http.NewRequest("POST", "", nil)
r.Form = v
_, err := Int64ValueOrDefault(r, "test", -1)
if err == nil {
t.Fatalf("Expected an error.")
}
}

View file

@ -4,8 +4,6 @@ import (
"encoding/json"
"net/http"
"runtime"
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus"
@ -54,26 +52,27 @@ func (s *router) getInfo(ctx context.Context, w http.ResponseWriter, r *http.Req
return httputils.WriteJSON(w, http.StatusOK, info)
}
func buildOutputEncoder(w http.ResponseWriter) *json.Encoder {
w.Header().Set("Content-Type", "application/json")
outStream := ioutils.NewWriteFlusher(w)
// Write an empty chunk of data.
// This is to ensure that the HTTP status code is sent immediately,
// so that it will not block the receiver.
outStream.Write(nil)
return json.NewEncoder(outStream)
}
func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil {
return err
}
var since int64 = -1
if r.Form.Get("since") != "" {
s, err := strconv.ParseInt(r.Form.Get("since"), 10, 64)
if err != nil {
return err
}
since = s
since, err := httputils.Int64ValueOrDefault(r, "since", -1)
if err != nil {
return err
}
var until int64 = -1
if r.Form.Get("until") != "" {
u, err := strconv.ParseInt(r.Form.Get("until"), 10, 64)
if err != nil {
return err
}
until = u
until, err := httputils.Int64ValueOrDefault(r, "until", -1)
if err != nil {
return err
}
timer := time.NewTimer(0)
@ -88,70 +87,30 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
return err
}
isFiltered := func(field string, filter []string) bool {
if len(field) == 0 {
return false
}
if len(filter) == 0 {
return false
}
for _, v := range filter {
if v == field {
return false
}
if strings.Contains(field, ":") {
image := strings.Split(field, ":")
if image[0] == v {
return false
}
}
}
return true
}
enc := buildOutputEncoder(w)
d := s.daemon
es := d.EventsService
w.Header().Set("Content-Type", "application/json")
outStream := ioutils.NewWriteFlusher(w)
// Write an empty chunk of data.
// This is to ensure that the HTTP status code is sent immediately,
// so that it will not block the receiver.
outStream.Write(nil)
enc := json.NewEncoder(outStream)
getContainerID := func(cn string) string {
c, err := d.Get(cn)
if err != nil {
return ""
}
return c.ID
}
sendEvent := func(ev *jsonmessage.JSONMessage) error {
//incoming container filter can be name,id or partial id, convert and replace as a full container id
for i, cn := range ef["container"] {
ef["container"][i] = getContainerID(cn)
}
if isFiltered(ev.Status, ef["event"]) || (isFiltered(ev.ID, ef["image"]) &&
isFiltered(ev.From, ef["image"])) || isFiltered(ev.ID, ef["container"]) {
return nil
}
return enc.Encode(ev)
}
current, l := es.Subscribe()
defer es.Evict(l)
eventFilter := d.GetEventFilter(ef)
handleEvent := func(ev *jsonmessage.JSONMessage) error {
if eventFilter.Include(ev) {
if err := enc.Encode(ev); err != nil {
return err
}
}
return nil
}
if since == -1 {
current = nil
}
defer es.Evict(l)
for _, ev := range current {
if ev.Time < since {
continue
}
if err := sendEvent(ev); err != nil {
if err := handleEvent(ev); err != nil {
return err
}
}
@ -168,7 +127,7 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
if !ok {
continue
}
if err := sendEvent(jev); err != nil {
if err := handleEvent(jev); err != nil {
return err
}
case <-timer.C:

View file

@ -39,6 +39,7 @@ import (
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/docker/docker/pkg/nat"
"github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/docker/pkg/signal"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/stringutils"
@ -528,6 +529,36 @@ func (daemon *Daemon) GetByName(name string) (*Container, error) {
return e, nil
}
// GetEventFilter returns a filters.Filter for a set of filters
func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter {
// incoming container filter can be name, id or partial id, convert to
// a full container id
for i, cn := range filter["container"] {
c, err := daemon.Get(cn)
if err != nil {
filter["container"][i] = ""
} else {
filter["container"][i] = c.ID
}
}
return events.NewFilter(filter, daemon.GetLabels)
}
// GetLabels for a container or image id
func (daemon *Daemon) GetLabels(id string) map[string]string {
// TODO: TestCase
container := daemon.containers.Get(id)
if container != nil {
return container.Config.Labels
}
img, err := daemon.repositories.LookupImage(id)
if err == nil {
return img.ContainerConfig.Labels
}
return nil
}
// children returns all child containers of the container with the
// given name. The containers are returned as a map from the container
// name to a pointer to Container.

64
daemon/events/filter.go Normal file
View file

@ -0,0 +1,64 @@
package events
import (
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/parsers"
"github.com/docker/docker/pkg/parsers/filters"
)
// Filter can filter out docker events from a stream
type Filter struct {
filter filters.Args
getLabels func(id string) map[string]string
}
// NewFilter creates a new Filter
func NewFilter(filter filters.Args, getLabels func(id string) map[string]string) *Filter {
return &Filter{filter: filter, getLabels: getLabels}
}
// Include returns true when the event ev is included by the filters
func (ef *Filter) Include(ev *jsonmessage.JSONMessage) bool {
return isFieldIncluded(ev.Status, ef.filter["event"]) &&
isFieldIncluded(ev.ID, ef.filter["container"]) &&
ef.isImageIncluded(ev.ID, ev.From) &&
ef.isLabelFieldIncluded(ev.ID)
}
func (ef *Filter) isLabelFieldIncluded(id string) bool {
if _, ok := ef.filter["label"]; !ok {
return true
}
return ef.filter.MatchKVList("label", ef.getLabels(id))
}
// The image filter will be matched against both event.ID (for image events)
// and event.From (for container events), so that any container that was created
// from an image will be included in the image events. Also compare both
// against the stripped repo name without any tags.
func (ef *Filter) isImageIncluded(eventID string, eventFrom string) bool {
stripTag := func(image string) string {
repo, _ := parsers.ParseRepositoryTag(image)
return repo
}
return isFieldIncluded(eventID, ef.filter["image"]) ||
isFieldIncluded(eventFrom, ef.filter["image"]) ||
isFieldIncluded(stripTag(eventID), ef.filter["image"]) ||
isFieldIncluded(stripTag(eventFrom), ef.filter["image"])
}
func isFieldIncluded(field string, filter []string) bool {
if len(field) == 0 {
return true
}
if len(filter) == 0 {
return true
}
for _, v := range filter {
if v == field {
return true
}
}
return false
}

View file

@ -88,6 +88,7 @@ This section lists each version from latest to oldest. Each listing includes a
list of DNS options to be used in the container.
* `POST /build` now optionally takes a serialized map of build-time variables.
* `GET /events` now includes a `timenano` field, in addition to the existing `time` field.
* `GET /events` now supports filtering by image and container labels.
* `GET /info` now lists engine version information.
* `GET /containers/json` will return `ImageID` of the image used by container.
* `POST /exec/(name)/start` will now return an HTTP 409 when the container is either stopped or paused.
@ -181,6 +182,3 @@ to add, and the field `CapDrop`, which specifies a list of capabilities to drop.
* `POST /images/create` th `fromImage` and `repo` parameters supportthe
`repo:tag` format. Consequently, the `tag` parameter is now obsolete. Using the
new format and the `tag` parameter at the same time will return an error.

View file

@ -233,7 +233,7 @@ Json Parameters:
- **CpuShares** - An integer value containing the container's CPU Shares
(ie. the relative weight vs other containers).
- **CpuPeriod** - The length of a CPU period in microseconds.
- **Cpuset** - Deprecated please don't use. Use `CpusetCpus` instead.
- **Cpuset** - Deprecated please don't use. Use `CpusetCpus` instead.
- **CpusetCpus** - String value containing the `cgroups CpusetCpus` to use.
- **CpusetMems** - Memory nodes (MEMs) in which to allow execution (0-3, 0,1). Only effective on NUMA systems.
- **BlkioWeight** - Block IO weight (relative weight) accepts a weight value between 10 and 1000.
@ -1363,7 +1363,7 @@ or being killed.
Query Parameters:
- **dockerfile** - Path within the build context to the Dockerfile. This is
- **dockerfile** - Path within the build context to the Dockerfile. This is
ignored if `remote` is specified and points to an individual filename.
- **t** A repository name (and optionally a tag) to apply to
the resulting image in case of success.
@ -2038,9 +2038,10 @@ Query Parameters:
- **since** Timestamp used for polling
- **until** Timestamp used for polling
- **filters** A json encoded value of the filters (a map[string][]string) to process on the event list. Available filters:
- `container=<string>`; -- container to filter
- `event=<string>`; -- event to filter
- `image=<string>`; -- image to filter
- `container=<string>`; -- container to filter
- `label=<string>`; -- image and container label to filter
Status Codes:

View file

@ -48,9 +48,10 @@ container container 588a23dac085 *AND* the event type is *start*
The currently supported filters are:
* container
* event
* image
* container (`container=<name or id>`)
* event (`event=<event type>`)
* image (`image=<tag or id>`)
* label (`label=<key>` or `label=<key>=<value>`)
## Examples
@ -133,4 +134,3 @@ relative to the current time on the client machine:
2014-05-10T17:42:14.999999999Z07:00 4386fb97867d: (from ubuntu-1:14.04) stop
2014-05-10T17:42:14.999999999Z07:00 7805c1d35632: (from redis:2.8) die
2014-09-03T15:49:29.999999999Z07:00 7805c1d35632: (from redis:2.8) stop

View file

@ -13,6 +13,7 @@ import (
"sync"
"time"
"github.com/docker/docker/pkg/integration/checker"
"github.com/go-check/check"
)
@ -383,6 +384,65 @@ func (s *DockerSuite) TestEventsFilterImageName(c *check.C) {
}
func (s *DockerSuite) TestEventsFilterLabels(c *check.C) {
testRequires(c, DaemonIsLinux)
since := daemonTime(c).Unix()
label := "io.docker.testing=foo"
out, _ := dockerCmd(c, "run", "-d", "-l", label, "busybox:latest", "true")
container1 := strings.TrimSpace(out)
out, _ = dockerCmd(c, "run", "-d", "busybox", "true")
container2 := strings.TrimSpace(out)
out, _ = dockerCmd(
c,
"events",
fmt.Sprintf("--since=%d", since),
fmt.Sprintf("--until=%d", daemonTime(c).Unix()),
"--filter", fmt.Sprintf("label=%s", label))
events := strings.Split(strings.TrimSpace(out), "\n")
c.Assert(len(events), checker.Equals, 3)
for _, e := range events {
c.Assert(e, checker.Contains, container1)
c.Assert(e, check.Not(checker.Contains), container2)
}
}
func (s *DockerSuite) TestEventsFilterImageLabels(c *check.C) {
testRequires(c, DaemonIsLinux)
since := daemonTime(c).Unix()
name := "labelfilterimage"
label := "io.docker.testing=image"
// Build a test image.
_, err := buildImage(name, `
FROM busybox:latest
LABEL io.docker.testing=image`, true)
if err != nil {
c.Fatalf("Couldn't create image: %q", err)
}
dockerCmd(c, "tag", name, "labelfiltertest:tag1")
dockerCmd(c, "tag", name, "labelfiltertest:tag2")
dockerCmd(c, "tag", "busybox:latest", "labelfiltertest:tag3")
out, _ := dockerCmd(
c,
"events",
fmt.Sprintf("--since=%d", since),
fmt.Sprintf("--until=%d", daemonTime(c).Unix()),
"--filter", fmt.Sprintf("label=%s", label))
events := strings.Split(strings.TrimSpace(out), "\n")
c.Assert(len(events), checker.Equals, 2, check.Commentf("Events == %s", events))
for _, e := range events {
c.Assert(e, checker.Contains, "labelfiltertest")
}
}
func (s *DockerSuite) TestEventsFilterContainer(c *check.C) {
testRequires(c, DaemonIsLinux)
since := fmt.Sprintf("%d", daemonTime(c).Unix())
@ -401,7 +461,7 @@ func (s *DockerSuite) TestEventsFilterContainer(c *check.C) {
checkEvents := func(id string, events []string) error {
if len(events) != 4 { // create, attach, start, die
return fmt.Errorf("expected 3 events, got %v", events)
return fmt.Errorf("expected 4 events, got %v", events)
}
for _, event := range events {
e := strings.Fields(event)