index_blob_manager_v0.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. package indexblob
  2. import (
  3. "context"
  4. "encoding/json"
  5. "time"
  6. "github.com/pkg/errors"
  7. "golang.org/x/sync/errgroup"
  8. "github.com/kopia/kopia/internal/gather"
  9. "github.com/kopia/kopia/repo/blob"
  10. "github.com/kopia/kopia/repo/content/index"
  11. "github.com/kopia/kopia/repo/format"
  12. "github.com/kopia/kopia/repo/logging"
  13. )
  14. // V0IndexBlobPrefix is the prefix for all legacy (v0) index blobs.
  15. const V0IndexBlobPrefix = "n"
  16. // V0CompactionLogBlobPrefix is the prefix for all legacy (v0) index compactions blobs.
  17. const V0CompactionLogBlobPrefix = "m"
  18. // V0CleanupBlobPrefix is the prefix for all legacy (v0) index cleanup blobs.
  19. const V0CleanupBlobPrefix = "l"
  20. const defaultEventualConsistencySettleTime = 1 * time.Hour
  21. // compactionLogEntry represents contents of compaction log entry stored in `m` blob.
  22. type compactionLogEntry struct {
  23. // list of input blob names that were compacted together.
  24. InputMetadata []blob.Metadata `json:"inputMetadata"`
  25. // list of blobs that are results of compaction.
  26. OutputMetadata []blob.Metadata `json:"outputMetadata"`
  27. // Metadata of the compaction blob itself, not serialized.
  28. metadata blob.Metadata
  29. }
  30. // cleanupEntry represents contents of cleanup entry stored in `l` blob.
  31. type cleanupEntry struct {
  32. BlobIDs []blob.ID `json:"blobIDs"`
  33. // We're adding cleanup schedule time to make cleanup blobs unique which prevents them
  34. // from being rewritten, random would probably work just as well or another mechanism to prevent
  35. // deletion of blobs that does not require reading them in the first place (which messes up
  36. // read-after-create promise in S3).
  37. CleanupScheduleTime time.Time `json:"cleanupScheduleTime"`
  38. age time.Duration // not serialized, computed on load
  39. }
  40. // IndexFormattingOptions provides options for formatting index blobs.
  41. type IndexFormattingOptions interface {
  42. GetMutableParameters(ctx context.Context) (format.MutableParameters, error)
  43. }
  44. // ManagerV0 is a V0 (legacy) implementation of index blob manager.
  45. type ManagerV0 struct {
  46. st blob.Storage
  47. enc *EncryptionManager
  48. timeNow func() time.Time
  49. formattingOptions IndexFormattingOptions
  50. log logging.Logger
  51. }
  52. // ListIndexBlobInfos list active blob info structs. Also returns time of latest content deletion commit.
  53. func (m *ManagerV0) ListIndexBlobInfos(ctx context.Context) ([]Metadata, time.Time, error) {
  54. activeIndexBlobs, t0, err := m.ListActiveIndexBlobs(ctx)
  55. if err != nil {
  56. return nil, time.Time{}, err
  57. }
  58. q := make([]Metadata, 0, len(activeIndexBlobs))
  59. for _, activeIndexBlob := range activeIndexBlobs {
  60. // skip the V0 blob poison that is used to prevent client reads.
  61. if activeIndexBlob.BlobID == format.LegacyIndexPoisonBlobID {
  62. continue
  63. }
  64. q = append(q, activeIndexBlob)
  65. }
  66. return q, t0, nil
  67. }
  68. // ListActiveIndexBlobs lists the metadata for active index blobs and returns the cut-off time
  69. // before which all deleted index entries should be treated as non-existent.
  70. func (m *ManagerV0) ListActiveIndexBlobs(ctx context.Context) ([]Metadata, time.Time, error) {
  71. var compactionLogMetadata, storageIndexBlobs []blob.Metadata
  72. var eg errgroup.Group
  73. // list index and cleanup blobs in parallel.
  74. eg.Go(func() error {
  75. v, err := blob.ListAllBlobs(ctx, m.st, V0CompactionLogBlobPrefix)
  76. compactionLogMetadata = v
  77. return errors.Wrap(err, "error listing compaction blobs")
  78. })
  79. eg.Go(func() error {
  80. v, err := blob.ListAllBlobs(ctx, m.st, V0IndexBlobPrefix)
  81. storageIndexBlobs = v
  82. return errors.Wrap(err, "error listing index blobs")
  83. })
  84. if err := eg.Wait(); err != nil {
  85. return nil, time.Time{}, errors.Wrap(err, "error listing indexes")
  86. }
  87. for i, sib := range storageIndexBlobs {
  88. m.log.Debugf("found-index-blobs[%v] = %v", i, sib)
  89. }
  90. for i, clm := range compactionLogMetadata {
  91. m.log.Debugf("found-compaction-blobs[%v] %v", i, clm)
  92. }
  93. indexMap := map[blob.ID]*Metadata{}
  94. addBlobsToIndex(indexMap, storageIndexBlobs)
  95. compactionLogs, err := m.getCompactionLogEntries(ctx, compactionLogMetadata)
  96. if err != nil {
  97. return nil, time.Time{}, errors.Wrap(err, "error reading compaction log")
  98. }
  99. // remove entries from indexMap that have been compacted and replaced by other indexes.
  100. m.removeCompactedIndexes(indexMap, compactionLogs)
  101. var results []Metadata
  102. for _, v := range indexMap {
  103. results = append(results, *v)
  104. }
  105. for i, res := range results {
  106. m.log.Debugf("active-index-blobs[%v] = %v", i, res)
  107. }
  108. return results, time.Time{}, nil
  109. }
  110. // Invalidate invalidates any caches.
  111. func (m *ManagerV0) Invalidate() {
  112. }
  113. // Compact performs compaction of index blobs by merging smaller ones into larger
  114. // and registering compaction and cleanup blobs in the repository.
  115. func (m *ManagerV0) Compact(ctx context.Context, opt CompactOptions) error {
  116. indexBlobs, _, err := m.ListActiveIndexBlobs(ctx)
  117. if err != nil {
  118. return errors.Wrap(err, "error listing active index blobs")
  119. }
  120. mp, mperr := m.formattingOptions.GetMutableParameters(ctx)
  121. if mperr != nil {
  122. return errors.Wrap(mperr, "mutable parameters")
  123. }
  124. blobsToCompact := m.getBlobsToCompact(indexBlobs, opt, mp)
  125. if err := m.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil {
  126. return errors.Wrap(err, "error performing compaction")
  127. }
  128. if err := m.cleanup(ctx, opt.maxEventualConsistencySettleTime()); err != nil {
  129. return errors.Wrap(err, "error cleaning up index blobs")
  130. }
  131. return nil
  132. }
  133. func (m *ManagerV0) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
  134. logEntryBytes, err := json.Marshal(&compactionLogEntry{
  135. InputMetadata: inputs,
  136. OutputMetadata: outputs,
  137. })
  138. if err != nil {
  139. return errors.Wrap(err, "unable to marshal log entry bytes")
  140. }
  141. compactionLogBlobMetadata, err := m.enc.EncryptAndWriteBlob(ctx, gather.FromSlice(logEntryBytes), V0CompactionLogBlobPrefix, "")
  142. if err != nil {
  143. return errors.Wrap(err, "unable to write compaction log")
  144. }
  145. for i, input := range inputs {
  146. m.log.Debugf("compacted-input[%v/%v] %v", i, len(inputs), input)
  147. }
  148. for i, output := range outputs {
  149. m.log.Debugf("compacted-output[%v/%v] %v", i, len(outputs), output)
  150. }
  151. m.log.Debugf("compaction-log %v %v", compactionLogBlobMetadata.BlobID, compactionLogBlobMetadata.Timestamp)
  152. if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata, maxEventualConsistencySettleTime); err != nil {
  153. return errors.Wrap(err, "error deleting old index blobs")
  154. }
  155. return nil
  156. }
  157. // WriteIndexBlobs writes the provided data shards into new index blobs optionally appending the provided suffix.
  158. func (m *ManagerV0) WriteIndexBlobs(ctx context.Context, dataShards []gather.Bytes, suffix blob.ID) ([]blob.Metadata, error) {
  159. var result []blob.Metadata
  160. for _, data := range dataShards {
  161. bm, err := m.enc.EncryptAndWriteBlob(ctx, data, V0IndexBlobPrefix, suffix)
  162. if err != nil {
  163. return nil, errors.Wrap(err, "error writing index blob")
  164. }
  165. result = append(result, bm)
  166. }
  167. return result, nil
  168. }
  169. func (m *ManagerV0) getCompactionLogEntries(ctx context.Context, blobs []blob.Metadata) (map[blob.ID]*compactionLogEntry, error) {
  170. results := map[blob.ID]*compactionLogEntry{}
  171. var data gather.WriteBuffer
  172. defer data.Close()
  173. for _, cb := range blobs {
  174. err := m.enc.GetEncryptedBlob(ctx, cb.BlobID, &data)
  175. if errors.Is(err, blob.ErrBlobNotFound) {
  176. continue
  177. }
  178. if err != nil {
  179. return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID)
  180. }
  181. le := &compactionLogEntry{}
  182. if err := json.NewDecoder(data.Bytes().Reader()).Decode(le); err != nil {
  183. return nil, errors.Wrap(err, "unable to read compaction log entry %q")
  184. }
  185. le.metadata = cb
  186. results[cb.BlobID] = le
  187. }
  188. return results, nil
  189. }
  190. func (m *ManagerV0) getCleanupEntries(ctx context.Context, latestServerBlobTime time.Time, blobs []blob.Metadata) (map[blob.ID]*cleanupEntry, error) {
  191. results := map[blob.ID]*cleanupEntry{}
  192. var data gather.WriteBuffer
  193. defer data.Close()
  194. for _, cb := range blobs {
  195. data.Reset()
  196. err := m.enc.GetEncryptedBlob(ctx, cb.BlobID, &data)
  197. if errors.Is(err, blob.ErrBlobNotFound) {
  198. continue
  199. }
  200. if err != nil {
  201. return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID)
  202. }
  203. le := &cleanupEntry{}
  204. if err := json.NewDecoder(data.Bytes().Reader()).Decode(le); err != nil {
  205. return nil, errors.Wrap(err, "unable to read compaction log entry %q")
  206. }
  207. le.age = latestServerBlobTime.Sub(le.CleanupScheduleTime)
  208. results[cb.BlobID] = le
  209. }
  210. return results, nil
  211. }
  212. func (m *ManagerV0) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
  213. allCompactionLogBlobs, err := blob.ListAllBlobs(ctx, m.st, V0CompactionLogBlobPrefix)
  214. if err != nil {
  215. return errors.Wrap(err, "error listing compaction log blobs")
  216. }
  217. // look for server-assigned timestamp of the compaction log entry we just wrote as a reference.
  218. // we're assuming server-generated timestamps are somewhat reasonable and time is moving
  219. compactionLogServerTimeCutoff := latestBlob.Timestamp.Add(-maxEventualConsistencySettleTime)
  220. compactionBlobs := blobsOlderThan(allCompactionLogBlobs, compactionLogServerTimeCutoff)
  221. m.log.Debugf("fetching %v/%v compaction logs older than %v", len(compactionBlobs), len(allCompactionLogBlobs), compactionLogServerTimeCutoff)
  222. compactionBlobEntries, err := m.getCompactionLogEntries(ctx, compactionBlobs)
  223. if err != nil {
  224. return errors.Wrap(err, "unable to get compaction log entries")
  225. }
  226. indexBlobsToDelete := m.findIndexBlobsToDelete(latestBlob.Timestamp, compactionBlobEntries, maxEventualConsistencySettleTime)
  227. // note that we must always delete index blobs first before compaction logs
  228. // otherwise we may inadvertently resurrect an index blob that should have been removed.
  229. if err := m.deleteBlobsFromStorageAndCache(ctx, indexBlobsToDelete); err != nil {
  230. return errors.Wrap(err, "unable to delete compaction logs")
  231. }
  232. compactionLogBlobsToDelayCleanup := m.findCompactionLogBlobsToDelayCleanup(compactionBlobs)
  233. if err := m.delayCleanupBlobs(ctx, compactionLogBlobsToDelayCleanup, latestBlob.Timestamp); err != nil {
  234. return errors.Wrap(err, "unable to schedule delayed cleanup of blobs")
  235. }
  236. return nil
  237. }
  238. func (m *ManagerV0) findIndexBlobsToDelete(latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry, maxEventualConsistencySettleTime time.Duration) []blob.ID {
  239. tmp := map[blob.ID]bool{}
  240. for _, cl := range entries {
  241. // are the input index blobs in this compaction eligible for deletion?
  242. if age := latestServerBlobTime.Sub(cl.metadata.Timestamp); age < maxEventualConsistencySettleTime {
  243. m.log.Debugf("not deleting compacted index blob used as inputs for compaction %v, because it's too recent: %v < %v", cl.metadata.BlobID, age, maxEventualConsistencySettleTime)
  244. continue
  245. }
  246. for _, b := range cl.InputMetadata {
  247. m.log.Debugf("will delete old index %v compacted to %v", b, cl.OutputMetadata)
  248. tmp[b.BlobID] = true
  249. }
  250. }
  251. var result []blob.ID
  252. for k := range tmp {
  253. result = append(result, k)
  254. }
  255. return result
  256. }
  257. func (m *ManagerV0) findCompactionLogBlobsToDelayCleanup(compactionBlobs []blob.Metadata) []blob.ID {
  258. var result []blob.ID
  259. for _, cb := range compactionBlobs {
  260. m.log.Debugf("will delete compaction log blob %v", cb)
  261. result = append(result, cb.BlobID)
  262. }
  263. return result
  264. }
  265. func (m *ManagerV0) findBlobsToDelete(entries map[blob.ID]*cleanupEntry, maxEventualConsistencySettleTime time.Duration) (compactionLogs, cleanupBlobs []blob.ID) {
  266. for k, e := range entries {
  267. if e.age >= maxEventualConsistencySettleTime {
  268. compactionLogs = append(compactionLogs, e.BlobIDs...)
  269. cleanupBlobs = append(cleanupBlobs, k)
  270. }
  271. }
  272. return
  273. }
  274. func (m *ManagerV0) delayCleanupBlobs(ctx context.Context, blobIDs []blob.ID, cleanupScheduleTime time.Time) error {
  275. if len(blobIDs) == 0 {
  276. return nil
  277. }
  278. payload, err := json.Marshal(&cleanupEntry{
  279. BlobIDs: blobIDs,
  280. CleanupScheduleTime: cleanupScheduleTime,
  281. })
  282. if err != nil {
  283. return errors.Wrap(err, "unable to marshal cleanup log bytes")
  284. }
  285. if _, err := m.enc.EncryptAndWriteBlob(ctx, gather.FromSlice(payload), V0CleanupBlobPrefix, ""); err != nil {
  286. return errors.Wrap(err, "unable to cleanup log")
  287. }
  288. return nil
  289. }
  290. func (m *ManagerV0) deleteBlobsFromStorageAndCache(ctx context.Context, blobIDs []blob.ID) error {
  291. for _, blobID := range blobIDs {
  292. if err := m.st.DeleteBlob(ctx, blobID); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
  293. m.log.Debugf("delete-blob failed %v %v", blobID, err)
  294. return errors.Wrapf(err, "unable to delete blob %v", blobID)
  295. }
  296. m.log.Debugf("delete-blob succeeded %v", blobID)
  297. }
  298. return nil
  299. }
  300. func (m *ManagerV0) cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error {
  301. allCleanupBlobs, err := blob.ListAllBlobs(ctx, m.st, V0CleanupBlobPrefix)
  302. if err != nil {
  303. return errors.Wrap(err, "error listing cleanup blobs")
  304. }
  305. // determine latest storage write time of a cleanup blob
  306. var latestStorageWriteTimestamp time.Time
  307. for _, cb := range allCleanupBlobs {
  308. if cb.Timestamp.After(latestStorageWriteTimestamp) {
  309. latestStorageWriteTimestamp = cb.Timestamp
  310. }
  311. }
  312. // load cleanup entries and compute their age
  313. cleanupEntries, err := m.getCleanupEntries(ctx, latestStorageWriteTimestamp, allCleanupBlobs)
  314. if err != nil {
  315. return errors.Wrap(err, "error loading cleanup blobs")
  316. }
  317. // pick cleanup entries to delete that are old enough
  318. compactionLogsToDelete, cleanupBlobsToDelete := m.findBlobsToDelete(cleanupEntries, maxEventualConsistencySettleTime)
  319. if err := m.deleteBlobsFromStorageAndCache(ctx, compactionLogsToDelete); err != nil {
  320. return errors.Wrap(err, "unable to delete cleanup blobs")
  321. }
  322. if err := m.deleteBlobsFromStorageAndCache(ctx, cleanupBlobsToDelete); err != nil {
  323. return errors.Wrap(err, "unable to delete cleanup blobs")
  324. }
  325. if err := m.st.FlushCaches(ctx); err != nil {
  326. m.log.Debugw("error flushing caches", "err", err)
  327. }
  328. return nil
  329. }
  330. func (m *ManagerV0) getBlobsToCompact(indexBlobs []Metadata, opt CompactOptions, mp format.MutableParameters) []Metadata {
  331. var (
  332. nonCompactedBlobs, verySmallBlobs []Metadata
  333. totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64
  334. mediumSizedBlobCount int
  335. )
  336. for _, b := range indexBlobs {
  337. if b.Length > int64(mp.MaxPackSize) && !opt.AllIndexes {
  338. continue
  339. }
  340. nonCompactedBlobs = append(nonCompactedBlobs, b)
  341. totalSizeNonCompactedBlobs += b.Length
  342. if b.Length < int64(mp.MaxPackSize)/verySmallContentFraction {
  343. verySmallBlobs = append(verySmallBlobs, b)
  344. totalSizeVerySmallBlobs += b.Length
  345. } else {
  346. mediumSizedBlobCount++
  347. totalSizeMediumSizedBlobs += b.Length
  348. }
  349. }
  350. if len(nonCompactedBlobs) < opt.MaxSmallBlobs {
  351. // current count is below min allowed - nothing to do
  352. m.log.Debug("no small contents to Compact")
  353. return nil
  354. }
  355. if len(verySmallBlobs) > len(nonCompactedBlobs)/2 && mediumSizedBlobCount+1 < opt.MaxSmallBlobs {
  356. m.log.Debugf("compacting %v very small contents", len(verySmallBlobs))
  357. return verySmallBlobs
  358. }
  359. m.log.Debugf("compacting all %v non-compacted contents", len(nonCompactedBlobs))
  360. return nonCompactedBlobs
  361. }
  362. func (m *ManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []Metadata, opt CompactOptions) error {
  363. if len(indexBlobs) <= 1 && opt.DropDeletedBefore.IsZero() && len(opt.DropContents) == 0 {
  364. return nil
  365. }
  366. mp, mperr := m.formattingOptions.GetMutableParameters(ctx)
  367. if mperr != nil {
  368. return errors.Wrap(mperr, "mutable parameters")
  369. }
  370. bld := make(index.Builder)
  371. var inputs, outputs []blob.Metadata
  372. for i, indexBlob := range indexBlobs {
  373. m.log.Debugf("compacting-entries[%v/%v] %v", i, len(indexBlobs), indexBlob)
  374. if err := addIndexBlobsToBuilder(ctx, m.enc, bld, indexBlob.BlobID); err != nil {
  375. return errors.Wrap(err, "error adding index to builder")
  376. }
  377. inputs = append(inputs, indexBlob.Metadata)
  378. }
  379. // after we built index map in memory, drop contents from it
  380. // we must do it after all input blobs have been merged, otherwise we may resurrect contents.
  381. m.dropContentsFromBuilder(bld, opt)
  382. dataShards, cleanupShards, err := bld.BuildShards(mp.IndexVersion, false, DefaultIndexShardSize)
  383. if err != nil {
  384. return errors.Wrap(err, "unable to build an index")
  385. }
  386. defer cleanupShards()
  387. compactedIndexBlobs, err := m.WriteIndexBlobs(ctx, dataShards, "")
  388. if err != nil {
  389. return errors.Wrap(err, "unable to write compacted indexes")
  390. }
  391. outputs = append(outputs, compactedIndexBlobs...)
  392. if err := m.registerCompaction(ctx, inputs, outputs, opt.maxEventualConsistencySettleTime()); err != nil {
  393. return errors.Wrap(err, "unable to register compaction")
  394. }
  395. return nil
  396. }
  397. func (m *ManagerV0) dropContentsFromBuilder(bld index.Builder, opt CompactOptions) {
  398. for _, dc := range opt.DropContents {
  399. if _, ok := bld[dc]; ok {
  400. m.log.Debugf("manual-drop-from-index %v", dc)
  401. delete(bld, dc)
  402. }
  403. }
  404. if !opt.DropDeletedBefore.IsZero() {
  405. m.log.Debugf("drop-content-deleted-before %v", opt.DropDeletedBefore)
  406. for _, i := range bld {
  407. if i.Deleted && i.Timestamp().Before(opt.DropDeletedBefore) {
  408. m.log.Debugf("drop-from-index-old-deleted %v %v", i.ContentID, i.Timestamp())
  409. delete(bld, i.ContentID)
  410. }
  411. }
  412. m.log.Debugf("finished drop-content-deleted-before %v", opt.DropDeletedBefore)
  413. }
  414. }
  415. func addIndexBlobsToBuilder(ctx context.Context, enc *EncryptionManager, bld index.BuilderCreator, indexBlobID blob.ID) error {
  416. var data gather.WriteBuffer
  417. defer data.Close()
  418. err := enc.GetEncryptedBlob(ctx, indexBlobID, &data)
  419. if err != nil {
  420. return errors.Wrapf(err, "error getting index %q", indexBlobID)
  421. }
  422. ndx, err := index.Open(data.ToByteSlice(), nil, enc.crypter.Encryptor().Overhead)
  423. if err != nil {
  424. return errors.Wrapf(err, "unable to open index blob %q", indexBlobID)
  425. }
  426. _ = ndx.Iterate(index.AllIDs, func(i index.Info) error {
  427. bld.Add(i)
  428. return nil
  429. })
  430. return nil
  431. }
  432. func blobsOlderThan(m []blob.Metadata, cutoffTime time.Time) []blob.Metadata {
  433. var res []blob.Metadata
  434. for _, m := range m {
  435. if !m.Timestamp.After(cutoffTime) {
  436. res = append(res, m)
  437. }
  438. }
  439. return res
  440. }
  441. func (m *ManagerV0) removeCompactedIndexes(bimap map[blob.ID]*Metadata, compactionLogs map[blob.ID]*compactionLogEntry) {
  442. var validCompactionLogs []*compactionLogEntry
  443. for _, cl := range compactionLogs {
  444. // only process compaction logs for which we have found all the outputs.
  445. haveAllOutputs := true
  446. for _, o := range cl.OutputMetadata {
  447. if bimap[o.BlobID] == nil {
  448. haveAllOutputs = false
  449. m.log.Debugf("blob %v referenced by compaction log is not found", o.BlobID)
  450. break
  451. }
  452. }
  453. if haveAllOutputs {
  454. validCompactionLogs = append(validCompactionLogs, cl)
  455. }
  456. }
  457. // now remove all inputs from the set if there's a valid compaction log entry with all the outputs.
  458. for _, cl := range validCompactionLogs {
  459. for _, ib := range cl.InputMetadata {
  460. if md := bimap[ib.BlobID]; md != nil && md.Superseded == nil {
  461. m.log.Debugf("ignore-index-blob %v compacted to %v", ib, cl.OutputMetadata)
  462. delete(bimap, ib.BlobID)
  463. }
  464. }
  465. }
  466. }
  467. // NewManagerV0 creates new instance of ManagerV0 with all required parameters set.
  468. func NewManagerV0(
  469. st blob.Storage,
  470. enc *EncryptionManager,
  471. timeNow func() time.Time,
  472. formattingOptions IndexFormattingOptions,
  473. log logging.Logger,
  474. ) *ManagerV0 {
  475. return &ManagerV0{st, enc, timeNow, formattingOptions, log}
  476. }
  477. var _ Manager = (*ManagerV0)(nil)