gelf.go 6.5 KB


  1. // Package gelf provides the log driver for forwarding server logs to
  2. // endpoints that support the Graylog Extended Log Format.
  3. package gelf
  4. import (
  5. "compress/flate"
  6. "encoding/json"
  7. "fmt"
  8. "net"
  9. "net/url"
  10. "strconv"
  11. "time"
  12. "github.com/Graylog2/go-gelf/gelf"
  13. "github.com/docker/docker/daemon/logger"
  14. "github.com/docker/docker/daemon/logger/loggerutils"
  15. "github.com/docker/docker/pkg/urlutil"
  16. "github.com/sirupsen/logrus"
  17. )
  18. const name = "gelf"
  19. type gelfLogger struct {
  20. writer gelf.Writer
  21. info logger.Info
  22. hostname string
  23. rawExtra json.RawMessage
  24. }
  25. func init() {
  26. if err := logger.RegisterLogDriver(name, New); err != nil {
  27. logrus.Fatal(err)
  28. }
  29. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  30. logrus.Fatal(err)
  31. }
  32. }
  33. // New creates a gelf logger using the configuration passed in on the
  34. // context. The supported context configuration variable is gelf-address.
  35. func New(info logger.Info) (logger.Logger, error) {
  36. // parse gelf address
  37. address, err := parseAddress(info.Config["gelf-address"])
  38. if err != nil {
  39. return nil, err
  40. }
  41. // collect extra data for GELF message
  42. hostname, err := info.Hostname()
  43. if err != nil {
  44. return nil, fmt.Errorf("gelf: cannot access hostname to set source field")
  45. }
  46. // parse log tag
  47. tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  48. if err != nil {
  49. return nil, err
  50. }
  51. extra := map[string]interface{}{
  52. "_container_id": info.ContainerID,
  53. "_container_name": info.Name(),
  54. "_image_id": info.ContainerImageID,
  55. "_image_name": info.ContainerImageName,
  56. "_command": info.Command(),
  57. "_tag": tag,
  58. "_created": info.ContainerCreated,
  59. }
  60. extraAttrs, err := info.ExtraAttributes(func(key string) string {
  61. if key[0] == '_' {
  62. return key
  63. }
  64. return "_" + key
  65. })
  66. if err != nil {
  67. return nil, err
  68. }
  69. for k, v := range extraAttrs {
  70. extra[k] = v
  71. }
  72. rawExtra, err := json.Marshal(extra)
  73. if err != nil {
  74. return nil, err
  75. }
  76. var gelfWriter gelf.Writer
  77. if address.Scheme == "udp" {
  78. gelfWriter, err = newGELFUDPWriter(address.Host, info)
  79. if err != nil {
  80. return nil, err
  81. }
  82. } else if address.Scheme == "tcp" {
  83. gelfWriter, err = newGELFTCPWriter(address.Host, info)
  84. if err != nil {
  85. return nil, err
  86. }
  87. }
  88. return &gelfLogger{
  89. writer: gelfWriter,
  90. info: info,
  91. hostname: hostname,
  92. rawExtra: rawExtra,
  93. }, nil
  94. }
  95. // create new TCP gelfWriter
  96. func newGELFTCPWriter(address string, info logger.Info) (gelf.Writer, error) {
  97. gelfWriter, err := gelf.NewTCPWriter(address)
  98. if err != nil {
  99. return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
  100. }
  101. if v, ok := info.Config["gelf-tcp-max-reconnect"]; ok {
  102. i, err := strconv.Atoi(v)
  103. if err != nil || i < 0 {
  104. return nil, fmt.Errorf("gelf-tcp-max-reconnect must be a positive integer")
  105. }
  106. gelfWriter.MaxReconnect = i
  107. }
  108. if v, ok := info.Config["gelf-tcp-reconnect-delay"]; ok {
  109. i, err := strconv.Atoi(v)
  110. if err != nil || i < 0 {
  111. return nil, fmt.Errorf("gelf-tcp-reconnect-delay must be a positive integer")
  112. }
  113. gelfWriter.ReconnectDelay = time.Duration(i)
  114. }
  115. return gelfWriter, nil
  116. }
  117. // create new UDP gelfWriter
  118. func newGELFUDPWriter(address string, info logger.Info) (gelf.Writer, error) {
  119. gelfWriter, err := gelf.NewUDPWriter(address)
  120. if err != nil {
  121. return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
  122. }
  123. if v, ok := info.Config["gelf-compression-type"]; ok {
  124. switch v {
  125. case "gzip":
  126. gelfWriter.CompressionType = gelf.CompressGzip
  127. case "zlib":
  128. gelfWriter.CompressionType = gelf.CompressZlib
  129. case "none":
  130. gelfWriter.CompressionType = gelf.CompressNone
  131. default:
  132. return nil, fmt.Errorf("gelf: invalid compression type %q", v)
  133. }
  134. }
  135. if v, ok := info.Config["gelf-compression-level"]; ok {
  136. val, err := strconv.Atoi(v)
  137. if err != nil {
  138. return nil, fmt.Errorf("gelf: invalid compression level %s, err %v", v, err)
  139. }
  140. gelfWriter.CompressionLevel = val
  141. }
  142. return gelfWriter, nil
  143. }
  144. func (s *gelfLogger) Log(msg *logger.Message) error {
  145. level := gelf.LOG_INFO
  146. if msg.Source == "stderr" {
  147. level = gelf.LOG_ERR
  148. }
  149. m := gelf.Message{
  150. Version: "1.1",
  151. Host: s.hostname,
  152. Short: string(msg.Line),
  153. TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0,
  154. Level: int32(level),
  155. RawExtra: s.rawExtra,
  156. }
  157. logger.PutMessage(msg)
  158. if err := s.writer.WriteMessage(&m); err != nil {
  159. return fmt.Errorf("gelf: cannot send GELF message: %v", err)
  160. }
  161. return nil
  162. }
  163. func (s *gelfLogger) Close() error {
  164. return s.writer.Close()
  165. }
  166. func (s *gelfLogger) Name() string {
  167. return name
  168. }
  169. // ValidateLogOpt looks for gelf specific log option gelf-address.
  170. func ValidateLogOpt(cfg map[string]string) error {
  171. address, err := parseAddress(cfg["gelf-address"])
  172. if err != nil {
  173. return err
  174. }
  175. for key, val := range cfg {
  176. switch key {
  177. case "gelf-address":
  178. case "tag":
  179. case "labels":
  180. case "env":
  181. case "env-regex":
  182. case "gelf-compression-level":
  183. if address.Scheme != "udp" {
  184. return fmt.Errorf("compression is only supported on UDP")
  185. }
  186. i, err := strconv.Atoi(val)
  187. if err != nil || i < flate.DefaultCompression || i > flate.BestCompression {
  188. return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
  189. }
  190. case "gelf-compression-type":
  191. if address.Scheme != "udp" {
  192. return fmt.Errorf("compression is only supported on UDP")
  193. }
  194. switch val {
  195. case "gzip", "zlib", "none":
  196. default:
  197. return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
  198. }
  199. case "gelf-tcp-max-reconnect", "gelf-tcp-reconnect-delay":
  200. if address.Scheme != "tcp" {
  201. return fmt.Errorf("%q is only valid for TCP", key)
  202. }
  203. i, err := strconv.Atoi(val)
  204. if err != nil || i < 0 {
  205. return fmt.Errorf("%q must be a positive integer", key)
  206. }
  207. default:
  208. return fmt.Errorf("unknown log opt %q for gelf log driver", key)
  209. }
  210. }
  211. return nil
  212. }
  213. func parseAddress(address string) (*url.URL, error) {
  214. if address == "" {
  215. return nil, fmt.Errorf("gelf-address is a required parameter")
  216. }
  217. if !urlutil.IsTransportURL(address) {
  218. return nil, fmt.Errorf("gelf-address should be in form proto://address, got %v", address)
  219. }
  220. url, err := url.Parse(address)
  221. if err != nil {
  222. return nil, err
  223. }
  224. // we support only udp
  225. if url.Scheme != "udp" && url.Scheme != "tcp" {
  226. return nil, fmt.Errorf("gelf: endpoint needs to be TCP or UDP")
  227. }
  228. // get host and port
  229. if _, _, err = net.SplitHostPort(url.Host); err != nil {
  230. return nil, fmt.Errorf("gelf: please provide gelf-address as proto://host:port")
  231. }
  232. return url, nil
  233. }