filesync.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package filesync
  2. import (
  3. "os"
  4. "strings"
  5. "github.com/docker/docker/client/session"
  6. "github.com/pkg/errors"
  7. "github.com/tonistiigi/fsutil"
  8. "golang.org/x/net/context"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/metadata"
  11. )
  12. const (
  13. keyOverrideExcludes = "override-excludes"
  14. keyIncludePatterns = "include-patterns"
  15. )
  16. type fsSyncProvider struct {
  17. root string
  18. excludes []string
  19. p progressCb
  20. doneCh chan error
  21. }
  22. // NewFSSyncProvider creates a new provider for sending files from client
  23. func NewFSSyncProvider(root string, excludes []string) session.Attachable {
  24. p := &fsSyncProvider{
  25. root: root,
  26. excludes: excludes,
  27. }
  28. return p
  29. }
  30. func (sp *fsSyncProvider) Register(server *grpc.Server) {
  31. RegisterFileSyncServer(server, sp)
  32. }
  33. func (sp *fsSyncProvider) DiffCopy(stream FileSync_DiffCopyServer) error {
  34. return sp.handle("diffcopy", stream)
  35. }
  36. func (sp *fsSyncProvider) TarStream(stream FileSync_TarStreamServer) error {
  37. return sp.handle("tarstream", stream)
  38. }
  39. func (sp *fsSyncProvider) handle(method string, stream grpc.ServerStream) error {
  40. var pr *protocol
  41. for _, p := range supportedProtocols {
  42. if method == p.name && isProtoSupported(p.name) {
  43. pr = &p
  44. break
  45. }
  46. }
  47. if pr == nil {
  48. return errors.New("failed to negotiate protocol")
  49. }
  50. opts, _ := metadata.FromContext(stream.Context()) // if no metadata continue with empty object
  51. var excludes []string
  52. if len(opts[keyOverrideExcludes]) == 0 || opts[keyOverrideExcludes][0] != "true" {
  53. excludes = sp.excludes
  54. }
  55. includes := opts[keyIncludePatterns]
  56. var progress progressCb
  57. if sp.p != nil {
  58. progress = sp.p
  59. sp.p = nil
  60. }
  61. var doneCh chan error
  62. if sp.doneCh != nil {
  63. doneCh = sp.doneCh
  64. sp.doneCh = nil
  65. }
  66. err := pr.sendFn(stream, sp.root, includes, excludes, progress)
  67. if doneCh != nil {
  68. if err != nil {
  69. doneCh <- err
  70. }
  71. close(doneCh)
  72. }
  73. return err
  74. }
  75. func (sp *fsSyncProvider) SetNextProgressCallback(f func(int, bool), doneCh chan error) {
  76. sp.p = f
  77. sp.doneCh = doneCh
  78. }
  79. type progressCb func(int, bool)
  80. type protocol struct {
  81. name string
  82. sendFn func(stream grpc.Stream, srcDir string, includes, excludes []string, progress progressCb) error
  83. recvFn func(stream grpc.Stream, destDir string, cu CacheUpdater) error
  84. }
  85. func isProtoSupported(p string) bool {
  86. // TODO: this should be removed after testing if stability is confirmed
  87. if override := os.Getenv("BUILD_STREAM_PROTOCOL"); override != "" {
  88. return strings.EqualFold(p, override)
  89. }
  90. return true
  91. }
  92. var supportedProtocols = []protocol{
  93. {
  94. name: "diffcopy",
  95. sendFn: sendDiffCopy,
  96. recvFn: recvDiffCopy,
  97. },
  98. {
  99. name: "tarstream",
  100. sendFn: sendTarStream,
  101. recvFn: recvTarStream,
  102. },
  103. }
  104. // FSSendRequestOpt defines options for FSSend request
  105. type FSSendRequestOpt struct {
  106. IncludePatterns []string
  107. OverrideExcludes bool
  108. DestDir string
  109. CacheUpdater CacheUpdater
  110. }
  111. // CacheUpdater is an object capable of sending notifications for the cache hash changes
  112. type CacheUpdater interface {
  113. MarkSupported(bool)
  114. HandleChange(fsutil.ChangeKind, string, os.FileInfo, error) error
  115. }
  116. // FSSync initializes a transfer of files
  117. func FSSync(ctx context.Context, c session.Caller, opt FSSendRequestOpt) error {
  118. var pr *protocol
  119. for _, p := range supportedProtocols {
  120. if isProtoSupported(p.name) && c.Supports(session.MethodURL(_FileSync_serviceDesc.ServiceName, p.name)) {
  121. pr = &p
  122. break
  123. }
  124. }
  125. if pr == nil {
  126. return errors.New("no fssync handlers")
  127. }
  128. opts := make(map[string][]string)
  129. if opt.OverrideExcludes {
  130. opts[keyOverrideExcludes] = []string{"true"}
  131. }
  132. if opt.IncludePatterns != nil {
  133. opts[keyIncludePatterns] = opt.IncludePatterns
  134. }
  135. ctx, cancel := context.WithCancel(ctx)
  136. defer cancel()
  137. client := NewFileSyncClient(c.Conn())
  138. var stream grpc.ClientStream
  139. ctx = metadata.NewContext(ctx, opts)
  140. switch pr.name {
  141. case "tarstream":
  142. cc, err := client.TarStream(ctx)
  143. if err != nil {
  144. return err
  145. }
  146. stream = cc
  147. case "diffcopy":
  148. cc, err := client.DiffCopy(ctx)
  149. if err != nil {
  150. return err
  151. }
  152. stream = cc
  153. }
  154. return pr.recvFn(stream, opt.DestDir, opt.CacheUpdater)
  155. }