writer.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. // Copyright 2012 SocialCode. All rights reserved.
  2. // Use of this source code is governed by the MIT
  3. // license that can be found in the LICENSE file.
  4. package gelf
  5. import (
  6. "bytes"
  7. "compress/flate"
  8. "compress/gzip"
  9. "compress/zlib"
  10. "crypto/rand"
  11. "encoding/json"
  12. "fmt"
  13. "io"
  14. "net"
  15. "os"
  16. "path"
  17. "runtime"
  18. "strings"
  19. "sync"
  20. "time"
  21. )
  22. // Writer implements io.Writer and is used to send both discrete
  23. // messages to a graylog2 server, or data from a stream-oriented
  24. // interface (like the functions in log).
  25. type Writer struct {
  26. mu sync.Mutex
  27. conn net.Conn
  28. hostname string
  29. Facility string // defaults to current process name
  30. CompressionLevel int // one of the consts from compress/flate
  31. CompressionType CompressType
  32. }
  33. // What compression type the writer should use when sending messages
  34. // to the graylog2 server
  35. type CompressType int
  36. const (
  37. CompressGzip CompressType = iota
  38. CompressZlib
  39. CompressNone
  40. )
  41. // Message represents the contents of the GELF message. It is gzipped
  42. // before sending.
  43. type Message struct {
  44. Version string `json:"version"`
  45. Host string `json:"host"`
  46. Short string `json:"short_message"`
  47. Full string `json:"full_message,omitempty"`
  48. TimeUnix float64 `json:"timestamp"`
  49. Level int32 `json:"level,omitempty"`
  50. Facility string `json:"facility,omitempty"`
  51. Extra map[string]interface{} `json:"-"`
  52. RawExtra json.RawMessage `json:"-"`
  53. }
  54. // Used to control GELF chunking. Should be less than (MTU - len(UDP
  55. // header)).
  56. //
  57. // TODO: generate dynamically using Path MTU Discovery?
  58. const (
  59. ChunkSize = 1420
  60. chunkedHeaderLen = 12
  61. chunkedDataLen = ChunkSize - chunkedHeaderLen
  62. )
  63. var (
  64. magicChunked = []byte{0x1e, 0x0f}
  65. magicZlib = []byte{0x78}
  66. magicGzip = []byte{0x1f, 0x8b}
  67. )
  68. // Syslog severity levels
  69. const (
  70. LOG_EMERG = int32(0)
  71. LOG_ALERT = int32(1)
  72. LOG_CRIT = int32(2)
  73. LOG_ERR = int32(3)
  74. LOG_WARNING = int32(4)
  75. LOG_NOTICE = int32(5)
  76. LOG_INFO = int32(6)
  77. LOG_DEBUG = int32(7)
  78. )
  79. // numChunks returns the number of GELF chunks necessary to transmit
  80. // the given compressed buffer.
  81. func numChunks(b []byte) int {
  82. lenB := len(b)
  83. if lenB <= ChunkSize {
  84. return 1
  85. }
  86. return len(b)/chunkedDataLen + 1
  87. }
  88. // New returns a new GELF Writer. This writer can be used to send the
  89. // output of the standard Go log functions to a central GELF server by
  90. // passing it to log.SetOutput()
  91. func NewWriter(addr string) (*Writer, error) {
  92. var err error
  93. w := new(Writer)
  94. w.CompressionLevel = flate.BestSpeed
  95. if w.conn, err = net.Dial("udp", addr); err != nil {
  96. return nil, err
  97. }
  98. if w.hostname, err = os.Hostname(); err != nil {
  99. return nil, err
  100. }
  101. w.Facility = path.Base(os.Args[0])
  102. return w, nil
  103. }
  104. // writes the gzip compressed byte array to the connection as a series
  105. // of GELF chunked messages. The format is documented at
  106. // http://docs.graylog.org/en/2.1/pages/gelf.html as:
  107. //
  108. // 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte
  109. // total, chunk-data
  110. func (w *Writer) writeChunked(zBytes []byte) (err error) {
  111. b := make([]byte, 0, ChunkSize)
  112. buf := bytes.NewBuffer(b)
  113. nChunksI := numChunks(zBytes)
  114. if nChunksI > 128 {
  115. return fmt.Errorf("msg too large, would need %d chunks", nChunksI)
  116. }
  117. nChunks := uint8(nChunksI)
  118. // use urandom to get a unique message id
  119. msgId := make([]byte, 8)
  120. n, err := io.ReadFull(rand.Reader, msgId)
  121. if err != nil || n != 8 {
  122. return fmt.Errorf("rand.Reader: %d/%s", n, err)
  123. }
  124. bytesLeft := len(zBytes)
  125. for i := uint8(0); i < nChunks; i++ {
  126. buf.Reset()
  127. // manually write header. Don't care about
  128. // host/network byte order, because the spec only
  129. // deals in individual bytes.
  130. buf.Write(magicChunked) //magic
  131. buf.Write(msgId)
  132. buf.WriteByte(i)
  133. buf.WriteByte(nChunks)
  134. // slice out our chunk from zBytes
  135. chunkLen := chunkedDataLen
  136. if chunkLen > bytesLeft {
  137. chunkLen = bytesLeft
  138. }
  139. off := int(i) * chunkedDataLen
  140. chunk := zBytes[off : off+chunkLen]
  141. buf.Write(chunk)
  142. // write this chunk, and make sure the write was good
  143. n, err := w.conn.Write(buf.Bytes())
  144. if err != nil {
  145. return fmt.Errorf("Write (chunk %d/%d): %s", i,
  146. nChunks, err)
  147. }
  148. if n != len(buf.Bytes()) {
  149. return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)",
  150. i, nChunks, n, len(buf.Bytes()))
  151. }
  152. bytesLeft -= chunkLen
  153. }
  154. if bytesLeft != 0 {
  155. return fmt.Errorf("error: %d bytes left after sending", bytesLeft)
  156. }
  157. return nil
  158. }
  159. // 1k bytes buffer by default
  160. var bufPool = sync.Pool{
  161. New: func() interface{} {
  162. return bytes.NewBuffer(make([]byte, 0, 1024))
  163. },
  164. }
  165. func newBuffer() *bytes.Buffer {
  166. b := bufPool.Get().(*bytes.Buffer)
  167. if b != nil {
  168. b.Reset()
  169. return b
  170. }
  171. return bytes.NewBuffer(nil)
  172. }
  173. // WriteMessage sends the specified message to the GELF server
  174. // specified in the call to New(). It assumes all the fields are
  175. // filled out appropriately. In general, clients will want to use
  176. // Write, rather than WriteMessage.
  177. func (w *Writer) WriteMessage(m *Message) (err error) {
  178. mBuf := newBuffer()
  179. defer bufPool.Put(mBuf)
  180. if err = m.MarshalJSONBuf(mBuf); err != nil {
  181. return err
  182. }
  183. mBytes := mBuf.Bytes()
  184. var (
  185. zBuf *bytes.Buffer
  186. zBytes []byte
  187. )
  188. var zw io.WriteCloser
  189. switch w.CompressionType {
  190. case CompressGzip:
  191. zBuf = newBuffer()
  192. defer bufPool.Put(zBuf)
  193. zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel)
  194. case CompressZlib:
  195. zBuf = newBuffer()
  196. defer bufPool.Put(zBuf)
  197. zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel)
  198. case CompressNone:
  199. zBytes = mBytes
  200. default:
  201. panic(fmt.Sprintf("unknown compression type %d",
  202. w.CompressionType))
  203. }
  204. if zw != nil {
  205. if err != nil {
  206. return
  207. }
  208. if _, err = zw.Write(mBytes); err != nil {
  209. zw.Close()
  210. return
  211. }
  212. zw.Close()
  213. zBytes = zBuf.Bytes()
  214. }
  215. if numChunks(zBytes) > 1 {
  216. return w.writeChunked(zBytes)
  217. }
  218. n, err := w.conn.Write(zBytes)
  219. if err != nil {
  220. return
  221. }
  222. if n != len(zBytes) {
  223. return fmt.Errorf("bad write (%d/%d)", n, len(zBytes))
  224. }
  225. return nil
  226. }
  227. // Close connection and interrupt blocked Read or Write operations
  228. func (w *Writer) Close() error {
  229. return w.conn.Close()
  230. }
  231. /*
  232. func (w *Writer) Alert(m string) (err error)
  233. func (w *Writer) Close() error
  234. func (w *Writer) Crit(m string) (err error)
  235. func (w *Writer) Debug(m string) (err error)
  236. func (w *Writer) Emerg(m string) (err error)
  237. func (w *Writer) Err(m string) (err error)
  238. func (w *Writer) Info(m string) (err error)
  239. func (w *Writer) Notice(m string) (err error)
  240. func (w *Writer) Warning(m string) (err error)
  241. */
  242. // getCaller returns the filename and the line info of a function
  243. // further down in the call stack. Passing 0 in as callDepth would
  244. // return info on the function calling getCallerIgnoringLog, 1 the
  245. // parent function, and so on. Any suffixes passed to getCaller are
  246. // path fragments like "/pkg/log/log.go", and functions in the call
  247. // stack from that file are ignored.
  248. func getCaller(callDepth int, suffixesToIgnore ...string) (file string, line int) {
  249. // bump by 1 to ignore the getCaller (this) stackframe
  250. callDepth++
  251. outer:
  252. for {
  253. var ok bool
  254. _, file, line, ok = runtime.Caller(callDepth)
  255. if !ok {
  256. file = "???"
  257. line = 0
  258. break
  259. }
  260. for _, s := range suffixesToIgnore {
  261. if strings.HasSuffix(file, s) {
  262. callDepth++
  263. continue outer
  264. }
  265. }
  266. break
  267. }
  268. return
  269. }
  270. func getCallerIgnoringLogMulti(callDepth int) (string, int) {
  271. // the +1 is to ignore this (getCallerIgnoringLogMulti) frame
  272. return getCaller(callDepth+1, "/pkg/log/log.go", "/pkg/io/multi.go")
  273. }
  274. // Write encodes the given string in a GELF message and sends it to
  275. // the server specified in New().
  276. func (w *Writer) Write(p []byte) (n int, err error) {
  277. // 1 for the function that called us.
  278. file, line := getCallerIgnoringLogMulti(1)
  279. // remove trailing and leading whitespace
  280. p = bytes.TrimSpace(p)
  281. // If there are newlines in the message, use the first line
  282. // for the short message and set the full message to the
  283. // original input. If the input has no newlines, stick the
  284. // whole thing in Short.
  285. short := p
  286. full := []byte("")
  287. if i := bytes.IndexRune(p, '\n'); i > 0 {
  288. short = p[:i]
  289. full = p
  290. }
  291. m := Message{
  292. Version: "1.1",
  293. Host: w.hostname,
  294. Short: string(short),
  295. Full: string(full),
  296. TimeUnix: float64(time.Now().Unix()),
  297. Level: 6, // info
  298. Facility: w.Facility,
  299. Extra: map[string]interface{}{
  300. "_file": file,
  301. "_line": line,
  302. },
  303. }
  304. if err = w.WriteMessage(&m); err != nil {
  305. return 0, err
  306. }
  307. return len(p), nil
  308. }
  309. func (m *Message) MarshalJSONBuf(buf *bytes.Buffer) error {
  310. b, err := json.Marshal(m)
  311. if err != nil {
  312. return err
  313. }
  314. // write up until the final }
  315. if _, err = buf.Write(b[:len(b)-1]); err != nil {
  316. return err
  317. }
  318. if len(m.Extra) > 0 {
  319. eb, err := json.Marshal(m.Extra)
  320. if err != nil {
  321. return err
  322. }
  323. // merge serialized message + serialized extra map
  324. if err = buf.WriteByte(','); err != nil {
  325. return err
  326. }
  327. // write serialized extra bytes, without enclosing quotes
  328. if _, err = buf.Write(eb[1 : len(eb)-1]); err != nil {
  329. return err
  330. }
  331. }
  332. if len(m.RawExtra) > 0 {
  333. if err := buf.WriteByte(','); err != nil {
  334. return err
  335. }
  336. // write serialized extra bytes, without enclosing quotes
  337. if _, err = buf.Write(m.RawExtra[1 : len(m.RawExtra)-1]); err != nil {
  338. return err
  339. }
  340. }
  341. // write final closing quotes
  342. return buf.WriteByte('}')
  343. }
  344. func (m *Message) UnmarshalJSON(data []byte) error {
  345. i := make(map[string]interface{}, 16)
  346. if err := json.Unmarshal(data, &i); err != nil {
  347. return err
  348. }
  349. for k, v := range i {
  350. if k[0] == '_' {
  351. if m.Extra == nil {
  352. m.Extra = make(map[string]interface{}, 1)
  353. }
  354. m.Extra[k] = v
  355. continue
  356. }
  357. switch k {
  358. case "version":
  359. m.Version = v.(string)
  360. case "host":
  361. m.Host = v.(string)
  362. case "short_message":
  363. m.Short = v.(string)
  364. case "full_message":
  365. m.Full = v.(string)
  366. case "timestamp":
  367. m.TimeUnix = v.(float64)
  368. case "level":
  369. m.Level = int32(v.(float64))
  370. case "facility":
  371. m.Facility = v.(string)
  372. }
  373. }
  374. return nil
  375. }