123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- // Copyright 2012 SocialCode. All rights reserved.
- // Use of this source code is governed by the MIT
- // license that can be found in the LICENSE file.
- package gelf
- import (
- "bytes"
- "compress/gzip"
- "compress/zlib"
- "encoding/json"
- "fmt"
- "io"
- "net"
- "strings"
- "sync"
- )
- type Reader struct {
- mu sync.Mutex
- conn net.Conn
- }
- func NewReader(addr string) (*Reader, error) {
- var err error
- udpAddr, err := net.ResolveUDPAddr("udp", addr)
- if err != nil {
- return nil, fmt.Errorf("ResolveUDPAddr('%s'): %s", addr, err)
- }
- conn, err := net.ListenUDP("udp", udpAddr)
- if err != nil {
- return nil, fmt.Errorf("ListenUDP: %s", err)
- }
- r := new(Reader)
- r.conn = conn
- return r, nil
- }
- func (r *Reader) Addr() string {
- return r.conn.LocalAddr().String()
- }
- // FIXME: this will discard data if p isn't big enough to hold the
- // full message.
- func (r *Reader) Read(p []byte) (int, error) {
- msg, err := r.ReadMessage()
- if err != nil {
- return -1, err
- }
- var data string
- if msg.Full == "" {
- data = msg.Short
- } else {
- data = msg.Full
- }
- return strings.NewReader(data).Read(p)
- }
- func (r *Reader) ReadMessage() (*Message, error) {
- cBuf := make([]byte, ChunkSize)
- var (
- err error
- n, length int
- cid, ocid []byte
- seq, total uint8
- cHead []byte
- cReader io.Reader
- chunks [][]byte
- )
- for got := 0; got < 128 && (total == 0 || got < int(total)); got++ {
- if n, err = r.conn.Read(cBuf); err != nil {
- return nil, fmt.Errorf("Read: %s", err)
- }
- cHead, cBuf = cBuf[:2], cBuf[:n]
- if bytes.Equal(cHead, magicChunked) {
- //fmt.Printf("chunked %v\n", cBuf[:14])
- cid, seq, total = cBuf[2:2+8], cBuf[2+8], cBuf[2+8+1]
- if ocid != nil && !bytes.Equal(cid, ocid) {
- return nil, fmt.Errorf("out-of-band message %v (awaited %v)", cid, ocid)
- } else if ocid == nil {
- ocid = cid
- chunks = make([][]byte, total)
- }
- n = len(cBuf) - chunkedHeaderLen
- //fmt.Printf("setting chunks[%d]: %d\n", seq, n)
- chunks[seq] = append(make([]byte, 0, n), cBuf[chunkedHeaderLen:]...)
- length += n
- } else { //not chunked
- if total > 0 {
- return nil, fmt.Errorf("out-of-band message (not chunked)")
- }
- break
- }
- }
- //fmt.Printf("\nchunks: %v\n", chunks)
- if length > 0 {
- if cap(cBuf) < length {
- cBuf = append(cBuf, make([]byte, 0, length-cap(cBuf))...)
- }
- cBuf = cBuf[:0]
- for i := range chunks {
- //fmt.Printf("appending %d %v\n", i, chunks[i])
- cBuf = append(cBuf, chunks[i]...)
- }
- cHead = cBuf[:2]
- }
- // the data we get from the wire is compressed
- if bytes.Equal(cHead, magicGzip) {
- cReader, err = gzip.NewReader(bytes.NewReader(cBuf))
- } else if cHead[0] == magicZlib[0] &&
- (int(cHead[0])*256+int(cHead[1]))%31 == 0 {
- // zlib is slightly more complicated, but correct
- cReader, err = zlib.NewReader(bytes.NewReader(cBuf))
- } else {
- // compliance with https://github.com/Graylog2/graylog2-server
- // treating all messages as uncompressed if they are not gzip, zlib or
- // chunked
- cReader = bytes.NewReader(cBuf)
- }
- if err != nil {
- return nil, fmt.Errorf("NewReader: %s", err)
- }
- msg := new(Message)
- if err := json.NewDecoder(cReader).Decode(&msg); err != nil {
- return nil, fmt.Errorf("json.Unmarshal: %s", err)
- }
- return msg, nil
- }
|