|
@@ -2,8 +2,12 @@ package ttrpc
|
|
|
|
|
|
import (
|
|
import (
|
|
"context"
|
|
"context"
|
|
|
|
+ "io"
|
|
"net"
|
|
"net"
|
|
|
|
+ "os"
|
|
|
|
+ "strings"
|
|
"sync"
|
|
"sync"
|
|
|
|
+ "syscall"
|
|
|
|
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/gogo/protobuf/proto"
|
|
@@ -11,6 +15,10 @@ import (
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+// ErrClosed is returned by client methods when the underlying connection is
|
|
|
|
+// closed.
|
|
|
|
+var ErrClosed = errors.New("ttrpc: closed")
|
|
|
|
+
|
|
type Client struct {
|
|
type Client struct {
|
|
codec codec
|
|
codec codec
|
|
conn net.Conn
|
|
conn net.Conn
|
|
@@ -19,18 +27,20 @@ type Client struct {
|
|
|
|
|
|
closed chan struct{}
|
|
closed chan struct{}
|
|
closeOnce sync.Once
|
|
closeOnce sync.Once
|
|
|
|
+ closeFunc func()
|
|
done chan struct{}
|
|
done chan struct{}
|
|
err error
|
|
err error
|
|
}
|
|
}
|
|
|
|
|
|
func NewClient(conn net.Conn) *Client {
|
|
func NewClient(conn net.Conn) *Client {
|
|
c := &Client{
|
|
c := &Client{
|
|
- codec: codec{},
|
|
|
|
- conn: conn,
|
|
|
|
- channel: newChannel(conn, conn),
|
|
|
|
- calls: make(chan *callRequest),
|
|
|
|
- closed: make(chan struct{}),
|
|
|
|
- done: make(chan struct{}),
|
|
|
|
|
|
+ codec: codec{},
|
|
|
|
+ conn: conn,
|
|
|
|
+ channel: newChannel(conn),
|
|
|
|
+ calls: make(chan *callRequest),
|
|
|
|
+ closed: make(chan struct{}),
|
|
|
|
+ done: make(chan struct{}),
|
|
|
|
+ closeFunc: func() {},
|
|
}
|
|
}
|
|
|
|
|
|
go c.run()
|
|
go c.run()
|
|
@@ -91,7 +101,7 @@ func (c *Client) dispatch(ctx context.Context, req *Request, resp *Response) err
|
|
|
|
|
|
select {
|
|
select {
|
|
case err := <-errs:
|
|
case err := <-errs:
|
|
- return err
|
|
|
|
|
|
+ return filterCloseErr(err)
|
|
case <-c.done:
|
|
case <-c.done:
|
|
return c.err
|
|
return c.err
|
|
}
|
|
}
|
|
@@ -105,6 +115,11 @@ func (c *Client) Close() error {
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// OnClose allows a close func to be called when the server is closed
|
|
|
|
+func (c *Client) OnClose(closer func()) {
|
|
|
|
+ c.closeFunc = closer
|
|
|
|
+}
|
|
|
|
+
|
|
type message struct {
|
|
type message struct {
|
|
messageHeader
|
|
messageHeader
|
|
p []byte
|
|
p []byte
|
|
@@ -150,6 +165,7 @@ func (c *Client) run() {
|
|
|
|
|
|
defer c.conn.Close()
|
|
defer c.conn.Close()
|
|
defer close(c.done)
|
|
defer close(c.done)
|
|
|
|
+ defer c.closeFunc()
|
|
|
|
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
@@ -171,7 +187,14 @@ func (c *Client) run() {
|
|
call.errs <- c.recv(call.resp, msg)
|
|
call.errs <- c.recv(call.resp, msg)
|
|
delete(waiters, msg.StreamID)
|
|
delete(waiters, msg.StreamID)
|
|
case <-shutdown:
|
|
case <-shutdown:
|
|
|
|
+ if shutdownErr != nil {
|
|
|
|
+ shutdownErr = filterCloseErr(shutdownErr)
|
|
|
|
+ } else {
|
|
|
|
+ shutdownErr = ErrClosed
|
|
|
|
+ }
|
|
|
|
+
|
|
shutdownErr = errors.Wrapf(shutdownErr, "ttrpc: client shutting down")
|
|
shutdownErr = errors.Wrapf(shutdownErr, "ttrpc: client shutting down")
|
|
|
|
+
|
|
c.err = shutdownErr
|
|
c.err = shutdownErr
|
|
for _, waiter := range waiters {
|
|
for _, waiter := range waiters {
|
|
waiter.errs <- shutdownErr
|
|
waiter.errs <- shutdownErr
|
|
@@ -179,9 +202,12 @@ func (c *Client) run() {
|
|
c.Close()
|
|
c.Close()
|
|
return
|
|
return
|
|
case <-c.closed:
|
|
case <-c.closed:
|
|
|
|
+ if c.err == nil {
|
|
|
|
+ c.err = ErrClosed
|
|
|
|
+ }
|
|
// broadcast the shutdown error to the remaining waiters.
|
|
// broadcast the shutdown error to the remaining waiters.
|
|
for _, waiter := range waiters {
|
|
for _, waiter := range waiters {
|
|
- waiter.errs <- shutdownErr
|
|
|
|
|
|
+ waiter.errs <- c.err
|
|
}
|
|
}
|
|
return
|
|
return
|
|
}
|
|
}
|
|
@@ -209,3 +235,30 @@ func (c *Client) recv(resp *Response, msg *message) error {
|
|
defer c.channel.putmbuf(msg.p)
|
|
defer c.channel.putmbuf(msg.p)
|
|
return proto.Unmarshal(msg.p, resp)
|
|
return proto.Unmarshal(msg.p, resp)
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+// filterCloseErr rewrites EOF and EPIPE errors to ErrClosed. Use when
|
|
|
|
+// returning from call or handling errors from main read loop.
|
|
|
|
+//
|
|
|
|
+// This purposely ignores errors with a wrapped cause.
|
|
|
|
+func filterCloseErr(err error) error {
|
|
|
|
+ if err == nil {
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if err == io.EOF {
|
|
|
|
+ return ErrClosed
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if strings.Contains(err.Error(), "use of closed network connection") {
|
|
|
|
+ return ErrClosed
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // if we have an epipe on a write, we cast to errclosed
|
|
|
|
+ if oerr, ok := err.(*net.OpError); ok && oerr.Op == "write" {
|
|
|
|
+ if serr, ok := oerr.Err.(*os.SyscallError); ok && serr.Err == syscall.EPIPE {
|
|
|
|
+ return ErrClosed
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return err
|
|
|
|
+}
|