Explorar el Código

feat(cli): implementation for 'kopia snapshot fix' (#1930)

* feat(cli): implementation for 'kopia snapshot fix'

This allows modifications and fixes to the snapshots after they have
been taken.

Supported are:

* `kopia snapshot fix remove-invalid-files [--verify-files-percent=X]`

Removes all directory entries where the underlying files cannot be
read based on index analysis (this does not read the files, only index
structures so is reasonably quick).

`--verify-files-percent=100` can be used to trigger full read for
all files.

* `kopia snapshot fix remove-files --object-id=<object-id>`

Removes the object with a given ID from the entire snapshot tree.
Useful when you accidentally snapshot a sensitive file.

* `kopia snapshot fix remove-files --filename=<wildcard>`

Removes the files with a given name from the entire snapshot tree.
Useful when you accidentally snapshot a sensitive file.

By default all snapshots are analyzed and rewritten. To limit the scope
use:

--source=user@host:/path
--manifest-id=manifestID

By default the rewrite operation writes new directory entries but
does not replace the manifests. To do that pass `--commit`.

Related #1906
Fixes #799

reorganized CLI per PR suggestion

* additional logging for diff command

* added Clone() method to snapshot manifst and directory entry

* added a comprehensive test, moved DirRewriter  to separate file

* pr feedback

* more pr feedback

* improved logging output

* disable test in -race configuration since it's way to slow

* pr feedback
Jarek Kowalski hace 3 años
padre
commit
f49bcdd883

+ 2 - 0
cli/command_snapshot.go

@@ -7,6 +7,7 @@ type commandSnapshot struct {
 	delete      commandSnapshotDelete
 	estimate    commandSnapshotEstimate
 	expire      commandSnapshotExpire
+	fix         commandSnapshotFix
 	gc          commandSnapshotGC
 	list        commandSnapshotList
 	migrate     commandSnapshotMigrate
@@ -23,6 +24,7 @@ func (c *commandSnapshot) setup(svc advancedAppServices, parent commandParent) {
 	c.delete.setup(svc, cmd)
 	c.estimate.setup(svc, cmd)
 	c.expire.setup(svc, cmd)
+	c.fix.setup(svc, cmd)
 	c.gc.setup(svc, cmd)
 	c.list.setup(svc, cmd)
 	c.migrate.setup(svc, cmd)

+ 193 - 0
cli/command_snapshot_fix.go

@@ -0,0 +1,193 @@
+package cli
+
+import (
+	"context"
+
+	"github.com/alecthomas/kingpin"
+	"github.com/pkg/errors"
+
+	"github.com/kopia/kopia/internal/units"
+	"github.com/kopia/kopia/repo"
+	"github.com/kopia/kopia/repo/manifest"
+	"github.com/kopia/kopia/snapshot"
+	"github.com/kopia/kopia/snapshot/snapshotfs"
+)
+
+type commandSnapshotFix struct {
+	invalidFiles commandSnapshotFixInvalidFiles
+	removeFiles  commandSnapshotFixRemoveFiles
+}
+
+func (c *commandSnapshotFix) setup(svc appServices, parent commandParent) {
+	cmd := parent.Command("fix", "Commands to fix snapshot consistency issues")
+
+	c.invalidFiles.setup(svc, cmd)
+	c.removeFiles.setup(svc, cmd)
+}
+
+type commonRewriteSnapshots struct {
+	manifestIDs        []string
+	sources            []string
+	commit             bool
+	parallel           int
+	invalidDirHandling string
+}
+
+const (
+	invalidEntryKeep   = "keep"   // keep unreadable file/directory
+	invalidEntryStub   = "stub"   // replaces unreadable file/directory with a stub file
+	invalidEntryFail   = "fail"   // fail the command
+	invalidEntryRemove = "remove" // removes unreadable file/directory
+)
+
+func (c *commonRewriteSnapshots) setup(svc appServices, cmd *kingpin.CmdClause) {
+	_ = svc
+
+	cmd.Flag("manifest-id", "Manifest IDs").StringsVar(&c.manifestIDs)
+	cmd.Flag("source", "Source to target (username@hostname:/path)").StringsVar(&c.sources)
+	cmd.Flag("commit", "Update snapshot manifests").BoolVar(&c.commit)
+	cmd.Flag("parallel", "Parallelism").IntVar(&c.parallel)
+	cmd.Flag("invalid-directory-handling", "Handling of invalid directories").Default(invalidEntryStub).EnumVar(&c.invalidDirHandling, invalidEntryFail, invalidEntryStub, invalidEntryKeep)
+}
+
+func failedEntryCallback(rep repo.RepositoryWriter, enumVal string) snapshotfs.RewriteFailedEntryCallback {
+	switch enumVal {
+	default:
+		return snapshotfs.RewriteFail
+	case invalidEntryStub:
+		return snapshotfs.RewriteAsStub(rep)
+	case invalidEntryRemove:
+		return snapshotfs.RewriteRemove
+	case invalidEntryKeep:
+		return snapshotfs.RewriteKeep
+	}
+}
+
+func (c *commonRewriteSnapshots) rewriteMatchingSnapshots(ctx context.Context, rep repo.RepositoryWriter, rewrite snapshotfs.RewriteDirEntryCallback) error {
+	rw := snapshotfs.NewDirRewriter(rep, snapshotfs.DirRewriterOptions{
+		Parallel:               c.parallel,
+		RewriteEntry:           rewrite,
+		OnDirectoryReadFailure: failedEntryCallback(rep, c.invalidDirHandling),
+	})
+	defer rw.Close()
+
+	var updatedSnapshots int
+
+	manifestIDs, err := c.listManifestIDs(ctx, rep)
+	if err != nil {
+		return err
+	}
+
+	manifests, err := snapshot.LoadSnapshots(ctx, rep, manifestIDs)
+	if err != nil {
+		return errors.Wrap(err, "error loading snapshots")
+	}
+
+	for _, mg := range snapshot.GroupBySource(manifests) {
+		log(ctx).Infof("Processing snapshot %v", mg[0].Source)
+
+		for _, man := range snapshot.SortByTime(mg, false) {
+			log(ctx).Debugf("  %v (%v)", formatTimestamp(man.StartTime), man.ID)
+
+			old := man.Clone()
+
+			changed, err := rw.RewriteSnapshotManifest(ctx, man)
+			if err != nil {
+				return errors.Wrap(err, "error rewriting manifest")
+			}
+
+			if !changed {
+				log(ctx).Infof("  %v unchanged (%v)", formatTimestamp(man.StartTime), man.ID)
+
+				continue
+			}
+
+			if c.commit {
+				if err := snapshot.UpdateSnapshot(ctx, rep, man); err != nil {
+					return errors.Wrap(err, "error updating snapshot")
+				}
+			}
+
+			log(ctx).Infof("  %v replaced manifest from %v to %v", formatTimestamp(man.StartTime), old.ID, man.ID)
+			log(ctx).Infof("    diff %v %v", old.RootEntry.ObjectID, man.RootEntry.ObjectID)
+
+			if d := snapshotSizeDelta(old, man); d != "" {
+				log(ctx).Infof("    delta:%v", d)
+			}
+
+			updatedSnapshots++
+		}
+	}
+
+	if updatedSnapshots > 0 {
+		if !c.commit {
+			log(ctx).Infof("Fixed %v snapshots, but snapshot manifests were not updated. Pass --commit to update snapshots.", updatedSnapshots)
+		} else {
+			log(ctx).Infof("Fixed and committed %v snapshots.", updatedSnapshots)
+		}
+	}
+
+	if updatedSnapshots == 0 {
+		log(ctx).Infof("No changes.")
+	}
+
+	return nil
+}
+
+func snapshotSizeDelta(m1, m2 *snapshot.Manifest) string {
+	if m1.RootEntry == nil || m2.RootEntry == nil {
+		return ""
+	}
+
+	if m1.RootEntry.DirSummary == nil || m2.RootEntry.DirSummary == nil {
+		return ""
+	}
+
+	deltaBytes := m2.RootEntry.DirSummary.TotalFileSize - m1.RootEntry.DirSummary.TotalFileSize
+	if deltaBytes < 0 {
+		return "-" + units.BytesStringBase10(-deltaBytes)
+	}
+
+	if deltaBytes > 0 {
+		return "+" + units.BytesStringBase10(deltaBytes)
+	}
+
+	return ""
+}
+
+func (c *commonRewriteSnapshots) listManifestIDs(ctx context.Context, rep repo.Repository) ([]manifest.ID, error) {
+	manifests := toManifestIDs(c.manifestIDs)
+
+	for _, src := range c.sources {
+		log(ctx).Infof("Listing snapshots for source %q...", src)
+
+		si, err := snapshot.ParseSourceInfo(src, rep.ClientOptions().Hostname, rep.ClientOptions().Username)
+		if err != nil {
+			return nil, errors.Wrap(err, "unable to parse source")
+		}
+
+		m, err := snapshot.ListSnapshotManifests(ctx, rep, &si, nil)
+		if err != nil {
+			return nil, errors.Wrap(err, "unable to list manifests")
+		}
+
+		if len(m) == 0 {
+			return nil, errors.Errorf("no snapshots for %v", src)
+		}
+
+		manifests = append(manifests, m...)
+	}
+
+	if len(manifests) == 0 {
+		log(ctx).Infof("Listing all snapshots...")
+
+		m, err := snapshot.ListSnapshotManifests(ctx, rep, nil, nil)
+		if err != nil {
+			return nil, errors.Wrap(err, "unable to list snapshot manifests")
+		}
+
+		manifests = append(manifests, m...)
+	}
+
+	return manifests, nil
+}

+ 68 - 0
cli/command_snapshot_fix_invalid_files.go

@@ -0,0 +1,68 @@
+package cli
+
+import (
+	"context"
+
+	"github.com/pkg/errors"
+
+	"github.com/kopia/kopia/repo"
+	"github.com/kopia/kopia/repo/blob"
+	"github.com/kopia/kopia/snapshot"
+	"github.com/kopia/kopia/snapshot/snapshotfs"
+)
+
+type commandSnapshotFixInvalidFiles struct {
+	common commonRewriteSnapshots
+
+	verifyFilesPercent float64
+	verifier           *snapshotfs.Verifier
+
+	invalidFileHandling string
+
+	failedFileCallback snapshotfs.RewriteFailedEntryCallback
+}
+
+func (c *commandSnapshotFixInvalidFiles) setup(svc appServices, parent commandParent) {
+	cmd := parent.Command("invalid-files", "Remove references to any invalid (unreadable) files from snapshots.")
+	c.common.setup(svc, cmd)
+
+	cmd.Flag("invalid-file-handling", "How to handle invalid files").Default(invalidEntryStub).EnumVar(&c.invalidFileHandling, invalidEntryFail, invalidEntryStub, invalidEntryKeep, invalidEntryRemove)
+	cmd.Flag("verify-files-percent", "Verify a percentage of files by fully downloading them [0.0 .. 100.0]").Default("0").Float64Var(&c.verifyFilesPercent)
+
+	cmd.Action(svc.repositoryWriterAction(c.run))
+}
+
+func (c *commandSnapshotFixInvalidFiles) rewriteEntry(ctx context.Context, dirRelativePath string, ent *snapshot.DirEntry) (*snapshot.DirEntry, error) {
+	fname := dirRelativePath + "/" + ent.Name
+
+	if ent.Type != snapshot.EntryTypeDirectory {
+		if err := c.verifier.VerifyFile(ctx, ent.ObjectID, fname); err != nil {
+			log(ctx).Warnf("removing invalid file %v due to: %v", fname, err)
+
+			return c.failedFileCallback(ctx, dirRelativePath, ent, err)
+		}
+	}
+
+	return ent, nil
+}
+
+func (c *commandSnapshotFixInvalidFiles) run(ctx context.Context, rep repo.RepositoryWriter) error {
+	c.failedFileCallback = failedEntryCallback(rep, c.invalidFileHandling)
+
+	opts := snapshotfs.VerifierOptions{
+		VerifyFilesPercent: c.verifyFilesPercent,
+	}
+
+	if dr, ok := rep.(repo.DirectRepository); ok {
+		blobMap, err := blob.ReadBlobMap(ctx, dr.BlobReader())
+		if err != nil {
+			return errors.Wrap(err, "unable to read blob map")
+		}
+
+		opts.BlobMap = blobMap
+	}
+
+	c.verifier = snapshotfs.NewVerifier(ctx, rep, opts)
+
+	return c.common.rewriteMatchingSnapshots(ctx, rep, c.rewriteEntry)
+}

+ 61 - 0
cli/command_snapshot_fix_remove_files.go

@@ -0,0 +1,61 @@
+package cli
+
+import (
+	"context"
+	"path"
+
+	"github.com/pkg/errors"
+
+	"github.com/kopia/kopia/repo"
+	"github.com/kopia/kopia/snapshot"
+)
+
+type commandSnapshotFixRemoveFiles struct {
+	common commonRewriteSnapshots
+
+	removeObjectIDs   []string
+	removeFilesByName []string
+}
+
+func (c *commandSnapshotFixRemoveFiles) setup(svc appServices, parent commandParent) {
+	cmd := parent.Command("remove-files", "Remove references to the specified files from snapshots.")
+	c.common.setup(svc, cmd)
+
+	cmd.Flag("object-id", "Remove files by their object ID").StringsVar(&c.removeObjectIDs)
+	cmd.Flag("filename", "Remove files by filename (wildcards are supported)").StringsVar(&c.removeFilesByName)
+
+	cmd.Action(svc.repositoryWriterAction(c.run))
+}
+
+func (c *commandSnapshotFixRemoveFiles) rewriteEntry(ctx context.Context, dirRelativePath string, ent *snapshot.DirEntry) (*snapshot.DirEntry, error) {
+	for _, id := range c.removeObjectIDs {
+		if string(ent.ObjectID) == id {
+			log(ctx).Infof("will remove file %v", path.Join(dirRelativePath, ent.Name))
+
+			return nil, nil
+		}
+	}
+
+	for _, n := range c.removeFilesByName {
+		matched, err := path.Match(n, ent.Name)
+		if err != nil {
+			return nil, errors.Wrap(err, "invalid wildcard")
+		}
+
+		if matched {
+			log(ctx).Infof("will remove file %v", path.Join(dirRelativePath, ent.Name))
+
+			return nil, nil
+		}
+	}
+
+	return ent, nil
+}
+
+func (c *commandSnapshotFixRemoveFiles) run(ctx context.Context, rep repo.RepositoryWriter) error {
+	if len(c.removeObjectIDs)+len(c.removeFilesByName) == 0 {
+		return errors.Errorf("must specify files to remove")
+	}
+
+	return c.common.rewriteMatchingSnapshots(ctx, rep, c.rewriteEntry)
+}

+ 439 - 0
cli/command_snapshot_fix_test.go

@@ -0,0 +1,439 @@
+package cli_test
+
+import (
+	"bytes"
+	"os"
+	"path/filepath"
+	"sort"
+	"strings"
+	"testing"
+
+	"github.com/stretchr/testify/require"
+
+	"github.com/kopia/kopia/internal/testutil"
+	"github.com/kopia/kopia/repo/content"
+	"github.com/kopia/kopia/repo/content/index"
+	"github.com/kopia/kopia/repo/object"
+	"github.com/kopia/kopia/snapshot"
+	"github.com/kopia/kopia/snapshot/snapshotfs"
+	"github.com/kopia/kopia/tests/testenv"
+)
+
+// nolint:maintidx
+func TestSnapshotFix(t *testing.T) {
+	srcDir1 := testutil.TempDirectory(t)
+
+	if testutil.ShouldReduceTestComplexity() {
+		return
+	}
+
+	// 300 bytes
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "small-file1"), 1, bytes.Repeat([]byte{1, 2, 3}, 100))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "small-file1-dup"), 1, bytes.Repeat([]byte{1, 2, 3}, 100))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "small-file2"), 1, bytes.Repeat([]byte{1, 2, 4}, 100))
+
+	require.NoError(t, os.MkdirAll(filepath.Join(srcDir1, "dir1"), 0o700))
+	require.NoError(t, os.MkdirAll(filepath.Join(srcDir1, "dir2"), 0o700))
+
+	// 3 x 3 x 1_000_000 bytes = 9 MB
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "large-file1"), 3, bytes.Repeat([]byte{1, 2, 3}, 1000000))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "large-file1-dup"), 3, bytes.Repeat([]byte{1, 2, 3}, 1000000))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "large-file2"), 3, bytes.Repeat([]byte{1, 2, 4}, 1000000))
+
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir1", "small-file1"), 1, bytes.Repeat([]byte{1, 1, 2, 3}, 100))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir1", "small-file1-dup"), 1, bytes.Repeat([]byte{1, 1, 2, 3}, 100))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir1", "small-file2"), 1, bytes.Repeat([]byte{1, 1, 2, 4}, 100))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir1", "large-file1"), 3, bytes.Repeat([]byte{1, 1, 2, 3}, 1000000))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir1", "large-file1-dup"), 3, bytes.Repeat([]byte{1, 1, 2, 3}, 1000000))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir1", "large-file2"), 3, bytes.Repeat([]byte{1, 1, 2, 4}, 1000000))
+
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir2", "small-file1"), 1, bytes.Repeat([]byte{2, 1, 2, 3}, 100))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir2", "small-file1-dup"), 1, bytes.Repeat([]byte{2, 1, 2, 3}, 100))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir2", "small-file2"), 1, bytes.Repeat([]byte{2, 1, 2, 4}, 100))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir2", "large-file1"), 3, bytes.Repeat([]byte{2, 1, 2, 3}, 1000000))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir2", "large-file1-dup"), 3, bytes.Repeat([]byte{2, 1, 2, 3}, 1000000))
+	mustWriteFileWithRepeatedData(t, filepath.Join(srcDir1, "dir2", "large-file2"), 3, bytes.Repeat([]byte{2, 1, 2, 4}, 1000000))
+
+	cases := []struct {
+		name                    string
+		flags                   []string
+		modifyRepoAfterSnapshot func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry)
+		initiallyCorrupted      bool
+		wantRecoveredFiles      []string
+		wantRootStub            bool
+		wantFixFail             bool
+		wantFailVerify          bool
+	}{
+		{
+			name:                    "FixInvalidFiles_NoOp",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {},
+			flags:                   []string{"invalid-files"},
+			wantRecoveredFiles: []string{
+				"dir1",
+				"dir1/large-file1",
+				"dir1/large-file1-dup",
+				"dir1/large-file2",
+				"dir1/small-file1",
+				"dir1/small-file1-dup",
+				"dir1/small-file2",
+				"dir2",
+				"dir2/large-file1",
+				"dir2/large-file1-dup",
+				"dir2/large-file2",
+				"dir2/small-file1",
+				"dir2/small-file1-dup",
+				"dir2/small-file2",
+				"large-file1",
+				"large-file1-dup",
+				"large-file2",
+				"small-file1",
+				"small-file1-dup",
+				"small-file2",
+			},
+		},
+		{
+			name: "FixInvalidFiles_MissingRootDirStub",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {
+				forgetContents(t, env,
+					string(man.RootObjectID()))
+			},
+			initiallyCorrupted: true,
+			flags:              []string{"invalid-files"},
+			wantRootStub:       true,
+		},
+		{
+			name: "FixInvalidFiles_MissingRootDirFail",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {
+				forgetContents(t, env,
+					string(man.RootObjectID()))
+			},
+			initiallyCorrupted: true,
+			flags:              []string{"invalid-files", "--invalid-directory-handling=fail"},
+			wantFixFail:        true,
+		},
+		{
+			name: "FixInvalidFiles_MissingRootDirKeep",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {
+				forgetContents(t, env,
+					string(man.RootObjectID()))
+			},
+			initiallyCorrupted: true,
+			flags:              []string{"invalid-files", "--invalid-directory-handling=keep"},
+			wantFailVerify:     true,
+		},
+		{
+			name: "FixInvalidFiles_MissingShortContentFileRemove",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {
+				forgetContents(t, env,
+					string(fileMap["small-file1"].ObjectID),
+					string(fileMap["dir1/small-file1"].ObjectID))
+			},
+			initiallyCorrupted: true,
+			// recovered files
+			flags: []string{"invalid-files", "--invalid-file-handling=remove"},
+			wantRecoveredFiles: []string{
+				"dir1",
+				"dir1/large-file1",
+				"dir1/large-file1-dup",
+				"dir1/large-file2",
+				"dir1/small-file2",
+				"dir2",
+				"dir2/large-file1",
+				"dir2/large-file1-dup",
+				"dir2/large-file2",
+				"dir2/small-file1",
+				"dir2/small-file1-dup",
+				"dir2/small-file2",
+				"large-file1",
+				"large-file1-dup",
+				"large-file2",
+				"small-file2",
+			},
+		},
+		{
+			name: "FixInvalidFiles_MissingShortContentFileStub",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {
+				forgetContents(t, env,
+					string(fileMap["small-file1"].ObjectID),
+					string(fileMap["dir1/small-file1"].ObjectID))
+			},
+			initiallyCorrupted: true,
+			flags:              []string{"invalid-files"},
+			// recovered files
+			wantRecoveredFiles: []string{
+				".INVALID.small-file1",
+				".INVALID.small-file1-dup",
+				"dir1",
+				"dir1/.INVALID.small-file1",
+				"dir1/.INVALID.small-file1-dup",
+				"dir1/large-file1",
+				"dir1/large-file1-dup",
+				"dir1/large-file2",
+				"dir1/small-file2",
+				"dir2",
+				"dir2/large-file1",
+				"dir2/large-file1-dup",
+				"dir2/large-file2",
+				"dir2/small-file1",
+				"dir2/small-file1-dup",
+				"dir2/small-file2",
+				"large-file1",
+				"large-file1-dup",
+				"large-file2",
+				"small-file2",
+			},
+		},
+		{
+			name: "FixInvalidFiles_MissingShortContentFileKeep",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {
+				forgetContents(t, env,
+					string(fileMap["small-file1"].ObjectID),
+					string(fileMap["dir1/small-file1"].ObjectID))
+			},
+			initiallyCorrupted: true,
+			flags:              []string{"invalid-files", "--invalid-file-handling=keep"},
+			wantFailVerify:     true,
+		},
+		{
+			name: "FixInvalidFiles_MissingShortContentDir",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {
+				forgetContents(t, env,
+					string(fileMap["dir1"].ObjectID))
+			},
+			initiallyCorrupted: true,
+			flags:              []string{"invalid-files", "--invalid-directory-handling=stub"},
+			wantRecoveredFiles: []string{
+				".INVALID.dir1",
+				"dir2",
+				"dir2/large-file1",
+				"dir2/large-file1-dup",
+				"dir2/large-file2",
+				"dir2/small-file1",
+				"dir2/small-file1-dup",
+				"dir2/small-file2",
+				"large-file1",
+				"large-file1-dup",
+				"large-file2",
+				"small-file1",
+				"small-file1-dup",
+				"small-file2",
+			},
+		},
+		{
+			name: "FixInvalidFiles_MissingLargeFileIndex",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {
+				forgetContents(t, env,
+					strings.TrimPrefix(string(fileMap["large-file1"].ObjectID), "I"),
+					strings.TrimPrefix(string(fileMap["dir1/large-file1"].ObjectID), "I"))
+			},
+			initiallyCorrupted: true,
+			flags:              []string{"invalid-files", "--invalid-file-handling=remove"},
+			wantRecoveredFiles: []string{
+				"dir1",
+				"dir1/large-file2",
+				"dir1/small-file1",
+				"dir1/small-file1-dup",
+				"dir1/small-file2",
+				"dir2",
+				"dir2/large-file1",
+				"dir2/large-file1-dup",
+				"dir2/large-file2",
+				"dir2/small-file1",
+				"dir2/small-file1-dup",
+				"dir2/small-file2",
+				"large-file2",
+				"small-file1",
+				"small-file1-dup",
+				"small-file2",
+			},
+		},
+		{
+			name:                    "FixRemoveFiles_ByFileName",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {},
+			flags:                   []string{"remove-files", "--filename=small-file2", "--filename=large-file1-dup"},
+			wantRecoveredFiles: []string{
+				"dir1",
+				"dir1/large-file1",
+				"dir1/large-file2",
+				"dir1/small-file1",
+				"dir1/small-file1-dup",
+				"dir2",
+				"dir2/large-file1",
+				"dir2/large-file2",
+				"dir2/small-file1",
+				"dir2/small-file1-dup",
+				"large-file1",
+				"large-file2",
+				"small-file1",
+				"small-file1-dup",
+			},
+		},
+		{
+			name:                    "FixRemoveFiles_ByWildcard",
+			modifyRepoAfterSnapshot: func(env *testenv.CLITest, man *snapshot.Manifest, fileMap map[string]*snapshot.DirEntry) {},
+			flags:                   []string{"remove-files", "--filename=small-*", "--filename=*-dup"},
+			wantRecoveredFiles: []string{
+				"dir1",
+				"dir1/large-file1",
+				"dir1/large-file2",
+				"dir2",
+				"dir2/large-file1",
+				"dir2/large-file2",
+				"large-file1",
+				"large-file2",
+			},
+		},
+	}
+
+	for _, tc := range cases {
+		tc := tc
+
+		t.Run(tc.name, func(t *testing.T) {
+			runner := testenv.NewInProcRunner(t)
+			env := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, runner)
+
+			env.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", env.RepoDir)
+
+			var man1, man2 snapshot.Manifest
+
+			testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "snapshot", "create", srcDir1, "--json"), &man1)
+			testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "snapshot", "create", srcDir1, "--json"), &man2)
+
+			fileMap := mustGetFileMap(t, env, man1.RootObjectID())
+
+			tc.modifyRepoAfterSnapshot(env, &man1, fileMap)
+
+			if tc.initiallyCorrupted {
+				env.RunAndExpectFailure(t, "snapshot", "verify")
+			} else {
+				env.RunAndExpectFailure(t, "snapshot", "success")
+			}
+
+			if tc.wantFixFail {
+				env.RunAndExpectFailure(t, append([]string{"snapshot", "fix"}, tc.flags...)...)
+				env.RunAndExpectFailure(t, append(append([]string{"snapshot", "fix"}, tc.flags...), "--commit")...)
+				env.RunAndExpectFailure(t, "snapshot", "verify")
+				return
+			}
+
+			// this does not commit fixes
+			env.RunAndExpectSuccess(t, append([]string{"snapshot", "fix"}, tc.flags...)...)
+
+			if tc.initiallyCorrupted {
+				// snapshot verify still fails
+				env.RunAndExpectFailure(t, "snapshot", "verify")
+			} else {
+				env.RunAndExpectFailure(t, "snapshot", "success")
+			}
+
+			env.RunAndExpectSuccess(t, append(append([]string{"snapshot", "fix"}, tc.flags...), "--commit")...)
+
+			if tc.wantFailVerify {
+				env.RunAndExpectFailure(t, "snapshot", "verify")
+				return
+			}
+
+			env.RunAndExpectSuccess(t, "snapshot", "verify")
+
+			var manifests []snapshot.Manifest
+
+			testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "snapshot", "list", "--json"), &manifests)
+			require.Len(t, manifests, 2)
+
+			// make sure all root entries have been fixed the same way
+			require.Equal(t, manifests[0].RootEntry, manifests[1].RootEntry)
+
+			switch {
+			case tc.wantRecoveredFiles != nil:
+				var remainingFiles []string
+
+				for f := range mustGetFileMap(t, env, manifests[0].RootObjectID()) {
+					remainingFiles = append(remainingFiles, f)
+				}
+
+				sort.Strings(remainingFiles)
+				require.Equal(t, tc.wantRecoveredFiles, remainingFiles)
+
+			case tc.wantRootStub:
+				var stub snapshotfs.UnreadableDirEntryReplacement
+
+				testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "show", string(manifests[0].RootObjectID())), &stub)
+			}
+		})
+	}
+}
+
+// forgetContents rewrites contents into a new blob and deletes the blob
+// making index entries dangling.
+func forgetContents(t *testing.T, env *testenv.CLITest, contentIDs ...string) {
+	t.Helper()
+
+	before := mustGetContentMap(t, env)
+
+	env.RunAndExpectSuccess(t, append([]string{"content", "rewrite", "--safety=none"}, contentIDs...)...)
+
+	after := mustGetContentMap(t, env)
+
+	var blobIDs []string
+
+	for _, cid := range contentIDs {
+		require.NotEqual(t, before[content.ID(cid)].PackBlobID, after[content.ID(cid)].PackBlobID)
+		blobIDs = append(blobIDs, string(after[index.ID(cid)].PackBlobID))
+	}
+
+	env.RunAndExpectSuccess(t, append([]string{"blob", "rm"}, blobIDs...)...)
+}
+
+func mustGetContentMap(t *testing.T, env *testenv.CLITest) map[content.ID]content.InfoStruct {
+	t.Helper()
+
+	var contents1 []content.InfoStruct
+
+	testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "content", "ls", "--json"), &contents1)
+
+	contentMap := map[content.ID]content.InfoStruct{}
+	for _, v := range contents1 {
+		contentMap[v.ContentID] = v
+	}
+
+	return contentMap
+}
+
+func mustGetFileMap(t *testing.T, env *testenv.CLITest, root object.ID) map[string]*snapshot.DirEntry {
+	t.Helper()
+
+	fileMap := map[string]*snapshot.DirEntry{}
+	mustListDirEntries(t, env, fileMap, root, "")
+
+	return fileMap
+}
+
+func mustListDirEntries(t *testing.T, env *testenv.CLITest, out map[string]*snapshot.DirEntry, root object.ID, prefix string) {
+	t.Helper()
+
+	var dir1 snapshot.DirManifest
+
+	testutil.MustParseJSONLines(t, env.RunAndExpectSuccess(t, "show", string(root)), &dir1)
+
+	for _, v := range dir1.Entries {
+		out[prefix+v.Name] = v
+
+		if v.Type == snapshot.EntryTypeDirectory {
+			mustListDirEntries(t, env, out, v.ObjectID, prefix+v.Name+"/")
+		}
+	}
+}
+
+func mustWriteFileWithRepeatedData(t *testing.T, fname string, repeat int, data []byte) {
+	t.Helper()
+
+	f, err := os.Create(fname)
+	require.NoError(t, err)
+
+	defer f.Close()
+
+	for i := 0; i < repeat; i++ {
+		_, err := f.Write(data)
+		require.NoError(t, err)
+	}
+}

