gelf.go 6.4 KB


  1. // Package gelf provides the log driver for forwarding server logs to
  2. // endpoints that support the Graylog Extended Log Format.
  3. package gelf // import "github.com/docker/docker/daemon/logger/gelf"
  4. import (
  5. "compress/flate"
  6. "encoding/json"
  7. "fmt"
  8. "net"
  9. "net/url"
  10. "strconv"
  11. "time"
  12. "github.com/Graylog2/go-gelf/gelf"
  13. "github.com/docker/docker/daemon/logger"
  14. "github.com/docker/docker/daemon/logger/loggerutils"
  15. )
  16. const name = "gelf"
  17. type gelfLogger struct {
  18. writer gelf.Writer
  19. info logger.Info
  20. hostname string
  21. rawExtra json.RawMessage
  22. }
  23. func init() {
  24. if err := logger.RegisterLogDriver(name, New); err != nil {
  25. panic(err)
  26. }
  27. if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
  28. panic(err)
  29. }
  30. }
  31. // New creates a gelf logger using the configuration passed in on the
  32. // context. The supported context configuration variable is gelf-address.
  33. func New(info logger.Info) (logger.Logger, error) {
  34. // parse gelf address
  35. address, err := parseAddress(info.Config["gelf-address"])
  36. if err != nil {
  37. return nil, err
  38. }
  39. // collect extra data for GELF message
  40. hostname, err := info.Hostname()
  41. if err != nil {
  42. return nil, fmt.Errorf("gelf: cannot access hostname to set source field")
  43. }
  44. // parse log tag
  45. tag, err := loggerutils.ParseLogTag(info, loggerutils.DefaultTemplate)
  46. if err != nil {
  47. return nil, err
  48. }
  49. extra := map[string]interface{}{
  50. "_container_id": info.ContainerID,
  51. "_container_name": info.Name(),
  52. "_image_id": info.ContainerImageID,
  53. "_image_name": info.ContainerImageName,
  54. "_command": info.Command(),
  55. "_tag": tag,
  56. "_created": info.ContainerCreated,
  57. }
  58. extraAttrs, err := info.ExtraAttributes(func(key string) string {
  59. if key[0] == '_' {
  60. return key
  61. }
  62. return "_" + key
  63. })
  64. if err != nil {
  65. return nil, err
  66. }
  67. for k, v := range extraAttrs {
  68. extra[k] = v
  69. }
  70. rawExtra, err := json.Marshal(extra)
  71. if err != nil {
  72. return nil, err
  73. }
  74. var gelfWriter gelf.Writer
  75. if address.Scheme == "udp" {
  76. gelfWriter, err = newGELFUDPWriter(address.Host, info)
  77. if err != nil {
  78. return nil, err
  79. }
  80. } else if address.Scheme == "tcp" {
  81. gelfWriter, err = newGELFTCPWriter(address.Host, info)
  82. if err != nil {
  83. return nil, err
  84. }
  85. }
  86. return &gelfLogger{
  87. writer: gelfWriter,
  88. info: info,
  89. hostname: hostname,
  90. rawExtra: rawExtra,
  91. }, nil
  92. }
  93. // create new TCP gelfWriter
  94. func newGELFTCPWriter(address string, info logger.Info) (gelf.Writer, error) {
  95. gelfWriter, err := gelf.NewTCPWriter(address)
  96. if err != nil {
  97. return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
  98. }
  99. if v, ok := info.Config["gelf-tcp-max-reconnect"]; ok {
  100. i, err := strconv.Atoi(v)
  101. if err != nil || i < 0 {
  102. return nil, fmt.Errorf("gelf-tcp-max-reconnect must be a positive integer")
  103. }
  104. gelfWriter.MaxReconnect = i
  105. }
  106. if v, ok := info.Config["gelf-tcp-reconnect-delay"]; ok {
  107. i, err := strconv.Atoi(v)
  108. if err != nil || i < 0 {
  109. return nil, fmt.Errorf("gelf-tcp-reconnect-delay must be a positive integer")
  110. }
  111. gelfWriter.ReconnectDelay = time.Duration(i)
  112. }
  113. return gelfWriter, nil
  114. }
  115. // create new UDP gelfWriter
  116. func newGELFUDPWriter(address string, info logger.Info) (gelf.Writer, error) {
  117. gelfWriter, err := gelf.NewUDPWriter(address)
  118. if err != nil {
  119. return nil, fmt.Errorf("gelf: cannot connect to GELF endpoint: %s %v", address, err)
  120. }
  121. if v, ok := info.Config["gelf-compression-type"]; ok {
  122. switch v {
  123. case "gzip":
  124. gelfWriter.CompressionType = gelf.CompressGzip
  125. case "zlib":
  126. gelfWriter.CompressionType = gelf.CompressZlib
  127. case "none":
  128. gelfWriter.CompressionType = gelf.CompressNone
  129. default:
  130. return nil, fmt.Errorf("gelf: invalid compression type %q", v)
  131. }
  132. }
  133. if v, ok := info.Config["gelf-compression-level"]; ok {
  134. val, err := strconv.Atoi(v)
  135. if err != nil {
  136. return nil, fmt.Errorf("gelf: invalid compression level %s, err %v", v, err)
  137. }
  138. gelfWriter.CompressionLevel = val
  139. }
  140. return gelfWriter, nil
  141. }
  142. func (s *gelfLogger) Log(msg *logger.Message) error {
  143. if len(msg.Line) == 0 {
  144. return nil
  145. }
  146. level := gelf.LOG_INFO
  147. if msg.Source == "stderr" {
  148. level = gelf.LOG_ERR
  149. }
  150. m := gelf.Message{
  151. Version: "1.1",
  152. Host: s.hostname,
  153. Short: string(msg.Line),
  154. TimeUnix: float64(msg.Timestamp.UnixNano()/int64(time.Millisecond)) / 1000.0,
  155. Level: int32(level),
  156. RawExtra: s.rawExtra,
  157. }
  158. logger.PutMessage(msg)
  159. if err := s.writer.WriteMessage(&m); err != nil {
  160. return fmt.Errorf("gelf: cannot send GELF message: %v", err)
  161. }
  162. return nil
  163. }
  164. func (s *gelfLogger) Close() error {
  165. return s.writer.Close()
  166. }
  167. func (s *gelfLogger) Name() string {
  168. return name
  169. }
  170. // ValidateLogOpt looks for gelf specific log option gelf-address.
  171. func ValidateLogOpt(cfg map[string]string) error {
  172. address, err := parseAddress(cfg["gelf-address"])
  173. if err != nil {
  174. return err
  175. }
  176. for key, val := range cfg {
  177. switch key {
  178. case "gelf-address":
  179. case "tag":
  180. case "labels":
  181. case "labels-regex":
  182. case "env":
  183. case "env-regex":
  184. case "gelf-compression-level":
  185. if address.Scheme != "udp" {
  186. return fmt.Errorf("compression is only supported on UDP")
  187. }
  188. i, err := strconv.Atoi(val)
  189. if err != nil || i < flate.DefaultCompression || i > flate.BestCompression {
  190. return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
  191. }
  192. case "gelf-compression-type":
  193. if address.Scheme != "udp" {
  194. return fmt.Errorf("compression is only supported on UDP")
  195. }
  196. switch val {
  197. case "gzip", "zlib", "none":
  198. default:
  199. return fmt.Errorf("unknown value %q for log opt %q for gelf log driver", val, key)
  200. }
  201. case "gelf-tcp-max-reconnect", "gelf-tcp-reconnect-delay":
  202. if address.Scheme != "tcp" {
  203. return fmt.Errorf("%q is only valid for TCP", key)
  204. }
  205. i, err := strconv.Atoi(val)
  206. if err != nil || i < 0 {
  207. return fmt.Errorf("%q must be a positive integer", key)
  208. }
  209. default:
  210. return fmt.Errorf("unknown log opt %q for gelf log driver", key)
  211. }
  212. }
  213. return nil
  214. }
  215. func parseAddress(address string) (*url.URL, error) {
  216. if address == "" {
  217. return nil, fmt.Errorf("gelf-address is a required parameter")
  218. }
  219. addr, err := url.Parse(address)
  220. if err != nil {
  221. return nil, err
  222. }
  223. if addr.Scheme != "udp" && addr.Scheme != "tcp" {
  224. return nil, fmt.Errorf("gelf: endpoint needs to be TCP or UDP")
  225. }
  226. if _, _, err = net.SplitHostPort(addr.Host); err != nil {
  227. return nil, fmt.Errorf("gelf: please provide gelf-address as proto://host:port")
  228. }
  229. return addr, nil
  230. }