pwalkdir.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. //go:build go1.16
  2. // +build go1.16
  3. package pwalkdir
  4. import (
  5. "fmt"
  6. "io/fs"
  7. "path/filepath"
  8. "runtime"
  9. "sync"
  10. )
  11. // Walk is a wrapper for filepath.WalkDir which can call multiple walkFn
  12. // in parallel, allowing to handle each item concurrently. A maximum of
  13. // twice the runtime.NumCPU() walkFn will be called at any one time.
  14. // If you want to change the maximum, use WalkN instead.
  15. //
  16. // The order of calls is non-deterministic.
  17. //
  18. // Note that this implementation only supports primitive error handling:
  19. //
  20. // - no errors are ever passed to walkFn;
  21. //
  22. // - once a walkFn returns any error, all further processing stops
  23. // and the error is returned to the caller of Walk;
  24. //
  25. // - filepath.SkipDir is not supported;
  26. //
  27. // - if more than one walkFn instance will return an error, only one
  28. // of such errors will be propagated and returned by Walk, others
  29. // will be silently discarded.
  30. func Walk(root string, walkFn fs.WalkDirFunc) error {
  31. return WalkN(root, walkFn, runtime.NumCPU()*2)
  32. }
  33. // WalkN is a wrapper for filepath.WalkDir which can call multiple walkFn
  34. // in parallel, allowing to handle each item concurrently. A maximum of
  35. // num walkFn will be called at any one time.
  36. //
  37. // Please see Walk documentation for caveats of using this function.
  38. func WalkN(root string, walkFn fs.WalkDirFunc, num int) error {
  39. // make sure limit is sensible
  40. if num < 1 {
  41. return fmt.Errorf("walk(%q): num must be > 0", root)
  42. }
  43. files := make(chan *walkArgs, 2*num)
  44. errCh := make(chan error, 1) // Get the first error, ignore others.
  45. // Start walking a tree asap.
  46. var (
  47. err error
  48. wg sync.WaitGroup
  49. rootLen = len(root)
  50. rootEntry *walkArgs
  51. )
  52. wg.Add(1)
  53. go func() {
  54. err = filepath.WalkDir(root, func(p string, entry fs.DirEntry, err error) error {
  55. if err != nil {
  56. close(files)
  57. return err
  58. }
  59. if len(p) == rootLen {
  60. // Root entry is processed separately below.
  61. rootEntry = &walkArgs{path: p, entry: entry}
  62. return nil
  63. }
  64. // Add a file to the queue unless a callback sent an error.
  65. select {
  66. case e := <-errCh:
  67. close(files)
  68. return e
  69. default:
  70. files <- &walkArgs{path: p, entry: entry}
  71. return nil
  72. }
  73. })
  74. if err == nil {
  75. close(files)
  76. }
  77. wg.Done()
  78. }()
  79. wg.Add(num)
  80. for i := 0; i < num; i++ {
  81. go func() {
  82. for file := range files {
  83. if e := walkFn(file.path, file.entry, nil); e != nil {
  84. select {
  85. case errCh <- e: // sent ok
  86. default: // buffer full
  87. }
  88. }
  89. }
  90. wg.Done()
  91. }()
  92. }
  93. wg.Wait()
  94. if err == nil {
  95. err = walkFn(rootEntry.path, rootEntry.entry, nil)
  96. }
  97. return err
  98. }
  99. // walkArgs holds the arguments that were passed to the Walk or WalkN
  100. // functions.
  101. type walkArgs struct {
  102. entry fs.DirEntry
  103. path string
  104. }