Skip to content

Commit 83260da

Browse files
committed
Make resolving timeout configurable
Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
1 parent 38baee4 commit 83260da

File tree

4 files changed

+91
-29
lines changed

4 files changed

+91
-29
lines changed

fs/config/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type Config struct {
4242
ResolveResultEntry int `toml:"resolve_result_entry"` // deprecated
4343
PrefetchSize int64 `toml:"prefetch_size"`
4444
PrefetchTimeoutSec int64 `toml:"prefetch_timeout_sec"`
45+
ResolveTimeoutSec int64 `toml:"resolve_timeout_sec"`
46+
ResolveRequestTimeoutSec int64 `toml:"resolve_request_timeout_sec"`
4547
NoPrefetch bool `toml:"noprefetch"`
4648
NoBackgroundFetch bool `toml:"no_background_fetch"`
4749
Debug bool `toml:"debug"`

fs/fs.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,10 @@ import (
6868
)
6969

7070
const (
71-
defaultFuseTimeout = time.Second
72-
defaultMaxConcurrency = 2
73-
fusermountBin = "fusermount"
71+
defaultFuseTimeout = time.Second
72+
defaultResolveTimeoutSec = 60
73+
defaultMaxConcurrency = 2
74+
fusermountBin = "fusermount"
7475
)
7576

7677
type Option func(*options)
@@ -122,6 +123,11 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
122123
entryTimeout = defaultFuseTimeout
123124
}
124125

126+
resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second
127+
if resolveTimeout == 0 {
128+
resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second
129+
}
130+
125131
metadataStore := fsOpts.metadataStore
126132
if metadataStore == nil {
127133
metadataStore = memorymetadata.NewReader
@@ -163,6 +169,7 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
163169
metricsController: c,
164170
attrTimeout: attrTimeout,
165171
entryTimeout: entryTimeout,
172+
resolveTimeout: resolveTimeout,
166173
}, nil
167174
}
168175

@@ -181,6 +188,7 @@ type filesystem struct {
181188
metricsController *layermetrics.Controller
182189
attrTimeout time.Duration
183190
entryTimeout time.Duration
191+
resolveTimeout time.Duration
184192
}
185193

