123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- package gelf
- import (
- "bufio"
- "encoding/json"
- "fmt"
- "net"
- "time"
- )
- type TCPReader struct {
- listener *net.TCPListener
- conn net.Conn
- messages chan []byte
- }
- type connChannels struct {
- drop chan string
- confirm chan string
- }
- func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) {
- var err error
- tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
- if err != nil {
- return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
- }
- listener, err := net.ListenTCP("tcp", tcpAddr)
- if err != nil {
- return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err)
- }
- r := &TCPReader{
- listener: listener,
- messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
- }
- closeSignal := make(chan string, 1)
- doneSignal := make(chan string, 1)
- go r.listenUntilCloseSignal(closeSignal, doneSignal)
- return r, closeSignal, doneSignal, nil
- }
- func (r *TCPReader) accepter(connections chan net.Conn) {
- for {
- conn, err := r.listener.Accept()
- if err != nil {
- break
- }
- connections <- conn
- }
- }
- func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) {
- defer func() { doneSignal <- "done" }()
- defer r.listener.Close()
- var conns []connChannels
- connectionsChannel := make(chan net.Conn, 1)
- go r.accepter(connectionsChannel)
- for {
- select {
- case conn := <-connectionsChannel:
- dropSignal := make(chan string, 1)
- dropConfirm := make(chan string, 1)
- channels := connChannels{drop: dropSignal, confirm: dropConfirm}
- go handleConnection(conn, r.messages, dropSignal, dropConfirm)
- conns = append(conns, channels)
- default:
- }
- select {
- case sig := <-closeSignal:
- if sig == "stop" || sig == "drop" {
- if len(conns) >= 1 {
- for _, s := range conns {
- if s.drop != nil {
- s.drop <- "drop"
- <-s.confirm
- conns = append(conns[:0], conns[1:]...)
- }
- }
- if sig == "stop" {
- return
- }
- } else if sig == "stop" {
- closeSignal <- "stop"
- }
- if sig == "drop" {
- doneSignal <- "done"
- }
- }
- default:
- }
- }
- }
- func (r *TCPReader) addr() string {
- return r.listener.Addr().String()
- }
- func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) {
- defer func() { dropConfirm <- "done" }()
- defer conn.Close()
- reader := bufio.NewReader(conn)
- var b []byte
- var err error
- drop := false
- canDrop := false
- for {
- conn.SetDeadline(time.Now().Add(2 * time.Second))
- if b, err = reader.ReadBytes(0); err != nil {
- if drop {
- return
- }
- } else if len(b) > 0 {
- messages <- b
- canDrop = true
- if drop {
- return
- }
- } else if drop {
- return
- }
- select {
- case sig := <-dropSignal:
- if sig == "drop" {
- drop = true
- time.Sleep(1 * time.Second)
- if canDrop {
- return
- }
- }
- default:
- }
- }
- }
- func (r *TCPReader) readMessage() (*Message, error) {
- b := <-r.messages
- var msg Message
- if err := json.Unmarshal(b[:len(b)-1], &msg); err != nil {
- return nil, fmt.Errorf("json.Unmarshal: %s", err)
- }
- return &msg, nil
- }
- func (r *TCPReader) Close() {
- r.listener.Close()
- }
|