123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- package filesync
- import (
- "context"
- "fmt"
- io "io"
- "os"
- "strings"
- "github.com/moby/buildkit/session"
- "github.com/pkg/errors"
- "github.com/tonistiigi/fsutil"
- "google.golang.org/grpc"
- "google.golang.org/grpc/metadata"
- )
- const (
- keyOverrideExcludes = "override-excludes"
- keyIncludePatterns = "include-patterns"
- keyExcludePatterns = "exclude-patterns"
- keyFollowPaths = "followpaths"
- keyDirName = "dir-name"
- )
- type fsSyncProvider struct {
- dirs map[string]SyncedDir
- p progressCb
- doneCh chan error
- }
- type SyncedDir struct {
- Name string
- Dir string
- Excludes []string
- Map func(*fsutil.Stat) bool
- }
- // NewFSSyncProvider creates a new provider for sending files from client
- func NewFSSyncProvider(dirs []SyncedDir) session.Attachable {
- p := &fsSyncProvider{
- dirs: map[string]SyncedDir{},
- }
- for _, d := range dirs {
- p.dirs[d.Name] = d
- }
- return p
- }
- func (sp *fsSyncProvider) Register(server *grpc.Server) {
- RegisterFileSyncServer(server, sp)
- }
- func (sp *fsSyncProvider) DiffCopy(stream FileSync_DiffCopyServer) error {
- return sp.handle("diffcopy", stream)
- }
- func (sp *fsSyncProvider) TarStream(stream FileSync_TarStreamServer) error {
- return sp.handle("tarstream", stream)
- }
- func (sp *fsSyncProvider) handle(method string, stream grpc.ServerStream) (retErr error) {
- var pr *protocol
- for _, p := range supportedProtocols {
- if method == p.name && isProtoSupported(p.name) {
- pr = &p
- break
- }
- }
- if pr == nil {
- return errors.New("failed to negotiate protocol")
- }
- opts, _ := metadata.FromIncomingContext(stream.Context()) // if no metadata continue with empty object
- dirName := ""
- name, ok := opts[keyDirName]
- if ok && len(name) > 0 {
- dirName = name[0]
- }
- dir, ok := sp.dirs[dirName]
- if !ok {
- return errors.Errorf("no access allowed to dir %q", dirName)
- }
- excludes := opts[keyExcludePatterns]
- if len(dir.Excludes) != 0 && (len(opts[keyOverrideExcludes]) == 0 || opts[keyOverrideExcludes][0] != "true") {
- excludes = dir.Excludes
- }
- includes := opts[keyIncludePatterns]
- followPaths := opts[keyFollowPaths]
- var progress progressCb
- if sp.p != nil {
- progress = sp.p
- sp.p = nil
- }
- var doneCh chan error
- if sp.doneCh != nil {
- doneCh = sp.doneCh
- sp.doneCh = nil
- }
- err := pr.sendFn(stream, dir.Dir, includes, excludes, followPaths, progress, dir.Map)
- if doneCh != nil {
- if err != nil {
- doneCh <- err
- }
- close(doneCh)
- }
- return err
- }
- func (sp *fsSyncProvider) SetNextProgressCallback(f func(int, bool), doneCh chan error) {
- sp.p = f
- sp.doneCh = doneCh
- }
- type progressCb func(int, bool)
- type protocol struct {
- name string
- sendFn func(stream grpc.Stream, srcDir string, includes, excludes, followPaths []string, progress progressCb, _map func(*fsutil.Stat) bool) error
- recvFn func(stream grpc.Stream, destDir string, cu CacheUpdater, progress progressCb) error
- }
- func isProtoSupported(p string) bool {
- // TODO: this should be removed after testing if stability is confirmed
- if override := os.Getenv("BUILD_STREAM_PROTOCOL"); override != "" {
- return strings.EqualFold(p, override)
- }
- return true
- }
- var supportedProtocols = []protocol{
- {
- name: "diffcopy",
- sendFn: sendDiffCopy,
- recvFn: recvDiffCopy,
- },
- }
- // FSSendRequestOpt defines options for FSSend request
- type FSSendRequestOpt struct {
- Name string
- IncludePatterns []string
- ExcludePatterns []string
- FollowPaths []string
- OverrideExcludes bool // deprecated: this is used by docker/cli for automatically loading .dockerignore from the directory
- DestDir string
- CacheUpdater CacheUpdater
- ProgressCb func(int, bool)
- }
- // CacheUpdater is an object capable of sending notifications for the cache hash changes
- type CacheUpdater interface {
- MarkSupported(bool)
- HandleChange(fsutil.ChangeKind, string, os.FileInfo, error) error
- ContentHasher() fsutil.ContentHasher
- }
- // FSSync initializes a transfer of files
- func FSSync(ctx context.Context, c session.Caller, opt FSSendRequestOpt) error {
- var pr *protocol
- for _, p := range supportedProtocols {
- if isProtoSupported(p.name) && c.Supports(session.MethodURL(_FileSync_serviceDesc.ServiceName, p.name)) {
- pr = &p
- break
- }
- }
- if pr == nil {
- return errors.New("no fssync handlers")
- }
- opts := make(map[string][]string)
- if opt.OverrideExcludes {
- opts[keyOverrideExcludes] = []string{"true"}
- }
- if opt.IncludePatterns != nil {
- opts[keyIncludePatterns] = opt.IncludePatterns
- }
- if opt.ExcludePatterns != nil {
- opts[keyExcludePatterns] = opt.ExcludePatterns
- }
- if opt.FollowPaths != nil {
- opts[keyFollowPaths] = opt.FollowPaths
- }
- opts[keyDirName] = []string{opt.Name}
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
- client := NewFileSyncClient(c.Conn())
- var stream grpc.ClientStream
- ctx = metadata.NewOutgoingContext(ctx, opts)
- switch pr.name {
- case "tarstream":
- cc, err := client.TarStream(ctx)
- if err != nil {
- return err
- }
- stream = cc
- case "diffcopy":
- cc, err := client.DiffCopy(ctx)
- if err != nil {
- return err
- }
- stream = cc
- default:
- panic(fmt.Sprintf("invalid protocol: %q", pr.name))
- }
- return pr.recvFn(stream, opt.DestDir, opt.CacheUpdater, opt.ProgressCb)
- }
- // NewFSSyncTargetDir allows writing into a directory
- func NewFSSyncTargetDir(outdir string) session.Attachable {
- p := &fsSyncTarget{
- outdir: outdir,
- }
- return p
- }
- // NewFSSyncTarget allows writing into an io.WriteCloser
- func NewFSSyncTarget(w io.WriteCloser) session.Attachable {
- p := &fsSyncTarget{
- outfile: w,
- }
- return p
- }
- type fsSyncTarget struct {
- outdir string
- outfile io.WriteCloser
- }
- func (sp *fsSyncTarget) Register(server *grpc.Server) {
- RegisterFileSendServer(server, sp)
- }
- func (sp *fsSyncTarget) DiffCopy(stream FileSend_DiffCopyServer) error {
- if sp.outdir != "" {
- return syncTargetDiffCopy(stream, sp.outdir)
- }
- if sp.outfile == nil {
- return errors.New("empty outfile and outdir")
- }
- defer sp.outfile.Close()
- return writeTargetFile(stream, sp.outfile)
- }
- func CopyToCaller(ctx context.Context, srcPath string, c session.Caller, progress func(int, bool)) error {
- method := session.MethodURL(_FileSend_serviceDesc.ServiceName, "diffcopy")
- if !c.Supports(method) {
- return errors.Errorf("method %s not supported by the client", method)
- }
- client := NewFileSendClient(c.Conn())
- cc, err := client.DiffCopy(ctx)
- if err != nil {
- return err
- }
- return sendDiffCopy(cc, srcPath, nil, nil, nil, progress, nil)
- }
- func CopyFileWriter(ctx context.Context, c session.Caller) (io.WriteCloser, error) {
- method := session.MethodURL(_FileSend_serviceDesc.ServiceName, "diffcopy")
- if !c.Supports(method) {
- return nil, errors.Errorf("method %s not supported by the client", method)
- }
- client := NewFileSendClient(c.Conn())
- cc, err := client.DiffCopy(ctx)
- if err != nil {
- return nil, err
- }
- return newStreamWriter(cc), nil
- }
|