186194
func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[string]string) (retErr error) {
@@ -255,8 +263,8 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
255263
case err := <-errChan:
256264
log.G(ctx).WithError(err).Debug("failed to resolve layer")
257265
return errors.Wrapf(err, "failed to resolve layer")
258-
case <-time.After(30 * time.Second):
259-
log.G(ctx).Debug("failed to resolve layer (timeout)")
266+
case <-time.After(fs.resolveTimeout):
267+
log.G(ctx).WithField("timeout(sec)", fs.resolveTimeout.Seconds()).Debug("failed to resolve layer (timeout)")
260268
return fmt.Errorf("failed to resolve layer (timeout)")
261269
}
262270
defer func() {

fs/layer/layer.go

+66-21
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ const (
5959
defaultMaxLRUCacheEntry = 10
6060
defaultMaxCacheFds = 10
6161
defaultPrefetchTimeoutSec = 10
62+
defaultResolveTimeoutSec = 60
63+
defaultResolveRequestTimeoutSec = 30
6264
memoryCacheType = "memory"
6365
)
6466

@@ -117,6 +119,8 @@ type Resolver struct {
117119
rootDir string
118120
resolver *remote.Resolver
119121
prefetchTimeout time.Duration
122+
resolveTimeout time.Duration
123+
resolveRequestTimeout time.Duration
120124
layerCache *cacheutil.TTLCache
121125
layerCacheMu sync.Mutex
122126
blobCache *cacheutil.TTLCache
@@ -137,6 +141,14 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,
137141
if prefetchTimeout == 0 {
138142
prefetchTimeout = defaultPrefetchTimeoutSec * time.Second
139143
}
144+
resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second
145+
if resolveTimeout == 0 {
146+
resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second
147+
}
148+
resolveRequestTimeout := time.Duration(cfg.ResolveRequestTimeoutSec) * time.Second
149+
if resolveRequestTimeout == 0 {
150+
resolveRequestTimeout = time.Duration(defaultResolveRequestTimeoutSec) * time.Second
151+
}
140152

141153
// layerCache caches resolved layers for future use. This is useful in a use-case where
142154
// the filesystem resolves and caches all layers in an image (not only queried one) in parallel,
@@ -171,6 +183,8 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,
171183
layerCache: layerCache,
172184
blobCache: blobCache,
173185
prefetchTimeout: prefetchTimeout,
186+
resolveTimeout: resolveTimeout,
187+
resolveRequestTimeout: resolveRequestTimeout,
174188
backgroundTaskManager: backgroundTaskManager,
175189
config: cfg,
176190
resolveLock: new(namedmutex.NamedMutex),
@@ -236,6 +250,8 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
236250
defer r.resolveLock.Unlock(name)
237251

238252
ctx = log.WithLogger(ctx, log.G(ctx).WithField("src", name))
253+
ctx, cancel := context.WithTimeout(ctx, r.resolveTimeout)
254+
defer cancel()
239255

240256
// First, try to retrieve this layer from the underlying cache.
241257
r.layerCacheMu.Lock()
@@ -256,7 +272,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
256272
log.G(ctx).Debugf("resolving")
257273

258274
// Resolve the blob.
259-
blobR, err := r.resolveBlob(ctx, hosts, refspec, desc)
275+
blobR, err := r.resolveBlob(ctx, hosts, refspec, desc, name)
260276
if err != nil {
261277
return nil, errors.Wrapf(err, "failed to resolve the blob")
262278
}
@@ -280,11 +296,52 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
280296
// Each file's read operation is a prioritized task and all background tasks
281297
// will be stopped during the execution so this can avoid being disturbed for
282298
// NW traffic by background tasks.
299+
var (
300+
// Use context during resolving, to make it cancellable
301+
curR = readerAtFunc(func(p []byte, offset int64) (n int, err error) {
302+
ctx, cancel := context.WithTimeout(ctx, r.resolveRequestTimeout)
303+
defer cancel()
304+
return blobR.ReadAt(p, offset, remote.WithContext(ctx))
305+
})
306+
curRMu sync.Mutex
307+
)
283308
sr := io.NewSectionReader(readerAtFunc(func(p []byte, offset int64) (n int, err error) {
284309
r.backgroundTaskManager.DoPrioritizedTask()
285310
defer r.backgroundTaskManager.DonePrioritizedTask()
286-
return blobR.ReadAt(p, offset)
311+
curRMu.Lock()
312+
br := curR
313+
curRMu.Unlock()
314+
return br.ReadAt(p, offset)
287315
}), 0, blobR.Size())
316+
vr, err := r.newReader(sr, desc, fsCache, esgzOpts...)
317+
if err != nil {
318+
cErr := ctx.Err()
319+
if errors.Is(cErr, context.DeadlineExceeded) {
320+
r.blobCacheMu.Lock()
321+
r.blobCache.Remove(name)
322+
r.blobCacheMu.Unlock()
323+
}
324+
return nil, errors.Wrap(err, "failed to read layer")
325+
}
326+
// do not propagate context after resolve is done
327+
curRMu.Lock()
328+
curR = readerAtFunc(func(p []byte, offset int64) (n int, err error) { return blobR.ReadAt(p, offset) })
329+
curRMu.Unlock()
330+
331+
// Combine layer information together and cache it.
332+
l := newLayer(r, desc, blobR, vr)
333+
r.layerCacheMu.Lock()
334+
cachedL, done2, added := r.layerCache.Add(name, l)
335+
r.layerCacheMu.Unlock()
336+
if !added {
337+
l.close() // layer already exists in the cache. discrad this.
338+
}
339+
340+
log.G(ctx).Debugf("resolved")
341+
return &layerRef{cachedL.(*layer), done2}, nil
342+
}
343+
344+
func (r *Resolver) newReader(sr *io.SectionReader, desc ocispec.Descriptor, fsCache cache.BlobCache, esgzOpts ...metadata.Option) (*reader.VerifiableReader, error) {
288345
// define telemetry hooks to measure latency metrics inside estargz package
289346
telemetry := metadata.Telemetry{
290347
GetFooterLatency: func(start time.Time) {
@@ -306,36 +363,24 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
306363
if err != nil {
307364
return nil, errors.Wrap(err, "failed to read layer")
308365
}
309-
310-
// Combine layer information together and cache it.
311-
l := newLayer(r, desc, blobR, vr)
312-
r.layerCacheMu.Lock()
313-
cachedL, done2, added := r.layerCache.Add(name, l)
314-
r.layerCacheMu.Unlock()
315-
if !added {
316-
l.close() // layer already exists in the cache. discrad this.
317-
}
318-
319-
log.G(ctx).Debugf("resolved")
320-
return &layerRef{cachedL.(*layer), done2}, nil
366+
return vr, nil
321367
}
322368

323369
// resolveBlob resolves a blob based on the passed layer blob information.
324-
func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) (_ *blobRef, retErr error) {
325-
name := refspec.String() + "/" + desc.Digest.String()
326-
370+
func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor, cacheKey string) (_ *blobRef, retErr error) {
327371
// Try to retrieve the blob from the underlying cache.
328372
r.blobCacheMu.Lock()
329-
c, done, ok := r.blobCache.Get(name)
373+
c, done, ok := r.blobCache.Get(cacheKey)
330374
r.blobCacheMu.Unlock()
331375
if ok {
332-
if blob := c.(remote.Blob); blob.Check() == nil {
376+
blob := c.(remote.Blob)
377+
if err := blob.Check(); err == nil {
333378
return &blobRef{blob, done}, nil
334379
}
335380
// invalid blob. discard this.
336381
done()
337382
r.blobCacheMu.Lock()
338-
r.blobCache.Remove(name)
383+
r.blobCache.Remove(cacheKey)
339384
r.blobCacheMu.Unlock()
340385
}
341386

@@ -355,7 +400,7 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts,
355400
return nil, errors.Wrap(err, "failed to resolve the source")
356401
}
357402
r.blobCacheMu.Lock()
358-
cachedB, done, added := r.blobCache.Add(name, b)
403+
cachedB, done, added := r.blobCache.Add(cacheKey, b)
359404
r.blobCacheMu.Unlock()
360405
if !added {
361406
b.Close() // blob already exists in the cache. discard this.

store/manager.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ const (
4646
prepareSucceeded = "true"
4747
prepareFailed = "false"
4848

49-
defaultMaxConcurrency = 2
49+
defaultMaxConcurrency = 2
50+
defaultResolveTimeoutSec = 60
5051
)
5152

5253
func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHosts, metadataStore metadata.Store, cfg config.Config) (*LayerManager, error) {
@@ -71,6 +72,10 @@ func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHost
7172
if ns != nil {
7273
metrics.Register(ns)
7374
}
75+
resolveTimeout := time.Duration(cfg.ResolveTimeoutSec) * time.Second
76+
if resolveTimeout == 0 {
77+
resolveTimeout = time.Duration(defaultResolveTimeoutSec) * time.Second
78+
}
7479
return &LayerManager{
7580
refPool: refPool,
7681
hosts: hosts,
@@ -85,6 +90,7 @@ func NewLayerManager(ctx context.Context, root string, hosts source.RegistryHost
8590
resolveLock: new(namedmutex.NamedMutex),
8691
layer: make(map[string]map[string]layer.Layer),
8792
refcounter: make(map[string]map[string]int),
93+
resolveTimeout: resolveTimeout,
8894
}, nil
8995
}
9096

@@ -102,6 +108,7 @@ type LayerManager struct {
102108
disableVerification bool
103109
metricsController *layermetrics.Controller
104110
resolveLock *namedmutex.NamedMutex
111+
resolveTimeout time.Duration
105112

106113
layer map[string]map[string]layer.Layer
107114
refcounter map[string]map[string]int
@@ -220,8 +227,8 @@ func (r *LayerManager) getLayer(ctx context.Context, refspec reference.Spec, dgs
220227
case err := <-errChan:
221228
log.G(ctx).WithError(err).Debug("failed to resolve layer")
222229
return nil, errors.Wrapf(err, "failed to resolve layer")
223-
case <-time.After(30 * time.Second):
224-
log.G(ctx).Debug("failed to resolve layer (timeout)")
230+
case <-time.After(r.resolveTimeout):
231+
log.G(ctx).WithField("timeout(sec)", r.resolveTimeout.Seconds()).Debug("failed to resolve layer (timeout)")
225232
return nil, fmt.Errorf("failed to resolve layer (timeout)")
226233
}
227234

0 commit comments

Comments
 (0)