@@ -59,6 +59,8 @@ const (
59
59
defaultMaxLRUCacheEntry = 10
60
60
defaultMaxCacheFds = 10
61
61
defaultPrefetchTimeoutSec = 10
62
+ defaultResolveTimeoutSec = 60
63
+ defaultResolveRequestTimeoutSec = 30
62
64
memoryCacheType = "memory"
63
65
)
64
66
@@ -117,6 +119,8 @@ type Resolver struct {
117
119
rootDir string
118
120
resolver * remote.Resolver
119
121
prefetchTimeout time.Duration
122
+ resolveTimeout time.Duration
123
+ resolveRequestTimeout time.Duration
120
124
layerCache * cacheutil.TTLCache
121
125
layerCacheMu sync.Mutex
122
126
blobCache * cacheutil.TTLCache
@@ -137,6 +141,14 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,
137
141
if prefetchTimeout == 0 {
138
142
prefetchTimeout = defaultPrefetchTimeoutSec * time .Second
139
143
}
144
+ resolveTimeout := time .Duration (cfg .ResolveTimeoutSec ) * time .Second
145
+ if resolveTimeout == 0 {
146
+ resolveTimeout = time .Duration (defaultResolveTimeoutSec ) * time .Second
147
+ }
148
+ resolveRequestTimeout := time .Duration (cfg .ResolveRequestTimeoutSec ) * time .Second
149
+ if resolveRequestTimeout == 0 {
150
+ resolveRequestTimeout = time .Duration (defaultResolveRequestTimeoutSec ) * time .Second
151
+ }
140
152
141
153
// layerCache caches resolved layers for future use. This is useful in a use-case where
142
154
// the filesystem resolves and caches all layers in an image (not only queried one) in parallel,
@@ -171,6 +183,8 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager,
171
183
layerCache : layerCache ,
172
184
blobCache : blobCache ,
173
185
prefetchTimeout : prefetchTimeout ,
186
+ resolveTimeout : resolveTimeout ,
187
+ resolveRequestTimeout : resolveRequestTimeout ,
174
188
backgroundTaskManager : backgroundTaskManager ,
175
189
config : cfg ,
176
190
resolveLock : new (namedmutex.NamedMutex ),
@@ -236,6 +250,8 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
236
250
defer r .resolveLock .Unlock (name )
237
251
238
252
ctx = log .WithLogger (ctx , log .G (ctx ).WithField ("src" , name ))
253
+ ctx , cancel := context .WithTimeout (ctx , r .resolveTimeout )
254
+ defer cancel ()
239
255
240
256
// First, try to retrieve this layer from the underlying cache.
241
257
r .layerCacheMu .Lock ()
@@ -256,7 +272,7 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
256
272
log .G (ctx ).Debugf ("resolving" )
257
273
258
274
// Resolve the blob.
259
- blobR , err := r .resolveBlob (ctx , hosts , refspec , desc )
275
+ blobR , err := r .resolveBlob (ctx , hosts , refspec , desc , name )
260
276
if err != nil {
261
277
return nil , errors .Wrapf (err , "failed to resolve the blob" )
262
278
}
@@ -280,11 +296,51 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
280
296
// Each file's read operation is a prioritized task and all background tasks
281
297
// will be stopped during the execution so this can avoid being disturbed for
282
298
// NW traffic by background tasks.
299
+ var (
300
+ // Use context during resolving, to make it cancellable
301
+ curR = readerAtFunc (func (p []byte , offset int64 ) (n int , err error ) {
302
+ ctx , cancel := context .WithTimeout (ctx , r .resolveRequestTimeout )
303
+ defer cancel ()
304
+ return blobR .ReadAt (p , offset , remote .WithContext (ctx ))
305
+ })
306
+ curRMu sync.Mutex
307
+ )
283
308
sr := io .NewSectionReader (readerAtFunc (func (p []byte , offset int64 ) (n int , err error ) {
284
309
r .backgroundTaskManager .DoPrioritizedTask ()
285
310
defer r .backgroundTaskManager .DonePrioritizedTask ()
286
- return blobR .ReadAt (p , offset )
311
+ curRMu .Lock ()
312
+ br := curR
313
+ curRMu .Unlock ()
314
+ return br .ReadAt (p , offset )
287
315
}), 0 , blobR .Size ())
316
+ vr , err := r .newReader (sr , desc , fsCache , esgzOpts ... )
317
+ if err != nil {
318
+ if errors .Is (err , context .DeadlineExceeded ) {
319
+ r .blobCacheMu .Lock ()
320
+ r .blobCache .Remove (name )
321
+ r .blobCacheMu .Unlock ()
322
+ }
323
+ return nil , errors .Wrap (err , "failed to read layer" )
324
+ }
325
+ // do not propagate context after resolve is done
326
+ curRMu .Lock ()
327
+ curR = readerAtFunc (func (p []byte , offset int64 ) (n int , err error ) { return blobR .ReadAt (p , offset ) })
328
+ curRMu .Unlock ()
329
+
330
+ // Combine layer information together and cache it.
331
+ l := newLayer (r , desc , blobR , vr )
332
+ r .layerCacheMu .Lock ()
333
+ cachedL , done2 , added := r .layerCache .Add (name , l )
334
+ r .layerCacheMu .Unlock ()
335
+ if ! added {
336
+ l .close () // layer already exists in the cache. discrad this.
337
+ }
338
+
339
+ log .G (ctx ).Debugf ("resolved" )
340
+ return & layerRef {cachedL .(* layer ), done2 }, nil
341
+ }
342
+
343
+ func (r * Resolver ) newReader (sr * io.SectionReader , desc ocispec.Descriptor , fsCache cache.BlobCache , esgzOpts ... metadata.Option ) (* reader.VerifiableReader , error ) {
288
344
// define telemetry hooks to measure latency metrics inside estargz package
289
345
telemetry := metadata.Telemetry {
290
346
GetFooterLatency : func (start time.Time ) {
@@ -306,36 +362,24 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs
306
362
if err != nil {
307
363
return nil , errors .Wrap (err , "failed to read layer" )
308
364
}
309
-
310
- // Combine layer information together and cache it.
311
- l := newLayer (r , desc , blobR , vr )
312
- r .layerCacheMu .Lock ()
313
- cachedL , done2 , added := r .layerCache .Add (name , l )
314
- r .layerCacheMu .Unlock ()
315
- if ! added {
316
- l .close () // layer already exists in the cache. discrad this.
317
- }
318
-
319
- log .G (ctx ).Debugf ("resolved" )
320
- return & layerRef {cachedL .(* layer ), done2 }, nil
365
+ return vr , nil
321
366
}
322
367
323
368
// resolveBlob resolves a blob based on the passed layer blob information.
324
- func (r * Resolver ) resolveBlob (ctx context.Context , hosts source.RegistryHosts , refspec reference.Spec , desc ocispec.Descriptor ) (_ * blobRef , retErr error ) {
325
- name := refspec .String () + "/" + desc .Digest .String ()
326
-
369
+ func (r * Resolver ) resolveBlob (ctx context.Context , hosts source.RegistryHosts , refspec reference.Spec , desc ocispec.Descriptor , cacheKey string ) (_ * blobRef , retErr error ) {
327
370
// Try to retrieve the blob from the underlying cache.
328
371
r .blobCacheMu .Lock ()
329
- c , done , ok := r .blobCache .Get (name )
372
+ c , done , ok := r .blobCache .Get (cacheKey )
330
373
r .blobCacheMu .Unlock ()
331
374
if ok {
332
- if blob := c .(remote.Blob ); blob .Check () == nil {
375
+ blob := c .(remote.Blob )
376
+ if err := blob .Check (); err == nil {
333
377
return & blobRef {blob , done }, nil
334
378
}
335
379
// invalid blob. discard this.
336
380
done ()
337
381
r .blobCacheMu .Lock ()
338
- r .blobCache .Remove (name )
382
+ r .blobCache .Remove (cacheKey )
339
383
r .blobCacheMu .Unlock ()
340
384
}
341
385
@@ -355,7 +399,7 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts,
355
399
return nil , errors .Wrap (err , "failed to resolve the source" )
356
400
}
357
401
r .blobCacheMu .Lock ()
358
- cachedB , done , added := r .blobCache .Add (name , b )
402
+ cachedB , done , added := r .blobCache .Add (cacheKey , b )
359
403
r .blobCacheMu .Unlock ()
360
404
if ! added {
361
405
b .Close () // blob already exists in the cache. discard this.
0 commit comments