reader.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. // Copyright 2012 SocialCode. All rights reserved.
  2. // Use of this source code is governed by the MIT
  3. // license that can be found in the LICENSE file.
  4. package gelf
  5. import (
  6. "bytes"
  7. "compress/gzip"
  8. "compress/zlib"
  9. "encoding/json"
  10. "fmt"
  11. "io"
  12. "net"
  13. "strings"
  14. "sync"
  15. )
  16. type Reader struct {
  17. mu sync.Mutex
  18. conn net.Conn
  19. }
  20. func NewReader(addr string) (*Reader, error) {
  21. var err error
  22. udpAddr, err := net.ResolveUDPAddr("udp", addr)
  23. if err != nil {
  24. return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
  25. }
  26. conn, err := net.ListenUDP("udp", udpAddr)
  27. if err != nil {
  28. return nil, fmt.Errorf("ListenUDP: %s", err)
  29. }
  30. r := new(Reader)
  31. r.conn = conn
  32. return r, nil
  33. }
  34. func (r *Reader) Addr() string {
  35. return r.conn.LocalAddr().String()
  36. }
  37. // FIXME: this will discard data if p isn't big enough to hold the
  38. // full message.
  39. func (r *Reader) Read(p []byte) (int, error) {
  40. msg, err := r.ReadMessage()
  41. if err != nil {
  42. return -1, err
  43. }
  44. var data string
  45. if msg.Full == "" {
  46. data = msg.Short
  47. } else {
  48. data = msg.Full
  49. }
  50. return strings.NewReader(data).Read(p)
  51. }
  52. func (r *Reader) ReadMessage() (*Message, error) {
  53. cBuf := make([]byte, ChunkSize)
  54. var (
  55. err error
  56. n, length int
  57. cid, ocid []byte
  58. seq, total uint8
  59. cHead []byte
  60. cReader io.Reader
  61. chunks [][]byte
  62. )
  63. for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
  64. if n, err = r.conn.Read(cBuf); err != nil {
  65. return nil, fmt.Errorf("Read: %s", err)
  66. }
  67. cHead, cBuf = cBuf[:2], cBuf[:n]
  68. if bytes.Equal(cHead, magicChunked) {
  69. //fmt.Printf("chunked %v\n", cBuf[:14])
  70. cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
  71. if ocid != nil && !bytes.Equal(cid, ocid) {
  72. return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
  73. } else if ocid == nil {
  74. ocid = cid
  75. chunks = make([][]byte, total)
  76. }
  77. n = len(cBuf) - chunkedHeaderLen
  78. //fmt.Printf("setting chunks[%d]: %d\n", seq, n)
  79. chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
  80. length += n
  81. } else { //not chunked
  82. if total > 0 {
  83. return nil, fmt.Errorf("out-of-band message (not chunked)")
  84. }
  85. break
  86. }
  87. }
  88. //fmt.Printf("\nchunks: %v\n", chunks)
  89. if length > 0 {
  90. if cap(cBuf) < length {
  91. cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
  92. }
  93. cBuf = cBuf[:0]
  94. for i := range chunks {
  95. //fmt.Printf("appending %d %v\n", i, chunks[i])
  96. cBuf = append(cBuf, chunks[i]...)
  97. }
  98. cHead = cBuf[:2]
  99. }
  100. // the data we get from the wire is compressed
  101. if bytes.Equal(cHead, magicGzip) {
  102. cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
  103. } else if cHead[0] == magicZlib[0] &&
  104. (int(cHead[0])*256+int(cHead[1]))%31 == 0 {
  105. // zlib is slightly more complicated, but correct
  106. cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
  107. } else {
  108. // compliance with https://github.com/Graylog2/graylog2-server
  109. // treating all messages as uncompressed if they are not gzip, zlib or
  110. // chunked
  111. cReader = bytes.NewReader(cBuf)
  112. }
  113. if err != nil {
  114. return nil, fmt.Errorf("NewReader: %s", err)
  115. }
  116. msg := new(Message)
  117. if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
  118. return nil, fmt.Errorf("json.Unmarshal: %s", err)
  119. }
  120. return msg, nil
  121. }