|
@@ -8,6 +8,7 @@ import (
|
|
"io/ioutil"
|
|
"io/ioutil"
|
|
"runtime"
|
|
"runtime"
|
|
"sync"
|
|
"sync"
|
|
|
|
+ "sync/atomic"
|
|
"time"
|
|
"time"
|
|
|
|
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/content"
|
|
@@ -57,13 +58,15 @@ type SourceOpt struct {
|
|
|
|
|
|
type imageSource struct {
|
|
type imageSource struct {
|
|
SourceOpt
|
|
SourceOpt
|
|
- g flightcontrol.Group
|
|
|
|
|
|
+ g flightcontrol.Group
|
|
|
|
+ resolverCache *resolverCache
|
|
}
|
|
}
|
|
|
|
|
|
// NewSource creates a new image source
|
|
// NewSource creates a new image source
|
|
func NewSource(opt SourceOpt) (source.Source, error) {
|
|
func NewSource(opt SourceOpt) (source.Source, error) {
|
|
is := &imageSource{
|
|
is := &imageSource{
|
|
- SourceOpt: opt,
|
|
|
|
|
|
+ SourceOpt: opt,
|
|
|
|
+ resolverCache: newResolverCache(),
|
|
}
|
|
}
|
|
|
|
|
|
return is, nil
|
|
return is, nil
|
|
@@ -74,6 +77,9 @@ func (is *imageSource) ID() string {
|
|
}
|
|
}
|
|
|
|
|
|
func (is *imageSource) getResolver(ctx context.Context, rfn resolver.ResolveOptionsFunc, ref string) remotes.Resolver {
|
|
func (is *imageSource) getResolver(ctx context.Context, rfn resolver.ResolveOptionsFunc, ref string) remotes.Resolver {
|
|
|
|
+ if res := is.resolverCache.Get(ctx, ref); res != nil {
|
|
|
|
+ return res
|
|
|
|
+ }
|
|
opt := docker.ResolverOptions{
|
|
opt := docker.ResolverOptions{
|
|
Client: tracing.DefaultClient,
|
|
Client: tracing.DefaultClient,
|
|
}
|
|
}
|
|
@@ -82,6 +88,7 @@ func (is *imageSource) getResolver(ctx context.Context, rfn resolver.ResolveOpti
|
|
}
|
|
}
|
|
opt.Credentials = is.getCredentialsFromSession(ctx)
|
|
opt.Credentials = is.getCredentialsFromSession(ctx)
|
|
r := docker.NewResolver(opt)
|
|
r := docker.NewResolver(opt)
|
|
|
|
+ r = is.resolverCache.Add(ctx, ref, r)
|
|
return r
|
|
return r
|
|
}
|
|
}
|
|
|
|
|
|
@@ -380,6 +387,11 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
|
|
return nil, err
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // workaround for GCR bug that requires a request to manifest endpoint for authentication to work.
|
|
|
|
+ // if current resolver has not used manifests do a dummy request.
|
|
|
|
+ // in most cases resolver should be cached and extra request is not needed.
|
|
|
|
+ ensureManifestRequested(ctx, p.resolver, p.ref)
|
|
|
|
+
|
|
var (
|
|
var (
|
|
schema1Converter *schema1.Converter
|
|
schema1Converter *schema1.Converter
|
|
handlers []images.Handler
|
|
handlers []images.Handler
|
|
@@ -791,3 +803,90 @@ func resolveModeToString(rm source.ResolveMode) string {
|
|
}
|
|
}
|
|
return ""
|
|
return ""
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+type resolverCache struct {
|
|
|
|
+ mu sync.Mutex
|
|
|
|
+ m map[string]cachedResolver
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type cachedResolver struct {
|
|
|
|
+ timeout time.Time
|
|
|
|
+ remotes.Resolver
|
|
|
|
+ counter int64
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (cr *cachedResolver) Resolve(ctx context.Context, ref string) (name string, desc ocispec.Descriptor, err error) {
|
|
|
|
+ atomic.AddInt64(&cr.counter, 1)
|
|
|
|
+ return cr.Resolver.Resolve(ctx, ref)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *resolverCache) Add(ctx context.Context, ref string, resolver remotes.Resolver) remotes.Resolver {
|
|
|
|
+ r.mu.Lock()
|
|
|
|
+ defer r.mu.Unlock()
|
|
|
|
+
|
|
|
|
+ ref = r.domain(ref) + "-" + session.FromContext(ctx)
|
|
|
|
+
|
|
|
|
+ cr, ok := r.m[ref]
|
|
|
|
+ cr.timeout = time.Now().Add(time.Minute)
|
|
|
|
+ if ok {
|
|
|
|
+ return &cr
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ cr.Resolver = resolver
|
|
|
|
+ r.m[ref] = cr
|
|
|
|
+ return &cr
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *resolverCache) domain(refStr string) string {
|
|
|
|
+ ref, err := distreference.ParseNormalizedNamed(refStr)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return refStr
|
|
|
|
+ }
|
|
|
|
+ return distreference.Domain(ref)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *resolverCache) Get(ctx context.Context, ref string) remotes.Resolver {
|
|
|
|
+ r.mu.Lock()
|
|
|
|
+ defer r.mu.Unlock()
|
|
|
|
+
|
|
|
|
+ ref = r.domain(ref) + "-" + session.FromContext(ctx)
|
|
|
|
+
|
|
|
|
+ cr, ok := r.m[ref]
|
|
|
|
+ if !ok {
|
|
|
|
+ return nil
|
|
|
|
+ }
|
|
|
|
+ return &cr
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *resolverCache) clean(now time.Time) {
|
|
|
|
+ r.mu.Lock()
|
|
|
|
+ for k, cr := range r.m {
|
|
|
|
+ if now.After(cr.timeout) {
|
|
|
|
+ delete(r.m, k)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ r.mu.Unlock()
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func newResolverCache() *resolverCache {
|
|
|
|
+ rc := &resolverCache{
|
|
|
|
+ m: map[string]cachedResolver{},
|
|
|
|
+ }
|
|
|
|
+ t := time.NewTicker(time.Minute)
|
|
|
|
+ go func() {
|
|
|
|
+ for {
|
|
|
|
+ rc.clean(<-t.C)
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
|
|
+ return rc
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func ensureManifestRequested(ctx context.Context, res remotes.Resolver, ref string) {
|
|
|
|
+ cr, ok := res.(*cachedResolver)
|
|
|
|
+ if !ok {
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ if atomic.LoadInt64(&cr.counter) == 0 {
|
|
|
|
+ res.Resolve(ctx, ref)
|
|
|
|
+ }
|
|
|
|
+}
|