sequence.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. // Package bitseq provides a structure and utilities for representing long bitmask
  2. // as sequence of run-length encoded blocks. It operates directly on the encoded
  3. // representation, it does not decode/encode.
  4. package bitseq
  5. import (
  6. "encoding/binary"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "sync"
  11. "github.com/Sirupsen/logrus"
  12. "github.com/docker/libnetwork/datastore"
  13. "github.com/docker/libnetwork/types"
  14. )
  15. // block sequence constants
  16. // If needed we can think of making these configurable
  17. const (
  18. blockLen = uint32(32)
  19. blockBytes = uint64(blockLen / 8)
  20. blockMAX = uint32(1<<blockLen - 1)
  21. blockFirstBit = uint32(1) << (blockLen - 1)
  22. invalidPos = uint64(0xFFFFFFFFFFFFFFFF)
  23. )
  24. var (
  25. // ErrNoBitAvailable is returned when no more bits are available to set
  26. ErrNoBitAvailable = errors.New("no bit available")
  27. // ErrBitAllocated is returned when the specific bit requested is already set
  28. ErrBitAllocated = errors.New("requested bit is already allocated")
  29. )
  30. // Handle contains the sequece representing the bitmask and its identifier
  31. type Handle struct {
  32. bits uint64
  33. unselected uint64
  34. head *sequence
  35. app string
  36. id string
  37. dbIndex uint64
  38. dbExists bool
  39. store datastore.DataStore
  40. sync.Mutex
  41. }
  42. // NewHandle returns a thread-safe instance of the bitmask handler
  43. func NewHandle(app string, ds datastore.DataStore, id string, numElements uint64) (*Handle, error) {
  44. h := &Handle{
  45. app: app,
  46. id: id,
  47. store: ds,
  48. bits: numElements,
  49. unselected: numElements,
  50. head: &sequence{
  51. block: 0x0,
  52. count: getNumBlocks(numElements),
  53. },
  54. }
  55. if h.store == nil {
  56. return h, nil
  57. }
  58. // Get the initial status from the ds if present.
  59. if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
  60. return nil, err
  61. }
  62. // If the handle is not in store, write it.
  63. if !h.Exists() {
  64. if err := h.writeToStore(); err != nil {
  65. return nil, fmt.Errorf("failed to write bitsequence to store: %v", err)
  66. }
  67. }
  68. return h, nil
  69. }
  70. // sequence represents a recurring sequence of 32 bits long bitmasks
  71. type sequence struct {
  72. block uint32 // block is a symbol representing 4 byte long allocation bitmask
  73. count uint64 // number of consecutive blocks (symbols)
  74. next *sequence // next sequence
  75. }
  76. // String returns a string representation of the block sequence starting from this block
  77. func (s *sequence) toString() string {
  78. var nextBlock string
  79. if s.next == nil {
  80. nextBlock = "end"
  81. } else {
  82. nextBlock = s.next.toString()
  83. }
  84. return fmt.Sprintf("(0x%x, %d)->%s", s.block, s.count, nextBlock)
  85. }
  86. // GetAvailableBit returns the position of the first unset bit in the bitmask represented by this sequence
  87. func (s *sequence) getAvailableBit(from uint64) (uint64, uint64, error) {
  88. if s.block == blockMAX || s.count == 0 {
  89. return invalidPos, invalidPos, ErrNoBitAvailable
  90. }
  91. bits := from
  92. bitSel := blockFirstBit >> from
  93. for bitSel > 0 && s.block&bitSel != 0 {
  94. bitSel >>= 1
  95. bits++
  96. }
  97. return bits / 8, bits % 8, nil
  98. }
  99. // GetCopy returns a copy of the linked list rooted at this node
  100. func (s *sequence) getCopy() *sequence {
  101. n := &sequence{block: s.block, count: s.count}
  102. pn := n
  103. ps := s.next
  104. for ps != nil {
  105. pn.next = &sequence{block: ps.block, count: ps.count}
  106. pn = pn.next
  107. ps = ps.next
  108. }
  109. return n
  110. }
  111. // Equal checks if this sequence is equal to the passed one
  112. func (s *sequence) equal(o *sequence) bool {
  113. this := s
  114. other := o
  115. for this != nil {
  116. if other == nil {
  117. return false
  118. }
  119. if this.block != other.block || this.count != other.count {
  120. return false
  121. }
  122. this = this.next
  123. other = other.next
  124. }
  125. // Check if other is longer than this
  126. if other != nil {
  127. return false
  128. }
  129. return true
  130. }
  131. // ToByteArray converts the sequence into a byte array
  132. func (s *sequence) toByteArray() ([]byte, error) {
  133. var bb []byte
  134. p := s
  135. for p != nil {
  136. b := make([]byte, 12)
  137. binary.BigEndian.PutUint32(b[0:], p.block)
  138. binary.BigEndian.PutUint64(b[4:], p.count)
  139. bb = append(bb, b...)
  140. p = p.next
  141. }
  142. return bb, nil
  143. }
  144. // fromByteArray construct the sequence from the byte array
  145. func (s *sequence) fromByteArray(data []byte) error {
  146. l := len(data)
  147. if l%12 != 0 {
  148. return fmt.Errorf("cannot deserialize byte sequence of length %d (%v)", l, data)
  149. }
  150. p := s
  151. i := 0
  152. for {
  153. p.block = binary.BigEndian.Uint32(data[i : i+4])
  154. p.count = binary.BigEndian.Uint64(data[i+4 : i+12])
  155. i += 12
  156. if i == l {
  157. break
  158. }
  159. p.next = &sequence{}
  160. p = p.next
  161. }
  162. return nil
  163. }
  164. func (h *Handle) getCopy() *Handle {
  165. return &Handle{
  166. bits: h.bits,
  167. unselected: h.unselected,
  168. head: h.head.getCopy(),
  169. app: h.app,
  170. id: h.id,
  171. dbIndex: h.dbIndex,
  172. dbExists: h.dbExists,
  173. store: h.store,
  174. }
  175. }
  176. // SetAnyInRange atomically sets the first unset bit in the specified range in the sequence and returns the corresponding ordinal
  177. func (h *Handle) SetAnyInRange(start, end uint64) (uint64, error) {
  178. if end < start || end >= h.bits {
  179. return invalidPos, fmt.Errorf("invalid bit range [%d, %d]", start, end)
  180. }
  181. if h.Unselected() == 0 {
  182. return invalidPos, ErrNoBitAvailable
  183. }
  184. return h.set(0, start, end, true, false)
  185. }
  186. // SetAny atomically sets the first unset bit in the sequence and returns the corresponding ordinal
  187. func (h *Handle) SetAny() (uint64, error) {
  188. if h.Unselected() == 0 {
  189. return invalidPos, ErrNoBitAvailable
  190. }
  191. return h.set(0, 0, h.bits-1, true, false)
  192. }
  193. // Set atomically sets the corresponding bit in the sequence
  194. func (h *Handle) Set(ordinal uint64) error {
  195. if err := h.validateOrdinal(ordinal); err != nil {
  196. return err
  197. }
  198. _, err := h.set(ordinal, 0, 0, false, false)
  199. return err
  200. }
  201. // Unset atomically unsets the corresponding bit in the sequence
  202. func (h *Handle) Unset(ordinal uint64) error {
  203. if err := h.validateOrdinal(ordinal); err != nil {
  204. return err
  205. }
  206. _, err := h.set(ordinal, 0, 0, false, true)
  207. return err
  208. }
  209. // IsSet atomically checks if the ordinal bit is set. In case ordinal
  210. // is outside of the bit sequence limits, false is returned.
  211. func (h *Handle) IsSet(ordinal uint64) bool {
  212. if err := h.validateOrdinal(ordinal); err != nil {
  213. return false
  214. }
  215. h.Lock()
  216. _, _, err := checkIfAvailable(h.head, ordinal)
  217. h.Unlock()
  218. return err != nil
  219. }
  220. func (h *Handle) runConsistencyCheck() bool {
  221. corrupted := false
  222. for p, c := h.head, h.head.next; c != nil; c = c.next {
  223. if c.count == 0 {
  224. corrupted = true
  225. p.next = c.next
  226. continue // keep same p
  227. }
  228. p = c
  229. }
  230. return corrupted
  231. }
  232. // CheckConsistency checks if the bit sequence is in an inconsistent state and attempts to fix it.
  233. // It looks for a corruption signature that may happen in docker 1.9.0 and 1.9.1.
  234. func (h *Handle) CheckConsistency() error {
  235. for {
  236. h.Lock()
  237. store := h.store
  238. h.Unlock()
  239. if store != nil {
  240. if err := store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
  241. return err
  242. }
  243. }
  244. h.Lock()
  245. nh := h.getCopy()
  246. h.Unlock()
  247. if !nh.runConsistencyCheck() {
  248. return nil
  249. }
  250. if err := nh.writeToStore(); err != nil {
  251. if _, ok := err.(types.RetryError); !ok {
  252. return fmt.Errorf("internal failure while fixing inconsistent bitsequence: %v", err)
  253. }
  254. continue
  255. }
  256. logrus.Infof("Fixed inconsistent bit sequence in datastore:\n%s\n%s", h, nh)
  257. h.Lock()
  258. h.head = nh.head
  259. h.Unlock()
  260. return nil
  261. }
  262. }
  263. // set/reset the bit
  264. func (h *Handle) set(ordinal, start, end uint64, any bool, release bool) (uint64, error) {
  265. var (
  266. bitPos uint64
  267. bytePos uint64
  268. ret uint64
  269. err error
  270. )
  271. for {
  272. var store datastore.DataStore
  273. h.Lock()
  274. store = h.store
  275. h.Unlock()
  276. if store != nil {
  277. if err := store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
  278. return ret, err
  279. }
  280. }
  281. h.Lock()
  282. // Get position if available
  283. if release {
  284. bytePos, bitPos = ordinalToPos(ordinal)
  285. } else {
  286. if any {
  287. bytePos, bitPos, err = getFirstAvailable(h.head, start)
  288. ret = posToOrdinal(bytePos, bitPos)
  289. if end < ret {
  290. err = ErrNoBitAvailable
  291. }
  292. } else {
  293. bytePos, bitPos, err = checkIfAvailable(h.head, ordinal)
  294. ret = ordinal
  295. }
  296. }
  297. if err != nil {
  298. h.Unlock()
  299. return ret, err
  300. }
  301. // Create a private copy of h and work on it
  302. nh := h.getCopy()
  303. h.Unlock()
  304. nh.head = pushReservation(bytePos, bitPos, nh.head, release)
  305. if release {
  306. nh.unselected++
  307. } else {
  308. nh.unselected--
  309. }
  310. // Attempt to write private copy to store
  311. if err := nh.writeToStore(); err != nil {
  312. if _, ok := err.(types.RetryError); !ok {
  313. return ret, fmt.Errorf("internal failure while setting the bit: %v", err)
  314. }
  315. // Retry
  316. continue
  317. }
  318. // Previous atomic push was succesfull. Save private copy to local copy
  319. h.Lock()
  320. defer h.Unlock()
  321. h.unselected = nh.unselected
  322. h.head = nh.head
  323. h.dbExists = nh.dbExists
  324. h.dbIndex = nh.dbIndex
  325. return ret, nil
  326. }
  327. }
  328. // checks is needed because to cover the case where the number of bits is not a multiple of blockLen
  329. func (h *Handle) validateOrdinal(ordinal uint64) error {
  330. h.Lock()
  331. defer h.Unlock()
  332. if ordinal >= h.bits {
  333. return errors.New("bit does not belong to the sequence")
  334. }
  335. return nil
  336. }
  337. // Destroy removes from the datastore the data belonging to this handle
  338. func (h *Handle) Destroy() error {
  339. for {
  340. if err := h.deleteFromStore(); err != nil {
  341. if _, ok := err.(types.RetryError); !ok {
  342. return fmt.Errorf("internal failure while destroying the sequence: %v", err)
  343. }
  344. // Fetch latest
  345. if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil {
  346. if err == datastore.ErrKeyNotFound { // already removed
  347. return nil
  348. }
  349. return fmt.Errorf("failed to fetch from store when destroying the sequence: %v", err)
  350. }
  351. continue
  352. }
  353. return nil
  354. }
  355. }
  356. // ToByteArray converts this handle's data into a byte array
  357. func (h *Handle) ToByteArray() ([]byte, error) {
  358. h.Lock()
  359. defer h.Unlock()
  360. ba := make([]byte, 16)
  361. binary.BigEndian.PutUint64(ba[0:], h.bits)
  362. binary.BigEndian.PutUint64(ba[8:], h.unselected)
  363. bm, err := h.head.toByteArray()
  364. if err != nil {
  365. return nil, fmt.Errorf("failed to serialize head: %s", err.Error())
  366. }
  367. ba = append(ba, bm...)
  368. return ba, nil
  369. }
  370. // FromByteArray reads his handle's data from a byte array
  371. func (h *Handle) FromByteArray(ba []byte) error {
  372. if ba == nil {
  373. return errors.New("nil byte array")
  374. }
  375. nh := &sequence{}
  376. err := nh.fromByteArray(ba[16:])
  377. if err != nil {
  378. return fmt.Errorf("failed to deserialize head: %s", err.Error())
  379. }
  380. h.Lock()
  381. h.head = nh
  382. h.bits = binary.BigEndian.Uint64(ba[0:8])
  383. h.unselected = binary.BigEndian.Uint64(ba[8:16])
  384. h.Unlock()
  385. return nil
  386. }
  387. // Bits returns the length of the bit sequence
  388. func (h *Handle) Bits() uint64 {
  389. return h.bits
  390. }
  391. // Unselected returns the number of bits which are not selected
  392. func (h *Handle) Unselected() uint64 {
  393. h.Lock()
  394. defer h.Unlock()
  395. return h.unselected
  396. }
  397. func (h *Handle) String() string {
  398. h.Lock()
  399. defer h.Unlock()
  400. return fmt.Sprintf("App: %s, ID: %s, DBIndex: 0x%x, bits: %d, unselected: %d, sequence: %s",
  401. h.app, h.id, h.dbIndex, h.bits, h.unselected, h.head.toString())
  402. }
  403. // MarshalJSON encodes Handle into json message
  404. func (h *Handle) MarshalJSON() ([]byte, error) {
  405. m := map[string]interface{}{
  406. "id": h.id,
  407. }
  408. b, err := h.ToByteArray()
  409. if err != nil {
  410. return nil, err
  411. }
  412. m["sequence"] = b
  413. return json.Marshal(m)
  414. }
  415. // UnmarshalJSON decodes json message into Handle
  416. func (h *Handle) UnmarshalJSON(data []byte) error {
  417. var (
  418. m map[string]interface{}
  419. b []byte
  420. err error
  421. )
  422. if err = json.Unmarshal(data, &m); err != nil {
  423. return err
  424. }
  425. h.id = m["id"].(string)
  426. bi, _ := json.Marshal(m["sequence"])
  427. if err := json.Unmarshal(bi, &b); err != nil {
  428. return err
  429. }
  430. return h.FromByteArray(b)
  431. }
  432. // getFirstAvailable looks for the first unset bit in passed mask starting from start
  433. func getFirstAvailable(head *sequence, start uint64) (uint64, uint64, error) {
  434. // Find sequence which contains the start bit
  435. byteStart, bitStart := ordinalToPos(start)
  436. current, _, _, inBlockBytePos := findSequence(head, byteStart)
  437. // Derive the this sequence offsets
  438. byteOffset := byteStart - inBlockBytePos
  439. bitOffset := inBlockBytePos*8 + bitStart
  440. for current != nil {
  441. if current.block != blockMAX {
  442. bytePos, bitPos, err := current.getAvailableBit(bitOffset)
  443. return byteOffset + bytePos, bitPos, err
  444. }
  445. // Moving to next block: Reset bit offset.
  446. bitOffset = 0
  447. byteOffset += current.count * blockBytes
  448. current = current.next
  449. }
  450. return invalidPos, invalidPos, ErrNoBitAvailable
  451. }
  452. // checkIfAvailable checks if the bit correspondent to the specified ordinal is unset
  453. // If the ordinal is beyond the sequence limits, a negative response is returned
  454. func checkIfAvailable(head *sequence, ordinal uint64) (uint64, uint64, error) {
  455. bytePos, bitPos := ordinalToPos(ordinal)
  456. // Find the sequence containing this byte
  457. current, _, _, inBlockBytePos := findSequence(head, bytePos)
  458. if current != nil {
  459. // Check whether the bit corresponding to the ordinal address is unset
  460. bitSel := blockFirstBit >> (inBlockBytePos*8 + bitPos)
  461. if current.block&bitSel == 0 {
  462. return bytePos, bitPos, nil
  463. }
  464. }
  465. return invalidPos, invalidPos, ErrBitAllocated
  466. }
  467. // Given the byte position and the sequences list head, return the pointer to the
  468. // sequence containing the byte (current), the pointer to the previous sequence,
  469. // the number of blocks preceding the block containing the byte inside the current sequence.
  470. // If bytePos is outside of the list, function will return (nil, nil, 0, invalidPos)
  471. func findSequence(head *sequence, bytePos uint64) (*sequence, *sequence, uint64, uint64) {
  472. // Find the sequence containing this byte
  473. previous := head
  474. current := head
  475. n := bytePos
  476. for current.next != nil && n >= (current.count*blockBytes) { // Nil check for less than 32 addresses masks
  477. n -= (current.count * blockBytes)
  478. previous = current
  479. current = current.next
  480. }
  481. // If byte is outside of the list, let caller know
  482. if n >= (current.count * blockBytes) {
  483. return nil, nil, 0, invalidPos
  484. }
  485. // Find the byte position inside the block and the number of blocks
  486. // preceding the block containing the byte inside this sequence
  487. precBlocks := n / blockBytes
  488. inBlockBytePos := bytePos % blockBytes
  489. return current, previous, precBlocks, inBlockBytePos
  490. }
  491. // PushReservation pushes the bit reservation inside the bitmask.
  492. // Given byte and bit positions, identify the sequence (current) which holds the block containing the affected bit.
  493. // Create a new block with the modified bit according to the operation (allocate/release).
  494. // Create a new sequence containing the new block and insert it in the proper position.
  495. // Remove current sequence if empty.
  496. // Check if new sequence can be merged with neighbour (previous/next) sequences.
  497. //
  498. //
  499. // Identify "current" sequence containing block:
  500. // [prev seq] [current seq] [next seq]
  501. //
  502. // Based on block position, resulting list of sequences can be any of three forms:
  503. //
  504. // block position Resulting list of sequences
  505. // A) block is first in current: [prev seq] [new] [modified current seq] [next seq]
  506. // B) block is last in current: [prev seq] [modified current seq] [new] [next seq]
  507. // C) block is in the middle of current: [prev seq] [curr pre] [new] [curr post] [next seq]
  508. func pushReservation(bytePos, bitPos uint64, head *sequence, release bool) *sequence {
  509. // Store list's head
  510. newHead := head
  511. // Find the sequence containing this byte
  512. current, previous, precBlocks, inBlockBytePos := findSequence(head, bytePos)
  513. if current == nil {
  514. return newHead
  515. }
  516. // Construct updated block
  517. bitSel := blockFirstBit >> (inBlockBytePos*8 + bitPos)
  518. newBlock := current.block
  519. if release {
  520. newBlock &^= bitSel
  521. } else {
  522. newBlock |= bitSel
  523. }
  524. // Quit if it was a redundant request
  525. if current.block == newBlock {
  526. return newHead
  527. }
  528. // Current sequence inevitably looses one block, upadate count
  529. current.count--
  530. // Create new sequence
  531. newSequence := &sequence{block: newBlock, count: 1}
  532. // Insert the new sequence in the list based on block position
  533. if precBlocks == 0 { // First in sequence (A)
  534. newSequence.next = current
  535. if current == head {
  536. newHead = newSequence
  537. previous = newHead
  538. } else {
  539. previous.next = newSequence
  540. }
  541. removeCurrentIfEmpty(&newHead, newSequence, current)
  542. mergeSequences(previous)
  543. } else if precBlocks == current.count { // Last in sequence (B)
  544. newSequence.next = current.next
  545. current.next = newSequence
  546. mergeSequences(current)
  547. } else { // In between the sequence (C)
  548. currPre := &sequence{block: current.block, count: precBlocks, next: newSequence}
  549. currPost := current
  550. currPost.count -= precBlocks
  551. newSequence.next = currPost
  552. if currPost == head {
  553. newHead = currPre
  554. } else {
  555. previous.next = currPre
  556. }
  557. // No merging or empty current possible here
  558. }
  559. return newHead
  560. }
  561. // Removes the current sequence from the list if empty, adjusting the head pointer if needed
  562. func removeCurrentIfEmpty(head **sequence, previous, current *sequence) {
  563. if current.count == 0 {
  564. if current == *head {
  565. *head = current.next
  566. } else {
  567. previous.next = current.next
  568. current = current.next
  569. }
  570. }
  571. }
  572. // Given a pointer to a sequence, it checks if it can be merged with any following sequences
  573. // It stops when no more merging is possible.
  574. // TODO: Optimization: only attempt merge from start to end sequence, no need to scan till the end of the list
  575. func mergeSequences(seq *sequence) {
  576. if seq != nil {
  577. // Merge all what possible from seq
  578. for seq.next != nil && seq.block == seq.next.block {
  579. seq.count += seq.next.count
  580. seq.next = seq.next.next
  581. }
  582. // Move to next
  583. mergeSequences(seq.next)
  584. }
  585. }
  586. func getNumBlocks(numBits uint64) uint64 {
  587. numBlocks := numBits / uint64(blockLen)
  588. if numBits%uint64(blockLen) != 0 {
  589. numBlocks++
  590. }
  591. return numBlocks
  592. }
  593. func ordinalToPos(ordinal uint64) (uint64, uint64) {
  594. return ordinal / 8, ordinal % 8
  595. }
  596. func posToOrdinal(bytePos, bitPos uint64) uint64 {
  597. return bytePos*8 + bitPos
  598. }