123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- package fluentd
- import (
- "bytes"
- "fmt"
- "math"
- "net"
- "strconv"
- "strings"
- "text/template"
- "github.com/Sirupsen/logrus"
- "github.com/docker/docker/daemon/logger"
- "github.com/fluent/fluent-logger-golang/fluent"
- )
- type Fluentd struct {
- tag string
- containerID string
- containerName string
- writer *fluent.Fluent
- }
- type Receiver struct {
- ID string
- FullID string
- Name string
- }
- const (
- name = "fluentd"
- defaultHostName = "localhost"
- defaultPort = 24224
- defaultTagPrefix = "docker"
- )
- func init() {
- if err := logger.RegisterLogDriver(name, New); err != nil {
- logrus.Fatal(err)
- }
- if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
- logrus.Fatal(err)
- }
- }
- func parseConfig(ctx logger.Context) (string, int, string, error) {
- host := defaultHostName
- port := defaultPort
- tag := "docker." + ctx.ContainerID[:12]
- config := ctx.Config
- if address := config["fluentd-address"]; address != "" {
- if h, p, err := net.SplitHostPort(address); err != nil {
- if !strings.Contains(err.Error(), "missing port in address") {
- return "", 0, "", err
- }
- host = h
- } else {
- portnum, err := strconv.Atoi(p)
- if err != nil {
- return "", 0, "", err
- }
- host = h
- port = portnum
- }
- }
- if config["fluentd-tag"] != "" {
- receiver := &Receiver{
- ID: ctx.ContainerID[:12],
- FullID: ctx.ContainerID,
- Name: ctx.ContainerName,
- }
- tmpl, err := template.New("tag").Parse(config["fluentd-tag"])
- if err != nil {
- return "", 0, "", err
- }
- buf := new(bytes.Buffer)
- if err := tmpl.Execute(buf, receiver); err != nil {
- return "", 0, "", err
- }
- tag = buf.String()
- }
- return host, port, tag, nil
- }
- func New(ctx logger.Context) (logger.Logger, error) {
- host, port, tag, err := parseConfig(ctx)
- if err != nil {
- return nil, err
- }
- logrus.Debugf("logging driver fluentd configured for container:%s, host:%s, port:%d, tag:%s.", ctx.ContainerID, host, port, tag)
- // logger tries to recoonect 2**64 - 1 times
- // failed (and panic) after 204 years [ 1.5 ** (2**32 - 1) - 1 seconds]
- log, err := fluent.New(fluent.Config{FluentPort: port, FluentHost: host, RetryWait: 1000, MaxRetry: math.MaxUint32})
- if err != nil {
- return nil, err
- }
- return &Fluentd{
- tag: tag,
- containerID: ctx.ContainerID,
- containerName: ctx.ContainerName,
- writer: log,
- }, nil
- }
- func (f *Fluentd) Log(msg *logger.Message) error {
- data := map[string]string{
- "container_id": f.containerID,
- "container_name": f.containerName,
- "source": msg.Source,
- "log": string(msg.Line),
- }
- // fluent-logger-golang buffers logs from failures and disconnections,
- // and these are transferred again automatically.
- return f.writer.PostWithTime(f.tag, msg.Timestamp, data)
- }
- func ValidateLogOpt(cfg map[string]string) error {
- for key := range cfg {
- switch key {
- case "fluentd-address":
- case "fluentd-tag":
- default:
- return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)
- }
- }
- return nil
- }
- func (f *Fluentd) Close() error {
- return f.writer.Close()
- }
- func (f *Fluentd) Name() string {
- return name
- }
|