filesync.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package filesync
  2. import (
  3. "os"
  4. "strings"
  5. "github.com/docker/docker/client/session"
  6. "github.com/pkg/errors"
  7. "github.com/tonistiigi/fsutil"
  8. "golang.org/x/net/context"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/metadata"
  11. )
  12. type fsSyncProvider struct {
  13. root string
  14. excludes []string
  15. p progressCb
  16. doneCh chan error
  17. }
  18. // NewFSSyncProvider creates a new provider for sending files from client
  19. func NewFSSyncProvider(root string, excludes []string) session.Attachable {
  20. p := &fsSyncProvider{
  21. root: root,
  22. excludes: excludes,
  23. }
  24. return p
  25. }
  26. func (sp *fsSyncProvider) Register(server *grpc.Server) {
  27. RegisterFileSyncServer(server, sp)
  28. }
  29. func (sp *fsSyncProvider) DiffCopy(stream FileSync_DiffCopyServer) error {
  30. return sp.handle("diffcopy", stream)
  31. }
  32. func (sp *fsSyncProvider) TarStream(stream FileSync_TarStreamServer) error {
  33. return sp.handle("tarstream", stream)
  34. }
  35. func (sp *fsSyncProvider) handle(method string, stream grpc.ServerStream) error {
  36. var pr *protocol
  37. for _, p := range supportedProtocols {
  38. if method == p.name && isProtoSupported(p.name) {
  39. pr = &p
  40. break
  41. }
  42. }
  43. if pr == nil {
  44. return errors.New("failed to negotiate protocol")
  45. }
  46. opts, _ := metadata.FromContext(stream.Context()) // if no metadata continue with empty object
  47. var excludes []string
  48. if len(opts["Override-Excludes"]) == 0 || opts["Override-Excludes"][0] != "true" {
  49. excludes = sp.excludes
  50. }
  51. var progress progressCb
  52. if sp.p != nil {
  53. progress = sp.p
  54. sp.p = nil
  55. }
  56. var doneCh chan error
  57. if sp.doneCh != nil {
  58. doneCh = sp.doneCh
  59. sp.doneCh = nil
  60. }
  61. err := pr.sendFn(stream, sp.root, excludes, progress)
  62. if doneCh != nil {
  63. if err != nil {
  64. doneCh <- err
  65. }
  66. close(doneCh)
  67. }
  68. return err
  69. }
  70. func (sp *fsSyncProvider) SetNextProgressCallback(f func(int, bool), doneCh chan error) {
  71. sp.p = f
  72. sp.doneCh = doneCh
  73. }
  74. type progressCb func(int, bool)
  75. type protocol struct {
  76. name string
  77. sendFn func(stream grpc.Stream, srcDir string, excludes []string, progress progressCb) error
  78. recvFn func(stream grpc.Stream, destDir string, cu CacheUpdater) error
  79. }
  80. func isProtoSupported(p string) bool {
  81. // TODO: this should be removed after testing if stability is confirmed
  82. if override := os.Getenv("BUILD_STREAM_PROTOCOL"); override != "" {
  83. return strings.EqualFold(p, override)
  84. }
  85. return true
  86. }
  87. var supportedProtocols = []protocol{
  88. {
  89. name: "diffcopy",
  90. sendFn: sendDiffCopy,
  91. recvFn: recvDiffCopy,
  92. },
  93. {
  94. name: "tarstream",
  95. sendFn: sendTarStream,
  96. recvFn: recvTarStream,
  97. },
  98. }
  99. // FSSendRequestOpt defines options for FSSend request
  100. type FSSendRequestOpt struct {
  101. SrcPaths []string
  102. OverrideExcludes bool
  103. DestDir string
  104. CacheUpdater CacheUpdater
  105. }
  106. // CacheUpdater is an object capable of sending notifications for the cache hash changes
  107. type CacheUpdater interface {
  108. MarkSupported(bool)
  109. HandleChange(fsutil.ChangeKind, string, os.FileInfo, error) error
  110. }
  111. // FSSync initializes a transfer of files
  112. func FSSync(ctx context.Context, c session.Caller, opt FSSendRequestOpt) error {
  113. var pr *protocol
  114. for _, p := range supportedProtocols {
  115. if isProtoSupported(p.name) && c.Supports(session.MethodURL(_FileSync_serviceDesc.ServiceName, p.name)) {
  116. pr = &p
  117. break
  118. }
  119. }
  120. if pr == nil {
  121. return errors.New("no fssync handlers")
  122. }
  123. opts := make(map[string][]string)
  124. if opt.OverrideExcludes {
  125. opts["Override-Excludes"] = []string{"true"}
  126. }
  127. ctx, cancel := context.WithCancel(ctx)
  128. defer cancel()
  129. client := NewFileSyncClient(c.Conn())
  130. var stream grpc.ClientStream
  131. ctx = metadata.NewContext(ctx, opts)
  132. switch pr.name {
  133. case "tarstream":
  134. cc, err := client.TarStream(ctx)
  135. if err != nil {
  136. return err
  137. }
  138. stream = cc
  139. case "diffcopy":
  140. cc, err := client.DiffCopy(ctx)
  141. if err != nil {
  142. return err
  143. }
  144. stream = cc
  145. }
  146. return pr.recvFn(stream, opt.DestDir, opt.CacheUpdater)
  147. }