Skip to content

Commit

Permalink
Remove VAI provider cloud scheduler support (#322)
Browse files Browse the repository at this point in the history
  • Loading branch information
grahamia authored Mar 14, 2024
1 parent 8a214b1 commit fcfc03d
Show file tree
Hide file tree
Showing 25 changed files with 298 additions and 661 deletions.
5 changes: 2 additions & 3 deletions argo/providers/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ go 1.20
require (
cloud.google.com/go/aiplatform v1.48.0
cloud.google.com/go/pubsub v1.33.0
cloud.google.com/go/scheduler v1.10.1
cloud.google.com/go/storage v1.30.1
github.com/argoproj/argo-events v1.8.1
github.com/argoproj/argo-workflows/v3 v3.3.9
github.com/go-logr/logr v1.2.3
github.com/go-openapi/runtime v0.25.0
github.com/go-openapi/strfmt v0.21.3
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/googleapis/gax-go/v2 v2.12.0
github.com/hashicorp/go-bexpr v0.1.13
github.com/kubeflow/pipelines v0.0.0-20220721222832-061905b6df39
Expand All @@ -24,7 +22,6 @@ require (
github.com/urfave/cli v1.22.10
go.uber.org/zap v1.24.0
google.golang.org/api v0.136.0
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5
google.golang.org/grpc v1.57.0
google.golang.org/protobuf v1.31.0
gopkg.in/yaml.v2 v2.4.0
Expand Down Expand Up @@ -73,6 +70,7 @@ require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.5 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
Expand Down Expand Up @@ -139,6 +137,7 @@ require (
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230803162519-f966b187b2e5 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions argo/providers/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjp
cloud.google.com/go/pubsub v1.9.0/go.mod h1:G3o6/kJvEMIEAN5urdkaP4be49WQsjNiykBIto9LFtY=
cloud.google.com/go/pubsub v1.33.0 h1:6SPCPvWav64tj0sVX/+npCBKhUi/UjJehy9op/V3p2g=
cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc=
cloud.google.com/go/scheduler v1.10.1 h1:yoZbZR8880KgPGLmACOMCiY2tPk+iX4V/dkxqTirlz8=
cloud.google.com/go/scheduler v1.10.1/go.mod h1:R63Ldltd47Bs4gnhQkmNDse5w8gBRrhObZ54PxgR2Oo=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
Expand Down
2 changes: 2 additions & 0 deletions argo/providers/go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ cloud.google.com/go/retail v1.14.1/go.mod h1:y3Wv3Vr2k54dLNIrCzenyKG8g8dhvhncT2N
cloud.google.com/go/run v0.3.0/go.mod h1:TuyY1+taHxTjrD0ZFk2iAR+xyOXEA0ztb7U3UNA0zBo=
cloud.google.com/go/run v1.2.0 h1:kHeIG8q+N6Zv0nDkBjSOYfK2eWqa5FnaiDPH/7/HirE=
cloud.google.com/go/run v1.2.0/go.mod h1:36V1IlDzQ0XxbQjUx6IYbw8H3TJnWvhii963WW3B/bo=
cloud.google.com/go/scheduler v1.10.1 h1:yoZbZR8880KgPGLmACOMCiY2tPk+iX4V/dkxqTirlz8=
cloud.google.com/go/scheduler v1.10.1/go.mod h1:R63Ldltd47Bs4gnhQkmNDse5w8gBRrhObZ54PxgR2Oo=
cloud.google.com/go/secretmanager v1.9.0/go.mod h1:b71qH2l1yHmWQHt9LC80akm86mX8AL6X1MA01dW8ht4=
cloud.google.com/go/secretmanager v1.11.1 h1:cLTCwAjFh9fKvU6F13Y4L9vPcx9yiWPyWXE4+zkuEQs=
cloud.google.com/go/secretmanager v1.11.1/go.mod h1:znq9JlXgTNdBeQk9TBW/FnR/W4uChEKGeqQWAJ8SXFw=
Expand Down
43 changes: 1 addition & 42 deletions argo/providers/vai/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,50 +3,9 @@ package main
import (
. "github.com/sky-uk/kfp-operator/argo/providers/base"
"github.com/sky-uk/kfp-operator/argo/providers/vai"
"github.com/urfave/cli"
)

func main() {
app := NewProviderApp[vai.VAIProviderConfig]()
provider := vai.VAIProvider{}

app.Run(provider, cli.Command{
Name: "vai-run",
Subcommands: []cli.Command{
{
Name: "enqueue",
Flags: []cli.Flag{cli.StringFlag{
Name: "run-intent",
Required: true,
}},
Action: func(c *cli.Context) error {
providerConfig, err := app.LoadProviderConfig(c)
if err != nil {
return err
}
runIntent, err := LoadJsonFromFile[vai.RunIntent](c.String("run-intent"))
if err != nil {
return err
}
_, err = provider.EnqueueRun(app.Context, providerConfig, runIntent)
return err
},
},
{
Name: "submit",
Flags: []cli.Flag{cli.StringFlag{
Name: "run",
Required: true,
}},
Action: func(c *cli.Context) error {
providerConfig, err := app.LoadProviderConfig(c)
vaiRun, err := LoadJsonFromFile[vai.VAIRun](c.String("run"))
if err != nil {
return err
}
return provider.SubmitRun(app.Context, providerConfig, vaiRun)
},
},
},
})
app.Run(vai.VAIProvider{})
}
14 changes: 0 additions & 14 deletions argo/providers/vai/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ type VAIProviderConfig struct {
VaiJobServiceAccount string `yaml:"vaiJobServiceAccount"`
GcsEndpoint string `yaml:"gcsEndpoint"`
PipelineBucket string `yaml:"pipelineBucket"`
RunIntentsTopic string `yaml:"runIntentsTopic"`
RunsTopic string `yaml:"runsTopic"`
EventsourcePipelineEventsSubscription string `yaml:"eventsourcePipelineEventsSubscription"`
MaxConcurrentRunCount int64 `yaml:"maxConcurrentRunCount"`
}
Expand All @@ -30,18 +28,6 @@ func (vaipc VAIProviderConfig) pipelineJobName(name string) string {
return fmt.Sprintf("%s/pipelineJobs/%s", vaipc.parent(), name)
}

func (vaipc VAIProviderConfig) schedulerJobName(name string) string {
return fmt.Sprintf("%s/jobs/%s", vaipc.parent(), name)
}

func (vaipc VAIProviderConfig) runIntentsTopicFullName() string {
return vaipc.topicFullName(vaipc.RunIntentsTopic)
}

func (vaipc VAIProviderConfig) topicFullName(topicName string) string {
return fmt.Sprintf("projects/%s/topics/%s", vaipc.VaiProject, topicName)
}

func (vaipc VAIProviderConfig) pipelineStorageObject(pipelineName string, pipelineVersion string) string {
return fmt.Sprintf("%s/%s", pipelineName, pipelineVersion)
}
Expand Down
52 changes: 17 additions & 35 deletions argo/providers/vai/eventing_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,10 @@ func (es *VaiEventingServer) StartEventSource(source *generic.EventSource, strea
m.Nack()
return
}
run := VAIRun{
Labels: logEntry.Labels,
RunId: pipelineJobId,
Artifacts: artifactDefs,
}

event := es.runCompletionEventForRun(stream.Context(), run)
event := es.runCompletionEventForRun(stream.Context(), pipelineJobId, artifactDefs)
if event == nil {
es.Logger.Error(err, fmt.Sprintf("failed to convert to run completion event %+v", run))
es.Logger.Error(err, fmt.Sprintf("failed to convert to run completion event %s, %+v", pipelineJobId, artifactDefs))
m.Nack()
return
}
Expand Down Expand Up @@ -120,20 +115,20 @@ func (es *VaiEventingServer) StartEventSource(source *generic.EventSource, strea
return nil
}

func (es *VaiEventingServer) runCompletionEventForRun(ctx context.Context, run VAIRun) *common.RunCompletionEvent {
func (es *VaiEventingServer) runCompletionEventForRun(ctx context.Context, runId string, artifacts []pipelinesv1.OutputArtifact) *common.RunCompletionEvent {
job, err := es.PipelineJobClient.GetPipelineJob(ctx, &aiplatformpb.GetPipelineJobRequest{
Name: es.ProviderConfig.pipelineJobName(run.RunId),
Name: es.ProviderConfig.pipelineJobName(runId),
})
if err != nil {
es.Logger.Error(err, "could not fetch pipeline job")
return nil
}
if job == nil {
es.Logger.Error(nil, "expected pipeline job not found", "run-id", run.RunId)
es.Logger.Error(nil, "expected pipeline job not found", "run-id", runId)
return nil
}

return es.toRunCompletionEvent(job, run)
return es.toRunCompletionEvent(job, runId, artifacts)
}

func modelServingArtifactsForJob(job *aiplatformpb.PipelineJob) []common.Artifact {
Expand Down Expand Up @@ -240,51 +235,38 @@ func gvrAndNamespacedNameForRunLabels(runLabels map[string]string) (schema.Group
return schema.GroupVersionResource{}, types.NamespacedName{}, fmt.Errorf("neither %s or %s provided in labels", labels.RunConfigurationName, labels.RunName)
}

func (es *VaiEventingServer) toRunCompletionEvent(job *aiplatformpb.PipelineJob, run VAIRun) *common.RunCompletionEvent {
func (es *VaiEventingServer) toRunCompletionEvent(job *aiplatformpb.PipelineJob, runId string, artifacts []pipelinesv1.OutputArtifact) *common.RunCompletionEvent {
runCompletionStatus, completed := runCompletionStatus(job)

if !completed {
es.Logger.Error(nil, "expected pipeline job to have finished", "run-id", run.RunId)
es.Logger.Error(nil, "expected pipeline job to have finished", "run-id", runId)
return nil
}

var runName, runConfigurationName, pipelineName common.NamespacedName
var pipelineName common.NamespacedName

pipelineName.Name = job.Labels[labels.PipelineName]
if pipelineNamespace, ok := job.Labels[labels.PipelineNamespace]; ok {
pipelineName.Namespace = pipelineNamespace
}

if legacyNamespace, ok := job.Labels[labels.LegacyNamespace]; ok {
// For compatability with resources created with v0.3.0 and older
runName = common.NamespacedName{
Name: run.RunId,
Namespace: legacyNamespace,
}
} else {
runName = common.NamespacedName{
Name: job.Labels[labels.RunName],
Namespace: job.Labels[labels.RunNamespace]}
runName := common.NamespacedName{
Name: job.Labels[labels.RunName],
Namespace: job.Labels[labels.RunNamespace],
}

if legacyRunConfiguration, ok := job.Labels[labels.LegacyRunConfiguration]; ok {
// For compatability with resources created with v0.3.0 and older
runConfigurationName = common.NamespacedName{
Name: legacyRunConfiguration,
}
} else {
runConfigurationName = common.NamespacedName{
Name: job.Labels[labels.RunConfigurationName],
Namespace: job.Labels[labels.RunConfigurationNamespace]}
runConfigurationName := common.NamespacedName{
Name: job.Labels[labels.RunConfigurationName],
Namespace: job.Labels[labels.RunConfigurationNamespace],
}

return &common.RunCompletionEvent{
Status: runCompletionStatus,
PipelineName: pipelineName,
RunConfigurationName: runConfigurationName.NonEmptyPtr(),
RunName: runName.NonEmptyPtr(),
RunId: run.RunId,
Artifacts: artifactsForJob(job, run.Artifacts),
RunId: runId,
Artifacts: artifactsForJob(job, artifacts),
ServingModelArtifacts: modelServingArtifactsForJob(job),
Provider: es.ProviderConfig.Name,
}
Expand Down
Loading

0 comments on commit fcfc03d

Please sign in to comment.