udpwriter.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Copyright 2012 SocialCode. All rights reserved.
  2. // Use of this source code is governed by the MIT
  3. // license that can be found in the LICENSE file.
  4. package gelf
  5. import (
  6. "bytes"
  7. "compress/flate"
  8. "compress/gzip"
  9. "compress/zlib"
  10. "crypto/rand"
  11. "fmt"
  12. "io"
  13. "net"
  14. "os"
  15. "path"
  16. "sync"
  17. )
  18. type UDPWriter struct {
  19. GelfWriter
  20. CompressionLevel int // one of the consts from compress/flate
  21. CompressionType CompressType
  22. }
  23. // What compression type the writer should use when sending messages
  24. // to the graylog2 server
  25. type CompressType int
  26. const (
  27. CompressGzip CompressType = iota
  28. CompressZlib
  29. CompressNone
  30. )
  31. // Used to control GELF chunking. Should be less than (MTU - len(UDP
  32. // header)).
  33. //
  34. // TODO: generate dynamically using Path MTU Discovery?
  35. const (
  36. ChunkSize = 1420
  37. chunkedHeaderLen = 12
  38. chunkedDataLen = ChunkSize - chunkedHeaderLen
  39. )
  40. var (
  41. magicChunked = []byte{0x1e, 0x0f}
  42. magicZlib = []byte{0x78}
  43. magicGzip = []byte{0x1f, 0x8b}
  44. )
  45. // numChunks returns the number of GELF chunks necessary to transmit
  46. // the given compressed buffer.
  47. func numChunks(b []byte) int {
  48. lenB := len(b)
  49. if lenB <= ChunkSize {
  50. return 1
  51. }
  52. return len(b)/chunkedDataLen + 1
  53. }
  54. // New returns a new GELF Writer. This writer can be used to send the
  55. // output of the standard Go log functions to a central GELF server by
  56. // passing it to log.SetOutput()
  57. func NewUDPWriter(addr string) (*UDPWriter, error) {
  58. var err error
  59. w := new(UDPWriter)
  60. w.CompressionLevel = flate.BestSpeed
  61. if w.conn, err = net.Dial("udp", addr); err != nil {
  62. return nil, err
  63. }
  64. if w.hostname, err = os.Hostname(); err != nil {
  65. return nil, err
  66. }
  67. w.Facility = path.Base(os.Args[0])
  68. return w, nil
  69. }
  70. // writes the gzip compressed byte array to the connection as a series
  71. // of GELF chunked messages. The format is documented at
  72. // http://docs.graylog.org/en/2.1/pages/gelf.html as:
  73. //
  74. // 2-byte magic (0x1e 0x0f), 8 byte id, 1 byte sequence id, 1 byte
  75. // total, chunk-data
  76. func (w *GelfWriter) writeChunked(zBytes []byte) (err error) {
  77. b := make([]byte, 0, ChunkSize)
  78. buf := bytes.NewBuffer(b)
  79. nChunksI := numChunks(zBytes)
  80. if nChunksI > 128 {
  81. return fmt.Errorf("msg too large, would need %d chunks", nChunksI)
  82. }
  83. nChunks := uint8(nChunksI)
  84. // use urandom to get a unique message id
  85. msgId := make([]byte, 8)
  86. n, err := io.ReadFull(rand.Reader, msgId)
  87. if err != nil || n != 8 {
  88. return fmt.Errorf("rand.Reader: %d/%s", n, err)
  89. }
  90. bytesLeft := len(zBytes)
  91. for i := uint8(0); i < nChunks; i++ {
  92. buf.Reset()
  93. // manually write header. Don't care about
  94. // host/network byte order, because the spec only
  95. // deals in individual bytes.
  96. buf.Write(magicChunked) //magic
  97. buf.Write(msgId)
  98. buf.WriteByte(i)
  99. buf.WriteByte(nChunks)
  100. // slice out our chunk from zBytes
  101. chunkLen := chunkedDataLen
  102. if chunkLen > bytesLeft {
  103. chunkLen = bytesLeft
  104. }
  105. off := int(i) * chunkedDataLen
  106. chunk := zBytes[off : off+chunkLen]
  107. buf.Write(chunk)
  108. // write this chunk, and make sure the write was good
  109. n, err := w.conn.Write(buf.Bytes())
  110. if err != nil {
  111. return fmt.Errorf("Write (chunk %d/%d): %s", i,
  112. nChunks, err)
  113. }
  114. if n != len(buf.Bytes()) {
  115. return fmt.Errorf("Write len: (chunk %d/%d) (%d/%d)",
  116. i, nChunks, n, len(buf.Bytes()))
  117. }
  118. bytesLeft -= chunkLen
  119. }
  120. if bytesLeft != 0 {
  121. return fmt.Errorf("error: %d bytes left after sending", bytesLeft)
  122. }
  123. return nil
  124. }
  125. // 1k bytes buffer by default
  126. var bufPool = sync.Pool{
  127. New: func() interface{} {
  128. return bytes.NewBuffer(make([]byte, 0, 1024))
  129. },
  130. }
  131. func newBuffer() *bytes.Buffer {
  132. b := bufPool.Get().(*bytes.Buffer)
  133. if b != nil {
  134. b.Reset()
  135. return b
  136. }
  137. return bytes.NewBuffer(nil)
  138. }
  139. // WriteMessage sends the specified message to the GELF server
  140. // specified in the call to New(). It assumes all the fields are
  141. // filled out appropriately. In general, clients will want to use
  142. // Write, rather than WriteMessage.
  143. func (w *UDPWriter) WriteMessage(m *Message) (err error) {
  144. mBuf := newBuffer()
  145. defer bufPool.Put(mBuf)
  146. if err = m.MarshalJSONBuf(mBuf); err != nil {
  147. return err
  148. }
  149. mBytes := mBuf.Bytes()
  150. var (
  151. zBuf *bytes.Buffer
  152. zBytes []byte
  153. )
  154. var zw io.WriteCloser
  155. switch w.CompressionType {
  156. case CompressGzip:
  157. zBuf = newBuffer()
  158. defer bufPool.Put(zBuf)
  159. zw, err = gzip.NewWriterLevel(zBuf, w.CompressionLevel)
  160. case CompressZlib:
  161. zBuf = newBuffer()
  162. defer bufPool.Put(zBuf)
  163. zw, err = zlib.NewWriterLevel(zBuf, w.CompressionLevel)
  164. case CompressNone:
  165. zBytes = mBytes
  166. default:
  167. panic(fmt.Sprintf("unknown compression type %d",
  168. w.CompressionType))
  169. }
  170. if zw != nil {
  171. if err != nil {
  172. return
  173. }
  174. if _, err = zw.Write(mBytes); err != nil {
  175. zw.Close()
  176. return
  177. }
  178. zw.Close()
  179. zBytes = zBuf.Bytes()
  180. }
  181. if numChunks(zBytes) > 1 {
  182. return w.writeChunked(zBytes)
  183. }
  184. n, err := w.conn.Write(zBytes)
  185. if err != nil {
  186. return
  187. }
  188. if n != len(zBytes) {
  189. return fmt.Errorf("bad write (%d/%d)", n, len(zBytes))
  190. }
  191. return nil
  192. }
  193. // Write encodes the given string in a GELF message and sends it to
  194. // the server specified in New().
  195. func (w *UDPWriter) Write(p []byte) (n int, err error) {
  196. // 1 for the function that called us.
  197. file, line := getCallerIgnoringLogMulti(1)
  198. m := constructMessage(p, w.hostname, w.Facility, file, line)
  199. if err = w.WriteMessage(m); err != nil {
  200. return 0, err
  201. }
  202. return len(p), nil
  203. }