diff --git a/.github/workflows/oras-release.yml b/.github/workflows/oras-release.yml index 33de2ff48b4..94e2411c10a 100644 --- a/.github/workflows/oras-release.yml +++ b/.github/workflows/oras-release.yml @@ -4,7 +4,7 @@ on: push: tags: - "v[0-9]+.[0-9]+.[0-9]+-alpha" - - "v[0-9]+.[0-9]+.[0-9]+-rc" + - "v[0-9]+.[0-9]+.[0-9]+-rc.[0-9]+" jobs: publish: diff --git a/docs/extensions.md b/docs/extensions.md new file mode 100644 index 00000000000..966017e4dc7 --- /dev/null +++ b/docs/extensions.md @@ -0,0 +1,117 @@ +--- +description: High level discussion of extensions +keywords: registry, extension, handlers, repository, distribution, artifacts +title: Extensions +--- + +This document serves as a high level discussion of the implementation of the extensions framework defined in the [OCI Distribution spec](https://github.com/opencontainers/distribution-spec/tree/main/extensions). + +## Extension Interface + +The `Extension` interface is introduced in the new `extension` package. It defines methods to access the extension's namespace-specific attributes such as the Name, Url defining the extension namespace, and the Description of the namespace. It defines route enumeration at the Registry and Repository level. It also encases the `ExtendedStorage` interface which defines the methods requires to extend the underlying storage functionality of the registry. + +``` +type Extension interface { + storage.ExtendedStorage + // GetRepositoryRoutes returns a list of extension routes scoped at a repository level + GetRepositoryRoutes() []ExtensionRoute + // GetRegistryRoutes returns a list of extension routes scoped at a registry level + GetRegistryRoutes() []ExtensionRoute + // GetNamespaceName returns the name associated with the namespace + GetNamespaceName() string + // GetNamespaceUrl returns the url link to the documentation where the namespace's extension and endpoints are defined + GetNamespaceUrl() string + // GetNamespaceDescription returns the description associated with the namespace + GetNamespaceDescription() string +} +``` + +The `ExtendedStorage` interface defines methods that specify storage-specific handlers. Each extension will implement a handler extending the functionality. The interface can be expanded in the future to consider new handler types. +`GetManifestHandlers` is used to return new `ManifestHandlers` defined by each of the extensions. +`GetGarbageCollectionHandlers` is used to return `GCExtensionHandler` implemented by each extension. + +``` +type ExtendedStorage interface { + // GetManifestHandlers returns the list of manifest handlers that handle custom manifest formats supported by the extension + GetManifestHandlers( + repo Repository, + blobStore BlobStore) []ManifestHandler + // GetGarbageCollectHandlers returns the GCExtensionHandlers that handles custom garbage collection behavior for the extension. + GetGarbageCollectionHandlers() []GCExtensionHandler +} +``` + +The `GCExtensionHandler` interface defines three methods that are used in the garbage colection mark and sweep process. The `Mark` method is invoked for each `GCExtensionHandler` after the existing mark process finishes in `MarkAndSweep`. It is used to determine if the manifest and blobs should have their temporary ref count incremented in the case of an artifact manifest, or if the manifest and it's referrers should be recursively indexed for deletion in the case of a non-artifact manifest. `OnManifestDelete` is invoked to extend the `RemoveManifest` functionality for the `Vacuum`. New or special-cased manifests may require custom manifest deletion which can be defined with this method. `SweepBlobs` is used to add artifact manifest/blobs to the original `markSet`. These blobs are retained after determining their ref count is still positive. + +``` +type GCExtensionHandler interface { + Mark(ctx context.Context, + repository distribution.Repository, + storageDriver driver.StorageDriver, + registry distribution.Namespace, + manifest distribution.Manifest, + manifestDigest digest.Digest, + dryRun bool, + removeUntagged bool) (bool, error) + OnManifestDelete(ctx context.Context, + storageDriver driver.StorageDriver, + registry distribution.Namespace, + dgst digest.Digest, + repositoryName string) error + SweepBlobs(ctx context.Context, + markSet map[digest.Digest]struct{}) map[digest.Digest]struct{} +} +``` + +## Registering Extensions + +Extensions are defined in the configuration yaml. + +### Sample Extension Configuration YAML +``` +# Configuration for extensions. It follows the below schema +# extensions +# namespace: +# configuration for the extension and its components in any schema specific to that namespace +extensions: + oci: + ext: + - discover # enable the discovery extension +``` + +Each `Extension` defined must call the `RegisterExtension` method to register an extension initialization function with the extension namespace name. The registered extension list is then used during configuration parsing to get and initialize the specified extension. (`GetExtension`) + +``` +// InitExtension is the initialize function for creating the extension namespace +type InitExtension func(ctx context.Context, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (Extension, error) + +// RegisterExtension is used to register an InitExtension for +// an extension with the given name. +func RegisterExtension(name string, initFunc InitExtension) + +// GetExtension constructs an extension with the given options using the given name. +func GetExtension(ctx context.Context, name string, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (Extension, error) +``` + +Each `Extension` defines an `ExtensionRoute` which contains the new `//` route attributes. Furthermore, the route `Descriptor` and `Dispatcher` are used to register the new route to the application. + +``` +type ExtensionRoute struct { + // Namespace is the name of the extension namespace + Namespace string + // Extension is the name of the extension under the namespace + Extension string + // Component is the name of the component under the extension + Component string + // Descriptor is the route descriptor that gives its path + Descriptor v2.RouteDescriptor + // Dispatcher if present signifies that the route is http route with a dispatcher + Dispatcher RouteDispatchFunc +} + +// RouteDispatchFunc is the http route dispatcher used by the extension route handlers +type RouteDispatchFunc func(extContext *ExtensionContext, r *http.Request) http.Handler +``` + + + diff --git a/docs/garbage-collection.md b/docs/garbage-collection.md index 928fab9aee8..4196b09f203 100644 --- a/docs/garbage-collection.md +++ b/docs/garbage-collection.md @@ -121,4 +121,4 @@ blob eligible for deletion: sha256:7e15ce58ccb2181a8fced7709e9893206f0937cc9543b blob eligible for deletion: sha256:87192bdbe00f8f2a62527f36bb4c7c7f4eaf9307e4b87e8334fb6abec1765bcb blob eligible for deletion: sha256:b549a9959a664038fc35c155a95742cf12297672ca0ae35735ec027d55bf4e97 blob eligible for deletion: sha256:f251d679a7c61455f06d793e43c06786d7766c88b8c24edf242b2c08e3c3f599 -``` +``` \ No newline at end of file diff --git a/registry/extension/distribution/manifests.go b/registry/extension/distribution/manifests.go index 51a52e8171f..d1974e1c83d 100644 --- a/registry/extension/distribution/manifests.go +++ b/registry/extension/distribution/manifests.go @@ -19,7 +19,7 @@ type manifestsGetAPIResponse struct { // manifestHandler handles requests for manifests under a manifest name. type manifestHandler struct { - *extension.Context + *extension.ExtensionContext storageDriver driver.StorageDriver } diff --git a/registry/extension/distribution/registry.go b/registry/extension/distribution/registry.go index d65b89ec2d2..0f41390c1df 100644 --- a/registry/extension/distribution/registry.go +++ b/registry/extension/distribution/registry.go @@ -34,7 +34,7 @@ type distributionOptions struct { } // newDistNamespace creates a new extension namespace with the name "distribution" -func newDistNamespace(ctx context.Context, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (extension.Namespace, error) { +func newDistNamespace(ctx context.Context, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (extension.Extension, error) { optionsYaml, err := yaml.Marshal(options) if err != nil { @@ -67,7 +67,7 @@ func newDistNamespace(ctx context.Context, storageDriver driver.StorageDriver, o func init() { // register the extension namespace. - extension.Register(namespaceName, newDistNamespace) + extension.RegisterExtension(namespaceName, newDistNamespace) } // GetManifestHandlers returns a list of manifest handlers that will be registered in the manifest store. @@ -76,12 +76,17 @@ func (o *distributionNamespace) GetManifestHandlers(repo distribution.Repository return []storage.ManifestHandler{} } +func (o *distributionNamespace) GetGarbageCollectionHandlers() []storage.GCExtensionHandler { + // This extension doesn't extend any garbage collection operations. + return []storage.GCExtensionHandler{} +} + // GetRepositoryRoutes returns a list of extension routes scoped at a repository level -func (d *distributionNamespace) GetRepositoryRoutes() []extension.Route { - var routes []extension.Route +func (d *distributionNamespace) GetRepositoryRoutes() []extension.ExtensionRoute { + var routes []extension.ExtensionRoute if d.manifestsEnabled { - routes = append(routes, extension.Route{ + routes = append(routes, extension.ExtensionRoute{ Namespace: namespaceName, Extension: extensionName, Component: manifestsComponentName, @@ -100,7 +105,7 @@ func (d *distributionNamespace) GetRepositoryRoutes() []extension.Route { } if d.tagHistoryEnabled { - routes = append(routes, extension.Route{ + routes = append(routes, extension.ExtensionRoute{ Namespace: namespaceName, Extension: extensionName, Component: tagHistoryComponentName, @@ -134,7 +139,7 @@ func (d *distributionNamespace) GetRepositoryRoutes() []extension.Route { // GetRegistryRoutes returns a list of extension routes scoped at a registry level // There are no registry scoped routes exposed by this namespace -func (d *distributionNamespace) GetRegistryRoutes() []extension.Route { +func (d *distributionNamespace) GetRegistryRoutes() []extension.ExtensionRoute { return nil } @@ -153,10 +158,10 @@ func (d *distributionNamespace) GetNamespaceDescription() string { return namespaceDescription } -func (d *distributionNamespace) tagHistoryDispatcher(ctx *extension.Context, r *http.Request) http.Handler { +func (d *distributionNamespace) tagHistoryDispatcher(ctx *extension.ExtensionContext, r *http.Request) http.Handler { tagHistoryHandler := &tagHistoryHandler{ - Context: ctx, - storageDriver: d.storageDriver, + ExtensionContext: ctx, + storageDriver: d.storageDriver, } return handlers.MethodHandler{ @@ -164,10 +169,10 @@ func (d *distributionNamespace) tagHistoryDispatcher(ctx *extension.Context, r * } } -func (d *distributionNamespace) manifestsDispatcher(ctx *extension.Context, r *http.Request) http.Handler { +func (d *distributionNamespace) manifestsDispatcher(ctx *extension.ExtensionContext, r *http.Request) http.Handler { manifestsHandler := &manifestHandler{ - Context: ctx, - storageDriver: d.storageDriver, + ExtensionContext: ctx, + storageDriver: d.storageDriver, } return handlers.MethodHandler{ diff --git a/registry/extension/distribution/taghistory.go b/registry/extension/distribution/taghistory.go index 9cb957b87d8..09c24bb6b9c 100644 --- a/registry/extension/distribution/taghistory.go +++ b/registry/extension/distribution/taghistory.go @@ -20,7 +20,7 @@ type tagHistoryAPIResponse struct { // manifestHandler handles requests for manifests under a manifest name. type tagHistoryHandler struct { - *extension.Context + *extension.ExtensionContext storageDriver driver.StorageDriver } diff --git a/registry/extension/extension.go b/registry/extension/extension.go index 4815599ba7f..a589b5bea26 100644 --- a/registry/extension/extension.go +++ b/registry/extension/extension.go @@ -1,7 +1,7 @@ package extension import ( - c "context" + "context" "fmt" "net/http" @@ -13,9 +13,9 @@ import ( "github.com/distribution/distribution/v3/registry/storage/driver" ) -// Context contains the request specific context for use in across handlers. -type Context struct { - c.Context +// ExtensionContext contains the request specific context for use in across handlers. +type ExtensionContext struct { + context.Context // Registry is the base namespace that is used by all extension namespaces Registry distribution.Namespace @@ -26,10 +26,10 @@ type Context struct { } // RouteDispatchFunc is the http route dispatcher used by the extension route handlers -type RouteDispatchFunc func(extContext *Context, r *http.Request) http.Handler +type RouteDispatchFunc func(extContext *ExtensionContext, r *http.Request) http.Handler -// Route describes an extension route. -type Route struct { +// ExtensionRoute describes an extension route. +type ExtensionRoute struct { // Namespace is the name of the extension namespace Namespace string // Extension is the name of the extension under the namespace @@ -42,13 +42,14 @@ type Route struct { Dispatcher RouteDispatchFunc } -// Namespace is the namespace that is used to define extensions to the distribution. -type Namespace interface { +// Extension is the interface that is used to define extensions to the distribution. +type Extension interface { storage.ExtendedStorage + // ExtensionService // GetRepositoryRoutes returns a list of extension routes scoped at a repository level - GetRepositoryRoutes() []Route + GetRepositoryRoutes() []ExtensionRoute // GetRegistryRoutes returns a list of extension routes scoped at a registry level - GetRegistryRoutes() []Route + GetRegistryRoutes() []ExtensionRoute // GetNamespaceName returns the name associated with the namespace GetNamespaceName() string // GetNamespaceUrl returns the url link to the documentation where the namespace's extension and endpoints are defined @@ -57,8 +58,8 @@ type Namespace interface { GetNamespaceDescription() string } -// InitExtensionNamespace is the initialize function for creating the extension namespace -type InitExtensionNamespace func(ctx c.Context, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (Namespace, error) +// InitExtension is the initialize function for creating the extension namespace +type InitExtension func(ctx context.Context, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (Extension, error) // EnumerateExtension specifies extension information at the namespace level type EnumerateExtension struct { @@ -68,10 +69,10 @@ type EnumerateExtension struct { Endpoints []string `json:"endpoints"` } -var extensions map[string]InitExtensionNamespace -var extensionsNamespaces map[string]Namespace +var extensions map[string]InitExtension +var extensionsNamespaces map[string]Extension -func EnumerateRegistered(ctx Context) (enumeratedExtensions []EnumerateExtension) { +func EnumerateRegistered(ctx ExtensionContext) (enumeratedExtensions []EnumerateExtension) { for _, namespace := range extensionsNamespaces { enumerateExtension := EnumerateExtension{ Name: fmt.Sprintf("_%s", namespace.GetNamespaceName()), @@ -102,11 +103,11 @@ func EnumerateRegistered(ctx Context) (enumeratedExtensions []EnumerateExtension return enumeratedExtensions } -// Register is used to register an InitExtensionNamespace for -// an extension namespace with the given name. -func Register(name string, initFunc InitExtensionNamespace) { +// RegisterExtension is used to register an InitExtension for +// an extension with the given name. +func RegisterExtension(name string, initFunc InitExtension) { if extensions == nil { - extensions = make(map[string]InitExtensionNamespace) + extensions = make(map[string]InitExtension) } if _, exists := extensions[name]; exists { @@ -116,11 +117,11 @@ func Register(name string, initFunc InitExtensionNamespace) { extensions[name] = initFunc } -// Get constructs an extension namespace with the given options using the given name. -func Get(ctx c.Context, name string, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (Namespace, error) { +// GetExtension constructs an extension with the given options using the given name. +func GetExtension(ctx context.Context, name string, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (Extension, error) { if extensions != nil { if extensionsNamespaces == nil { - extensionsNamespaces = make(map[string]Namespace) + extensionsNamespaces = make(map[string]Extension) } if initFunc, exists := extensions[name]; exists { diff --git a/registry/extension/oci/discover.go b/registry/extension/oci/discover.go index 8fca93edb39..ff33893aef0 100644 --- a/registry/extension/oci/discover.go +++ b/registry/extension/oci/discover.go @@ -16,7 +16,7 @@ type discoverGetAPIResponse struct { // extensionHandler handles requests for manifests under a manifest name. type extensionHandler struct { - *extension.Context + *extension.ExtensionContext storageDriver driver.StorageDriver } @@ -26,7 +26,7 @@ func (eh *extensionHandler) getExtensions(w http.ResponseWriter, r *http.Request w.Header().Set("Content-Type", "application/json") // get list of extension information seperated at the namespace level - enumeratedExtensions := extension.EnumerateRegistered(*eh.Context) + enumeratedExtensions := extension.EnumerateRegistered(*eh.ExtensionContext) // remove the oci extension so it's not returned by discover ociExtensionName := fmt.Sprintf("_%s", namespaceName) diff --git a/registry/extension/oci/oci.go b/registry/extension/oci/oci.go index 7c9ca183f0d..3f79e29e6c7 100644 --- a/registry/extension/oci/oci.go +++ b/registry/extension/oci/oci.go @@ -32,7 +32,7 @@ type ociOptions struct { } // newOciNamespace creates a new extension namespace with the name "oci" -func newOciNamespace(ctx context.Context, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (extension.Namespace, error) { +func newOciNamespace(ctx context.Context, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (extension.Extension, error) { optionsYaml, err := yaml.Marshal(options) if err != nil { return nil, err @@ -60,7 +60,7 @@ func newOciNamespace(ctx context.Context, storageDriver driver.StorageDriver, op func init() { // register the extension namespace. - extension.Register(namespaceName, newOciNamespace) + extension.RegisterExtension(namespaceName, newOciNamespace) } // GetManifestHandlers returns a list of manifest handlers that will be registered in the manifest store. @@ -69,12 +69,17 @@ func (o *ociNamespace) GetManifestHandlers(repo distribution.Repository, blobSto return []storage.ManifestHandler{} } +func (o *ociNamespace) GetGarbageCollectionHandlers() []storage.GCExtensionHandler { + // This extension doesn't extend any garbage collection operations. + return []storage.GCExtensionHandler{} +} + // GetRepositoryRoutes returns a list of extension routes scoped at a repository level -func (o *ociNamespace) GetRepositoryRoutes() []extension.Route { - var routes []extension.Route +func (o *ociNamespace) GetRepositoryRoutes() []extension.ExtensionRoute { + var routes []extension.ExtensionRoute if o.discoverEnabled { - routes = append(routes, extension.Route{ + routes = append(routes, extension.ExtensionRoute{ Namespace: namespaceName, Extension: extensionName, Component: discoverComponentName, @@ -96,11 +101,11 @@ func (o *ociNamespace) GetRepositoryRoutes() []extension.Route { } // GetRegistryRoutes returns a list of extension routes scoped at a registry level -func (o *ociNamespace) GetRegistryRoutes() []extension.Route { - var routes []extension.Route +func (o *ociNamespace) GetRegistryRoutes() []extension.ExtensionRoute { + var routes []extension.ExtensionRoute if o.discoverEnabled { - routes = append(routes, extension.Route{ + routes = append(routes, extension.ExtensionRoute{ Namespace: namespaceName, Extension: extensionName, Component: discoverComponentName, @@ -136,10 +141,10 @@ func (o *ociNamespace) GetNamespaceDescription() string { return namespaceDescription } -func (o *ociNamespace) discoverDispatcher(ctx *extension.Context, r *http.Request) http.Handler { +func (o *ociNamespace) discoverDispatcher(ctx *extension.ExtensionContext, r *http.Request) http.Handler { extensionHandler := &extensionHandler{ - Context: ctx, - storageDriver: o.storageDriver, + ExtensionContext: ctx, + storageDriver: o.storageDriver, } return handlers.MethodHandler{ diff --git a/registry/extension/oras/artifactgarbagecollectionhandler.go b/registry/extension/oras/artifactgarbagecollectionhandler.go new file mode 100644 index 00000000000..dfeff97ba17 --- /dev/null +++ b/registry/extension/oras/artifactgarbagecollectionhandler.go @@ -0,0 +1,193 @@ +package oras + +import ( + "context" + "fmt" + "path" + + "github.com/distribution/distribution/v3" + dcontext "github.com/distribution/distribution/v3/context" + "github.com/distribution/distribution/v3/reference" + "github.com/distribution/distribution/v3/registry/storage/driver" + "github.com/opencontainers/go-digest" + artifactv1 "github.com/oras-project/artifacts-spec/specs-go/v1" +) + +type orasGCHandler struct { + artifactManifestIndex map[digest.Digest][]digest.Digest + artifactMarkSet map[digest.Digest]int +} + +func (gc *orasGCHandler) Mark(ctx context.Context, + repository distribution.Repository, + storageDriver driver.StorageDriver, + registry distribution.Namespace, + manifest distribution.Manifest, + dgst digest.Digest, + dryRun bool, + removeUntagged bool) (bool, error) { + blobStatter := registry.BlobStatter() + mediaType, _, err := manifest.Payload() + if err != nil { + return false, err + } + referrerRootPath := referrersLinkPath(repository.Named().Name()) + rootPath := path.Join(referrerRootPath, dgst.Algorithm().String(), dgst.Hex()) + + if mediaType == artifactv1.MediaTypeArtifactManifest { + // if the manifest passed is an artifact -> mark the manifest and blobs for now + fmt.Printf("%s: incrementing artifact manifest ref count %s\n", repository.Named().Name(), dgst.String()) + gc.artifactMarkSet[dgst] += 1 + + // mark the artifact blobs + descriptors := manifest.References() + for _, descriptor := range descriptors { + gc.artifactMarkSet[descriptor.Digest] += 1 + fmt.Printf("%s: incrementing artifact blob ref count %s\n", repository.Named().Name(), descriptor.Digest) + } + return false, nil + } else { + // TODO: Add support for untagged root artifact manifest (no subject specified) + // Subsequently, extend GC support for sweeping all referrers to untagged root artifact manifest + + // if the manifest passed isn't an an artifact -> call the sweep ingestor + // find all artifacts linked to manifest and add to artifactManifestIndex for subsequent deletion + gc.artifactManifestIndex[dgst] = make([]digest.Digest, 0) + err := enumerateReferrerLinks(ctx, + rootPath, + storageDriver, + repository, + blobStatter, + dgst, + gc.artifactManifestIndex, + artifactSweepIngestor) + + if err != nil { + switch err.(type) { + case driver.PathNotFoundError: + return true, nil + } + return true, err + } + return true, nil + } +} + +func (gc *orasGCHandler) OnManifestDelete(ctx context.Context, storageDriver driver.StorageDriver, registry distribution.Namespace, dgst digest.Digest, repositoryName string) error { + referrerRootPath := referrersLinkPath(repositoryName) + fullArtifactManifestPath := path.Join(referrerRootPath, dgst.Algorithm().String(), dgst.Hex()) + dcontext.GetLogger(ctx).Infof("deleting manifest ref folder: %s", fullArtifactManifestPath) + err := storageDriver.Delete(ctx, fullArtifactManifestPath) + if err != nil { + if _, ok := err.(driver.PathNotFoundError); !ok { + return err + } + } + + subjectLinkedArtifacts, ok := gc.artifactManifestIndex[dgst] + if ok { + for _, artifactDigest := range subjectLinkedArtifacts { + // get the artifact manifest + named, err := reference.WithName(repositoryName) + if err != nil { + return fmt.Errorf("failed to parse repo name %s: %v", repositoryName, err) + } + repository, err := registry.Repository(ctx, named) + if err != nil { + return fmt.Errorf("failed to construct repository: %v", err) + } + + manifestService, err := repository.Manifests(ctx) + if err != nil { + return fmt.Errorf("failed to construct manifest service: %v", err) + } + artifactManifest, err := manifestService.Get(ctx, artifactDigest) + if err != nil { + return fmt.Errorf("failed to get artifact manifest: %v", err) + } + + // extract the reference + blobs := artifactManifest.References() + + // decrement refcount for the blobs' digests and the manifest digest + gc.artifactMarkSet[artifactDigest] -= 1 + fmt.Printf("%s: decrementing artifact manifest ref count %s\n", repositoryName, dgst) + for _, descriptor := range blobs { + gc.artifactMarkSet[descriptor.Digest] -= 1 + fmt.Printf("%s: decrementing artifact blob ref count %s\n", repositoryName, descriptor.Digest) + } + // delete each artifact manifest's revision + manifestPath := referrersRepositoriesManifestRevisionPath(repositoryName, artifactDigest) + dcontext.GetLogger(ctx).Infof("deleting artifact manifest revision: %s", manifestPath) + err = storageDriver.Delete(ctx, manifestPath) + if err != nil { + if _, ok := err.(driver.PathNotFoundError); !ok { + return err + } + } + // delete each artifact manifest's ref folder + fullArtifactManifestPath = path.Join(referrerRootPath, artifactDigest.Algorithm().String(), artifactDigest.Hex()) + dcontext.GetLogger(ctx).Infof("deleting artifact manifest ref folder: %s", fullArtifactManifestPath) + err = storageDriver.Delete(ctx, fullArtifactManifestPath) + if err != nil { + if _, ok := err.(driver.PathNotFoundError); !ok { + return err + } + } + } + } + + return nil +} + +func (gc *orasGCHandler) SweepBlobs(ctx context.Context, markSet map[digest.Digest]struct{}) map[digest.Digest]struct{} { + for key, refCount := range gc.artifactMarkSet { + if refCount > 0 { + markSet[key] = struct{}{} + } + } + return markSet +} + +// ingestor method used in EnumerateReferrerLinks +// indexes each artifact manifest and adds ArtifactManifestDel struct to index +func artifactSweepIngestor(ctx context.Context, + referrerRevision digest.Digest, + subjectRevision digest.Digest, + artifactManifestIndex map[digest.Digest][]digest.Digest, + repository distribution.Repository, + blobstatter distribution.BlobStatter, + storageDriver driver.StorageDriver) error { + repoName := repository.Named().Name() + // index the manifest + fmt.Printf("%s: indexing artifact manifest %s\n", repoName, referrerRevision.String()) + // if artifact is tagged, we don't add artifact and descendants to artifact manifest index + tags, err := repository.Tags(ctx).Lookup(ctx, distribution.Descriptor{Digest: referrerRevision}) + if err != nil { + return fmt.Errorf("failed to retrieve tags for artifact digest %v: %v", referrerRevision, err) + } + if len(tags) > 0 { + return nil + } + artifactManifestIndex[subjectRevision] = append(artifactManifestIndex[subjectRevision], referrerRevision) + + referrerRootPath := referrersLinkPath(repoName) + + rootPath := path.Join(referrerRootPath, referrerRevision.Algorithm().String(), referrerRevision.Hex()) + _, err = storageDriver.Stat(ctx, rootPath) + if err != nil { + switch err.(type) { + case driver.PathNotFoundError: + return nil + } + return err + } + return enumerateReferrerLinks(ctx, + rootPath, + storageDriver, + repository, + blobstatter, + subjectRevision, + artifactManifestIndex, + artifactSweepIngestor) +} diff --git a/registry/extension/oras/artifactgarbagecollectionhandler_test.go b/registry/extension/oras/artifactgarbagecollectionhandler_test.go new file mode 100644 index 00000000000..aaca7bffbe4 --- /dev/null +++ b/registry/extension/oras/artifactgarbagecollectionhandler_test.go @@ -0,0 +1,166 @@ +package oras + +import ( + "context" + "encoding/json" + "testing" + + "github.com/distribution/distribution/v3" + "github.com/distribution/distribution/v3/manifest" + "github.com/distribution/distribution/v3/manifest/schema2" + "github.com/distribution/distribution/v3/registry/storage" + "github.com/distribution/distribution/v3/registry/storage/driver/inmemory" + "github.com/opencontainers/go-digest" + orasartifacts "github.com/oras-project/artifacts-spec/specs-go/v1" +) + +func allManifests(t *testing.T, manifestService distribution.ManifestService) map[digest.Digest]struct{} { + ctx := context.Background() + allManMap := make(map[digest.Digest]struct{}) + manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator) + if !ok { + t.Fatalf("unable to convert ManifestService into ManifestEnumerator") + } + err := manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error { + allManMap[dgst] = struct{}{} + return nil + }) + if err != nil { + t.Fatalf("Error getting all manifests: %v", err) + } + return allManMap +} + +func allBlobs(t *testing.T, registry distribution.Namespace) map[digest.Digest]struct{} { + ctx := context.Background() + blobService := registry.Blobs() + allBlobsMap := make(map[digest.Digest]struct{}) + err := blobService.Enumerate(ctx, func(dgst digest.Digest) error { + allBlobsMap[dgst] = struct{}{} + return nil + }) + if err != nil { + t.Fatalf("Error getting all blobs: %v", err) + } + return allBlobsMap +} + +func TestReferrersBlobsDeleted(t *testing.T) { + ctx := context.Background() + inmemoryDriver := inmemory.New() + registry, orasExtension := createRegistry(t, inmemoryDriver) + repo := makeRepository(t, registry, "test") + manifestService := makeManifestService(t, repo) + tagService := repo.Tags(ctx) + + artifactBlob, err := repo.Blobs(ctx).Put(ctx, orasartifacts.MediaTypeDescriptor, nil) + if err != nil { + t.Fatal(err) + } + + config, err := repo.Blobs(ctx).Put(ctx, schema2.MediaTypeImageConfig, nil) + if err != nil { + t.Fatal(err) + } + + layer, err := repo.Blobs(ctx).Put(ctx, schema2.MediaTypeLayer, nil) + if err != nil { + t.Fatal(err) + } + + subjectManifest := schema2.Manifest{ + Versioned: manifest.Versioned{ + SchemaVersion: 2, + MediaType: schema2.MediaTypeManifest, + }, + Config: config, + Layers: []distribution.Descriptor{ + layer, + }, + } + + dm, err := schema2.FromStruct(subjectManifest) + if err != nil { + t.Fatalf("failed to marshal subject manifest: %v", err) + } + _, dmPayload, err := dm.Payload() + if err != nil { + t.Fatalf("failed to get subject manifest payload: %v", err) + } + + dg, err := manifestService.Put(ctx, dm) + if err != nil { + t.Fatalf("failed to put subject manifest with err: %v", err) + } + + artifactBlobDescriptor := orasartifacts.Descriptor{ + MediaType: artifactBlob.MediaType, + Digest: artifactBlob.Digest, + Size: artifactBlob.Size, + } + + artifactManifest := orasartifacts.Manifest{ + MediaType: orasartifacts.MediaTypeArtifactManifest, + ArtifactType: "test_artifactType", + Blobs: []orasartifacts.Descriptor{ + artifactBlobDescriptor, + }, + Subject: &orasartifacts.Descriptor{ + MediaType: schema2.MediaTypeManifest, + Size: int64(len(dmPayload)), + Digest: dg, + }, + } + + marshalledMan, err := json.Marshal(artifactManifest) + if err != nil { + t.Fatalf("artifact manifest could not be serialized to byte array: %v", err) + } + // upload manifest + artifactManifestDigest, err := manifestService.Put(ctx, &DeserializedManifest{ + Manifest: Manifest{ + inner: artifactManifest, + }, + raw: marshalledMan, + }) + if err != nil { + t.Fatalf("artifact manifest upload failed: %v", err) + } + + // the tags folder doesn't exist for this repo until a tag is added + // this leads to an error in Mark and Sweep if tags folder not found + err = tagService.Tag(ctx, "test", distribution.Descriptor{Digest: dg}) + if err != nil { + t.Fatalf("failed to tag subject image: %v", err) + } + err = tagService.Untag(ctx, "test") + if err != nil { + t.Fatalf("failed to untag subject image: %v", err) + } + + // Run GC + err = storage.MarkAndSweep(ctx, inmemoryDriver, registry, storage.GCOpts{ + DryRun: false, + RemoveUntagged: true, + GCExtensionHandlers: orasExtension.GetGarbageCollectionHandlers(), + }) + if err != nil { + t.Fatalf("Failed mark and sweep: %v", err) + } + + manifests := allManifests(t, manifestService) + blobs := allBlobs(t, registry) + + if _, exists := manifests[artifactManifestDigest]; exists { + t.Fatalf("artifact manifest with digest %s should have been deleted", artifactManifestDigest.String()) + } + + if _, exists := blobs[artifactManifestDigest]; exists { + t.Fatalf("artifact manifest blob with digest %s should have been deleted", artifactManifestDigest.String()) + } + + blobDigest := artifactManifest.Blobs[0].Digest + if _, exists := blobs[blobDigest]; exists { + t.Fatalf("artifact blob with digest %s should have been deleted", blobDigest) + } +} diff --git a/registry/extension/oras/artifactmanifesthandler.go b/registry/extension/oras/artifactmanifesthandler.go index 1188b10e143..27119c340a6 100644 --- a/registry/extension/oras/artifactmanifesthandler.go +++ b/registry/extension/oras/artifactmanifesthandler.go @@ -20,6 +20,8 @@ var ( errInvalidCreatedAnnotation = errors.New("failed to parse created time") ) +const rootPath = "/docker/registry/v2" + // artifactManifestHandler is a ManifestHandler that covers ORAS Artifacts. type artifactManifestHandler struct { repository distribution.Repository @@ -156,6 +158,18 @@ func (amh *artifactManifestHandler) indexReferrers(ctx context.Context, dm Deser return nil } +// TODO: Should be removed and paths package used +func referrersRepositoriesRootPath(name string) string { + return path.Join(rootPath, "repositories", name) +} + +// TODO: Should be removed and paths package used +func referrersRepositoriesManifestRevisionPath(name string, dgst digest.Digest) string { + return path.Join(referrersRepositoriesRootPath(name), "_manifests", "revisions", dgst.Algorithm().String(), dgst.Hex()) +} + +// TODO: Should be removed and defined instead in paths package +// Requires paths package to be exported func referrersLinkPath(name string) string { - return path.Join("/docker/registry/", "v2", "repositories", name, "_refs", "subjects") + return path.Join(referrersRepositoriesRootPath(name), "_refs", "subjects") } diff --git a/registry/extension/oras/artifactmanifesthandler_test.go b/registry/extension/oras/artifactmanifesthandler_test.go index bcb43cd53a5..4f286a7057b 100644 --- a/registry/extension/oras/artifactmanifesthandler_test.go +++ b/registry/extension/oras/artifactmanifesthandler_test.go @@ -17,22 +17,22 @@ import ( orasartifacts "github.com/oras-project/artifacts-spec/specs-go/v1" ) -func createRegistry(t *testing.T, driver driver.StorageDriver, options ...storage.RegistryOption) distribution.Namespace { +func createRegistry(t *testing.T, driver driver.StorageDriver, options ...storage.RegistryOption) (distribution.Namespace, extension.Extension) { ctx := context.Background() options = append([]storage.RegistryOption{storage.EnableDelete}, options...) extensionConfig := OrasOptions{ ArtifactsExtComponents: []string{"referrers"}, } - ns, err := extension.Get(ctx, "oras", driver, extensionConfig) + ns, err := extension.GetExtension(ctx, "oras", driver, extensionConfig) if err != nil { t.Fatalf("unable to configure extension namespace (%s): %s", "oras", err) } - options = append(options, storage.AddExtendedStorage(ns)) + options = append(options, storage.AddExtendedNamespace(ns)) registry, err := storage.NewRegistry(ctx, driver, options...) if err != nil { t.Fatalf("failed to construct namespace") } - return registry + return registry, ns } func makeRepository(t *testing.T, registry distribution.Namespace, name string) distribution.Repository { @@ -62,7 +62,7 @@ func makeManifestService(t *testing.T, repository distribution.Repository) distr func TestVerifyArtifactManifestPut(t *testing.T) { ctx := context.Background() inmemoryDriver := inmemory.New() - registry := createRegistry(t, inmemoryDriver) + registry, _ := createRegistry(t, inmemoryDriver) repo := makeRepository(t, registry, "test") manifestService := makeManifestService(t, repo) diff --git a/registry/extension/oras/artifactservice.go b/registry/extension/oras/artifactservice.go index bdc690067f4..cf4ede0922a 100644 --- a/registry/extension/oras/artifactservice.go +++ b/registry/extension/oras/artifactservice.go @@ -21,7 +21,7 @@ type ArtifactService interface { // referrersHandler handles http operations on manifest referrers. type referrersHandler struct { - extContext *extension.Context + extContext *extension.ExtensionContext storageDriver driver.StorageDriver // Digest is the target manifest's digest. @@ -51,49 +51,62 @@ func (h *referrersHandler) Referrers(ctx context.Context, revision digest.Digest blobStatter := h.extContext.Registry.BlobStatter() rootPath := path.Join(referrersLinkPath(repo.Named().Name()), revision.Algorithm().String(), revision.Hex()) - err = h.enumerateReferrerLinks(ctx, rootPath, func(referrerRevision digest.Digest) error { - man, err := manifests.Get(ctx, referrerRevision) - if err != nil { - return err - } - - ArtifactMan, ok := man.(*DeserializedManifest) - if !ok { - // The PUT handler would guard against this situation. Skip this manifest. - return nil - } - - extractedArtifactType := ArtifactMan.ArtifactType() - - // filtering by artifact type or bypass if no artifact type specified - if artifactType == "" || extractedArtifactType == artifactType { - desc, err := blobStatter.Stat(ctx, referrerRevision) + err = enumerateReferrerLinks(ctx, + rootPath, + h.storageDriver, + repo, + blobStatter, + revision, + map[digest.Digest][]digest.Digest{}, + func(ctx context.Context, + referrerRevision digest.Digest, + subjectRevision digest.Digest, + artifactManifestIndex map[digest.Digest][]digest.Digest, + repository distribution.Repository, + blobstatter distribution.BlobStatter, + storageDriver driver.StorageDriver) error { + man, err := manifests.Get(ctx, referrerRevision) if err != nil { return err } - desc.MediaType, _, _ = man.Payload() - artifactDesc := artifactv1.Descriptor{ - MediaType: desc.MediaType, - Size: desc.Size, - Digest: desc.Digest, - ArtifactType: extractedArtifactType, + + artifactManifest, ok := man.(*DeserializedManifest) + if !ok { + // The PUT handler would guard against this situation. Skip this manifest. + return nil } - if annotation, ok := ArtifactMan.Annotations()[createAnnotationName]; !ok { - referrersUnsorted = append(referrersUnsorted, artifactDesc) - } else { - extractedTimestamp, err := time.Parse(createAnnotationTimestampFormat, annotation) + extractedArtifactType := artifactManifest.ArtifactType() + + // filtering by artifact type or bypass if no artifact type specified + if artifactType == "" || extractedArtifactType == artifactType { + desc, err := blobStatter.Stat(ctx, referrerRevision) if err != nil { - return fmt.Errorf("failed to parse created annotation timestamp: %v", err) + return err + } + desc.MediaType, _, _ = man.Payload() + artifactDesc := artifactv1.Descriptor{ + MediaType: desc.MediaType, + Size: desc.Size, + Digest: desc.Digest, + ArtifactType: extractedArtifactType, + } + + if annotation, ok := artifactManifest.Annotations()[createAnnotationName]; !ok { + referrersUnsorted = append(referrersUnsorted, artifactDesc) + } else { + extractedTimestamp, err := time.Parse(createAnnotationTimestampFormat, annotation) + if err != nil { + return fmt.Errorf("failed to parse created annotation timestamp: %v", err) + } + referrersWrappers = append(referrersWrappers, referrersSortedWrapper{ + createdAt: extractedTimestamp, + descriptor: artifactDesc, + }) } - referrersWrappers = append(referrersWrappers, referrersSortedWrapper{ - createdAt: extractedTimestamp, - descriptor: artifactDesc, - }) } - } - return nil - }) + return nil + }) if err != nil { switch err.(type) { @@ -116,10 +129,23 @@ func (h *referrersHandler) Referrers(ctx context.Context, revision digest.Digest referrersSorted = append(referrersSorted, referrersUnsorted...) return referrersSorted, nil } -func (h *referrersHandler) enumerateReferrerLinks(ctx context.Context, rootPath string, ingestor func(digest.Digest) error) error { - blobStatter := h.extContext.Registry.BlobStatter() - return h.storageDriver.Walk(ctx, rootPath, func(fileInfo driver.FileInfo) error { +func enumerateReferrerLinks(ctx context.Context, + rootPath string, + stDriver driver.StorageDriver, + repository distribution.Repository, + blobstatter distribution.BlobStatter, + subjectRevision digest.Digest, + artifactManifestIndex map[digest.Digest][]digest.Digest, + ingestor func(ctx context.Context, + digest digest.Digest, + subjectRevision digest.Digest, + artifactManifestIndex map[digest.Digest][]digest.Digest, + repository distribution.Repository, + blobstatter distribution.BlobStatter, + storageDriver driver.StorageDriver) error) error { + + return stDriver.Walk(ctx, rootPath, func(fileInfo driver.FileInfo) error { // exit early if directory... if fileInfo.IsDir() { return nil @@ -133,13 +159,13 @@ func (h *referrersHandler) enumerateReferrerLinks(ctx context.Context, rootPath } // read the digest found in link - digest, err := h.readlink(ctx, filePath) + digest, err := readlink(ctx, filePath, stDriver) if err != nil { return err } // ensure this conforms to the linkPathFns - _, err = blobStatter.Stat(ctx, digest) + _, err = blobstatter.Stat(ctx, digest) if err != nil { // we expect this error to occur so we move on if err == distribution.ErrBlobUnknown { @@ -148,7 +174,13 @@ func (h *referrersHandler) enumerateReferrerLinks(ctx context.Context, rootPath return err } - err = ingestor(digest) + err = ingestor(ctx, + digest, + subjectRevision, + artifactManifestIndex, + repository, + blobstatter, + stDriver) if err != nil { return err } @@ -157,8 +189,8 @@ func (h *referrersHandler) enumerateReferrerLinks(ctx context.Context, rootPath }) } -func (h *referrersHandler) readlink(ctx context.Context, path string) (digest.Digest, error) { - content, err := h.storageDriver.GetContent(ctx, path) +func readlink(ctx context.Context, path string, stDriver driver.StorageDriver) (digest.Digest, error) { + content, err := stDriver.GetContent(ctx, path) if err != nil { return "", err } diff --git a/registry/extension/oras/oras.go b/registry/extension/oras/oras.go index 14001a8d2af..f12dbebe118 100644 --- a/registry/extension/oras/oras.go +++ b/registry/extension/oras/oras.go @@ -27,6 +27,7 @@ const ( type orasNamespace struct { storageDriver driver.StorageDriver referrersEnabled bool + gcHandler orasGCHandler } type OrasOptions struct { @@ -34,7 +35,7 @@ type OrasOptions struct { } // newOrasNamespace creates a new extension namespace with the name "oras" -func newOrasNamespace(ctx context.Context, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (extension.Namespace, error) { +func newOrasNamespace(ctx context.Context, storageDriver driver.StorageDriver, options configuration.ExtensionConfig) (extension.Extension, error) { optionsYaml, err := yaml.Marshal(options) if err != nil { return nil, err @@ -54,14 +55,20 @@ func newOrasNamespace(ctx context.Context, storageDriver driver.StorageDriver, o } } + orasGCHandler := orasGCHandler{ + artifactManifestIndex: make(map[digest.Digest][]digest.Digest), + artifactMarkSet: make(map[digest.Digest]int), + } + return &orasNamespace{ referrersEnabled: referrersEnabled, storageDriver: storageDriver, + gcHandler: orasGCHandler, }, nil } func init() { - extension.Register(namespaceName, newOrasNamespace) + extension.RegisterExtension(namespaceName, newOrasNamespace) } // GetManifestHandlers returns a list of manifest handlers that will be registered in the manifest store. @@ -78,12 +85,22 @@ func (o *orasNamespace) GetManifestHandlers(repo distribution.Repository, blobSt return []storage.ManifestHandler{} } +func (o *orasNamespace) GetGarbageCollectionHandlers() []storage.GCExtensionHandler { + if o.referrersEnabled { + return []storage.GCExtensionHandler{ + &o.gcHandler, + } + } + + return []storage.GCExtensionHandler{} +} + // GetRepositoryRoutes returns a list of extension routes scoped at a repository level -func (d *orasNamespace) GetRepositoryRoutes() []extension.Route { - var routes []extension.Route +func (d *orasNamespace) GetRepositoryRoutes() []extension.ExtensionRoute { + var routes []extension.ExtensionRoute if d.referrersEnabled { - routes = append(routes, extension.Route{ + routes = append(routes, extension.ExtensionRoute{ Namespace: namespaceName, Extension: extensionName, Component: referrersComponentName, @@ -106,7 +123,7 @@ func (d *orasNamespace) GetRepositoryRoutes() []extension.Route { // GetRegistryRoutes returns a list of extension routes scoped at a registry level // There are no registry scoped routes exposed by this namespace -func (d *orasNamespace) GetRegistryRoutes() []extension.Route { +func (d *orasNamespace) GetRegistryRoutes() []extension.ExtensionRoute { return nil } @@ -125,7 +142,7 @@ func (d *orasNamespace) GetNamespaceDescription() string { return namespaceDescription } -func (o *orasNamespace) referrersDispatcher(extCtx *extension.Context, r *http.Request) http.Handler { +func (o *orasNamespace) referrersDispatcher(extCtx *extension.ExtensionContext, r *http.Request) http.Handler { handler := &referrersHandler{ storageDriver: o.storageDriver, diff --git a/registry/extension/oras/referrers.md b/registry/extension/oras/referrers.md new file mode 100644 index 00000000000..194baf14a0d --- /dev/null +++ b/registry/extension/oras/referrers.md @@ -0,0 +1,103 @@ +# ORAS Artifacts Distribution + +This document describes an experimental prototype that implements the +[ORAS Artifact Manifest](https://github.com/oras-project/artifacts-spec) spec. + +## Implementation + +To power the [/referrers](https://github.com/oras-project/artifacts-spec/blob/main/manifest-referrers-api.md) API, the +referrers of a manifest are indexed in the repository store. The following example illustrates the creation of this +index. + +The `nginx:v1` image is already persisted: + +- repository: `nginx` +- digest: `sha256:111ma2d22ae5ef400769fa51c84717264cd1520ac8d93dc071374c1be49a111m` +- tag: `v1.0` + +The repository store layout is represented as: + +```bash + +└── v2 + └── repositories + └── nginx + └── _manifests + └── revisions + └── sha256 + └── 111ma2d22ae5ef400769fa51c84717264cd1520ac8d93dc071374c1be49a111m + └── link +``` + +Push a signature as blob and an ORAS Artifact that contains a blobs property referencing the signature, with the +following properties: + +- digest: `sha256:222ibbf80b44ce6be8234e6ff90a1ac34acbeb826903b02cfa0da11c82cb222i` +- `subjectManifest` digest: `sha256:111ma2d22ae5ef400769fa51c84717264cd1520ac8d93dc071374c1be49a111m` +- `artifactType`: `application/vnd.example.artifact` + +On `PUT`, the artifact appears as a manifest revision. Additionally, an index entry is created under +the subject ref folder to facilitate a lookup to the referrer. The index path where the entry is added is +`/_refs/subjects/sha256/`, as shown below. + +``` + +└── v2 + └── repositories + └── nginx + ├── _manifests + │ └── _revisions + │ └── sha256 + │ ├── 111ma2d22ae5ef400769fa51c84717264cd1520ac8d93dc071374c1be49a111m + │ │ └── link + │ └── 222ibbf80b44ce6be8234e6ff90a1ac34acbeb826903b02cfa0da11c82cb222i + │ └── link + └── _refs + └── subjects + └── sha256 + └── 111ma2d22ae5ef400769fa51c84717264cd1520ac8d93dc071374c1be49a111m + └── sha256 + └── 222ibbf80b44ce6be8234e6ff90a1ac34acbeb826903b02cfa0da11c82cb222i + └── link +``` + +Push another ORAS artifact with the following properties: + +- digest: `sha256:333ic0c33ebc4a74a0a554c86ac2b28ddf3454a5ad9cf90ea8cea9f9e75c333i` +- `subjectManifest` digest: `sha256:111ma2d22ae5ef400769fa51c84717264cd1520ac8d93dc071374c1be49a111m` +- `artifactType`: `application/vnd.another.example.artifact` + +This results in an addition to the index as shown below. + +``` + +└── v2 + └── repositories + └── nginx + ├── _manifests + │ └── _revisions + │ └── sha256 + │ ├── 111ma2d22ae5ef400769fa51c84717264cd1520ac8d93dc071374c1be49a111m + │ │ └── link + │ ├── 222ibbf80b44ce6be8234e6ff90a1ac34acbeb826903b02cfa0da11c82cb222i + │ │ └── link + │ └── 333ic0c33ebc4a74a0a554c86ac2b28ddf3454a5ad9cf90ea8cea9f9e75c333i + │ └── link + └── _refs + └── subjects + └── sha256 + └── 111ma2d22ae5ef400769fa51c84717264cd1520ac8d93dc071374c1be49a111m + └── sha256 + ├── 222ibbf80b44ce6be8234e6ff90a1ac34acbeb826903b02cfa0da11c82cb222i + │ └── link + └── 333ic0c33ebc4a74a0a554c86ac2b28ddf3454a5ad9cf90ea8cea9f9e75c333i + └── link +``` + +## Garbage Collection With Referrers + +The life of a referrer artifact is directly linked to its subject. When a referrer artifact's subject manifest is deleted, the artifact's referrers are also deleted. + +Manifest garbage collection is extended to include referrer artifact collection. The marking process begins with the normal marking behavior which consists of enumerating every manifest in every repository. If the manifest is untagged, we must consider the manifest for deletion. As we cannot guarantee that artifact manifests (tagged or untagged) will be traversed before their subjects, we must temporarily mark all artifact manifests and their blobs using a separate reference count map. If we encounter an untagged non-artifact manifest, then we proceed by adding the manifest to a deletion list, traversing it's referrers, and then indexing each artifact manifest for deletion. + +During the Sweep phase, each manifest in the deletion list has its contents and link files deleted. Then each of the indexed artifact manifests referring to the deleted subject will have its corresponding manifest and blobs' ref counts decremented. Furthermore, the artifact manifest revision, and `_refs` directories are removed. The final step is the vacuum of the blobs. Based on the final ref count map, we add each blob with a positive ref count back to the original `markSet` map. All unmarked blobs are then safely deleted. \ No newline at end of file diff --git a/registry/handlers/app.go b/registry/handlers/app.go index da337e5ed5a..0e37e14bc60 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -98,7 +98,7 @@ type App struct { repositoryExtensions []string // extensionNamespaces is a list of namespaces that are configured as extensions to the distribution - extensionNamespaces []extension.Namespace + extensionNamespaces []extension.Extension } // NewApp takes a configuration and returns a configured app, ready to serve @@ -280,7 +280,7 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App { // add the extended storage for every namespace to the new registry options for _, ns := range app.extensionNamespaces { - options = append(options, storage.AddExtendedStorage(ns)) + options = append(options, storage.AddExtendedNamespace(ns)) } // configure storage caches @@ -927,9 +927,9 @@ func (app *App) nameRequired(r *http.Request) bool { func (app *App) initializeExtensionNamespaces(ctx context.Context, extensions map[string]configuration.ExtensionConfig) error { - extensionNamespaces := []extension.Namespace{} + extensionNamespaces := []extension.Extension{} for key, options := range extensions { - ns, err := extension.Get(ctx, key, app.driver, options) + ns, err := extension.GetExtension(ctx, key, app.driver, options) if err != nil { return fmt.Errorf("unable to configure extension namespace (%s): %s", key, err) } @@ -979,7 +979,7 @@ func (app *App) registerExtensionRoutes(ctx context.Context) error { return nil } -func (app *App) registerExtensionRoute(route extension.Route, nameRequired bool) error { +func (app *App) registerExtensionRoute(route extension.ExtensionRoute, nameRequired bool) error { if route.Dispatcher == nil { return nil } @@ -997,7 +997,7 @@ func (app *App) registerExtensionRoute(route extension.Route, nameRequired bool) dispatch := route.Dispatcher app.register(desc.Name, func(ctx *Context, r *http.Request) http.Handler { return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - extCtx := &extension.Context{ + extCtx := &extension.ExtensionContext{ Context: ctx.Context, Repository: ctx.Repository, Errors: ctx.Errors, diff --git a/registry/root.go b/registry/root.go index e32e0984cb2..141366b294f 100644 --- a/registry/root.go +++ b/registry/root.go @@ -5,6 +5,7 @@ import ( "os" dcontext "github.com/distribution/distribution/v3/context" + "github.com/distribution/distribution/v3/registry/extension" "github.com/distribution/distribution/v3/registry/storage" "github.com/distribution/distribution/v3/registry/storage/driver/factory" "github.com/distribution/distribution/v3/version" @@ -71,15 +72,35 @@ var GCCmd = &cobra.Command{ os.Exit(1) } - registry, err := storage.NewRegistry(ctx, driver, storage.Schema1SigningKey(k)) + extensions := config.Extensions + extensionNamespaces := []extension.Extension{} + gcExtensionhandlers := []storage.GCExtensionHandler{} + for key, options := range extensions { + ns, err := extension.GetExtension(ctx, key, driver, options) + if err != nil { + fmt.Fprintf(os.Stderr, "unable to configure extension namespace (%s): %s", key, err) + os.Exit(1) + } + extensionNamespaces = append(extensionNamespaces, ns) + gcExtensionhandlers = append(gcExtensionhandlers, ns.GetGarbageCollectionHandlers()...) + } + + options := []storage.RegistryOption{storage.Schema1SigningKey(k)} + // add all the extended namespaces to the new registry options + for _, ns := range extensionNamespaces { + options = append(options, storage.AddExtendedNamespace(ns)) + } + + registry, err := storage.NewRegistry(ctx, driver, options...) if err != nil { fmt.Fprintf(os.Stderr, "failed to construct registry: %v", err) os.Exit(1) } err = storage.MarkAndSweep(ctx, driver, registry, storage.GCOpts{ - DryRun: dryRun, - RemoveUntagged: removeUntagged, + DryRun: dryRun, + RemoveUntagged: removeUntagged, + GCExtensionHandlers: gcExtensionhandlers, }) if err != nil { fmt.Fprintf(os.Stderr, "failed to garbage collect: %v", err) diff --git a/registry/storage/extension.go b/registry/storage/extension.go index edd2af4ac0d..2a7d06152eb 100644 --- a/registry/storage/extension.go +++ b/registry/storage/extension.go @@ -4,6 +4,7 @@ import ( "context" "github.com/distribution/distribution/v3" + "github.com/distribution/distribution/v3/registry/storage/driver" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/opencontainers/go-digest" ) @@ -15,12 +16,32 @@ type ReadOnlyBlobStore interface { distribution.BlobProvider } +type GCExtensionHandler interface { + Mark(ctx context.Context, + repository distribution.Repository, + storageDriver driver.StorageDriver, + registry distribution.Namespace, + manifest distribution.Manifest, + manifestDigest digest.Digest, + dryRun bool, + removeUntagged bool) (bool, error) + OnManifestDelete(ctx context.Context, + storageDriver driver.StorageDriver, + registry distribution.Namespace, + dgst digest.Digest, + repositoryName string) error + SweepBlobs(ctx context.Context, + markSet map[digest.Digest]struct{}) map[digest.Digest]struct{} +} + // ExtendedStorage defines extensions to store operations like manifest for example. type ExtendedStorage interface { // GetManifestHandlers returns the list of manifest handlers that handle custom manifest formats supported by the extensions. GetManifestHandlers( repo distribution.Repository, blobStore distribution.BlobStore) []ManifestHandler + // GetGarbageCollectHandlers returns the list of GC handlers that handle custom garbage collection behavior for the extensions + GetGarbageCollectionHandlers() []GCExtensionHandler } // GetManifestLinkReadOnlyBlobStore will enable extensions to access the underlying linked blob store for readonly operations. diff --git a/registry/storage/garbagecollect.go b/registry/storage/garbagecollect.go index 13c9b180c89..06c425e88c4 100644 --- a/registry/storage/garbagecollect.go +++ b/registry/storage/garbagecollect.go @@ -16,8 +16,9 @@ func emit(format string, a ...interface{}) { // GCOpts contains options for garbage collector type GCOpts struct { - DryRun bool - RemoveUntagged bool + DryRun bool + RemoveUntagged bool + GCExtensionHandlers []GCExtensionHandler } // ManifestDel contains manifest structure which will be deleted @@ -68,7 +69,6 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis return fmt.Errorf("failed to retrieve tags for digest %v: %v", dgst, err) } if len(tags) == 0 { - emit("manifest eligible for deletion: %s", dgst) // fetch all tags from repository // all of these tags could contain manifest in history // which means that we need check (and delete) those references when deleting manifest @@ -76,7 +76,22 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis if err != nil { return fmt.Errorf("failed to retrieve tags %v", err) } + // call GC extension handlers' mark + manifest, err := manifestService.Get(ctx, dgst) + if err != nil { + return fmt.Errorf("failed to retrieve manifest: %v", err) + } + for _, gcHandler := range opts.GCExtensionHandlers { + deleteEligible, err := gcHandler.Mark(ctx, repository, storageDriver, registry, manifest, dgst, opts.DryRun, opts.RemoveUntagged) + if err != nil { + return fmt.Errorf("failed to mark using extension handler: %v", err) + } + if !deleteEligible { + return nil + } + } manifestArr = append(manifestArr, ManifestDel{Name: repoName, Digest: dgst, Tags: allTags}) + emit("manifest eligible for deletion: %s", dgst) return nil } } @@ -122,8 +137,18 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis if err != nil { return fmt.Errorf("failed to delete manifest %s: %v", obj.Digest, err) } + for _, gcHandler := range opts.GCExtensionHandlers { + err := gcHandler.OnManifestDelete(ctx, storageDriver, registry, obj.Digest, obj.Name) + if err != nil { + return fmt.Errorf("failed to call remove manifest extension handler: %v", err) + } + } } } + // GC extension will add final saved blobs into markset for retention + for _, gcHandler := range opts.GCExtensionHandlers { + markSet = gcHandler.SweepBlobs(ctx, markSet) + } blobService := registry.Blobs() deleteSet := make(map[digest.Digest]struct{}) err = blobService.Enumerate(ctx, func(dgst digest.Digest) error { @@ -147,6 +172,5 @@ func MarkAndSweep(ctx context.Context, storageDriver driver.StorageDriver, regis return fmt.Errorf("failed to delete blob %s: %v", dgst, err) } } - return err } diff --git a/registry/storage/registry.go b/registry/storage/registry.go index 5eedb4433b1..0e31827e466 100644 --- a/registry/storage/registry.go +++ b/registry/storage/registry.go @@ -25,7 +25,7 @@ type registry struct { blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory manifestURLs manifestURLs driver storagedriver.StorageDriver - extendedStorages []ExtendedStorage + extendedNamespaces []ExtendedStorage } // manifestURLs holds regular expressions for controlling manifest URL whitelisting @@ -37,11 +37,11 @@ type manifestURLs struct { // RegistryOption is the type used for functional options for NewRegistry. type RegistryOption func(*registry) error -// AddExtendedStorage is a functional option for NewRegistry. It adds the given -// extended storage to the list of extended storages in the registry. -func AddExtendedStorage(extendedStorage ExtendedStorage) RegistryOption { +// AddExtendedNamespace is a functional option for NewRegistry. It adds the given +// extended namespace to the list of extended namespaces in the registry. +func AddExtendedNamespace(extendedNamespace ExtendedStorage) RegistryOption { return func(registry *registry) error { - registry.extendedStorages = append(registry.extendedStorages, extendedStorage) + registry.extendedNamespaces = append(registry.extendedNamespaces, extendedNamespace) return nil } } @@ -277,7 +277,7 @@ func (repo *repository) Manifests(ctx context.Context, options ...distribution.M } var extensionManifestHandlers []ManifestHandler - for _, ext := range repo.registry.extendedStorages { + for _, ext := range repo.registry.extendedNamespaces { handlers := ext.GetManifestHandlers(repo, blobStore) if len(handlers) > 0 { extensionManifestHandlers = append(extensionManifestHandlers, handlers...) diff --git a/registry/storage/vacuum.go b/registry/storage/vacuum.go index 749fb31906b..112237754e3 100644 --- a/registry/storage/vacuum.go +++ b/registry/storage/vacuum.go @@ -51,6 +51,7 @@ func (v Vacuum) RemoveBlob(dgst string) error { } // RemoveManifest removes a manifest from the filesystem +// Invokes each GCExtensionHandler's RemoveManifestVacuum func (v Vacuum) RemoveManifest(name string, dgst digest.Digest, tags []string) error { // remove a tag manifest reference, in case of not found continue to next one for _, tag := range tags { @@ -81,7 +82,14 @@ func (v Vacuum) RemoveManifest(name string, dgst digest.Digest, tags []string) e return err } dcontext.GetLogger(v.ctx).Infof("deleting manifest: %s", manifestPath) - return v.driver.Delete(v.ctx, manifestPath) + err = v.driver.Delete(v.ctx, manifestPath) + if err != nil { + if _, ok := err.(driver.PathNotFoundError); !ok { + return err + } + } + + return nil } // RemoveRepository removes a repository directory from the