storage.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package agent
  2. import (
  3. "github.com/boltdb/bolt"
  4. "github.com/docker/swarmkit/api"
  5. "github.com/gogo/protobuf/proto"
  6. )
  7. // Layout:
  8. //
  9. // bucket(v1.tasks.<id>) ->
  10. // data (task protobuf)
  11. // status (task status protobuf)
  12. // assigned (key present)
  13. var (
  14. bucketKeyStorageVersion = []byte("v1")
  15. bucketKeyTasks = []byte("tasks")
  16. bucketKeyAssigned = []byte("assigned")
  17. bucketKeyData = []byte("data")
  18. bucketKeyStatus = []byte("status")
  19. )
  20. // InitDB prepares a database for writing task data.
  21. //
  22. // Proper buckets will be created if they don't already exist.
  23. func InitDB(db *bolt.DB) error {
  24. return db.Update(func(tx *bolt.Tx) error {
  25. _, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyTasks)
  26. return err
  27. })
  28. }
  29. // GetTask retrieves the task with id from the datastore.
  30. func GetTask(tx *bolt.Tx, id string) (*api.Task, error) {
  31. var t api.Task
  32. if err := withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
  33. p := bkt.Get([]byte("data"))
  34. if p == nil {
  35. return errTaskUnknown
  36. }
  37. return proto.Unmarshal(p, &t)
  38. }); err != nil {
  39. return nil, err
  40. }
  41. return &t, nil
  42. }
  43. // WalkTasks walks all tasks in the datastore.
  44. func WalkTasks(tx *bolt.Tx, fn func(task *api.Task) error) error {
  45. bkt := getTasksBucket(tx)
  46. if bkt == nil {
  47. return nil
  48. }
  49. return bkt.ForEach(func(k, v []byte) error {
  50. tbkt := bkt.Bucket(k)
  51. p := tbkt.Get(bucketKeyData)
  52. var t api.Task
  53. if err := proto.Unmarshal(p, &t); err != nil {
  54. return err
  55. }
  56. return fn(&t)
  57. })
  58. }
  59. // TaskAssigned returns true if the task is assigned to the node.
  60. func TaskAssigned(tx *bolt.Tx, id string) bool {
  61. bkt := getTaskBucket(tx, id)
  62. if bkt == nil {
  63. return false
  64. }
  65. return len(bkt.Get(bucketKeyAssigned)) > 0
  66. }
  67. // GetTaskStatus returns the current status for the task.
  68. func GetTaskStatus(tx *bolt.Tx, id string) (*api.TaskStatus, error) {
  69. var ts api.TaskStatus
  70. if err := withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
  71. p := bkt.Get(bucketKeyStatus)
  72. if p == nil {
  73. return errTaskUnknown
  74. }
  75. return proto.Unmarshal(p, &ts)
  76. }); err != nil {
  77. return nil, err
  78. }
  79. return &ts, nil
  80. }
  81. // WalkTaskStatus calls fn for the status of each task.
  82. func WalkTaskStatus(tx *bolt.Tx, fn func(id string, status *api.TaskStatus) error) error {
  83. bkt := getTasksBucket(tx)
  84. if bkt == nil {
  85. return nil
  86. }
  87. return bkt.ForEach(func(k, v []byte) error {
  88. tbkt := bkt.Bucket(k)
  89. p := tbkt.Get(bucketKeyStatus)
  90. var ts api.TaskStatus
  91. if err := proto.Unmarshal(p, &ts); err != nil {
  92. return err
  93. }
  94. return fn(string(k), &ts)
  95. })
  96. }
  97. // PutTask places the task into the database.
  98. func PutTask(tx *bolt.Tx, task *api.Task) error {
  99. return withCreateTaskBucketIfNotExists(tx, task.ID, func(bkt *bolt.Bucket) error {
  100. taskCopy := *task
  101. taskCopy.Status = api.TaskStatus{} // blank out the status.
  102. p, err := proto.Marshal(&taskCopy)
  103. if err != nil {
  104. return err
  105. }
  106. return bkt.Put(bucketKeyData, p)
  107. })
  108. }
  109. // PutTaskStatus updates the status for the task with id.
  110. func PutTaskStatus(tx *bolt.Tx, id string, status *api.TaskStatus) error {
  111. return withCreateTaskBucketIfNotExists(tx, id, func(bkt *bolt.Bucket) error {
  112. p, err := proto.Marshal(status)
  113. if err != nil {
  114. return err
  115. }
  116. return bkt.Put([]byte("status"), p)
  117. })
  118. }
  119. // DeleteTask completely removes the task from the database.
  120. func DeleteTask(tx *bolt.Tx, id string) error {
  121. bkt := getTasksBucket(tx)
  122. if bkt == nil {
  123. return nil
  124. }
  125. return bkt.DeleteBucket([]byte(id))
  126. }
  127. // SetTaskAssignment sets the current assignment state.
  128. func SetTaskAssignment(tx *bolt.Tx, id string, assigned bool) error {
  129. return withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
  130. if assigned {
  131. return bkt.Put([]byte("assigned"), []byte{0xFF})
  132. }
  133. return bkt.Delete([]byte("assigned"))
  134. })
  135. }
  136. func createBucketIfNotExists(tx *bolt.Tx, keys ...[]byte) (*bolt.Bucket, error) {
  137. bkt, err := tx.CreateBucketIfNotExists(keys[0])
  138. if err != nil {
  139. return nil, err
  140. }
  141. for _, key := range keys[1:] {
  142. bkt, err = bkt.CreateBucketIfNotExists(key)
  143. if err != nil {
  144. return nil, err
  145. }
  146. }
  147. return bkt, nil
  148. }
  149. func withCreateTaskBucketIfNotExists(tx *bolt.Tx, id string, fn func(bkt *bolt.Bucket) error) error {
  150. bkt, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyTasks, []byte(id))
  151. if err != nil {
  152. return err
  153. }
  154. return fn(bkt)
  155. }
  156. func withTaskBucket(tx *bolt.Tx, id string, fn func(bkt *bolt.Bucket) error) error {
  157. bkt := getTaskBucket(tx, id)
  158. if bkt == nil {
  159. return errTaskUnknown
  160. }
  161. return fn(bkt)
  162. }
  163. func getTaskBucket(tx *bolt.Tx, id string) *bolt.Bucket {
  164. return getBucket(tx, bucketKeyStorageVersion, bucketKeyTasks, []byte(id))
  165. }
  166. func getTasksBucket(tx *bolt.Tx) *bolt.Bucket {
  167. return getBucket(tx, bucketKeyStorageVersion, bucketKeyTasks)
  168. }
  169. func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
  170. bkt := tx.Bucket(keys[0])
  171. for _, key := range keys[1:] {
  172. if bkt == nil {
  173. break
  174. }
  175. bkt = bkt.Bucket(key)
  176. }
  177. return bkt
  178. }