瀏覽代碼

Implemented epoch-based index manager (#1174)

* epoch: misc fixes and logging

* blob: misc helpers

* cli: removed useless 'repository upgrade', replaced by 'repository set-parameters'

* content: implemented indexBlobManagerV1 which uses epoch manager

* cli: commands to manipulate repository epoch parameters

* cli: commands to examine epoch-based indexes

* content: added test suite that uses epoch-based index manager

* content: fixed a ton of test data races caused by sharing blobtesting.DataMap

* cli: additional tests and validation for 'repository set-params'

* testing: replaced the use of suite with our own, since suite is not parallelizable
Jarek Kowalski 4 年之前
父節點
當前提交
62ad437bb6

+ 3 - 0
cli/command_index.go

@@ -1,6 +1,8 @@
 package cli
 
 type commandIndex struct {
+	epoch commandIndexEpoch
+
 	inspect  commandIndexInspect
 	list     commandIndexList
 	optimize commandIndexOptimize
@@ -10,6 +12,7 @@ type commandIndex struct {
 func (c *commandIndex) setup(svc appServices, parent commandParent) {
 	cmd := parent.Command("index", "Commands to manipulate content index.").Hidden()
 
+	c.epoch.setup(svc, cmd)
 	c.inspect.setup(svc, cmd)
 	c.list.setup(svc, cmd)
 	c.optimize.setup(svc, cmd)

+ 11 - 0
cli/command_index_epoch.go

@@ -0,0 +1,11 @@
+package cli
+
+type commandIndexEpoch struct {
+	list commandIndexEpochList
+}
+
+func (c *commandIndexEpoch) setup(svc appServices, parent commandParent) {
+	cmd := parent.Command("epoch", "Manage index manager epochs").Hidden()
+
+	c.list.setup(svc, cmd)
+}

+ 82 - 0
cli/command_index_epoch_list.go

@@ -0,0 +1,82 @@
+package cli
+
+import (
+	"context"
+	"time"
+
+	"github.com/pkg/errors"
+
+	"github.com/kopia/kopia/internal/units"
+	"github.com/kopia/kopia/repo"
+	"github.com/kopia/kopia/repo/blob"
+)
+
+type commandIndexEpochList struct {
+	out textOutput
+}
+
+func (c *commandIndexEpochList) setup(svc appServices, parent commandParent) {
+	cmd := parent.Command("list", "List the status of epochs.")
+	cmd.Action(svc.directRepositoryReadAction(c.run))
+
+	c.out.setup(svc)
+}
+
+func (c *commandIndexEpochList) run(ctx context.Context, rep repo.DirectRepository) error {
+	emgr, ok := rep.ContentReader().EpochManager()
+	if !ok {
+		return errors.Errorf("epoch manager is not active")
+	}
+
+	snap, err := emgr.Current(ctx)
+	if err != nil {
+		return errors.Wrap(err, "unable to determine current epoch")
+	}
+
+	c.out.printStdout("Current Epoch: %v\n", snap.WriteEpoch)
+
+	if est := snap.EpochStartTime[snap.WriteEpoch]; !est.IsZero() {
+		c.out.printStdout("Epoch Started  %v\n", formatTimestamp(est))
+	}
+
+	firstNonRangeCompacted := 0
+	if len(snap.LongestRangeCheckpointSets) > 0 {
+		firstNonRangeCompacted = snap.LongestRangeCheckpointSets[len(snap.LongestRangeCheckpointSets)-1].MaxEpoch + 1
+	}
+
+	for e := snap.WriteEpoch; e >= firstNonRangeCompacted; e-- {
+		if uces := snap.UncompactedEpochSets[e]; len(uces) > 0 {
+			min := blob.MinTimestamp(uces)
+			max := blob.MaxTimestamp(uces)
+
+			c.out.printStdout("%v %v ... %v, %v blobs, %v, span %v\n",
+				e,
+				formatTimestamp(min),
+				formatTimestamp(max),
+				len(uces),
+				units.BytesStringBase2(blob.TotalLength(uces)),
+				max.Sub(min).Round(time.Second),
+			)
+		}
+
+		if secs := snap.SingleEpochCompactionSets[e]; secs != nil {
+			c.out.printStdout("%v: %v single-epoch %v blobs, %v\n",
+				e,
+				formatTimestamp(secs[0].Timestamp),
+				len(secs),
+				units.BytesStringBase2(blob.TotalLength(secs)),
+			)
+		}
+	}
+
+	for _, cs := range snap.LongestRangeCheckpointSets {
+		c.out.printStdout("%v-%v: range, %v blobs, %v\n",
+			cs.MinEpoch,
+			cs.MaxEpoch,
+			len(cs.Blobs),
+			units.BytesStringBase2(blob.TotalLength(cs.Blobs)),
+		)
+	}
+
+	return nil
+}

+ 1 - 1
cli/command_index_list.go

@@ -58,7 +58,7 @@ func (c *commandIndexList) run(ctx context.Context, rep repo.DirectRepository) e
 		if c.jo.jsonOutput {
 			jl.emit(b)
 		} else {
-			c.out.printStdout("%-40v %10v %v %v\n", b.BlobID, b.Length, formatTimestampPrecise(b.Timestamp), b.Superseded)
+			c.out.printStdout("%-60v %10v %v %v\n", b.BlobID, b.Length, formatTimestampPrecise(b.Timestamp), b.Superseded)
 		}
 	}
 

+ 0 - 2
cli/command_repository.go

@@ -9,7 +9,6 @@ type commandRepository struct {
 	setParameters commandRepositorySetParameters
 	status        commandRepositoryStatus
 	syncTo        commandRepositorySyncTo
-	upgrade       commandRepositoryUpgrade
 }
 
 func (c *commandRepository) setup(svc advancedAppServices, parent commandParent) {
@@ -23,5 +22,4 @@ func (c *commandRepository) setup(svc advancedAppServices, parent commandParent)
 	c.setParameters.setup(svc, cmd)
 	c.status.setup(svc, cmd)
 	c.syncTo.setup(svc, cmd)
-	c.upgrade.setup(svc, cmd)
 }

+ 13 - 1
cli/command_repository_create.go

@@ -6,6 +6,7 @@ import (
 	"github.com/alecthomas/kingpin"
 	"github.com/pkg/errors"
 
+	"github.com/kopia/kopia/internal/epoch"
 	"github.com/kopia/kopia/repo"
 	"github.com/kopia/kopia/repo/blob"
 	"github.com/kopia/kopia/repo/content"
@@ -22,6 +23,7 @@ type commandRepositoryCreate struct {
 	createSplitter              string
 	createOnly                  bool
 	createIndexVersion          int
+	createIndexEpochs           bool
 
 	co  connectOptions
 	svc advancedAppServices
@@ -36,6 +38,7 @@ func (c *commandRepositoryCreate) setup(svc advancedAppServices, parent commandP
 	cmd.Flag("object-splitter", "The splitter to use for new objects in the repository").Default(splitter.DefaultAlgorithm).EnumVar(&c.createSplitter, splitter.SupportedAlgorithms()...)
 	cmd.Flag("create-only", "Create repository, but don't connect to it.").Short('c').BoolVar(&c.createOnly)
 	cmd.Flag("index-version", "Force particular index version").Hidden().Envar("KOPIA_CREATE_INDEX_VERSION").IntVar(&c.createIndexVersion)
+	cmd.Flag("enable-index-epochs", "Enable index epochs").Hidden().BoolVar(&c.createIndexEpochs)
 
 	c.co.setup(cmd)
 	c.svc = svc
@@ -62,13 +65,22 @@ func (c *commandRepositoryCreate) setup(svc advancedAppServices, parent commandP
 	}
 }
 
+func (c *commandRepositoryCreate) epochParametersFromFlags() epoch.Parameters {
+	if !c.createIndexEpochs {
+		return epoch.Parameters{}
+	}
+
+	return epoch.DefaultParameters
+}
+
 func (c *commandRepositoryCreate) newRepositoryOptionsFromFlags() *repo.NewRepositoryOptions {
 	return &repo.NewRepositoryOptions{
 		BlockFormat: content.FormattingOptions{
 			Hash:       c.createBlockHashFormat,
 			Encryption: c.createBlockEncryptionFormat,
 			MutableParameters: content.MutableParameters{
-				IndexVersion: c.createIndexVersion,
+				IndexVersion:    c.createIndexVersion,
+				EpochParameters: c.epochParametersFromFlags(),
 			},
 		},
 

+ 91 - 10
cli/command_repository_set_parameters.go

@@ -2,9 +2,11 @@ package cli
 
 import (
 	"context"
+	"time"
 
 	"github.com/pkg/errors"
 
+	"github.com/kopia/kopia/internal/epoch"
 	"github.com/kopia/kopia/internal/units"
 	"github.com/kopia/kopia/repo"
 )
@@ -13,42 +15,121 @@ type commandRepositorySetParameters struct {
 	maxPackSizeMB      int
 	indexFormatVersion int
 
+	epochRefreshFrequency    time.Duration
+	epochMinDuration         time.Duration
+	epochCleanupSafetyMargin time.Duration
+	epochAdvanceOnCount      int
+	epochAdvanceOnSizeMB     int64
+	epochDeleteParallelism   int
+	epochCheckpointFrequency int
+
+	upgradeRepositoryFormat bool
+
 	svc appServices
 }
 
 func (c *commandRepositorySetParameters) setup(svc appServices, parent commandParent) {
-	cmd := parent.Command("set-parameters", "Set repository parameters.")
+	cmd := parent.Command("set-parameters", "Set repository parameters.").Alias("set-params")
 
 	cmd.Flag("max-pack-size-mb", "Set max pack file size").PlaceHolder("MB").IntVar(&c.maxPackSizeMB)
 	cmd.Flag("index-version", "Set version of index format used for writing").IntVar(&c.indexFormatVersion)
+
+	cmd.Flag("upgrade", "Uprade repository to the latest format").BoolVar(&c.upgradeRepositoryFormat)
+
+	cmd.Flag("epoch-refresh-frequency", "Epoch refresh frequency").DurationVar(&c.epochRefreshFrequency)
+	cmd.Flag("epoch-min-duration", "Minimal duration of a single epoch").DurationVar(&c.epochMinDuration)
+	cmd.Flag("epoch-cleanup-safety-margin", "Epoch cleanup safety margin").DurationVar(&c.epochCleanupSafetyMargin)
+	cmd.Flag("epoch-advance-on-count", "Advance epoch if the number of indexes exceeds given threshold").IntVar(&c.epochAdvanceOnCount)
+	cmd.Flag("epoch-advance-on-size-mb", "Advance epoch if the total size of indexes exceeds given threshold").Int64Var(&c.epochAdvanceOnSizeMB)
+	cmd.Flag("epoch-delete-parallelism", "Epoch delete parallelism").IntVar(&c.epochDeleteParallelism)
+	cmd.Flag("epoch-checkpoint-frequency", "Checkpoint frequency").IntVar(&c.epochCheckpointFrequency)
+
 	cmd.Action(svc.directRepositoryWriteAction(c.run))
 
 	c.svc = svc
 }
 
+func (c *commandRepositorySetParameters) setSizeMBParameter(ctx context.Context, v int, desc string, dst *int, anyChange *bool) {
+	if v == 0 {
+		return
+	}
+
+	*dst = v << 20 //nolint:gomnd
+	*anyChange = true
+
+	log(ctx).Infof(" - setting %v to %v.\n", desc, units.BytesStringBase2(int64(v)<<20)) // nolint:gomnd
+}
+
+func (c *commandRepositorySetParameters) setInt64SizeMBParameter(ctx context.Context, v int64, desc string, dst *int64, anyChange *bool) {
+	if v == 0 {
+		return
+	}
+
+	*dst = v << 20 //nolint:gomnd
+	*anyChange = true
+
+	log(ctx).Infof(" - setting %v to %v.\n", desc, units.BytesStringBase2(v<<20)) // nolint:gomnd
+}
+
+func (c *commandRepositorySetParameters) setIntParameter(ctx context.Context, v int, desc string, dst *int, anyChange *bool) {
+	if v == 0 {
+		return
+	}
+
+	*dst = v
+	*anyChange = true
+
+	log(ctx).Infof(" - setting %v to %v.\n", desc, v)
+}
+
+func (c *commandRepositorySetParameters) setDurationParameter(ctx context.Context, v time.Duration, desc string, dst *time.Duration, anyChange *bool) {
+	if v == 0 {
+		return
+	}
+
+	*dst = v
+	*anyChange = true
+
+	log(ctx).Infof(" - setting %v to %v.\n", desc, v)
+}
+
 func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
 	var anyChange bool
 
 	mp := rep.ContentReader().ContentFormat().MutableParameters
 
-	if c.maxPackSizeMB != 0 {
-		mp.MaxPackSize = c.maxPackSizeMB << 20 // nolint:gomnd
-		anyChange = true
+	upgradeToEpochManager := false
 
-		log(ctx).Infof(" - setting maximum pack size to %v.\n", units.BytesStringBase2(int64(mp.MaxPackSize)))
+	if c.upgradeRepositoryFormat && !mp.EpochParameters.Enabled {
+		mp.EpochParameters = epoch.DefaultParameters
+		upgradeToEpochManager = true
+		mp.IndexVersion = 2
+		anyChange = true
 	}
 
-	if c.indexFormatVersion != 0 {
-		mp.IndexVersion = c.indexFormatVersion
-		anyChange = true
+	c.setSizeMBParameter(ctx, c.maxPackSizeMB, "maximum pack size", &mp.MaxPackSize, &anyChange)
+	c.setIntParameter(ctx, c.indexFormatVersion, "index format version", &mp.IndexVersion, &anyChange)
 
-		log(ctx).Infof(" - setting index format version to %v.\n", c.indexFormatVersion)
-	}
+	c.setDurationParameter(ctx, c.epochMinDuration, "minimum epoch duration", &mp.EpochParameters.MinEpochDuration, &anyChange)
+	c.setDurationParameter(ctx, c.epochRefreshFrequency, "epoch refresh frequency", &mp.EpochParameters.EpochRefreshFrequency, &anyChange)
+	c.setDurationParameter(ctx, c.epochCleanupSafetyMargin, "epoch cleanup safety margin", &mp.EpochParameters.CleanupSafetyMargin, &anyChange)
+	c.setIntParameter(ctx, c.epochAdvanceOnCount, "epoch advance on count", &mp.EpochParameters.EpochAdvanceOnCountThreshold, &anyChange)
+	c.setInt64SizeMBParameter(ctx, c.epochAdvanceOnSizeMB, "epoch advance on total size", &mp.EpochParameters.EpochAdvanceOnTotalSizeBytesThreshold, &anyChange)
+	c.setIntParameter(ctx, c.epochDeleteParallelism, "epoch delete parallelism", &mp.EpochParameters.DeleteParallelism, &anyChange)
+	c.setIntParameter(ctx, c.epochCheckpointFrequency, "epoch checkpoint frequency", &mp.EpochParameters.FullCheckpointFrequency, &anyChange)
 
 	if !anyChange {
 		return errors.Errorf("no changes")
 	}
 
+	if upgradeToEpochManager {
+		log(ctx).Infof("migrating current indexes to epoch format")
+
+		if err := rep.ContentManager().PrepareUpgradeToIndexBlobManagerV1(ctx, mp.EpochParameters); err != nil {
+			return errors.Wrap(err, "error upgrading indexes")
+		}
+	}
+
 	if err := rep.SetParameters(ctx, mp); err != nil {
 		return errors.Wrap(err, "error setting parameters")
 	}

+ 41 - 4
cli/command_repository_set_parameters_test.go

@@ -19,10 +19,10 @@ func TestRepositorySetParameters(t *testing.T) {
 	require.Contains(t, out, "Index Format:        v1")
 
 	// failure cases
-	env.RunAndExpectFailure(t, "repository", "set-params")
-	env.RunAndExpectFailure(t, "repository", "set-params", "--index-version=33")
-	env.RunAndExpectFailure(t, "repository", "set-params", "--max-pack-size-mb=9")
-	env.RunAndExpectFailure(t, "repository", "set-params", "--max-pack-size-mb=121")
+	env.RunAndExpectFailure(t, "repository", "set-parameters")
+	env.RunAndExpectFailure(t, "repository", "set-parameters", "--index-version=33")
+	env.RunAndExpectFailure(t, "repository", "set-parameters", "--max-pack-size-mb=9")
+	env.RunAndExpectFailure(t, "repository", "set-parameters", "--max-pack-size-mb=121")
 
 	env.RunAndExpectSuccess(t, "repository", "set-parameters", "--index-version=2", "--max-pack-size-mb=33")
 	out = env.RunAndExpectSuccess(t, "repository", "status")
@@ -33,3 +33,40 @@ func TestRepositorySetParameters(t *testing.T) {
 	out = env.RunAndExpectSuccess(t, "repository", "status")
 	require.Contains(t, out, "Max pack length:     44 MiB")
 }
+
+func TestRepositorySetParametersUpgrade(t *testing.T) {
+	env := testenv.NewCLITest(t, testenv.NewInProcRunner(t))
+
+	env.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", env.RepoDir)
+	out := env.RunAndExpectSuccess(t, "repository", "status")
+
+	// default values
+	require.Contains(t, out, "Max pack length:     20 MiB")
+	require.Contains(t, out, "Index Format:        v1")
+	require.Contains(t, out, "Epoch Manager:       disabled")
+
+	env.RunAndExpectFailure(t, "index", "epoch", "list")
+
+	env.RunAndExpectSuccess(t, "repository", "set-parameters", "--upgrade")
+
+	env.RunAndExpectSuccess(t, "repository", "set-parameters", "--epoch-min-duration", "3h")
+	env.RunAndExpectSuccess(t, "repository", "set-parameters", "--epoch-cleanup-safety-margin", "23h")
+	env.RunAndExpectSuccess(t, "repository", "set-parameters", "--epoch-advance-on-size-mb", "77")
+	env.RunAndExpectSuccess(t, "repository", "set-parameters", "--epoch-advance-on-count", "22")
+	env.RunAndExpectSuccess(t, "repository", "set-parameters", "--epoch-checkpoint-frequency", "9")
+
+	env.RunAndExpectFailure(t, "repository", "set-parameters", "--epoch-min-duration", "1s")
+	env.RunAndExpectFailure(t, "repository", "set-parameters", "--epoch-refresh-frequency", "10h")
+	env.RunAndExpectFailure(t, "repository", "set-parameters", "--epoch-checkpoint-frequency", "-10")
+	env.RunAndExpectFailure(t, "repository", "set-parameters", "--epoch-cleanup-safety-margin", "10s")
+	env.RunAndExpectFailure(t, "repository", "set-parameters", "--epoch-advance-on-count", "1")
+
+	out = env.RunAndExpectSuccess(t, "repository", "status")
+	require.Contains(t, out, "Epoch Manager:       enabled")
+	require.Contains(t, out, "Index Format:        v2")
+	require.Contains(t, out, "Epoch cleanup margin:    23h0m0s")
+	require.Contains(t, out, "Epoch advance on:        22 blobs or 77 MiB, minimum 3h0m0s")
+	require.Contains(t, out, "Epoch checkpoint every:  9 epochs")
+
+	env.RunAndExpectSuccess(t, "index", "epoch", "list")
+}

+ 19 - 0
cli/command_repository_status.go

@@ -61,9 +61,28 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
 	c.out.printStdout("Splitter:            %v\n", dr.ObjectFormat().Splitter)
 	c.out.printStdout("Format version:      %v\n", dr.ContentReader().ContentFormat().Version)
 	c.out.printStdout("Content compression: %v\n", dr.ContentReader().SupportsContentCompression())
+
 	c.out.printStdout("Max pack length:     %v\n", units.BytesStringBase2(int64(dr.ContentReader().ContentFormat().MaxPackSize)))
 	c.out.printStdout("Index Format:        v%v\n", dr.ContentReader().ContentFormat().IndexVersion)
 
+	if emgr, ok := dr.ContentReader().EpochManager(); ok {
+		c.out.printStdout("\n")
+		c.out.printStdout("Epoch Manager:       enabled\n")
+
+		snap, err := emgr.Current(ctx)
+		if err == nil {
+			c.out.printStdout("Current Epoch: %v\n", snap.WriteEpoch)
+		}
+
+		c.out.printStdout("\n")
+		c.out.printStdout("Epoch refresh frequency: %v\n", emgr.Params.EpochRefreshFrequency)
+		c.out.printStdout("Epoch advance on:        %v blobs or %v, minimum %v\n", emgr.Params.EpochAdvanceOnCountThreshold, units.BytesStringBase2(emgr.Params.EpochAdvanceOnTotalSizeBytesThreshold), emgr.Params.MinEpochDuration)
+		c.out.printStdout("Epoch cleanup margin:    %v\n", emgr.Params.CleanupSafetyMargin)
+		c.out.printStdout("Epoch checkpoint every:  %v epochs\n", emgr.Params.FullCheckpointFrequency)
+	} else {
+		c.out.printStdout("Epoch Manager:       disabled\n")
+	}
+
 	if !c.statusReconnectToken {
 		return nil
 	}

+ 0 - 19
cli/command_repository_upgrade.go

@@ -1,19 +0,0 @@
-package cli
-
-import (
-	"context"
-
-	"github.com/kopia/kopia/repo"
-)
-
-type commandRepositoryUpgrade struct{}
-
-func (c *commandRepositoryUpgrade) setup(svc appServices, parent commandParent) {
-	cmd := parent.Command("upgrade", "Upgrade repository format.")
-	cmd.Action(svc.directRepositoryWriteAction(c.run))
-}
-
-func (c *commandRepositoryUpgrade) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
-	// nolint:wrapcheck
-	return rep.Upgrade(ctx)
-}

+ 93 - 26
internal/epoch/epoch_manager.go

@@ -31,6 +31,9 @@ const (
 
 // Parameters encapsulates all parameters that influence the behavior of epoch manager.
 type Parameters struct {
+	// whether epoch manager is enabled, must be true.
+	Enabled bool
+
 	// how frequently each client will list blobs to determine the current epoch.
 	EpochRefreshFrequency time.Duration
 
@@ -53,9 +56,44 @@ type Parameters struct {
 	DeleteParallelism int
 }
 
+// Validate validates epoch parameters.
+// nolint:gomnd
+func (p *Parameters) Validate() error {
+	if !p.Enabled {
+		return nil
+	}
+
+	if p.MinEpochDuration < 10*time.Minute {
+		return errors.Errorf("minimum epoch duration too low: %v", p.MinEpochDuration)
+	}
+
+	if p.EpochRefreshFrequency*3 > p.MinEpochDuration {
+		return errors.Errorf("epoch refresh frequency too high, must be 1/3 or minimal epoch duration or less")
+	}
+
+	if p.FullCheckpointFrequency <= 0 {
+		return errors.Errorf("invalid epoch checkpoint frequency")
+	}
+
+	if p.CleanupSafetyMargin*3 < p.EpochRefreshFrequency {
+		return errors.Errorf("invalid cleanup safety margin, must be at least 3x epoch refresh frequency")
+	}
+
+	if p.EpochAdvanceOnCountThreshold < 10 {
+		return errors.Errorf("epoch advance on count too low")
+	}
+
+	if p.EpochAdvanceOnTotalSizeBytesThreshold < 1<<20 {
+		return errors.Errorf("epoch advance on size too low")
+	}
+
+	return nil
+}
+
 // DefaultParameters contains default epoch manager parameters.
 // nolint:gomnd
 var DefaultParameters = Parameters{
+	Enabled:                               true,
 	EpochRefreshFrequency:                 20 * time.Minute,
 	FullCheckpointFrequency:               7,
 	CleanupSafetyMargin:                   1 * time.Hour,
@@ -103,15 +141,19 @@ type Manager struct {
 	writeIndexTooSlow            *int32
 }
 
+// Index blob prefixes.
 const (
-	epochMarkerIndexBlobPrefix      blob.ID = "xe"
-	uncompactedIndexBlobPrefix      blob.ID = "xn"
-	singleEpochCompactionBlobPrefix blob.ID = "xs"
-	rangeCheckpointIndexBlobPrefix  blob.ID = "xr"
-
-	numUnsettledEpochs = 2
+	EpochMarkerIndexBlobPrefix      blob.ID = "xe"
+	UncompactedIndexBlobPrefix      blob.ID = "xn"
+	SingleEpochCompactionBlobPrefix blob.ID = "xs"
+	RangeCheckpointIndexBlobPrefix  blob.ID = "xr"
 )
 
+// FirstEpoch is the number of the first epoch in a repository.
+const FirstEpoch = 0
+
+const numUnsettledEpochs = 2
+
 // CompactionFunc merges the given set of index blobs into a new index blob set with a given prefix
 // and writes them out as a set following naming convention established in 'complete_set.go'.
 type CompactionFunc func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error
@@ -190,7 +232,7 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error
 	eg.Go(func() error {
 		var toDelete []blob.ID
 
-		if err := e.st.ListBlobs(ctx, epochMarkerIndexBlobPrefix, func(bm blob.Metadata) error {
+		if err := e.st.ListBlobs(ctx, EpochMarkerIndexBlobPrefix, func(bm blob.Metadata) error {
 			if n, ok := epochNumberFromBlobID(bm.BlobID); ok {
 				if n < cs.WriteEpoch-1 {
 					toDelete = append(toDelete, bm.BlobID)
@@ -208,7 +250,7 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error
 	// delete uncompacted indexes for epochs that already have single-epoch compaction
 	// that was written sufficiently long ago.
 	eg.Go(func() error {
-		blobs, err := blob.ListAllBlobs(ctx, e.st, uncompactedIndexBlobPrefix)
+		blobs, err := blob.ListAllBlobs(ctx, e.st, UncompactedIndexBlobPrefix)
 		if err != nil {
 			return errors.Wrap(err, "error listing uncompacted blobs")
 		}
@@ -247,6 +289,10 @@ func blobSetWrittenEarlyEnough(replacementSet []blob.Metadata, maxReplacementTim
 func (e *Manager) refreshLocked(ctx context.Context) error {
 	nextDelayTime := initiaRefreshAttemptSleep
 
+	if !e.Params.Enabled {
+		return errors.Errorf("epoch manager not enabled")
+	}
+
 	for err := e.refreshAttemptLocked(ctx); err != nil; err = e.refreshAttemptLocked(ctx) {
 		e.log.Debugf("refresh attempt failed: %v, sleeping %v before next retry", err, nextDelayTime)
 
@@ -261,7 +307,7 @@ func (e *Manager) refreshLocked(ctx context.Context) error {
 }
 
 func (e *Manager) loadWriteEpoch(ctx context.Context, cs *CurrentSnapshot) error {
-	blobs, err := blob.ListAllBlobs(ctx, e.st, epochMarkerIndexBlobPrefix)
+	blobs, err := blob.ListAllBlobs(ctx, e.st, EpochMarkerIndexBlobPrefix)
 	if err != nil {
 		return errors.Wrap(err, "error loading write epoch")
 	}
@@ -278,11 +324,13 @@ func (e *Manager) loadWriteEpoch(ctx context.Context, cs *CurrentSnapshot) error
 }
 
 func (e *Manager) loadRangeCheckpoints(ctx context.Context, cs *CurrentSnapshot) error {
-	blobs, err := blob.ListAllBlobs(ctx, e.st, rangeCheckpointIndexBlobPrefix)
+	blobs, err := blob.ListAllBlobs(ctx, e.st, RangeCheckpointIndexBlobPrefix)
 	if err != nil {
 		return errors.Wrap(err, "error loading full checkpoints")
 	}
 
+	e.log.Debugf("ranges: %v", blobs)
+
 	var rangeCheckpointSets []*RangeMetadata
 
 	for epoch1, m := range groupByEpochRanges(blobs) {
@@ -305,7 +353,7 @@ func (e *Manager) loadRangeCheckpoints(ctx context.Context, cs *CurrentSnapshot)
 }
 
 func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *CurrentSnapshot) error {
-	blobs, err := blob.ListAllBlobs(ctx, e.st, singleEpochCompactionBlobPrefix)
+	blobs, err := blob.ListAllBlobs(ctx, e.st, SingleEpochCompactionBlobPrefix)
 	if err != nil {
 		return errors.Wrap(err, "error loading single-epoch compactions")
 	}
@@ -331,9 +379,13 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs
 	}
 
 	if latestSettled-firstNonRangeCompacted < e.Params.FullCheckpointFrequency {
+		e.log.Debugf("not generating range checkpoint")
+
 		return
 	}
 
+	e.log.Debugf("generating range checkpoint")
+
 	e.backgroundWork.Add(1)
 
 	go func() {
@@ -374,7 +426,7 @@ func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[
 		}
 
 		eg.Go(func() error {
-			bm, err := blob.ListAllBlobs(ctx, e.st, uncompactedEpochBlobPrefix(n))
+			bm, err := blob.ListAllBlobs(ctx, e.st, UncompactedEpochBlobPrefix(n))
 			if err != nil {
 				return errors.Wrapf(err, "error listing uncompacted epoch %v", n)
 			}
@@ -405,6 +457,8 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
 		ValidUntil:                e.timeFunc().Add(e.Params.EpochRefreshFrequency),
 	}
 
+	e.log.Infof("refreshAttemptLocked")
+
 	eg, ctx := errgroup.WithContext(ctx)
 	eg.Go(func() error {
 		return e.loadWriteEpoch(ctx, &cs)
@@ -459,7 +513,7 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
 }
 
 func (e *Manager) advanceEpoch(ctx context.Context, cs CurrentSnapshot) error {
-	blobID := blob.ID(fmt.Sprintf("%v%v", string(epochMarkerIndexBlobPrefix), cs.WriteEpoch+1))
+	blobID := blob.ID(fmt.Sprintf("%v%v", string(EpochMarkerIndexBlobPrefix), cs.WriteEpoch+1))
 
 	if err := e.st.PutBlob(ctx, blobID, gather.FromSlice([]byte("epoch-marker"))); err != nil {
 		return errors.Wrap(err, "error writing epoch marker")
@@ -473,6 +527,8 @@ func (e *Manager) committedState(ctx context.Context) (CurrentSnapshot, error) {
 	defer e.mu.Unlock()
 
 	if e.timeFunc().After(e.lastKnownState.ValidUntil) {
+		e.log.Debugf("refreshing committed state because it's no longer valid")
+
 		if err := e.refreshLocked(ctx); err != nil {
 			return CurrentSnapshot{}, err
 		}
@@ -513,17 +569,28 @@ func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob
 }
 
 // WriteIndex writes new index blob by picking the appropriate prefix based on current epoch.
-func (e *Manager) WriteIndex(ctx context.Context, unprefixedBlobID blob.ID, data blob.Bytes) (blob.Metadata, error) {
+func (e *Manager) WriteIndex(ctx context.Context, dataShards map[blob.ID]blob.Bytes) ([]blob.Metadata, error) {
 	for {
 		cs, err := e.committedState(ctx)
 		if err != nil {
-			return blob.Metadata{}, errors.Wrap(err, "error getting committed state")
+			return nil, errors.Wrap(err, "error getting committed state")
 		}
 
-		blobID := uncompactedEpochBlobPrefix(cs.WriteEpoch) + unprefixedBlobID
+		var results []blob.Metadata
+
+		for unprefixedBlobID, data := range dataShards {
+			blobID := UncompactedEpochBlobPrefix(cs.WriteEpoch) + unprefixedBlobID
+
+			if err := e.st.PutBlob(ctx, blobID, data); err != nil {
+				return nil, errors.Wrap(err, "error writing index blob")
+			}
+
+			bm, err := e.st.GetMetadata(ctx, blobID)
+			if err != nil {
+				return nil, errors.Wrap(err, "error getting index metadata")
+			}
 
-		if err := e.st.PutBlob(ctx, blobID, data); err != nil {
-			return blob.Metadata{}, errors.Wrap(err, "error writing index blob")
+			results = append(results, bm)
 		}
 
 		if !e.timeFunc().Before(cs.ValidUntil) {
@@ -535,8 +602,7 @@ func (e *Manager) WriteIndex(ctx context.Context, unprefixedBlobID blob.ID, data
 
 		e.Invalidate()
 
-		// nolint:wrapcheck
-		return e.st.GetMetadata(ctx, blobID)
+		return results, nil
 	}
 }
 
@@ -554,7 +620,7 @@ func (e *Manager) getCompleteIndexSetForCommittedState(ctx context.Context, cs C
 	startEpoch := minEpoch
 
 	for _, c := range cs.LongestRangeCheckpointSets {
-		if c.MaxEpoch > maxEpoch {
+		if c.MaxEpoch > startEpoch {
 			result = append(result, c.Blobs...)
 			startEpoch = c.MaxEpoch + 1
 		}
@@ -602,7 +668,7 @@ func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs CurrentSna
 	}
 
 	// load uncompacted blobs for this epoch
-	uncompactedBlobs, err := blob.ListAllBlobs(ctx, e.st, uncompactedEpochBlobPrefix(epoch))
+	uncompactedBlobs, err := blob.ListAllBlobs(ctx, e.st, UncompactedEpochBlobPrefix(epoch))
 	if err != nil {
 		return nil, errors.Wrapf(err, "error listing uncompacted indexes for epoch %v", epoch)
 	}
@@ -659,16 +725,17 @@ func (e *Manager) generateRangeCheckpointFromCommittedState(ctx context.Context,
 	return nil
 }
 
-func uncompactedEpochBlobPrefix(epoch int) blob.ID {
-	return blob.ID(fmt.Sprintf("%v%v_", uncompactedIndexBlobPrefix, epoch))
+// UncompactedEpochBlobPrefix returns the prefix for uncompacted blobs of a given epoch.
+func UncompactedEpochBlobPrefix(epoch int) blob.ID {
+	return blob.ID(fmt.Sprintf("%v%v_", UncompactedIndexBlobPrefix, epoch))
 }
 
 func compactedEpochBlobPrefix(epoch int) blob.ID {
-	return blob.ID(fmt.Sprintf("%v%v_", singleEpochCompactionBlobPrefix, epoch))
+	return blob.ID(fmt.Sprintf("%v%v_", SingleEpochCompactionBlobPrefix, epoch))
 }
 
 func rangeCheckpointBlobPrefix(epoch1, epoch2 int) blob.ID {
-	return blob.ID(fmt.Sprintf("%v%v_%v_", rangeCheckpointIndexBlobPrefix, epoch1, epoch2))
+	return blob.ID(fmt.Sprintf("%v%v_%v_", RangeCheckpointIndexBlobPrefix, epoch1, epoch2))
 }
 
 // NewManager creates new epoch manager.

+ 21 - 9
internal/epoch/epoch_manager_test.go

@@ -95,6 +95,7 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv {
 	st = logging.NewWrapper(st, t.Logf, "[STORAGE] ")
 	te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft}
 	m := NewManager(te.st, Parameters{
+		Enabled:                 true,
 		EpochRefreshFrequency:   20 * time.Minute,
 		FullCheckpointFrequency: 7,
 		// increased safety margin because we're moving fake clock very fast
@@ -109,6 +110,8 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv {
 	te.faultyStorage = fs
 	te.data = data
 
+	t.Cleanup(te.mgr.Flush)
+
 	return te
 }
 
@@ -174,7 +177,9 @@ func TestIndexEpochManager_Parallel(t *testing.T) {
 
 				ndx := newFakeIndexWithEntries(indexNum)
 
-				if _, err := te2.mgr.WriteIndex(ctx, blob.ID(fmt.Sprintf("w%vr%x", worker, rnd)), gather.FromSlice(ndx.Bytes())); err != nil {
+				if _, err := te2.mgr.WriteIndex(ctx, map[blob.ID]blob.Bytes{
+					blob.ID(fmt.Sprintf("w%vr%x", worker, rnd)): gather.FromSlice(ndx.Bytes()),
+				}); err != nil {
 					return errors.Wrap(err, "error writing")
 				}
 
@@ -252,9 +257,9 @@ func TestIndexEpochManager_RogueBlobs(t *testing.T) {
 
 	te := newTestEnv(t)
 
-	te.data[epochMarkerIndexBlobPrefix+"zzzz"] = []byte{1}
-	te.data[singleEpochCompactionBlobPrefix+"zzzz"] = []byte{1}
-	te.data[rangeCheckpointIndexBlobPrefix+"zzzz"] = []byte{1}
+	te.data[EpochMarkerIndexBlobPrefix+"zzzz"] = []byte{1}
+	te.data[SingleEpochCompactionBlobPrefix+"zzzz"] = []byte{1}
+	te.data[RangeCheckpointIndexBlobPrefix+"zzzz"] = []byte{1}
 
 	verifySequentialWrites(t, te)
 	te.mgr.Cleanup(testlogging.Context(t))
@@ -327,7 +332,6 @@ func TestIndexEpochManager_DeletionFailing(t *testing.T) {
 
 func TestRefreshRetriesIfTakingTooLong(t *testing.T) {
 	te := newTestEnv(t)
-	defer te.mgr.Flush()
 
 	te.faultyStorage.Faults = map[string][]*blobtesting.Fault{
 		"ListBlobs": {
@@ -351,7 +355,6 @@ func TestRefreshRetriesIfTakingTooLong(t *testing.T) {
 
 func TestGetCompleteIndexSetRetriesIfTookTooLong(t *testing.T) {
 	te := newTestEnv(t)
-	defer te.mgr.Flush()
 
 	ctx := testlogging.Context(t)
 
@@ -386,7 +389,6 @@ func TestGetCompleteIndexSetRetriesIfTookTooLong(t *testing.T) {
 
 func TestSlowWrite(t *testing.T) {
 	te := newTestEnv(t)
-	defer te.mgr.Flush()
 
 	ctx := testlogging.Context(t)
 
@@ -410,7 +412,6 @@ func TestSlowWrite(t *testing.T) {
 
 func TestForceAdvanceEpoch(t *testing.T) {
 	te := newTestEnv(t)
-	defer te.mgr.Flush()
 
 	ctx := testlogging.Context(t)
 	cs, err := te.mgr.Current(ctx)
@@ -471,6 +472,15 @@ func verifySequentialWrites(t *testing.T, te *epochManagerTestEnv) {
 	t.Logf("total remaining %v", len(te.data))
 }
 
+func TestIndexEpochManager_Disabled(t *testing.T) {
+	te := newTestEnv(t)
+
+	te.mgr.Params.Enabled = false
+
+	_, err := te.mgr.Current(testlogging.Context(t))
+	require.Error(t, err)
+}
+
 func randomTime(min, max time.Duration) time.Duration {
 	return time.Duration(float64(max-min)*rand.Float64() + float64(min))
 }
@@ -516,6 +526,8 @@ func (te *epochManagerTestEnv) mustWriteIndexFile(ctx context.Context, t *testin
 
 	rand.Read(rnd[:])
 
-	_, err := te.mgr.WriteIndex(ctx, blob.ID(hex.EncodeToString(rnd[:])), gather.FromSlice(ndx.Bytes()))
+	_, err := te.mgr.WriteIndex(ctx, map[blob.ID]blob.Bytes{
+		blob.ID(hex.EncodeToString(rnd[:])): gather.FromSlice(ndx.Bytes()),
+	})
 	require.NoError(t, err)
 }

+ 1 - 1
internal/epoch/epoch_range.go

@@ -35,7 +35,7 @@ func findLongestRangeCheckpointStartingAt(startEpoch int, byMin, memo map[int][]
 	for _, cp := range byMin[startEpoch] {
 		combined := append([]*RangeMetadata{cp}, findLongestRangeCheckpointStartingAt(cp.MaxEpoch+1, byMin, memo)...)
 
-		if max := combined[len(combined)-1].MaxEpoch; max > longest {
+		if max := combined[len(combined)-1].MaxEpoch; (max > longest) || (max == longest && len(combined) < len(longestMetadata)) {
 			longest = max
 			longestMetadata = combined
 		}

+ 6 - 0
internal/epoch/epoch_range_test.go

@@ -12,6 +12,7 @@ func TestLongestRangeCheckpoint(t *testing.T) {
 	m10_19 := newEpochRangeMetadataForTesting(10, 19)
 	m20_29 := newEpochRangeMetadataForTesting(20, 29)
 	m30_39 := newEpochRangeMetadataForTesting(30, 39)
+	m40_49 := newEpochRangeMetadataForTesting(40, 49)
 	m50_59 := newEpochRangeMetadataForTesting(50, 59)
 	m10_59 := newEpochRangeMetadataForTesting(10, 59)
 
@@ -51,6 +52,11 @@ func TestLongestRangeCheckpoint(t *testing.T) {
 			input: []*RangeMetadata{m0_9, m0_9, m0_29, m10_59, m30_39},
 			want:  []*RangeMetadata{m0_9, m10_59},
 		},
+		{
+			// two equivalent sequences, shorter one wins
+			input: []*RangeMetadata{m10_59, m30_39, m50_59, m40_49, m0_9, m0_29},
+			want:  []*RangeMetadata{m0_9, m10_59},
+		},
 	}
 
 	for _, tc := range cases {

+ 19 - 0
internal/testutil/testutil.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"fmt"
 	"os"
+	"reflect"
 	"runtime"
 	"strings"
 	"testing"
@@ -81,3 +82,21 @@ func MustParseJSONLines(t *testing.T, lines []string, v interface{}) {
 		t.Fatalf("failed to parse JSON %v: %v", allJSON, err)
 	}
 }
+
+// RunAllTestsWithParam uses reflection to run all test methods starting with 'Test' on the provided object.
+// nolint:thelper
+func RunAllTestsWithParam(t *testing.T, v interface{}) {
+	m := reflect.ValueOf(v)
+	typ := m.Type()
+
+	for i := 0; i < typ.NumMethod(); i++ {
+		i := i
+		meth := typ.Method(i)
+
+		if strings.HasPrefix(meth.Name, "Test") {
+			t.Run(meth.Name, func(t *testing.T) {
+				m.Method(i).Call([]reflect.Value{reflect.ValueOf(t)})
+			})
+		}
+	}
+}

+ 25 - 1
repo/blob/storage.go

@@ -194,7 +194,31 @@ func IDsFromMetadata(mds []Metadata) []ID {
 	return ids
 }
 
-// MaxTimestamp returns IDs for blobs in Metadata slice.
+// TotalLength returns minimum timestamp for blobs in Metadata slice.
+func TotalLength(mds []Metadata) int64 {
+	var total int64
+
+	for _, md := range mds {
+		total += md.Length
+	}
+
+	return total
+}
+
+// MinTimestamp returns minimum timestamp for blobs in Metadata slice.
+func MinTimestamp(mds []Metadata) time.Time {
+	min := time.Time{}
+
+	for _, md := range mds {
+		if min.IsZero() || md.Timestamp.Before(min) {
+			min = md.Timestamp
+		}
+	}
+
+	return min
+}
+
+// MaxTimestamp returns maxinum timestamp for blobs in Metadata slice.
 func MaxTimestamp(mds []Metadata) time.Time {
 	max := time.Time{}
 

+ 25 - 0
repo/blob/storage_test.go

@@ -162,6 +162,31 @@ func TestMaxTimestamp(t *testing.T) {
 	require.Equal(t, time.Time{}, blob.MaxTimestamp([]blob.Metadata{}))
 }
 
+func TestMinTimestamp(t *testing.T) {
+	t0 := time.Date(2020, 1, 2, 3, 4, 5, 6, time.UTC)
+	t1 := t0.Add(1 * time.Hour)
+	t2 := t0.Add(-1 * time.Hour)
+
+	require.Equal(t,
+		t2,
+		blob.MinTimestamp([]blob.Metadata{
+			{BlobID: "foo", Timestamp: t0},
+			{BlobID: "bar", Timestamp: t1},
+			{BlobID: "baz", Timestamp: t2},
+		}))
+
+	require.Equal(t, time.Time{}, blob.MinTimestamp([]blob.Metadata{}))
+}
+
+func TestTotalLength(t *testing.T) {
+	require.Equal(t,
+		int64(357),
+		blob.TotalLength([]blob.Metadata{
+			{BlobID: "foo", Length: 123},
+			{BlobID: "bar", Length: 234},
+		}))
+}
+
 func TestDeleteMultiple(t *testing.T) {
 	data := blobtesting.DataMap{
 		"foo": []byte{1, 2, 3},

+ 51 - 5
repo/content/committed_read_manager.go

@@ -14,6 +14,7 @@ import (
 	"github.com/kopia/kopia/internal/buf"
 	"github.com/kopia/kopia/internal/cache"
 	"github.com/kopia/kopia/internal/clock"
+	"github.com/kopia/kopia/internal/epoch"
 	"github.com/kopia/kopia/internal/listcache"
 	"github.com/kopia/kopia/internal/ownwrites"
 	"github.com/kopia/kopia/repo/blob"
@@ -29,7 +30,16 @@ const indexRecoverPostambleSize = 8192
 
 const ownWritesCacheDuration = 15 * time.Minute
 
-var cachedIndexBlobPrefixes = []blob.ID{IndexBlobPrefix, compactionLogBlobPrefix, cleanupBlobPrefix}
+var cachedIndexBlobPrefixes = []blob.ID{
+	IndexBlobPrefix,
+	compactionLogBlobPrefix,
+	cleanupBlobPrefix,
+
+	epoch.UncompactedIndexBlobPrefix,
+	epoch.EpochMarkerIndexBlobPrefix,
+	epoch.SingleEpochCompactionBlobPrefix,
+	epoch.RangeCheckpointIndexBlobPrefix,
+}
 
 // indexBlobManager is the API of index blob manager as used by content manager.
 type indexBlobManager interface {
@@ -44,9 +54,13 @@ type SharedManager struct {
 	refCount int32 // number of Manager objects that refer to this SharedManager
 	closed   int32 // set to 1 if shared manager has been closed
 
-	Stats             *Stats
-	st                blob.Storage
-	indexBlobManager  indexBlobManager
+	Stats *Stats
+	st    blob.Storage
+
+	indexBlobManager   indexBlobManager // points at either indexBlobManagerV0 or indexBlobManagerV1
+	indexBlobManagerV0 *indexBlobManagerV0
+	indexBlobManagerV1 *indexBlobManagerV1
+
 	contentCache      contentCache
 	metadataCache     contentCache
 	committedContents *committedContentIndex
@@ -355,7 +369,8 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
 		log:            logging.WithPrefix("[encrypted-blob-manager] ", sm.sharedBaseLogger),
 	}
 
-	sm.indexBlobManager = &indexBlobManagerV0{
+	// set up legacy index blob manager
+	sm.indexBlobManagerV0 = &indexBlobManagerV0{
 		st:             cachedSt,
 		enc:            sm.enc,
 		timeNow:        sm.timeNow,
@@ -365,6 +380,25 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
 		log:            logging.WithPrefix("[index-blob-manager] ", sm.sharedBaseLogger),
 	}
 
+	// set up new index blob manager
+	sm.indexBlobManagerV1 = &indexBlobManagerV1{
+		st:             cachedSt,
+		enc:            sm.enc,
+		timeNow:        sm.timeNow,
+		maxPackSize:    sm.maxPackSize,
+		indexShardSize: sm.indexShardSize,
+		indexVersion:   sm.indexVersion,
+		log:            logging.WithPrefix("[index-blob-manager] ", sm.sharedBaseLogger),
+	}
+	sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, sm.format.EpochParameters, sm.indexBlobManagerV1.compactEpoch, sm.sharedBaseLogger)
+
+	// select active index blob manager based on parameters
+	if sm.format.EpochParameters.Enabled {
+		sm.indexBlobManager = sm.indexBlobManagerV1
+	} else {
+		sm.indexBlobManager = sm.indexBlobManagerV0
+	}
+
 	// once everything is ready, set it up
 	sm.contentCache = dataCache
 	sm.metadataCache = metadataCache
@@ -373,6 +407,16 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
 	return nil
 }
 
+// EpochManager returns the epoch manager.
+func (sm *SharedManager) EpochManager() (*epoch.Manager, bool) {
+	ibm1, ok := sm.indexBlobManager.(*indexBlobManagerV1)
+	if !ok {
+		return nil, false
+	}
+
+	return ibm1.epochMgr, true
+}
+
 // AddRef adds a reference to shared manager to prevents its closing on Release().
 func (sm *SharedManager) addRef() {
 	if atomic.LoadInt32(&sm.closed) != 0 {
@@ -414,6 +458,8 @@ func (sm *SharedManager) release(ctx context.Context) error {
 
 	sm.internalLogManager.Close(ctx)
 
+	sm.indexBlobManagerV1.epochMgr.Flush()
+
 	return errors.Wrap(sm.st.Close(ctx), "error closing storage")
 }
 

+ 8 - 16
repo/content/content_cache_test.go

@@ -28,8 +28,8 @@ func newUnderlyingStorageForContentCacheTesting(t *testing.T) blob.Storage {
 	ctx := testlogging.Context(t)
 	data := blobtesting.DataMap{}
 	st := blobtesting.NewMapStorage(data, nil, nil)
-	assertNoError(t, st.PutBlob(ctx, "content-1", gather.FromSlice([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})))
-	assertNoError(t, st.PutBlob(ctx, "content-4k", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3, 4}, 1000)))) // 4000 bytes
+	require.NoError(t, st.PutBlob(ctx, "content-1", gather.FromSlice([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})))
+	require.NoError(t, st.PutBlob(ctx, "content-4k", gather.FromSlice(bytes.Repeat([]byte{1, 2, 3, 4}, 1000)))) // 4000 bytes
 
 	return st
 }
@@ -70,13 +70,13 @@ func TestCacheExpiration(t *testing.T) {
 	defer cc.close(ctx)
 
 	_, err = cc.getContent(ctx, "00000a", "content-4k", 0, -1) // 4k
-	assertNoError(t, err)
+	require.NoError(t, err)
 	_, err = cc.getContent(ctx, "00000b", "content-4k", 0, -1) // 4k
-	assertNoError(t, err)
+	require.NoError(t, err)
 	_, err = cc.getContent(ctx, "00000c", "content-4k", 0, -1) // 4k
-	assertNoError(t, err)
+	require.NoError(t, err)
 	_, err = cc.getContent(ctx, "00000d", "content-4k", 0, -1) // 4k
-	assertNoError(t, err)
+	require.NoError(t, err)
 
 	// wait for a sweep
 	time.Sleep(2 * time.Second)
@@ -84,7 +84,7 @@ func TestCacheExpiration(t *testing.T) {
 	// 00000a and 00000b will be removed from cache because it's the oldest.
 	// to verify, let's remove content-4k from the underlying storage and make sure we can still read
 	// 00000c and 00000d from the cache but not 00000a nor 00000b
-	assertNoError(t, underlyingStorage.DeleteBlob(ctx, "content-4k"))
+	require.NoError(t, underlyingStorage.DeleteBlob(ctx, "content-4k"))
 
 	cases := []struct {
 		blobID        blob.ID
@@ -309,7 +309,7 @@ func verifyStorageContentList(t *testing.T, st blob.Storage, expectedContents ..
 
 	var foundContents []blob.ID
 
-	assertNoError(t, st.ListBlobs(testlogging.Context(t), "", func(bm blob.Metadata) error {
+	require.NoError(t, st.ListBlobs(testlogging.Context(t), "", func(bm blob.Metadata) error {
 		foundContents = append(foundContents, bm.BlobID)
 		return nil
 	}))
@@ -323,14 +323,6 @@ func verifyStorageContentList(t *testing.T, st blob.Storage, expectedContents ..
 	}
 }
 
-func assertNoError(t *testing.T, err error) {
-	t.Helper()
-
-	if err != nil {
-		t.Errorf("err: %v", err)
-	}
-}
-
 type withoutTouchBlob struct {
 	blob.Storage
 }

+ 8 - 2
repo/content/content_formatting_options.go

@@ -3,6 +3,7 @@ package content
 import (
 	"github.com/pkg/errors"
 
+	"github.com/kopia/kopia/internal/epoch"
 	"github.com/kopia/kopia/internal/units"
 )
 
@@ -24,8 +25,9 @@ type FormattingOptions struct {
 // MutableParameters represents parameters of the content manager that can be mutated after the repository
 // is created.
 type MutableParameters struct {
-	MaxPackSize  int `json:"maxPackSize,omitempty"`  // maximum size of a pack object
-	IndexVersion int `json:"indexVersion,omitempty"` // force particular index format version (1,2,..)
+	MaxPackSize     int              `json:"maxPackSize,omitempty"`     // maximum size of a pack object
+	IndexVersion    int              `json:"indexVersion,omitempty"`    // force particular index format version (1,2,..)
+	EpochParameters epoch.Parameters `json:"epochParameters,omitempty"` // epoch manager parameters
 }
 
 // Validate validates the parameters.
@@ -42,6 +44,10 @@ func (v *MutableParameters) Validate() error {
 		return errors.Errorf("invalid index version, supported versions are 1 & 2")
 	}
 
+	if err := v.EpochParameters.Validate(); err != nil {
+		return errors.Wrap(err, "invalid epoch parameters")
+	}
+
 	return nil
 }
 

+ 13 - 4
repo/content/content_index_recovery_test.go

@@ -4,16 +4,20 @@ import (
 	"testing"
 	"time"
 
+	"github.com/stretchr/testify/require"
+
 	"github.com/kopia/kopia/internal/blobtesting"
 	"github.com/kopia/kopia/internal/testlogging"
 	"github.com/kopia/kopia/repo/blob"
 )
 
-func TestContentIndexRecovery(t *testing.T) {
+func (s *contentManagerSuite) TestContentIndexRecovery(t *testing.T) {
 	ctx := testlogging.Context(t)
 	data := blobtesting.DataMap{}
 	keyTime := map[blob.ID]time.Time{}
-	bm := newTestContentManagerWithCustomTime(t, data, keyTime, nil)
+	st := blobtesting.NewMapStorage(data, keyTime, nil)
+
+	bm := s.newTestContentManagerWithCustomTime(t, st, nil)
 
 	content1 := writeContentAndVerify(ctx, t, bm, seededRandomData(10, 100))
 	content2 := writeContentAndVerify(ctx, t, bm, seededRandomData(11, 100))
@@ -24,7 +28,12 @@ func TestContentIndexRecovery(t *testing.T) {
 	}
 
 	// delete all index blobs
-	assertNoError(t, bm.st.ListBlobs(ctx, IndexBlobPrefix, func(bi blob.Metadata) error {
+	require.NoError(t, bm.st.ListBlobs(ctx, IndexBlobPrefix, func(bi blob.Metadata) error {
+		t.Logf("deleting %v", bi.BlobID)
+		return bm.st.DeleteBlob(ctx, bi.BlobID)
+	}))
+
+	require.NoError(t, bm.st.ListBlobs(ctx, "x", func(bi blob.Metadata) error {
 		t.Logf("deleting %v", bi.BlobID)
 		return bm.st.DeleteBlob(ctx, bi.BlobID)
 	}))
@@ -32,7 +41,7 @@ func TestContentIndexRecovery(t *testing.T) {
 	bm.Close(ctx)
 
 	// now with index blobs gone, all contents appear to not be found
-	bm = newTestContentManagerWithCustomTime(t, data, keyTime, nil)
+	bm = s.newTestContentManagerWithCustomTime(t, st, nil)
 	defer bm.Close(ctx)
 
 	verifyContentNotFound(ctx, t, bm, content1)

文件差異過大導致無法顯示
+ 213 - 137
repo/content/content_manager_test.go


+ 3 - 0
repo/content/content_reader.go

@@ -2,6 +2,8 @@ package content
 
 import (
 	"context"
+
+	"github.com/kopia/kopia/internal/epoch"
 )
 
 // Reader defines content read API.
@@ -13,4 +15,5 @@ type Reader interface {
 	IterateContents(ctx context.Context, opts IterateOptions, callback IterateCallback) error
 	IteratePacks(ctx context.Context, opts IteratePackOptions, callback IteratePacksCallback) error
 	ListActiveSessions(ctx context.Context) (map[SessionID]*SessionInfo, error)
+	EpochManager() (*epoch.Manager, bool)
 }

+ 6 - 6
repo/content/index_blob_manager_v0.go

@@ -445,7 +445,7 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [
 	for i, indexBlob := range indexBlobs {
 		m.log.Debugf("compacting-entries[%v/%v] %v", i, len(indexBlobs), indexBlob)
 
-		if err := m.addIndexBlobsToBuilder(ctx, bld, indexBlob); err != nil {
+		if err := addIndexBlobsToBuilder(ctx, m.enc, bld, indexBlob.BlobID); err != nil {
 			return errors.Wrap(err, "error adding index to builder")
 		}
 
@@ -497,15 +497,15 @@ func (m *indexBlobManagerV0) dropContentsFromBuilder(bld packIndexBuilder, opt C
 	}
 }
 
-func (m *indexBlobManagerV0) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuilder, indexBlob IndexBlobInfo) error {
-	data, err := m.getIndexBlob(ctx, indexBlob.BlobID)
+func addIndexBlobsToBuilder(ctx context.Context, enc *encryptedBlobMgr, bld packIndexBuilder, indexBlobID blob.ID) error {
+	data, err := enc.getEncryptedBlob(ctx, indexBlobID)
 	if err != nil {
-		return errors.Wrapf(err, "error getting index %q", indexBlob.BlobID)
+		return errors.Wrapf(err, "error getting index %q", indexBlobID)
 	}
 
-	index, err := openPackIndex(bytes.NewReader(data), uint32(m.enc.crypter.Encryptor.Overhead()))
+	index, err := openPackIndex(bytes.NewReader(data), uint32(enc.crypter.Encryptor.Overhead()))
 	if err != nil {
-		return errors.Wrapf(err, "unable to open index blob %q", indexBlob)
+		return errors.Wrapf(err, "unable to open index blob %q", indexBlobID)
 	}
 
 	_ = index.Iterate(AllIDs, func(i Info) error {

+ 133 - 0
repo/content/index_blob_manager_v1.go

@@ -0,0 +1,133 @@
+package content
+
+import (
+	"context"
+	"crypto/rand"
+	"fmt"
+	"time"
+
+	"github.com/pkg/errors"
+
+	"github.com/kopia/kopia/internal/epoch"
+	"github.com/kopia/kopia/internal/gather"
+	"github.com/kopia/kopia/repo/blob"
+	"github.com/kopia/kopia/repo/logging"
+)
+
+type indexBlobManagerV1 struct {
+	st             blob.Storage
+	enc            *encryptedBlobMgr
+	epochMgr       *epoch.Manager
+	timeNow        func() time.Time
+	log            logging.Logger
+	maxPackSize    int
+	indexVersion   int
+	indexShardSize int
+}
+
+func (m *indexBlobManagerV1) listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) {
+	active, err := m.epochMgr.GetCompleteIndexSet(ctx, epoch.LatestEpoch)
+	if err != nil {
+		return nil, errors.Wrap(err, "error getting index set")
+	}
+
+	var result []IndexBlobInfo
+
+	for _, bm := range active {
+		result = append(result, IndexBlobInfo{Metadata: bm})
+	}
+
+	m.log.Errorf("active indexes %v", blob.IDsFromMetadata(active))
+
+	return result, nil
+}
+
+func (m *indexBlobManagerV1) flushCache(ctx context.Context) {
+	if err := m.st.FlushCaches(ctx); err != nil {
+		m.log.Debugf("error flushing caches: %v", err)
+	}
+}
+
+func (m *indexBlobManagerV1) compact(ctx context.Context, opt CompactOptions) error {
+	return nil
+}
+
+func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
+	tmpbld := make(packIndexBuilder)
+
+	for _, indexBlob := range blobIDs {
+		if err := addIndexBlobsToBuilder(ctx, m.enc, tmpbld, indexBlob); err != nil {
+			return errors.Wrap(err, "error adding index to builder")
+		}
+	}
+
+	dataShards, err := tmpbld.buildShards(m.indexVersion, true, m.indexShardSize)
+	if err != nil {
+		return errors.Wrap(err, "unable to build index dataShards")
+	}
+
+	var rnd [8]byte
+
+	if _, err := rand.Read(rnd[:]); err != nil {
+		return errors.Wrap(err, "error getting random session ID")
+	}
+
+	sessionID := fmt.Sprintf("s%x-c%v", rnd[:], len(dataShards))
+
+	for _, data := range dataShards {
+		blobID, data2, err := m.enc.crypter.EncryptBLOB(data, outputPrefix, SessionID(sessionID))
+		if err != nil {
+			return errors.Wrap(err, "error encrypting")
+		}
+
+		if err := m.st.PutBlob(ctx, blobID, gather.FromSlice(data2)); err != nil {
+			return errors.Wrap(err, "error writing index blob")
+		}
+	}
+
+	return nil
+}
+
+func (m *indexBlobManagerV1) writeIndexBlobs(ctx context.Context, dataShards [][]byte, sessionID SessionID) ([]blob.Metadata, error) {
+	shards := map[blob.ID]blob.Bytes{}
+
+	sessionID = SessionID(fmt.Sprintf("%v-c%v", sessionID, len(dataShards)))
+
+	for _, data := range dataShards {
+		unprefixedBlobID, data2, err := m.enc.crypter.EncryptBLOB(data, "", sessionID)
+		if err != nil {
+			return nil, errors.Wrap(err, "error encrypting")
+		}
+
+		shards[unprefixedBlobID] = gather.FromSlice(data2)
+	}
+
+	// nolint:wrapcheck
+	return m.epochMgr.WriteIndex(ctx, shards)
+}
+
+var _ indexBlobManager = (*indexBlobManagerV1)(nil)
+
+// PrepareUpgradeToIndexBlobManagerV1 prepares the repository for migrating to IndexBlobManagerV1.
+func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context, params epoch.Parameters) error {
+	sm.indexBlobManagerV1.epochMgr.Params = params
+
+	ibl, err := sm.indexBlobManagerV0.listActiveIndexBlobs(ctx)
+	if err != nil {
+		return errors.Wrap(err, "error listing active index blobs")
+	}
+
+	var blobIDs []blob.ID
+
+	for _, ib := range ibl {
+		blobIDs = append(blobIDs, ib.BlobID)
+	}
+
+	if err := sm.indexBlobManagerV1.compactEpoch(ctx, blobIDs, epoch.UncompactedEpochBlobPrefix(epoch.FirstEpoch)); err != nil {
+		return errors.Wrap(err, "unable to generate initial epoch")
+	}
+
+	sm.indexBlobManager = sm.indexBlobManagerV1
+
+	return nil
+}

+ 3 - 2
repo/content/merged_test.go

@@ -6,6 +6,7 @@ import (
 	"testing"
 
 	"github.com/pkg/errors"
+	"github.com/stretchr/testify/require"
 )
 
 func TestMerged(t *testing.T) {
@@ -49,7 +50,7 @@ func TestMerged(t *testing.T) {
 		t.Errorf("invalid pack offset %v, wanted %v", got, want)
 	}
 
-	assertNoError(t, m.Iterate(AllIDs, func(i Info) error {
+	require.NoError(t, m.Iterate(AllIDs, func(i Info) error {
 		if i.GetContentID() == "de1e1e" {
 			if i.GetDeleted() {
 				t.Errorf("iteration preferred deleted content over non-deleted")
@@ -138,7 +139,7 @@ func iterateIDRange(t *testing.T, m packIndex, r IDRange) []ID {
 
 	var inOrder []ID
 
-	assertNoError(t, m.Iterate(r, func(i Info) error {
+	require.NoError(t, m.Iterate(r, func(i Info) error {
 		inOrder = append(inOrder, i.GetContentID())
 		return nil
 	}))

+ 2 - 2
repo/content/packindex_test.go

@@ -208,7 +208,7 @@ func testPackIndex(t *testing.T, version int) {
 
 	cnt := 0
 
-	assertNoError(t, ndx.Iterate(AllIDs, func(info2 Info) error {
+	require.NoError(t, ndx.Iterate(AllIDs, func(info2 Info) error {
 		want := infoMap[info2.GetContentID()]
 		if version == 1 {
 			// v1 does not preserve original length.
@@ -244,7 +244,7 @@ func testPackIndex(t *testing.T, version int) {
 	for _, prefix := range prefixes {
 		cnt2 := 0
 		prefix := prefix
-		assertNoError(t, ndx.Iterate(PrefixRange(prefix), func(info2 Info) error {
+		require.NoError(t, ndx.Iterate(PrefixRange(prefix), func(info2 Info) error {
 			cnt2++
 			if !strings.HasPrefix(string(info2.GetContentID()), string(prefix)) {
 				t.Errorf("unexpected item %v when iterating prefix %v", info2.GetContentID(), prefix)

+ 3 - 2
repo/initialize.go

@@ -100,8 +100,9 @@ func repositoryObjectFormatFromOptions(opt *NewRepositoryOptions) *repositoryObj
 			HMACSecret: applyDefaultRandomBytes(opt.BlockFormat.HMACSecret, hmacSecretLength),
 			MasterKey:  applyDefaultRandomBytes(opt.BlockFormat.MasterKey, masterKeyLength),
 			MutableParameters: content.MutableParameters{
-				MaxPackSize:  applyDefaultInt(opt.BlockFormat.MaxPackSize, 20<<20), //nolint:gomnd
-				IndexVersion: applyDefaultInt(opt.BlockFormat.IndexVersion, content.DefaultIndexVersion),
+				MaxPackSize:     applyDefaultInt(opt.BlockFormat.MaxPackSize, 20<<20), //nolint:gomnd
+				IndexVersion:    applyDefaultInt(opt.BlockFormat.IndexVersion, content.DefaultIndexVersion),
+				EpochParameters: opt.BlockFormat.EpochParameters,
 			},
 		},
 		Format: object.Format{

+ 0 - 1
repo/repository.go

@@ -74,7 +74,6 @@ type DirectRepositoryWriter interface {
 	BlobStorage() blob.Storage
 	ContentManager() *content.WriteManager
 	SetParameters(ctx context.Context, m content.MutableParameters) error
-	Upgrade(ctx context.Context) error
 }
 
 type directRepositoryParameters struct {

+ 0 - 12
repo/repository_test.go

@@ -156,18 +156,6 @@ func TestHMAC(t *testing.T) {
 	}
 }
 
-func TestUpgrade(t *testing.T) {
-	ctx, env := repotesting.NewEnvironment(t)
-
-	if err := env.RepositoryWriter.Upgrade(ctx); err != nil {
-		t.Errorf("upgrade error: %v", err)
-	}
-
-	if err := env.RepositoryWriter.Upgrade(ctx); err != nil {
-		t.Errorf("2nd upgrade error: %v", err)
-	}
-}
-
 func TestReaderStoredBlockNotFound(t *testing.T) {
 	ctx, env := repotesting.NewEnvironment(t)
 

+ 0 - 33
repo/upgrade.go

@@ -1,33 +0,0 @@
-package repo
-
-import (
-	"context"
-
-	"github.com/pkg/errors"
-)
-
-// Upgrade upgrades repository data structures to the latest version.
-func (r *directRepository) Upgrade(ctx context.Context) error {
-	f := r.formatBlob
-
-	repoConfig, err := f.decryptFormatBytes(r.masterKey)
-	if err != nil {
-		return errors.Wrap(err, "unable to decrypt repository config")
-	}
-
-	var migrated bool
-
-	// add migration code here
-	if !migrated {
-		log(ctx).Infof("nothing to do")
-		return nil
-	}
-
-	if err := encryptFormatBytes(f, repoConfig, r.masterKey, f.UniqueID); err != nil {
-		return errors.Errorf("unable to encrypt format bytes")
-	}
-
-	log(ctx).Infof("writing updated format content...")
-
-	return writeFormatBlob(ctx, r.blobs, f)
-}

部分文件因文件數量過多而無法顯示