Skip to content

Commit 3acac95

Browse files
authored
Execution and Partition-Level Environment Variables (#4801)
This PR introduces two key features: ## 1. Enhanced Execution Environment Variables Added support for passing rich job metadata to execution engines via environment variables including: - `BACALHAU_PARTITION_INDEX`: Current partition index (0 to N-1) - `BACALHAU_PARTITION_COUNT`: Total number of partitions - `BACALHAU_JOB_ID`: Unique job identifier - `BACALHAU_JOB_NAME`: User-provided job name - `BACALHAU_JOB_NAMESPACE`: Job namespace - `BACALHAU_JOB_TYPE`: Job type (Batch/Service) - `BACALHAU_EXECUTION_ID`: Unique execution identifier - `BACALHAU_NODE_ID`: ID of executing compute node This allows jobs to: - Be partition-aware and handle their specific partition's work - Access their execution context - Track node assignment ## 2. Test Suite for Partition Scheduling Added comprehensive test suite that validates: - Environment variable propagation to executors - Partition scheduling behavior: - Unique partition indices - Node distribution - Retry behavior - Service job continuous execution <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit Here are the release notes for this pull request: **New Features** - Added environment variable management for job executions - Enhanced support for system and task-level environment variables - Improved job partitioning and execution context handling **Bug Fixes** - Fixed potential nil slice access in job task retrieval - Added validation for environment variable naming conventions **Improvements** - Streamlined executor and job handling interfaces - Added utility functions for environment variable manipulation - Enhanced test coverage for job execution scenarios **Technical Enhancements** - Refactored execution context management - Improved error handling in task and job validation - Added robust environment variable sanitization and merging capabilities <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 559c036 commit 3acac95

21 files changed

+968
-82
lines changed

pkg/compute/envvars.go

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// File: pkg/compute/envvars.go
2+
3+
package compute
4+
5+
import (
6+
"fmt"
7+
8+
"github.com/bacalhau-project/bacalhau/pkg/lib/envvar"
9+
"github.com/bacalhau-project/bacalhau/pkg/models"
10+
)
11+
12+
// GetExecutionEnvVars returns a map of environment variables that should be passed to the execution.
13+
// Task-level environment variables are included but system variables take precedence.
14+
func GetExecutionEnvVars(execution *models.Execution) map[string]string {
15+
if execution == nil {
16+
return make(map[string]string)
17+
}
18+
19+
// Start with task-level environment variables if they exist
20+
taskEnv := make(map[string]string)
21+
if execution.Job != nil && execution.Job.Task() != nil && execution.Job.Task().Env != nil {
22+
taskEnv = execution.Job.Task().Env
23+
}
24+
25+
// Build system environment variables
26+
sysEnv := make(map[string]string)
27+
sysEnv[models.EnvVarPrefix+"EXECUTION_ID"] = execution.ID
28+
sysEnv[models.EnvVarPrefix+"NODE_ID"] = envvar.Sanitize(execution.NodeID)
29+
30+
// Add job-related environment variables if job exists
31+
if execution.Job != nil {
32+
sysEnv[models.EnvVarPrefix+"JOB_ID"] = execution.JobID
33+
sysEnv[models.EnvVarPrefix+"JOB_NAME"] = envvar.Sanitize(execution.Job.Name)
34+
sysEnv[models.EnvVarPrefix+"JOB_NAMESPACE"] = envvar.Sanitize(execution.Job.Namespace)
35+
sysEnv[models.EnvVarPrefix+"JOB_TYPE"] = execution.Job.Type
36+
37+
// Add partition-related environment variables
38+
sysEnv[models.EnvVarPrefix+"PARTITION_INDEX"] = fmt.Sprintf("%d", execution.PartitionIndex)
39+
sysEnv[models.EnvVarPrefix+"PARTITION_COUNT"] = fmt.Sprintf("%d", execution.Job.Count)
40+
}
41+
42+
// Merge task and system variables, with system taking precedence
43+
return envvar.Merge(taskEnv, sysEnv)
44+
}

pkg/compute/envvars_test.go

+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
//go:build unit || !integration
2+
3+
package compute
4+
5+
import (
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
10+
"github.com/bacalhau-project/bacalhau/pkg/models"
11+
)
12+
13+
func TestGetExecutionEnvVars(t *testing.T) {
14+
tests := []struct {
15+
name string
16+
execution *models.Execution
17+
want map[string]string
18+
}{
19+
{
20+
name: "nil execution returns empty map",
21+
execution: nil,
22+
want: map[string]string{},
23+
},
24+
{
25+
name: "basic execution without job",
26+
execution: &models.Execution{
27+
ID: "exec-1",
28+
NodeID: "node-1",
29+
},
30+
want: map[string]string{
31+
"BACALHAU_EXECUTION_ID": "exec-1",
32+
"BACALHAU_NODE_ID": "node-1",
33+
},
34+
},
35+
{
36+
name: "execution with job but no task env",
37+
execution: &models.Execution{
38+
ID: "exec-1",
39+
NodeID: "node-1",
40+
JobID: "job-1",
41+
PartitionIndex: 0,
42+
Job: &models.Job{
43+
Name: "test-job",
44+
Namespace: "default",
45+
Type: "batch",
46+
Tasks: []*models.Task{
47+
{
48+
Name: "task-1",
49+
},
50+
},
51+
Count: 3,
52+
},
53+
},
54+
want: map[string]string{
55+
"BACALHAU_EXECUTION_ID": "exec-1",
56+
"BACALHAU_NODE_ID": "node-1",
57+
"BACALHAU_JOB_ID": "job-1",
58+
"BACALHAU_JOB_NAME": "test-job",
59+
"BACALHAU_JOB_NAMESPACE": "default",
60+
"BACALHAU_JOB_TYPE": "batch",
61+
"BACALHAU_PARTITION_INDEX": "0",
62+
"BACALHAU_PARTITION_COUNT": "3",
63+
},
64+
},
65+
{
66+
name: "execution with task env",
67+
execution: &models.Execution{
68+
ID: "exec-1",
69+
NodeID: "node-1",
70+
JobID: "job-1",
71+
PartitionIndex: 0,
72+
Job: &models.Job{
73+
Name: "test-job",
74+
Namespace: "default",
75+
Type: "batch",
76+
Tasks: []*models.Task{
77+
{
78+
Name: "task-1",
79+
Env: map[string]string{
80+
"MY_VAR": "my-value",
81+
"BACALHAU_NODE_ID": "should-not-override", // Should not override system env
82+
"OTHER_VAR": "other-value",
83+
},
84+
},
85+
},
86+
Count: 3,
87+
},
88+
},
89+
want: map[string]string{
90+
"BACALHAU_EXECUTION_ID": "exec-1",
91+
"BACALHAU_NODE_ID": "node-1", // System value takes precedence
92+
"BACALHAU_JOB_ID": "job-1",
93+
"BACALHAU_JOB_NAME": "test-job",
94+
"BACALHAU_JOB_NAMESPACE": "default",
95+
"BACALHAU_JOB_TYPE": "batch",
96+
"BACALHAU_PARTITION_INDEX": "0",
97+
"BACALHAU_PARTITION_COUNT": "3",
98+
"MY_VAR": "my-value",
99+
"OTHER_VAR": "other-value",
100+
},
101+
},
102+
{
103+
name: "execution with special characters in names",
104+
execution: &models.Execution{
105+
ID: "exec-1",
106+
NodeID: "node-1",
107+
JobID: "job-1",
108+
PartitionIndex: 0,
109+
Job: &models.Job{
110+
Name: "test=job with spaces",
111+
Namespace: "test=namespace",
112+
Type: "batch",
113+
Tasks: []*models.Task{
114+
{
115+
Name: "task-1",
116+
},
117+
},
118+
Count: 1,
119+
},
120+
},
121+
want: map[string]string{
122+
"BACALHAU_EXECUTION_ID": "exec-1",
123+
"BACALHAU_NODE_ID": "node-1",
124+
"BACALHAU_JOB_ID": "job-1",
125+
"BACALHAU_JOB_NAME": "test_job_with_spaces", // Sanitized
126+
"BACALHAU_JOB_NAMESPACE": "test_namespace", // Sanitized
127+
"BACALHAU_JOB_TYPE": "batch",
128+
"BACALHAU_PARTITION_INDEX": "0",
129+
"BACALHAU_PARTITION_COUNT": "1",
130+
},
131+
},
132+
}
133+
134+
for _, tt := range tests {
135+
t.Run(tt.name, func(t *testing.T) {
136+
got := GetExecutionEnvVars(tt.execution)
137+
assert.Equal(t, tt.want, got)
138+
})
139+
}
140+
}

pkg/compute/executor.go

+1
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ func PrepareRunArguments(
183183
Inputs: inputVolumes,
184184
ResultsDir: resultsDir,
185185
EngineParams: engineArgs,
186+
Env: GetExecutionEnvVars(execution),
186187
OutputLimits: executor.OutputLimits{
187188
MaxStdoutFileLength: system.MaxStdoutFileLength,
188189
MaxStdoutReturnLength: system.MaxStdoutReturnLength,

pkg/executor/docker/executor.go

+13-25
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/bacalhau-project/bacalhau/pkg/config/types"
2222
"github.com/bacalhau-project/bacalhau/pkg/config_legacy"
2323
dockermodels "github.com/bacalhau-project/bacalhau/pkg/executor/docker/models"
24+
"github.com/bacalhau-project/bacalhau/pkg/lib/envvar"
2425
"github.com/bacalhau-project/bacalhau/pkg/models"
2526
"github.com/bacalhau-project/bacalhau/pkg/models/messages"
2627
pkgUtil "github.com/bacalhau-project/bacalhau/pkg/util"
@@ -137,16 +138,7 @@ func (e *Executor) Start(ctx context.Context, request *executor.RunCommandReques
137138
}
138139
}
139140

140-
jobContainer, err := e.newDockerJobContainer(ctx, &dockerJobContainerParams{
141-
ExecutionID: request.ExecutionID,
142-
JobID: request.JobID,
143-
EngineSpec: request.EngineParams,
144-
NetworkConfig: request.Network,
145-
Resources: request.Resources,
146-
Inputs: request.Inputs,
147-
Outputs: request.Outputs,
148-
ResultsDir: request.ResultsDir,
149-
})
141+
jobContainer, err := e.newDockerJobContainer(ctx, request)
150142
if err != nil {
151143
return err
152144
}
@@ -309,32 +301,28 @@ func (e *Executor) Run(
309301
}
310302
}
311303

312-
type dockerJobContainerParams struct {
313-
ExecutionID string
314-
JobID string
315-
EngineSpec *models.SpecConfig
316-
NetworkConfig *models.NetworkConfig
317-
Resources *models.Resources
318-
Inputs []storage.PreparedStorage
319-
Outputs []*models.ResultPath
320-
ResultsDir string
321-
}
322-
323304
// newDockerJobContainer is an internal method called by Start to set up a new Docker container
324305
// for the job execution. It configures the container based on the provided dockerJobContainerParams.
325306
// This includes decoding engine specifications, setting up environment variables, mounts, resource
326307
// constraints, and network configurations. It then creates the container but does not start it.
327308
// The method returns a container.CreateResponse and an error if any part of the setup fails.
328-
func (e *Executor) newDockerJobContainer(ctx context.Context, params *dockerJobContainerParams) (container.CreateResponse, error) {
309+
func (e *Executor) newDockerJobContainer(ctx context.Context, params *executor.RunCommandRequest) (container.CreateResponse, error) {
329310
// decode the request arguments, bail if they are invalid.
330-
dockerArgs, err := dockermodels.DecodeSpec(params.EngineSpec)
311+
dockerArgs, err := dockermodels.DecodeSpec(params.EngineParams)
331312
if err != nil {
332313
return container.CreateResponse{}, fmt.Errorf("decoding engine spec: %w", err)
333314
}
315+
316+
// merge both the job level and engine level environment variables
317+
envVars := envvar.MergeSlices(
318+
envvar.ToSlice(params.Env),
319+
dockerArgs.EnvironmentVariables,
320+
)
321+
334322
containerConfig := &container.Config{
335323
Image: dockerArgs.Image,
336324
Tty: false,
337-
Env: dockerArgs.EnvironmentVariables,
325+
Env: envVars,
338326
Entrypoint: dockerArgs.Entrypoint,
339327
Cmd: dockerArgs.Parameters,
340328
Labels: e.containerLabels(params.ExecutionID, params.JobID),
@@ -379,7 +367,7 @@ func (e *Executor) newDockerJobContainer(ctx context.Context, params *dockerJobC
379367
}
380368
log.Ctx(ctx).Trace().Msgf("Container: %+v %+v", containerConfig, mounts)
381369
// Create a network if the job requests it, modifying the containerConfig and hostConfig.
382-
err = e.setupNetworkForJob(ctx, params.JobID, params.ExecutionID, params.NetworkConfig, containerConfig, hostConfig)
370+
err = e.setupNetworkForJob(ctx, params.JobID, params.ExecutionID, params.Network, containerConfig, hostConfig)
383371
if err != nil {
384372
return container.CreateResponse{}, fmt.Errorf("setting up network: %w", err)
385373
}

pkg/executor/docker/executor_test.go

+74
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ func (s *ExecutorTestSuite) runJobWithContext(ctx context.Context, spec *models.
172172
Inputs: nil,
173173
ResultsDir: result,
174174
EngineParams: spec.Engine,
175+
Env: spec.Env,
175176
OutputLimits: executor.OutputLimits{
176177
MaxStdoutFileLength: system.MaxStdoutFileLength,
177178
MaxStdoutReturnLength: system.MaxStdoutReturnLength,
@@ -577,3 +578,76 @@ func (s *ExecutorTestSuite) TestDockerOOM() {
577578
require.NoError(s.T(), err)
578579
require.Contains(s.T(), result.ErrorMsg, "memory limit exceeded")
579580
}
581+
582+
func (s *ExecutorTestSuite) TestDockerEnvironmentVariables() {
583+
tests := []struct {
584+
name string
585+
taskEnv map[string]string
586+
engineEnv []string
587+
checkVars []string // variables to check in order
588+
want string
589+
}{
590+
{
591+
name: "task environment variables",
592+
taskEnv: map[string]string{
593+
"TEST_VAR": "test_value",
594+
"ANOTHER_VAR": "another_value",
595+
},
596+
checkVars: []string{"TEST_VAR", "ANOTHER_VAR"},
597+
want: "test_value\nanother_value",
598+
},
599+
{
600+
name: "engine environment variables",
601+
engineEnv: []string{
602+
"TEST_VAR=engine_value",
603+
"ENGINE_VAR=engine_only",
604+
},
605+
checkVars: []string{"TEST_VAR", "ENGINE_VAR"},
606+
want: "engine_value\nengine_only",
607+
},
608+
{
609+
name: "merged environment variables with engine precedence",
610+
taskEnv: map[string]string{
611+
"TEST_VAR": "task_value",
612+
"TASK_VAR": "task_only",
613+
},
614+
engineEnv: []string{
615+
"TEST_VAR=engine_value",
616+
"ENGINE_VAR=engine_only",
617+
},
618+
checkVars: []string{"TEST_VAR", "TASK_VAR", "ENGINE_VAR"},
619+
want: "engine_value\ntask_only\nengine_only",
620+
},
621+
}
622+
623+
for _, tt := range tests {
624+
s.Run(tt.name, func() {
625+
// Create simple script that prints vars in order
626+
script := strings.Builder{}
627+
for _, v := range tt.checkVars {
628+
script.WriteString(fmt.Sprintf("echo $%s\n", v))
629+
}
630+
631+
builder := dockermodels.NewDockerEngineBuilder("busybox:1.37.0").
632+
WithEntrypoint("sh", "-c", script.String())
633+
634+
if len(tt.engineEnv) > 0 {
635+
builder = builder.WithEnvironmentVariables(tt.engineEnv...)
636+
}
637+
638+
es, err := builder.Build()
639+
s.Require().NoError(err)
640+
641+
task := mock.Task()
642+
task.Engine = es
643+
task.Env = tt.taskEnv
644+
645+
result, err := s.runJob(task, uuid.New().String())
646+
require.NoError(s.T(), err)
647+
require.Zero(s.T(), result.ExitCode, result.STDERR)
648+
649+
output := strings.TrimSpace(result.STDOUT)
650+
s.Equal(tt.want, output)
651+
})
652+
}
653+
}

pkg/executor/docker/models/types.go

+10
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ func (c EngineSpec) Validate() error {
3939
"must contain absolute path", c.WorkingDirectory)
4040
}
4141
}
42+
// Validate environment variables
43+
for _, env := range c.EnvironmentVariables {
44+
parts := strings.SplitN(env, "=", 2)
45+
if len(parts) > 0 {
46+
if strings.HasPrefix(strings.ToUpper(parts[0]), models.EnvVarPrefix) {
47+
return fmt.Errorf("invalid docker engine param: environment variable '%s' cannot start with %s",
48+
parts[0], models.EnvVarPrefix)
49+
}
50+
}
51+
}
4252
return nil
4353
}
4454

0 commit comments

Comments
 (0)