session.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. package agent
  2. import (
  3. "errors"
  4. "sync"
  5. "time"
  6. "github.com/Sirupsen/logrus"
  7. "github.com/docker/swarmkit/api"
  8. "github.com/docker/swarmkit/connectionbroker"
  9. "github.com/docker/swarmkit/log"
  10. "golang.org/x/net/context"
  11. "google.golang.org/grpc"
  12. "google.golang.org/grpc/codes"
  13. )
  14. const dispatcherRPCTimeout = 5 * time.Second
  15. var (
  16. errSessionDisconnect = errors.New("agent: session disconnect") // instructed to disconnect
  17. errSessionClosed = errors.New("agent: session closed")
  18. )
  19. // session encapsulates one round of registration with the manager. session
  20. // starts the registration and heartbeat control cycle. Any failure will result
  21. // in a complete shutdown of the session and it must be reestablished.
  22. //
  23. // All communication with the master is done through session. Changes that
  24. // flow into the agent, such as task assignment, are called back into the
  25. // agent through errs, messages and tasks.
  26. type session struct {
  27. conn *connectionbroker.Conn
  28. agent *Agent
  29. sessionID string
  30. session api.Dispatcher_SessionClient
  31. errs chan error
  32. messages chan *api.SessionMessage
  33. assignments chan *api.AssignmentsMessage
  34. subscriptions chan *api.SubscriptionMessage
  35. registered chan struct{} // closed registration
  36. closed chan struct{}
  37. closeOnce sync.Once
  38. }
  39. func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session {
  40. s := &session{
  41. agent: agent,
  42. sessionID: sessionID,
  43. errs: make(chan error, 1),
  44. messages: make(chan *api.SessionMessage),
  45. assignments: make(chan *api.AssignmentsMessage),
  46. subscriptions: make(chan *api.SubscriptionMessage),
  47. registered: make(chan struct{}),
  48. closed: make(chan struct{}),
  49. }
  50. // TODO(stevvooe): Need to move connection management up a level or create
  51. // independent connection for log broker client.
  52. cc, err := agent.config.ConnBroker.Select(
  53. grpc.WithTransportCredentials(agent.config.Credentials),
  54. grpc.WithTimeout(dispatcherRPCTimeout),
  55. )
  56. if err != nil {
  57. s.errs <- err
  58. return s
  59. }
  60. s.conn = cc
  61. go s.run(ctx, delay, description)
  62. return s
  63. }
  64. func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) {
  65. timer := time.NewTimer(delay) // delay before registering.
  66. defer timer.Stop()
  67. select {
  68. case <-timer.C:
  69. case <-ctx.Done():
  70. return
  71. }
  72. if err := s.start(ctx, description); err != nil {
  73. select {
  74. case s.errs <- err:
  75. case <-s.closed:
  76. case <-ctx.Done():
  77. }
  78. return
  79. }
  80. ctx = log.WithLogger(ctx, log.G(ctx).WithField("session.id", s.sessionID))
  81. go runctx(ctx, s.closed, s.errs, s.heartbeat)
  82. go runctx(ctx, s.closed, s.errs, s.watch)
  83. go runctx(ctx, s.closed, s.errs, s.listen)
  84. go runctx(ctx, s.closed, s.errs, s.logSubscriptions)
  85. close(s.registered)
  86. }
  87. // start begins the session and returns the first SessionMessage.
  88. func (s *session) start(ctx context.Context, description *api.NodeDescription) error {
  89. log.G(ctx).Debugf("(*session).start")
  90. errChan := make(chan error, 1)
  91. var (
  92. msg *api.SessionMessage
  93. stream api.Dispatcher_SessionClient
  94. err error
  95. )
  96. // Note: we don't defer cancellation of this context, because the
  97. // streaming RPC is used after this function returned. We only cancel
  98. // it in the timeout case to make sure the goroutine completes.
  99. sessionCtx, cancelSession := context.WithCancel(ctx)
  100. // Need to run Session in a goroutine since there's no way to set a
  101. // timeout for an individual Recv call in a stream.
  102. go func() {
  103. client := api.NewDispatcherClient(s.conn.ClientConn)
  104. stream, err = client.Session(sessionCtx, &api.SessionRequest{
  105. Description: description,
  106. SessionID: s.sessionID,
  107. })
  108. if err != nil {
  109. errChan <- err
  110. return
  111. }
  112. msg, err = stream.Recv()
  113. errChan <- err
  114. }()
  115. select {
  116. case err := <-errChan:
  117. if err != nil {
  118. return err
  119. }
  120. case <-time.After(dispatcherRPCTimeout):
  121. cancelSession()
  122. return errors.New("session initiation timed out")
  123. }
  124. s.sessionID = msg.SessionID
  125. s.session = stream
  126. return s.handleSessionMessage(ctx, msg)
  127. }
  128. func (s *session) heartbeat(ctx context.Context) error {
  129. log.G(ctx).Debugf("(*session).heartbeat")
  130. client := api.NewDispatcherClient(s.conn.ClientConn)
  131. heartbeat := time.NewTimer(1) // send out a heartbeat right away
  132. defer heartbeat.Stop()
  133. for {
  134. select {
  135. case <-heartbeat.C:
  136. heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout)
  137. resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{
  138. SessionID: s.sessionID,
  139. })
  140. cancel()
  141. if err != nil {
  142. if grpc.Code(err) == codes.NotFound {
  143. err = errNodeNotRegistered
  144. }
  145. return err
  146. }
  147. heartbeat.Reset(resp.Period)
  148. case <-s.closed:
  149. return errSessionClosed
  150. case <-ctx.Done():
  151. return ctx.Err()
  152. }
  153. }
  154. }
  155. func (s *session) listen(ctx context.Context) error {
  156. defer s.session.CloseSend()
  157. log.G(ctx).Debugf("(*session).listen")
  158. for {
  159. msg, err := s.session.Recv()
  160. if err != nil {
  161. return err
  162. }
  163. if err := s.handleSessionMessage(ctx, msg); err != nil {
  164. return err
  165. }
  166. }
  167. }
  168. func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMessage) error {
  169. select {
  170. case s.messages <- msg:
  171. return nil
  172. case <-s.closed:
  173. return errSessionClosed
  174. case <-ctx.Done():
  175. return ctx.Err()
  176. }
  177. }
  178. func (s *session) logSubscriptions(ctx context.Context) error {
  179. log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
  180. log.Debugf("")
  181. client := api.NewLogBrokerClient(s.conn.ClientConn)
  182. subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
  183. if err != nil {
  184. return err
  185. }
  186. defer subscriptions.CloseSend()
  187. for {
  188. resp, err := subscriptions.Recv()
  189. if grpc.Code(err) == codes.Unimplemented {
  190. log.Warning("manager does not support log subscriptions")
  191. // Don't return, because returning would bounce the session
  192. select {
  193. case <-s.closed:
  194. return errSessionClosed
  195. case <-ctx.Done():
  196. return ctx.Err()
  197. }
  198. }
  199. if err != nil {
  200. return err
  201. }
  202. select {
  203. case s.subscriptions <- resp:
  204. case <-s.closed:
  205. return errSessionClosed
  206. case <-ctx.Done():
  207. return ctx.Err()
  208. }
  209. }
  210. }
  211. func (s *session) watch(ctx context.Context) error {
  212. log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"})
  213. log.Debugf("")
  214. var (
  215. resp *api.AssignmentsMessage
  216. assignmentWatch api.Dispatcher_AssignmentsClient
  217. tasksWatch api.Dispatcher_TasksClient
  218. streamReference string
  219. tasksFallback bool
  220. err error
  221. )
  222. client := api.NewDispatcherClient(s.conn.ClientConn)
  223. for {
  224. // If this is the first time we're running the loop, or there was a reference mismatch
  225. // attempt to get the assignmentWatch
  226. if assignmentWatch == nil && !tasksFallback {
  227. assignmentWatch, err = client.Assignments(ctx, &api.AssignmentsRequest{SessionID: s.sessionID})
  228. if err != nil {
  229. return err
  230. }
  231. }
  232. // We have an assignmentWatch, let's try to receive an AssignmentMessage
  233. if assignmentWatch != nil {
  234. // If we get a code = 12 desc = unknown method Assignments, try to use tasks
  235. resp, err = assignmentWatch.Recv()
  236. if err != nil {
  237. if grpc.Code(err) != codes.Unimplemented {
  238. return err
  239. }
  240. tasksFallback = true
  241. assignmentWatch = nil
  242. log.WithError(err).Infof("falling back to Tasks")
  243. }
  244. }
  245. // This code is here for backwards compatibility (so that newer clients can use the
  246. // older method Tasks)
  247. if tasksWatch == nil && tasksFallback {
  248. tasksWatch, err = client.Tasks(ctx, &api.TasksRequest{SessionID: s.sessionID})
  249. if err != nil {
  250. return err
  251. }
  252. }
  253. if tasksWatch != nil {
  254. // When falling back to Tasks because of an old managers, we wrap the tasks in assignments.
  255. var taskResp *api.TasksMessage
  256. var assignmentChanges []*api.AssignmentChange
  257. taskResp, err = tasksWatch.Recv()
  258. if err != nil {
  259. return err
  260. }
  261. for _, t := range taskResp.Tasks {
  262. taskChange := &api.AssignmentChange{
  263. Assignment: &api.Assignment{
  264. Item: &api.Assignment_Task{
  265. Task: t,
  266. },
  267. },
  268. Action: api.AssignmentChange_AssignmentActionUpdate,
  269. }
  270. assignmentChanges = append(assignmentChanges, taskChange)
  271. }
  272. resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, Changes: assignmentChanges}
  273. }
  274. // If there seems to be a gap in the stream, let's break out of the inner for and
  275. // re-sync (by calling Assignments again).
  276. if streamReference != "" && streamReference != resp.AppliesTo {
  277. assignmentWatch = nil
  278. } else {
  279. streamReference = resp.ResultsIn
  280. }
  281. select {
  282. case s.assignments <- resp:
  283. case <-s.closed:
  284. return errSessionClosed
  285. case <-ctx.Done():
  286. return ctx.Err()
  287. }
  288. }
  289. }
  290. // sendTaskStatus uses the current session to send the status of a single task.
  291. func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
  292. client := api.NewDispatcherClient(s.conn.ClientConn)
  293. if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
  294. SessionID: s.sessionID,
  295. Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{
  296. {
  297. TaskID: taskID,
  298. Status: status,
  299. },
  300. },
  301. }); err != nil {
  302. // TODO(stevvooe): Dispatcher should not return this error. Status
  303. // reports for unknown tasks should be ignored.
  304. if grpc.Code(err) == codes.NotFound {
  305. return errTaskUnknown
  306. }
  307. return err
  308. }
  309. return nil
  310. }
  311. func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTaskStatusRequest_TaskStatusUpdate) ([]*api.UpdateTaskStatusRequest_TaskStatusUpdate, error) {
  312. if len(updates) < 1 {
  313. return nil, nil
  314. }
  315. const batchSize = 1024
  316. select {
  317. case <-s.registered:
  318. select {
  319. case <-s.closed:
  320. return updates, ErrClosed
  321. default:
  322. }
  323. case <-s.closed:
  324. return updates, ErrClosed
  325. case <-ctx.Done():
  326. return updates, ctx.Err()
  327. }
  328. client := api.NewDispatcherClient(s.conn.ClientConn)
  329. n := batchSize
  330. if len(updates) < n {
  331. n = len(updates)
  332. }
  333. if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
  334. SessionID: s.sessionID,
  335. Updates: updates[:n],
  336. }); err != nil {
  337. log.G(ctx).WithError(err).Errorf("failed sending task status batch size of %d", len(updates[:n]))
  338. return updates, err
  339. }
  340. return updates[n:], nil
  341. }
  342. // sendError is used to send errors to errs channel and trigger session recreation
  343. func (s *session) sendError(err error) {
  344. select {
  345. case s.errs <- err:
  346. case <-s.closed:
  347. }
  348. }
  349. // close closing session. It should be called only in <-session.errs branch
  350. // of event loop.
  351. func (s *session) close() error {
  352. s.closeOnce.Do(func() {
  353. if s.conn != nil {
  354. s.conn.Close(false)
  355. }
  356. close(s.closed)
  357. })
  358. return nil
  359. }