|
@@ -5,6 +5,7 @@ import (
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
"net"
|
|
"net"
|
|
|
|
+ "time"
|
|
)
|
|
)
|
|
|
|
|
|
type TCPReader struct {
|
|
type TCPReader struct {
|
|
@@ -13,16 +14,21 @@ type TCPReader struct {
|
|
messages chan []byte
|
|
messages chan []byte
|
|
}
|
|
}
|
|
|
|
|
|
-func newTCPReader(addr string) (*TCPReader, chan string, error) {
|
|
|
|
|
|
+type connChannels struct {
|
|
|
|
+ drop chan string
|
|
|
|
+ confirm chan string
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) {
|
|
var err error
|
|
var err error
|
|
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
|
|
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
|
|
|
|
|
|
+ return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
|
|
}
|
|
}
|
|
|
|
|
|
listener, err := net.ListenTCP("tcp", tcpAddr)
|
|
listener, err := net.ListenTCP("tcp", tcpAddr)
|
|
if err != nil {
|
|
if err != nil {
|
|
- return nil, nil, fmt.Errorf("ListenTCP: %s", err)
|
|
|
|
|
|
+ return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err)
|
|
}
|
|
}
|
|
|
|
|
|
r := &TCPReader{
|
|
r := &TCPReader{
|
|
@@ -30,26 +36,61 @@ func newTCPReader(addr string) (*TCPReader, chan string, error) {
|
|
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
|
|
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
|
|
}
|
|
}
|
|
|
|
|
|
- signal := make(chan string, 1)
|
|
|
|
|
|
+ closeSignal := make(chan string, 1)
|
|
|
|
+ doneSignal := make(chan string, 1)
|
|
|
|
|
|
- go r.listenUntilCloseSignal(signal)
|
|
|
|
|
|
+ go r.listenUntilCloseSignal(closeSignal, doneSignal)
|
|
|
|
|
|
- return r, signal, nil
|
|
|
|
|
|
+ return r, closeSignal, doneSignal, nil
|
|
}
|
|
}
|
|
|
|
|
|
-func (r *TCPReader) listenUntilCloseSignal(signal chan string) {
|
|
|
|
- defer func() { signal <- "done" }()
|
|
|
|
- defer r.listener.Close()
|
|
|
|
|
|
+func (r *TCPReader) accepter(connections chan net.Conn) {
|
|
for {
|
|
for {
|
|
conn, err := r.listener.Accept()
|
|
conn, err := r.listener.Accept()
|
|
if err != nil {
|
|
if err != nil {
|
|
break
|
|
break
|
|
}
|
|
}
|
|
- go handleConnection(conn, r.messages)
|
|
|
|
|
|
+ connections <- conn
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) {
|
|
|
|
+ defer func() { doneSignal <- "done" }()
|
|
|
|
+ defer r.listener.Close()
|
|
|
|
+ var conns []connChannels
|
|
|
|
+ connectionsChannel := make(chan net.Conn, 1)
|
|
|
|
+ go r.accepter(connectionsChannel)
|
|
|
|
+ for {
|
|
select {
|
|
select {
|
|
- case sig := <-signal:
|
|
|
|
- if sig == "stop" {
|
|
|
|
- break
|
|
|
|
|
|
+ case conn := <-connectionsChannel:
|
|
|
|
+ dropSignal := make(chan string, 1)
|
|
|
|
+ dropConfirm := make(chan string, 1)
|
|
|
|
+ channels := connChannels{drop: dropSignal, confirm: dropConfirm}
|
|
|
|
+ go handleConnection(conn, r.messages, dropSignal, dropConfirm)
|
|
|
|
+ conns = append(conns, channels)
|
|
|
|
+ default:
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ select {
|
|
|
|
+ case sig := <-closeSignal:
|
|
|
|
+ if sig == "stop" || sig == "drop" {
|
|
|
|
+ if len(conns) >= 1 {
|
|
|
|
+ for _, s := range conns {
|
|
|
|
+ if s.drop != nil {
|
|
|
|
+ s.drop <- "drop"
|
|
|
|
+ <-s.confirm
|
|
|
|
+ conns = append(conns[:0], conns[1:]...)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if sig == "stop" {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ } else if sig == "stop" {
|
|
|
|
+ closeSignal <- "stop"
|
|
|
|
+ }
|
|
|
|
+ if sig == "drop" {
|
|
|
|
+ doneSignal <- "done"
|
|
|
|
+ }
|
|
}
|
|
}
|
|
default:
|
|
default:
|
|
}
|
|
}
|
|
@@ -60,19 +101,41 @@ func (r *TCPReader) addr() string {
|
|
return r.listener.Addr().String()
|
|
return r.listener.Addr().String()
|
|
}
|
|
}
|
|
|
|
|
|
-func handleConnection(conn net.Conn, messages chan<- []byte) {
|
|
|
|
|
|
+func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) {
|
|
|
|
+ defer func() { dropConfirm <- "done" }()
|
|
defer conn.Close()
|
|
defer conn.Close()
|
|
reader := bufio.NewReader(conn)
|
|
reader := bufio.NewReader(conn)
|
|
|
|
|
|
var b []byte
|
|
var b []byte
|
|
var err error
|
|
var err error
|
|
|
|
+ drop := false
|
|
|
|
+ canDrop := false
|
|
|
|
|
|
for {
|
|
for {
|
|
|
|
+ conn.SetDeadline(time.Now().Add(2 * time.Second))
|
|
if b, err = reader.ReadBytes(0); err != nil {
|
|
if b, err = reader.ReadBytes(0); err != nil {
|
|
- continue
|
|
|
|
- }
|
|
|
|
- if len(b) > 0 {
|
|
|
|
|
|
+ if drop {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ } else if len(b) > 0 {
|
|
messages <- b
|
|
messages <- b
|
|
|
|
+ canDrop = true
|
|
|
|
+ if drop {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ } else if drop {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ select {
|
|
|
|
+ case sig := <-dropSignal:
|
|
|
|
+ if sig == "drop" {
|
|
|
|
+ drop = true
|
|
|
|
+ time.Sleep(1 * time.Second)
|
|
|
|
+ if canDrop {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|