+ 9 - 1
internal/diff/diff.go

@@ -41,8 +41,16 @@ func (c *Comparer) Close() error {
 	return os.RemoveAll(c.tmpDir)
 }
 
+func maybeOID(e fs.Entry) string {
+	if h, ok := e.(object.HasObjectID); ok {
+		return h.ObjectID().String()
+	}
+
+	return ""
+}
+
 func (c *Comparer) compareDirectories(ctx context.Context, dir1, dir2 fs.Directory, parent string) error {
-	log(ctx).Debugf("comparing directories %v", parent)
+	log(ctx).Debugf("comparing directories %v (%v and %v)", parent, maybeOID(dir1), maybeOID(dir2))
 
 	var entries1, entries2 fs.Entries
 

+ 24 - 0
snapshot/manifest.go

@@ -132,6 +132,19 @@ type DirEntry struct {
 	DirSummary  *fs.DirectorySummary `json:"summ,omitempty"`
 }
 
+// Clone returns a clone of the entry.
+func (e *DirEntry) Clone() *DirEntry {
+	e2 := *e
+
+	if s := e2.DirSummary; s != nil {
+		s2 := s.Clone()
+
+		e2.DirSummary = &s2
+	}
+
+	return &e2
+}
+
 // HasDirEntry is implemented by objects that have a DirEntry associated with them.
 type HasDirEntry interface {
 	DirEntry() *DirEntry
@@ -161,6 +174,17 @@ func (m *Manifest) RootObjectID() object.ID {
 	return ""
 }
 
+// Clone returns a clone of the manifest.
+func (m *Manifest) Clone() *Manifest {
+	m2 := *m
+
+	if m2.RootEntry != nil {
+		m2.RootEntry = m2.RootEntry.Clone()
+	}
+
+	return &m2
+}
+
 // StorageStats encapsulates snapshot storage usage information and running totals.
 type StorageStats struct {
 	// amount of new unique data in this snapshot that wasn't there before.

+ 308 - 0
snapshot/snapshotfs/dir_rewriter.go

@@ -0,0 +1,308 @@
+package snapshotfs
+
+import (
+	"bytes"
+	"context"
+	"crypto/sha1"
+	"encoding/json"
+	"path"
+	"runtime"
+	"sync"
+
+	"github.com/pkg/errors"
+
+	"github.com/kopia/kopia/internal/impossible"
+	"github.com/kopia/kopia/internal/workshare"
+	"github.com/kopia/kopia/repo"
+	"github.com/kopia/kopia/repo/logging"
+	"github.com/kopia/kopia/repo/object"
+	"github.com/kopia/kopia/snapshot"
+)
+
+var dirRewriterLog = logging.Module("dirRewriter")
+
+type dirRewriterCacheKey [sha1.Size]byte
+
+// RewriteDirEntryCallback returns a replacement for the provided directory entry in the provided path.
+// nil indicates that the entry should be removed.
+type RewriteDirEntryCallback func(ctx context.Context, parentPath string, input *snapshot.DirEntry) (*snapshot.DirEntry, error)
+
+// RewriteFailedEntryCallback is invoked rewriting a file or directory.
+type RewriteFailedEntryCallback func(ctx context.Context, parentPath string, input *snapshot.DirEntry, err error) (*snapshot.DirEntry, error)
+
+// UnreadableDirEntryReplacement is serialized as a stub object replacing unreadable file or directory.
+type UnreadableDirEntryReplacement struct {
+	Info  string             `json:"info"`
+	Error string             `json:"error"`
+	Entry *snapshot.DirEntry `json:"entry"`
+}
+
+// DirRewriterOptions provides options for directory rewriter.
+type DirRewriterOptions struct {
+	Parallel int
+
+	RewriteEntry RewriteDirEntryCallback
+
+	// when != nil will be invoked to replace directory that can't be read,
+	// by default RewriteAsStub()
+	OnDirectoryReadFailure RewriteFailedEntryCallback
+}
+
+// DirRewriter rewrites contents of directories by walking the snapshot tree recursively.
+type DirRewriter struct {
+	ws   *workshare.Pool
+	opts DirRewriterOptions
+
+	cache sync.Map
+
+	rep repo.RepositoryWriter
+}
+
+type dirRewriterRequest struct {
+	ctx        context.Context // nolint:containedctx
+	parentPath string
+	input      *snapshot.DirEntry
+	result     *snapshot.DirEntry
+	err        error
+}
+
+func (rw *DirRewriter) processRequest(pool *workshare.Pool, req0 interface{}) {
+	req, _ := req0.(*dirRewriterRequest)
+
+	req.result, req.err = rw.getCachedReplacement(req.ctx, req.parentPath, req.input)
+}
+
+func (rw *DirRewriter) getCacheKey(input *snapshot.DirEntry) dirRewriterCacheKey {
+	// cache key = SHA1 hash of the input as JSON (20 bytes)
+	h := sha1.New()
+
+	if err := json.NewEncoder(h).Encode(input); err != nil {
+		impossible.PanicOnError(err)
+	}
+
+	var out dirRewriterCacheKey
+
+	h.Sum(out[:0])
+
+	return out
+}
+
+func (rw *DirRewriter) getCachedReplacement(ctx context.Context, parentPath string, input *snapshot.DirEntry) (*snapshot.DirEntry, error) {
+	key := rw.getCacheKey(input)
+
+	// see if we already processed this exact directory entry
+	if v, ok := rw.cache.Load(key); ok {
+		// nolint:forcetypeassert
+		return v.(*snapshot.DirEntry).Clone(), nil
+	}
+
+	// entry not cached yet, run rewriter
+	result, err := rw.opts.RewriteEntry(ctx, parentPath, input)
+
+	// if the rewriter did not return the entry, return any error.
+	if result == nil {
+		return nil, err
+	}
+
+	// the rewriter returned a directory, we must recursively process it.
+	if result.Type == snapshot.EntryTypeDirectory {
+		rep2, subdirErr := rw.processDirectory(ctx, parentPath, result)
+		if rep2 == nil {
+			return nil, errors.Wrap(subdirErr, input.Name)
+		}
+
+		result = rep2
+	}
+
+	actual, _ := rw.cache.LoadOrStore(key, result.Clone())
+
+	// nolint:forcetypeassert
+	return actual.(*snapshot.DirEntry), nil
+}
+
+func (rw *DirRewriter) processDirectory(ctx context.Context, pathFromRoot string, entry *snapshot.DirEntry) (*snapshot.DirEntry, error) {
+	dirRewriterLog(ctx).Debugf("processDirectory", "path", pathFromRoot)
+
+	r, err := rw.rep.OpenObject(ctx, entry.ObjectID)
+	if err != nil {
+		return rw.opts.OnDirectoryReadFailure(ctx, pathFromRoot, entry, errors.Wrapf(err, "unable to open directory object %v", entry.ObjectID))
+	}
+	defer r.Close() //nolint:errcheck
+
+	entries, _, err := readDirEntries(r)
+	if err != nil {
+		return rw.opts.OnDirectoryReadFailure(ctx, pathFromRoot, entry, errors.Wrap(err, "unable to read directory entries"))
+	}
+
+	return rw.processDirectoryEntries(ctx, pathFromRoot, entry, entries)
+}
+
+func (rw *DirRewriter) processDirectoryEntries(ctx context.Context, parentPath string, entry *snapshot.DirEntry, entries []*snapshot.DirEntry) (*snapshot.DirEntry, error) {
+	var (
+		builder DirManifestBuilder
+		wg      workshare.AsyncGroup
+	)
+
+	for _, child := range entries {
+		if wg.CanShareWork(rw.ws) {
+			// see if we can run this child in a goroutine
+			wg.RunAsync(rw.ws, rw.processRequest, &dirRewriterRequest{
+				ctx,
+				path.Join(parentPath, child.Name),
+				child,
+				nil,
+				nil,
+			})
+
+			continue
+		}
+
+		// run in current goroutine
+		replacement, repErr := rw.getCachedReplacement(ctx, path.Join(parentPath, child.Name), child)
+		if repErr != nil {
+			return nil, errors.Wrap(repErr, child.Name)
+		}
+
+		if replacement == nil {
+			continue
+		}
+
+		builder.AddEntry(replacement)
+	}
+
+	// now wait for all asynchronous work to complete and add resulting entries to
+	// the builder
+	for _, req := range wg.Wait() {
+		if req, ok := req.(*dirRewriterRequest); ok {
+			if req.result != nil {
+				builder.AddEntry(req.result)
+			}
+		}
+	}
+
+	dm := builder.Build(entry.ModTime, entry.DirSummary.IncompleteReason)
+
+	oid, err := writeDirManifest(ctx, rw.rep, string(entry.ObjectID), dm)
+	if err != nil {
+		return nil, errors.Wrap(err, "unable to write directory manifest")
+	}
+
+	result := *entry
+	result.DirSummary = dm.Summary
+	result.ObjectID = oid
+
+	return &result, nil
+}
+
+func (rw *DirRewriter) equalEntries(e1, e2 *snapshot.DirEntry) bool {
+	if e1 == nil {
+		return e2 == nil
+	}
+
+	if e2 == nil {
+		return false
+	}
+
+	return rw.getCacheKey(e1) == rw.getCacheKey(e2)
+}
+
+// RewriteSnapshotManifest rewrites the directory tree starting at a given manifest.
+func (rw *DirRewriter) RewriteSnapshotManifest(ctx context.Context, man *snapshot.Manifest) (bool, error) {
+	newEntry, err := rw.getCachedReplacement(ctx, ".", man.RootEntry)
+	if err != nil {
+		return false, errors.Wrapf(err, "error processing snapshot %v", man.ID)
+	}
+
+	if newEntry == nil {
+		newEntry, err = rw.opts.OnDirectoryReadFailure(ctx, ".", man.RootEntry, errors.Errorf("invalid root directory %v", man.ID))
+		if err != nil {
+			return false, err
+		}
+	}
+
+	if !rw.equalEntries(newEntry, man.RootEntry) {
+		man.RootEntry = newEntry
+		return true, nil
+	}
+
+	return false, nil
+}
+
+// Close closes the rewriter.
+func (rw *DirRewriter) Close() {
+	rw.ws.Close()
+}
+
+// RewriteKeep is a callback that keeps the unreadable entry.
+func RewriteKeep(ctx context.Context, parentPath string, input *snapshot.DirEntry, err error) (*snapshot.DirEntry, error) {
+	return input, nil
+}
+
+// RewriteAsStub returns a callback that replaces the invalid entry with a stub that describes
+// the error.
+func RewriteAsStub(rep repo.RepositoryWriter) RewriteFailedEntryCallback {
+	return func(ctx context.Context, parentPath string, input *snapshot.DirEntry, originalErr error) (*snapshot.DirEntry, error) {
+		var buf bytes.Buffer
+
+		e := json.NewEncoder(&buf)
+		e.SetIndent("  ", "    ")
+
+		if err := e.Encode(UnreadableDirEntryReplacement{
+			"Kopia replaced the original entry with this stub because of an error.",
+			originalErr.Error(),
+			input,
+		}); err != nil {
+			return nil, errors.Wrap(err, "error writing stub contents")
+		}
+
+		w := rep.NewObjectWriter(ctx, object.WriterOptions{})
+
+		n, err := buf.WriteTo(w)
+		if err != nil {
+			return nil, errors.Wrap(err, "error writing stub")
+		}
+
+		oid, err := w.Result()
+		if err != nil {
+			return nil, errors.Wrap(err, "error writing stub")
+		}
+
+		return &snapshot.DirEntry{
+			Name:        ".INVALID." + input.Name,
+			Type:        snapshot.EntryTypeFile,
+			ModTime:     input.ModTime,
+			FileSize:    n,
+			UserID:      input.UserID,
+			GroupID:     input.GroupID,
+			ObjectID:    oid,
+			Permissions: input.Permissions,
+		}, nil
+	}
+}
+
+// RewriteFail is a callback that fails the entire rewrite process when a directory is unreadable.
+func RewriteFail(ctx context.Context, parentPath string, entry *snapshot.DirEntry, err error) (*snapshot.DirEntry, error) {
+	return nil, err
+}
+
+// RewriteRemove is a callback that removes the entire failed entry.
+func RewriteRemove(ctx context.Context, parentPath string, entry *snapshot.DirEntry, err error) (*snapshot.DirEntry, error) {
+	return nil, nil
+}
+
+// NewDirRewriter creates a new directory rewriter.
+func NewDirRewriter(rep repo.RepositoryWriter, opts DirRewriterOptions) *DirRewriter {
+	if opts.Parallel == 0 {
+		opts.Parallel = runtime.NumCPU()
+	}
+
+	if opts.OnDirectoryReadFailure == nil {
+		opts.OnDirectoryReadFailure = RewriteFail
+	}
+
+	return &DirRewriter{
+		ws:   workshare.NewPool(opts.Parallel - 1),
+		opts: opts,
+		rep:  rep,
+	}
+}