session.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. package agent
  2. import (
  3. "errors"
  4. "sync"
  5. "time"
  6. "github.com/Sirupsen/logrus"
  7. "github.com/docker/swarmkit/api"
  8. "github.com/docker/swarmkit/connectionbroker"
  9. "github.com/docker/swarmkit/log"
  10. "golang.org/x/net/context"
  11. "google.golang.org/grpc"
  12. "google.golang.org/grpc/codes"
  13. )
  14. var (
  15. dispatcherRPCTimeout = 5 * time.Second
  16. errSessionDisconnect = errors.New("agent: session disconnect") // instructed to disconnect
  17. errSessionClosed = errors.New("agent: session closed")
  18. )
  19. // session encapsulates one round of registration with the manager. session
  20. // starts the registration and heartbeat control cycle. Any failure will result
  21. // in a complete shutdown of the session and it must be reestablished.
  22. //
  23. // All communication with the master is done through session. Changes that
  24. // flow into the agent, such as task assignment, are called back into the
  25. // agent through errs, messages and tasks.
  26. type session struct {
  27. conn *connectionbroker.Conn
  28. agent *Agent
  29. sessionID string
  30. session api.Dispatcher_SessionClient
  31. errs chan error
  32. messages chan *api.SessionMessage
  33. assignments chan *api.AssignmentsMessage
  34. subscriptions chan *api.SubscriptionMessage
  35. cancel func() // this is assumed to be never nil, and set whenever a session is created
  36. registered chan struct{} // closed registration
  37. closed chan struct{}
  38. closeOnce sync.Once
  39. }
  40. func newSession(ctx context.Context, agent *Agent, delay time.Duration, sessionID string, description *api.NodeDescription) *session {
  41. sessionCtx, sessionCancel := context.WithCancel(ctx)
  42. s := &session{
  43. agent: agent,
  44. sessionID: sessionID,
  45. errs: make(chan error, 1),
  46. messages: make(chan *api.SessionMessage),
  47. assignments: make(chan *api.AssignmentsMessage),
  48. subscriptions: make(chan *api.SubscriptionMessage),
  49. registered: make(chan struct{}),
  50. closed: make(chan struct{}),
  51. cancel: sessionCancel,
  52. }
  53. // TODO(stevvooe): Need to move connection management up a level or create
  54. // independent connection for log broker client.
  55. cc, err := agent.config.ConnBroker.Select(
  56. grpc.WithTransportCredentials(agent.config.Credentials),
  57. grpc.WithTimeout(dispatcherRPCTimeout),
  58. )
  59. if err != nil {
  60. s.errs <- err
  61. return s
  62. }
  63. s.conn = cc
  64. go s.run(sessionCtx, delay, description)
  65. return s
  66. }
  67. func (s *session) run(ctx context.Context, delay time.Duration, description *api.NodeDescription) {
  68. timer := time.NewTimer(delay) // delay before registering.
  69. defer timer.Stop()
  70. select {
  71. case <-timer.C:
  72. case <-ctx.Done():
  73. return
  74. }
  75. if err := s.start(ctx, description); err != nil {
  76. select {
  77. case s.errs <- err:
  78. case <-s.closed:
  79. case <-ctx.Done():
  80. }
  81. return
  82. }
  83. ctx = log.WithLogger(ctx, log.G(ctx).WithField("session.id", s.sessionID))
  84. go runctx(ctx, s.closed, s.errs, s.heartbeat)
  85. go runctx(ctx, s.closed, s.errs, s.watch)
  86. go runctx(ctx, s.closed, s.errs, s.listen)
  87. go runctx(ctx, s.closed, s.errs, s.logSubscriptions)
  88. close(s.registered)
  89. }
  90. // start begins the session and returns the first SessionMessage.
  91. func (s *session) start(ctx context.Context, description *api.NodeDescription) error {
  92. log.G(ctx).Debugf("(*session).start")
  93. errChan := make(chan error, 1)
  94. var (
  95. msg *api.SessionMessage
  96. stream api.Dispatcher_SessionClient
  97. err error
  98. )
  99. // Note: we don't defer cancellation of this context, because the
  100. // streaming RPC is used after this function returned. We only cancel
  101. // it in the timeout case to make sure the goroutine completes.
  102. // We also fork this context again from the `run` context, because on
  103. // `dispatcherRPCTimeout`, we want to cancel establishing a session and
  104. // return an error. If we cancel the `run` context instead of forking,
  105. // then in `run` it's possible that we just terminate the function because
  106. // `ctx` is done and hence fail to propagate the timeout error to the agent.
  107. // If the error is not propogated to the agent, the agent will not close
  108. // the session or rebuild a new sesssion.
  109. sessionCtx, cancelSession := context.WithCancel(ctx)
  110. // Need to run Session in a goroutine since there's no way to set a
  111. // timeout for an individual Recv call in a stream.
  112. go func() {
  113. client := api.NewDispatcherClient(s.conn.ClientConn)
  114. stream, err = client.Session(sessionCtx, &api.SessionRequest{
  115. Description: description,
  116. SessionID: s.sessionID,
  117. })
  118. if err != nil {
  119. errChan <- err
  120. return
  121. }
  122. msg, err = stream.Recv()
  123. errChan <- err
  124. }()
  125. select {
  126. case err := <-errChan:
  127. if err != nil {
  128. return err
  129. }
  130. case <-time.After(dispatcherRPCTimeout):
  131. cancelSession()
  132. return errors.New("session initiation timed out")
  133. }
  134. s.sessionID = msg.SessionID
  135. s.session = stream
  136. return s.handleSessionMessage(ctx, msg)
  137. }
  138. func (s *session) heartbeat(ctx context.Context) error {
  139. log.G(ctx).Debugf("(*session).heartbeat")
  140. client := api.NewDispatcherClient(s.conn.ClientConn)
  141. heartbeat := time.NewTimer(1) // send out a heartbeat right away
  142. defer heartbeat.Stop()
  143. for {
  144. select {
  145. case <-heartbeat.C:
  146. heartbeatCtx, cancel := context.WithTimeout(ctx, dispatcherRPCTimeout)
  147. resp, err := client.Heartbeat(heartbeatCtx, &api.HeartbeatRequest{
  148. SessionID: s.sessionID,
  149. })
  150. cancel()
  151. if err != nil {
  152. if grpc.Code(err) == codes.NotFound {
  153. err = errNodeNotRegistered
  154. }
  155. return err
  156. }
  157. heartbeat.Reset(resp.Period)
  158. case <-s.closed:
  159. return errSessionClosed
  160. case <-ctx.Done():
  161. return ctx.Err()
  162. }
  163. }
  164. }
  165. func (s *session) listen(ctx context.Context) error {
  166. defer s.session.CloseSend()
  167. log.G(ctx).Debugf("(*session).listen")
  168. for {
  169. msg, err := s.session.Recv()
  170. if err != nil {
  171. return err
  172. }
  173. if err := s.handleSessionMessage(ctx, msg); err != nil {
  174. return err
  175. }
  176. }
  177. }
  178. func (s *session) handleSessionMessage(ctx context.Context, msg *api.SessionMessage) error {
  179. select {
  180. case s.messages <- msg:
  181. return nil
  182. case <-s.closed:
  183. return errSessionClosed
  184. case <-ctx.Done():
  185. return ctx.Err()
  186. }
  187. }
  188. func (s *session) logSubscriptions(ctx context.Context) error {
  189. log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).logSubscriptions"})
  190. log.Debugf("")
  191. client := api.NewLogBrokerClient(s.conn.ClientConn)
  192. subscriptions, err := client.ListenSubscriptions(ctx, &api.ListenSubscriptionsRequest{})
  193. if err != nil {
  194. return err
  195. }
  196. defer subscriptions.CloseSend()
  197. for {
  198. resp, err := subscriptions.Recv()
  199. if grpc.Code(err) == codes.Unimplemented {
  200. log.Warning("manager does not support log subscriptions")
  201. // Don't return, because returning would bounce the session
  202. select {
  203. case <-s.closed:
  204. return errSessionClosed
  205. case <-ctx.Done():
  206. return ctx.Err()
  207. }
  208. }
  209. if err != nil {
  210. return err
  211. }
  212. select {
  213. case s.subscriptions <- resp:
  214. case <-s.closed:
  215. return errSessionClosed
  216. case <-ctx.Done():
  217. return ctx.Err()
  218. }
  219. }
  220. }
  221. func (s *session) watch(ctx context.Context) error {
  222. log := log.G(ctx).WithFields(logrus.Fields{"method": "(*session).watch"})
  223. log.Debugf("")
  224. var (
  225. resp *api.AssignmentsMessage
  226. assignmentWatch api.Dispatcher_AssignmentsClient
  227. tasksWatch api.Dispatcher_TasksClient
  228. streamReference string
  229. tasksFallback bool
  230. err error
  231. )
  232. client := api.NewDispatcherClient(s.conn.ClientConn)
  233. for {
  234. // If this is the first time we're running the loop, or there was a reference mismatch
  235. // attempt to get the assignmentWatch
  236. if assignmentWatch == nil && !tasksFallback {
  237. assignmentWatch, err = client.Assignments(ctx, &api.AssignmentsRequest{SessionID: s.sessionID})
  238. if err != nil {
  239. return err
  240. }
  241. }
  242. // We have an assignmentWatch, let's try to receive an AssignmentMessage
  243. if assignmentWatch != nil {
  244. // If we get a code = 12 desc = unknown method Assignments, try to use tasks
  245. resp, err = assignmentWatch.Recv()
  246. if err != nil {
  247. if grpc.Code(err) != codes.Unimplemented {
  248. return err
  249. }
  250. tasksFallback = true
  251. assignmentWatch = nil
  252. log.WithError(err).Infof("falling back to Tasks")
  253. }
  254. }
  255. // This code is here for backwards compatibility (so that newer clients can use the
  256. // older method Tasks)
  257. if tasksWatch == nil && tasksFallback {
  258. tasksWatch, err = client.Tasks(ctx, &api.TasksRequest{SessionID: s.sessionID})
  259. if err != nil {
  260. return err
  261. }
  262. }
  263. if tasksWatch != nil {
  264. // When falling back to Tasks because of an old managers, we wrap the tasks in assignments.
  265. var taskResp *api.TasksMessage
  266. var assignmentChanges []*api.AssignmentChange
  267. taskResp, err = tasksWatch.Recv()
  268. if err != nil {
  269. return err
  270. }
  271. for _, t := range taskResp.Tasks {
  272. taskChange := &api.AssignmentChange{
  273. Assignment: &api.Assignment{
  274. Item: &api.Assignment_Task{
  275. Task: t,
  276. },
  277. },
  278. Action: api.AssignmentChange_AssignmentActionUpdate,
  279. }
  280. assignmentChanges = append(assignmentChanges, taskChange)
  281. }
  282. resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, Changes: assignmentChanges}
  283. }
  284. // If there seems to be a gap in the stream, let's break out of the inner for and
  285. // re-sync (by calling Assignments again).
  286. if streamReference != "" && streamReference != resp.AppliesTo {
  287. assignmentWatch = nil
  288. } else {
  289. streamReference = resp.ResultsIn
  290. }
  291. select {
  292. case s.assignments <- resp:
  293. case <-s.closed:
  294. return errSessionClosed
  295. case <-ctx.Done():
  296. return ctx.Err()
  297. }
  298. }
  299. }
  300. // sendTaskStatus uses the current session to send the status of a single task.
  301. func (s *session) sendTaskStatus(ctx context.Context, taskID string, status *api.TaskStatus) error {
  302. client := api.NewDispatcherClient(s.conn.ClientConn)
  303. if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
  304. SessionID: s.sessionID,
  305. Updates: []*api.UpdateTaskStatusRequest_TaskStatusUpdate{
  306. {
  307. TaskID: taskID,
  308. Status: status,
  309. },
  310. },
  311. }); err != nil {
  312. // TODO(stevvooe): Dispatcher should not return this error. Status
  313. // reports for unknown tasks should be ignored.
  314. if grpc.Code(err) == codes.NotFound {
  315. return errTaskUnknown
  316. }
  317. return err
  318. }
  319. return nil
  320. }
  321. func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTaskStatusRequest_TaskStatusUpdate) ([]*api.UpdateTaskStatusRequest_TaskStatusUpdate, error) {
  322. if len(updates) < 1 {
  323. return nil, nil
  324. }
  325. const batchSize = 1024
  326. select {
  327. case <-s.registered:
  328. select {
  329. case <-s.closed:
  330. return updates, ErrClosed
  331. default:
  332. }
  333. case <-s.closed:
  334. return updates, ErrClosed
  335. case <-ctx.Done():
  336. return updates, ctx.Err()
  337. }
  338. client := api.NewDispatcherClient(s.conn.ClientConn)
  339. n := batchSize
  340. if len(updates) < n {
  341. n = len(updates)
  342. }
  343. if _, err := client.UpdateTaskStatus(ctx, &api.UpdateTaskStatusRequest{
  344. SessionID: s.sessionID,
  345. Updates: updates[:n],
  346. }); err != nil {
  347. log.G(ctx).WithError(err).Errorf("failed sending task status batch size of %d", len(updates[:n]))
  348. return updates, err
  349. }
  350. return updates[n:], nil
  351. }
  352. // sendError is used to send errors to errs channel and trigger session recreation
  353. func (s *session) sendError(err error) {
  354. select {
  355. case s.errs <- err:
  356. case <-s.closed:
  357. }
  358. }
  359. // close closing session. It should be called only in <-session.errs branch
  360. // of event loop.
  361. func (s *session) close() error {
  362. s.closeOnce.Do(func() {
  363. s.cancel()
  364. if s.conn != nil {
  365. s.conn.Close(false)
  366. }
  367. close(s.closed)
  368. })
  369. return nil
  370. }