123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632 |
- package indexblob
- import (
- "context"
- "encoding/json"
- "time"
- "github.com/pkg/errors"
- "golang.org/x/sync/errgroup"
- "github.com/kopia/kopia/internal/gather"
- "github.com/kopia/kopia/repo/blob"
- "github.com/kopia/kopia/repo/content/index"
- "github.com/kopia/kopia/repo/format"
- "github.com/kopia/kopia/repo/logging"
- )
- // V0IndexBlobPrefix is the prefix for all legacy (v0) index blobs.
- const V0IndexBlobPrefix = "n"
- // V0CompactionLogBlobPrefix is the prefix for all legacy (v0) index compactions blobs.
- const V0CompactionLogBlobPrefix = "m"
- // V0CleanupBlobPrefix is the prefix for all legacy (v0) index cleanup blobs.
- const V0CleanupBlobPrefix = "l"
- const defaultEventualConsistencySettleTime = 1 * time.Hour
- // compactionLogEntry represents contents of compaction log entry stored in `m` blob.
- type compactionLogEntry struct {
- // list of input blob names that were compacted together.
- InputMetadata []blob.Metadata `json:"inputMetadata"`
- // list of blobs that are results of compaction.
- OutputMetadata []blob.Metadata `json:"outputMetadata"`
- // Metadata of the compaction blob itself, not serialized.
- metadata blob.Metadata
- }
- // cleanupEntry represents contents of cleanup entry stored in `l` blob.
- type cleanupEntry struct {
- BlobIDs []blob.ID `json:"blobIDs"`
- // We're adding cleanup schedule time to make cleanup blobs unique which prevents them
- // from being rewritten, random would probably work just as well or another mechanism to prevent
- // deletion of blobs that does not require reading them in the first place (which messes up
- // read-after-create promise in S3).
- CleanupScheduleTime time.Time `json:"cleanupScheduleTime"`
- age time.Duration // not serialized, computed on load
- }
- // IndexFormattingOptions provides options for formatting index blobs.
- type IndexFormattingOptions interface {
- GetMutableParameters(ctx context.Context) (format.MutableParameters, error)
- }
- // ManagerV0 is a V0 (legacy) implementation of index blob manager.
- type ManagerV0 struct {
- st blob.Storage
- enc *EncryptionManager
- timeNow func() time.Time
- formattingOptions IndexFormattingOptions
- log logging.Logger
- }
- // ListIndexBlobInfos list active blob info structs. Also returns time of latest content deletion commit.
- func (m *ManagerV0) ListIndexBlobInfos(ctx context.Context) ([]Metadata, time.Time, error) {
- activeIndexBlobs, t0, err := m.ListActiveIndexBlobs(ctx)
- if err != nil {
- return nil, time.Time{}, err
- }
- q := make([]Metadata, 0, len(activeIndexBlobs))
- for _, activeIndexBlob := range activeIndexBlobs {
- // skip the V0 blob poison that is used to prevent client reads.
- if activeIndexBlob.BlobID == format.LegacyIndexPoisonBlobID {
- continue
- }
- q = append(q, activeIndexBlob)
- }
- return q, t0, nil
- }
- // ListActiveIndexBlobs lists the metadata for active index blobs and returns the cut-off time
- // before which all deleted index entries should be treated as non-existent.
- func (m *ManagerV0) ListActiveIndexBlobs(ctx context.Context) ([]Metadata, time.Time, error) {
- var compactionLogMetadata, storageIndexBlobs []blob.Metadata
- var eg errgroup.Group
- // list index and cleanup blobs in parallel.
- eg.Go(func() error {
- v, err := blob.ListAllBlobs(ctx, m.st, V0CompactionLogBlobPrefix)
- compactionLogMetadata = v
- return errors.Wrap(err, "error listing compaction blobs")
- })
- eg.Go(func() error {
- v, err := blob.ListAllBlobs(ctx, m.st, V0IndexBlobPrefix)
- storageIndexBlobs = v
- return errors.Wrap(err, "error listing index blobs")
- })
- if err := eg.Wait(); err != nil {
- return nil, time.Time{}, errors.Wrap(err, "error listing indexes")
- }
- for i, sib := range storageIndexBlobs {
- m.log.Debugf("found-index-blobs[%v] = %v", i, sib)
- }
- for i, clm := range compactionLogMetadata {
- m.log.Debugf("found-compaction-blobs[%v] %v", i, clm)
- }
- indexMap := map[blob.ID]*Metadata{}
- addBlobsToIndex(indexMap, storageIndexBlobs)
- compactionLogs, err := m.getCompactionLogEntries(ctx, compactionLogMetadata)
- if err != nil {
- return nil, time.Time{}, errors.Wrap(err, "error reading compaction log")
- }
- // remove entries from indexMap that have been compacted and replaced by other indexes.
- m.removeCompactedIndexes(indexMap, compactionLogs)
- var results []Metadata
- for _, v := range indexMap {
- results = append(results, *v)
- }
- for i, res := range results {
- m.log.Debugf("active-index-blobs[%v] = %v", i, res)
- }
- return results, time.Time{}, nil
- }
- // Invalidate invalidates any caches.
- func (m *ManagerV0) Invalidate() {
- }
- // Compact performs compaction of index blobs by merging smaller ones into larger
- // and registering compaction and cleanup blobs in the repository.
- func (m *ManagerV0) Compact(ctx context.Context, opt CompactOptions) error {
- indexBlobs, _, err := m.ListActiveIndexBlobs(ctx)
- if err != nil {
- return errors.Wrap(err, "error listing active index blobs")
- }
- mp, mperr := m.formattingOptions.GetMutableParameters(ctx)
- if mperr != nil {
- return errors.Wrap(mperr, "mutable parameters")
- }
- blobsToCompact := m.getBlobsToCompact(indexBlobs, opt, mp)
- if err := m.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil {
- return errors.Wrap(err, "error performing compaction")
- }
- if err := m.cleanup(ctx, opt.maxEventualConsistencySettleTime()); err != nil {
- return errors.Wrap(err, "error cleaning up index blobs")
- }
- return nil
- }
- func (m *ManagerV0) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
- logEntryBytes, err := json.Marshal(&compactionLogEntry{
- InputMetadata: inputs,
- OutputMetadata: outputs,
- })
- if err != nil {
- return errors.Wrap(err, "unable to marshal log entry bytes")
- }
- compactionLogBlobMetadata, err := m.enc.EncryptAndWriteBlob(ctx, gather.FromSlice(logEntryBytes), V0CompactionLogBlobPrefix, "")
- if err != nil {
- return errors.Wrap(err, "unable to write compaction log")
- }
- for i, input := range inputs {
- m.log.Debugf("compacted-input[%v/%v] %v", i, len(inputs), input)
- }
- for i, output := range outputs {
- m.log.Debugf("compacted-output[%v/%v] %v", i, len(outputs), output)
- }
- m.log.Debugf("compaction-log %v %v", compactionLogBlobMetadata.BlobID, compactionLogBlobMetadata.Timestamp)
- if err := m.deleteOldBlobs(ctx, compactionLogBlobMetadata, maxEventualConsistencySettleTime); err != nil {
- return errors.Wrap(err, "error deleting old index blobs")
- }
- return nil
- }
- // WriteIndexBlobs writes the provided data shards into new index blobs optionally appending the provided suffix.
- func (m *ManagerV0) WriteIndexBlobs(ctx context.Context, dataShards []gather.Bytes, suffix blob.ID) ([]blob.Metadata, error) {
- var result []blob.Metadata
- for _, data := range dataShards {
- bm, err := m.enc.EncryptAndWriteBlob(ctx, data, V0IndexBlobPrefix, suffix)
- if err != nil {
- return nil, errors.Wrap(err, "error writing index blob")
- }
- result = append(result, bm)
- }
- return result, nil
- }
- func (m *ManagerV0) getCompactionLogEntries(ctx context.Context, blobs []blob.Metadata) (map[blob.ID]*compactionLogEntry, error) {
- results := map[blob.ID]*compactionLogEntry{}
- var data gather.WriteBuffer
- defer data.Close()
- for _, cb := range blobs {
- err := m.enc.GetEncryptedBlob(ctx, cb.BlobID, &data)
- if errors.Is(err, blob.ErrBlobNotFound) {
- continue
- }
- if err != nil {
- return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID)
- }
- le := &compactionLogEntry{}
- if err := json.NewDecoder(data.Bytes().Reader()).Decode(le); err != nil {
- return nil, errors.Wrap(err, "unable to read compaction log entry %q")
- }
- le.metadata = cb
- results[cb.BlobID] = le
- }
- return results, nil
- }
- func (m *ManagerV0) getCleanupEntries(ctx context.Context, latestServerBlobTime time.Time, blobs []blob.Metadata) (map[blob.ID]*cleanupEntry, error) {
- results := map[blob.ID]*cleanupEntry{}
- var data gather.WriteBuffer
- defer data.Close()
- for _, cb := range blobs {
- data.Reset()
- err := m.enc.GetEncryptedBlob(ctx, cb.BlobID, &data)
- if errors.Is(err, blob.ErrBlobNotFound) {
- continue
- }
- if err != nil {
- return nil, errors.Wrapf(err, "unable to read compaction blob %q", cb.BlobID)
- }
- le := &cleanupEntry{}
- if err := json.NewDecoder(data.Bytes().Reader()).Decode(le); err != nil {
- return nil, errors.Wrap(err, "unable to read compaction log entry %q")
- }
- le.age = latestServerBlobTime.Sub(le.CleanupScheduleTime)
- results[cb.BlobID] = le
- }
- return results, nil
- }
- func (m *ManagerV0) deleteOldBlobs(ctx context.Context, latestBlob blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
- allCompactionLogBlobs, err := blob.ListAllBlobs(ctx, m.st, V0CompactionLogBlobPrefix)
- if err != nil {
- return errors.Wrap(err, "error listing compaction log blobs")
- }
- // look for server-assigned timestamp of the compaction log entry we just wrote as a reference.
- // we're assuming server-generated timestamps are somewhat reasonable and time is moving
- compactionLogServerTimeCutoff := latestBlob.Timestamp.Add(-maxEventualConsistencySettleTime)
- compactionBlobs := blobsOlderThan(allCompactionLogBlobs, compactionLogServerTimeCutoff)
- m.log.Debugf("fetching %v/%v compaction logs older than %v", len(compactionBlobs), len(allCompactionLogBlobs), compactionLogServerTimeCutoff)
- compactionBlobEntries, err := m.getCompactionLogEntries(ctx, compactionBlobs)
- if err != nil {
- return errors.Wrap(err, "unable to get compaction log entries")
- }
- indexBlobsToDelete := m.findIndexBlobsToDelete(latestBlob.Timestamp, compactionBlobEntries, maxEventualConsistencySettleTime)
- // note that we must always delete index blobs first before compaction logs
- // otherwise we may inadvertently resurrect an index blob that should have been removed.
- if err := m.deleteBlobsFromStorageAndCache(ctx, indexBlobsToDelete); err != nil {
- return errors.Wrap(err, "unable to delete compaction logs")
- }
- compactionLogBlobsToDelayCleanup := m.findCompactionLogBlobsToDelayCleanup(compactionBlobs)
- if err := m.delayCleanupBlobs(ctx, compactionLogBlobsToDelayCleanup, latestBlob.Timestamp); err != nil {
- return errors.Wrap(err, "unable to schedule delayed cleanup of blobs")
- }
- return nil
- }
- func (m *ManagerV0) findIndexBlobsToDelete(latestServerBlobTime time.Time, entries map[blob.ID]*compactionLogEntry, maxEventualConsistencySettleTime time.Duration) []blob.ID {
- tmp := map[blob.ID]bool{}
- for _, cl := range entries {
- // are the input index blobs in this compaction eligible for deletion?
- if age := latestServerBlobTime.Sub(cl.metadata.Timestamp); age < maxEventualConsistencySettleTime {
- m.log.Debugf("not deleting compacted index blob used as inputs for compaction %v, because it's too recent: %v < %v", cl.metadata.BlobID, age, maxEventualConsistencySettleTime)
- continue
- }
- for _, b := range cl.InputMetadata {
- m.log.Debugf("will delete old index %v compacted to %v", b, cl.OutputMetadata)
- tmp[b.BlobID] = true
- }
- }
- var result []blob.ID
- for k := range tmp {
- result = append(result, k)
- }
- return result
- }
- func (m *ManagerV0) findCompactionLogBlobsToDelayCleanup(compactionBlobs []blob.Metadata) []blob.ID {
- var result []blob.ID
- for _, cb := range compactionBlobs {
- m.log.Debugf("will delete compaction log blob %v", cb)
- result = append(result, cb.BlobID)
- }
- return result
- }
- func (m *ManagerV0) findBlobsToDelete(entries map[blob.ID]*cleanupEntry, maxEventualConsistencySettleTime time.Duration) (compactionLogs, cleanupBlobs []blob.ID) {
- for k, e := range entries {
- if e.age >= maxEventualConsistencySettleTime {
- compactionLogs = append(compactionLogs, e.BlobIDs...)
- cleanupBlobs = append(cleanupBlobs, k)
- }
- }
- return
- }
- func (m *ManagerV0) delayCleanupBlobs(ctx context.Context, blobIDs []blob.ID, cleanupScheduleTime time.Time) error {
- if len(blobIDs) == 0 {
- return nil
- }
- payload, err := json.Marshal(&cleanupEntry{
- BlobIDs: blobIDs,
- CleanupScheduleTime: cleanupScheduleTime,
- })
- if err != nil {
- return errors.Wrap(err, "unable to marshal cleanup log bytes")
- }
- if _, err := m.enc.EncryptAndWriteBlob(ctx, gather.FromSlice(payload), V0CleanupBlobPrefix, ""); err != nil {
- return errors.Wrap(err, "unable to cleanup log")
- }
- return nil
- }
- func (m *ManagerV0) deleteBlobsFromStorageAndCache(ctx context.Context, blobIDs []blob.ID) error {
- for _, blobID := range blobIDs {
- if err := m.st.DeleteBlob(ctx, blobID); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
- m.log.Debugf("delete-blob failed %v %v", blobID, err)
- return errors.Wrapf(err, "unable to delete blob %v", blobID)
- }
- m.log.Debugf("delete-blob succeeded %v", blobID)
- }
- return nil
- }
- func (m *ManagerV0) cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error {
- allCleanupBlobs, err := blob.ListAllBlobs(ctx, m.st, V0CleanupBlobPrefix)
- if err != nil {
- return errors.Wrap(err, "error listing cleanup blobs")
- }
- // determine latest storage write time of a cleanup blob
- var latestStorageWriteTimestamp time.Time
- for _, cb := range allCleanupBlobs {
- if cb.Timestamp.After(latestStorageWriteTimestamp) {
- latestStorageWriteTimestamp = cb.Timestamp
- }
- }
- // load cleanup entries and compute their age
- cleanupEntries, err := m.getCleanupEntries(ctx, latestStorageWriteTimestamp, allCleanupBlobs)
- if err != nil {
- return errors.Wrap(err, "error loading cleanup blobs")
- }
- // pick cleanup entries to delete that are old enough
- compactionLogsToDelete, cleanupBlobsToDelete := m.findBlobsToDelete(cleanupEntries, maxEventualConsistencySettleTime)
- if err := m.deleteBlobsFromStorageAndCache(ctx, compactionLogsToDelete); err != nil {
- return errors.Wrap(err, "unable to delete cleanup blobs")
- }
- if err := m.deleteBlobsFromStorageAndCache(ctx, cleanupBlobsToDelete); err != nil {
- return errors.Wrap(err, "unable to delete cleanup blobs")
- }
- if err := m.st.FlushCaches(ctx); err != nil {
- m.log.Debugw("error flushing caches", "err", err)
- }
- return nil
- }
- func (m *ManagerV0) getBlobsToCompact(indexBlobs []Metadata, opt CompactOptions, mp format.MutableParameters) []Metadata {
- var (
- nonCompactedBlobs, verySmallBlobs []Metadata
- totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64
- mediumSizedBlobCount int
- )
- for _, b := range indexBlobs {
- if b.Length > int64(mp.MaxPackSize) && !opt.AllIndexes {
- continue
- }
- nonCompactedBlobs = append(nonCompactedBlobs, b)
- totalSizeNonCompactedBlobs += b.Length
- if b.Length < int64(mp.MaxPackSize)/verySmallContentFraction {
- verySmallBlobs = append(verySmallBlobs, b)
- totalSizeVerySmallBlobs += b.Length
- } else {
- mediumSizedBlobCount++
- totalSizeMediumSizedBlobs += b.Length
- }
- }
- if len(nonCompactedBlobs) < opt.MaxSmallBlobs {
- // current count is below min allowed - nothing to do
- m.log.Debug("no small contents to Compact")
- return nil
- }
- if len(verySmallBlobs) > len(nonCompactedBlobs)/2 && mediumSizedBlobCount+1 < opt.MaxSmallBlobs {
- m.log.Debugf("compacting %v very small contents", len(verySmallBlobs))
- return verySmallBlobs
- }
- m.log.Debugf("compacting all %v non-compacted contents", len(nonCompactedBlobs))
- return nonCompactedBlobs
- }
- func (m *ManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []Metadata, opt CompactOptions) error {
- if len(indexBlobs) <= 1 && opt.DropDeletedBefore.IsZero() && len(opt.DropContents) == 0 {
- return nil
- }
- mp, mperr := m.formattingOptions.GetMutableParameters(ctx)
- if mperr != nil {
- return errors.Wrap(mperr, "mutable parameters")
- }
- bld := make(index.Builder)
- var inputs, outputs []blob.Metadata
- for i, indexBlob := range indexBlobs {
- m.log.Debugf("compacting-entries[%v/%v] %v", i, len(indexBlobs), indexBlob)
- if err := addIndexBlobsToBuilder(ctx, m.enc, bld, indexBlob.BlobID); err != nil {
- return errors.Wrap(err, "error adding index to builder")
- }
- inputs = append(inputs, indexBlob.Metadata)
- }
- // after we built index map in memory, drop contents from it
- // we must do it after all input blobs have been merged, otherwise we may resurrect contents.
- m.dropContentsFromBuilder(bld, opt)
- dataShards, cleanupShards, err := bld.BuildShards(mp.IndexVersion, false, DefaultIndexShardSize)
- if err != nil {
- return errors.Wrap(err, "unable to build an index")
- }
- defer cleanupShards()
- compactedIndexBlobs, err := m.WriteIndexBlobs(ctx, dataShards, "")
- if err != nil {
- return errors.Wrap(err, "unable to write compacted indexes")
- }
- outputs = append(outputs, compactedIndexBlobs...)
- if err := m.registerCompaction(ctx, inputs, outputs, opt.maxEventualConsistencySettleTime()); err != nil {
- return errors.Wrap(err, "unable to register compaction")
- }
- return nil
- }
- func (m *ManagerV0) dropContentsFromBuilder(bld index.Builder, opt CompactOptions) {
- for _, dc := range opt.DropContents {
- if _, ok := bld[dc]; ok {
- m.log.Debugf("manual-drop-from-index %v", dc)
- delete(bld, dc)
- }
- }
- if !opt.DropDeletedBefore.IsZero() {
- m.log.Debugf("drop-content-deleted-before %v", opt.DropDeletedBefore)
- for _, i := range bld {
- if i.Deleted && i.Timestamp().Before(opt.DropDeletedBefore) {
- m.log.Debugf("drop-from-index-old-deleted %v %v", i.ContentID, i.Timestamp())
- delete(bld, i.ContentID)
- }
- }
- m.log.Debugf("finished drop-content-deleted-before %v", opt.DropDeletedBefore)
- }
- }
- func addIndexBlobsToBuilder(ctx context.Context, enc *EncryptionManager, bld index.BuilderCreator, indexBlobID blob.ID) error {
- var data gather.WriteBuffer
- defer data.Close()
- err := enc.GetEncryptedBlob(ctx, indexBlobID, &data)
- if err != nil {
- return errors.Wrapf(err, "error getting index %q", indexBlobID)
- }
- ndx, err := index.Open(data.ToByteSlice(), nil, enc.crypter.Encryptor().Overhead)
- if err != nil {
- return errors.Wrapf(err, "unable to open index blob %q", indexBlobID)
- }
- _ = ndx.Iterate(index.AllIDs, func(i index.Info) error {
- bld.Add(i)
- return nil
- })
- return nil
- }
- func blobsOlderThan(m []blob.Metadata, cutoffTime time.Time) []blob.Metadata {
- var res []blob.Metadata
- for _, m := range m {
- if !m.Timestamp.After(cutoffTime) {
- res = append(res, m)
- }
- }
- return res
- }
- func (m *ManagerV0) removeCompactedIndexes(bimap map[blob.ID]*Metadata, compactionLogs map[blob.ID]*compactionLogEntry) {
- var validCompactionLogs []*compactionLogEntry
- for _, cl := range compactionLogs {
- // only process compaction logs for which we have found all the outputs.
- haveAllOutputs := true
- for _, o := range cl.OutputMetadata {
- if bimap[o.BlobID] == nil {
- haveAllOutputs = false
- m.log.Debugf("blob %v referenced by compaction log is not found", o.BlobID)
- break
- }
- }
- if haveAllOutputs {
- validCompactionLogs = append(validCompactionLogs, cl)
- }
- }
- // now remove all inputs from the set if there's a valid compaction log entry with all the outputs.
- for _, cl := range validCompactionLogs {
- for _, ib := range cl.InputMetadata {
- if md := bimap[ib.BlobID]; md != nil && md.Superseded == nil {
- m.log.Debugf("ignore-index-blob %v compacted to %v", ib, cl.OutputMetadata)
- delete(bimap, ib.BlobID)
- }
- }
- }
- }
- // NewManagerV0 creates new instance of ManagerV0 with all required parameters set.
- func NewManagerV0(
- st blob.Storage,
- enc *EncryptionManager,
- timeNow func() time.Time,
- formattingOptions IndexFormattingOptions,
- log logging.Logger,
- ) *ManagerV0 {
- return &ManagerV0{st, enc, timeNow, formattingOptions, log}
- }
- var _ Manager = (*ManagerV0)(nil)
|