Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Create workflow_v2 processor #2607

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions cmd/benthos/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
input:
stdin: {}

pipeline:
processors:
- workflow_v2:

# /--> B -------------|--> D
# / /
# A --| /--> E --|
# \--> C --| \
# \----------|--> F

branches:
A:
processors:
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE A FINISHED

B:
dependency_list: ["A"]
processors:
- sleep:
duration: "15s"
- log:
level: INFO
message: STAGE B FINISHED

C:
dependency_list: ["A"]
processors:
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE C FINISHED

D:
dependency_list: ["B", "E"]
processors:
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE D FINISHED

E:
dependency_list: ["C"]
processors:
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE E FINISHED

F:
dependency_list: ["C", "E"]
processors:
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE F FINISHED

output:
stdout: {}
86 changes: 86 additions & 0 deletions cmd/benthos/config_old.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
input:
stdin: {}

pipeline:
processors:
- workflow:
# meta_path: meta.workflow
order: [ [ A ], [ B, C ], [ E ], [ D, F ] ]

# /--> B -------------|--> D
# / /
# A --| /--> E --|
# \--> C --| \
# \----------|--> F

branches:
A:
processors:
# - log:
# level: INFO
# message: STAGE A STARTED
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE A FINISHED

B:
processors:
# - log:
# level: INFO
# message: STAGE B STARTED
- sleep:
duration: "15s"
- log:
level: INFO
message: STAGE B FINISHED

C:
processors:
# - log:
# level: INFO
# message: STAGE C STARTED
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE C FINISHED

D:
processors:
# - log:
# level: INFO
# message: STAGE D STARTED
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE D FINISHED

# Stage E won't exec until B has finished even though there is
# no direct dependency on B.
E:
processors:
# - log:
# level: INFO
# message: STAGE E STARTED
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE E FINISHED

F:
processors:
# - log:
# level: INFO
# message: STAGE F STARTED
- sleep:
duration: "3s"
- log:
level: INFO
message: STAGE F FINISHED

output:
stdout: {}
69 changes: 69 additions & 0 deletions internal/impl/pure/processor_workflow_branch_map_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package pure

import (
"context"
"fmt"
"regexp"

"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/public/service"
)

type workflowBranchMapV2 struct {
Branches map[string]*Branch
dependencies map[string][]string
}

// Locks all branches contained in the branch map and returns the latest DAG, a
// map of resources, and a func to unlock the resources that were locked. If
// any error occurs in locked each branch (the resource is missing, or the DAG
// is malformed) then an error is returned instead.
func (w *workflowBranchMapV2) LockV2() (branches map[string]*Branch, dependencies map[string][]string, unlockFn func(), err error) {
return w.Branches, w.dependencies, func() {}, nil
}

func (w *workflowBranchMapV2) Close(ctx context.Context) error {
for _, c := range w.Branches {
if err := c.Close(ctx); err != nil {
return err
}
}
return nil
}

var processDAGStageNameV2 = regexp.MustCompile("[a-zA-Z0-9-_]+")

func newWorkflowBranchMapV2(conf *service.ParsedConfig, mgr bundle.NewManagement) (*workflowBranchMapV2, error) {
branchObjMap, err := conf.FieldObjectMap(wflowProcFieldBranchesV2)
if err != nil {
return nil, err
}

branches := map[string]*Branch{}
for k, v := range branchObjMap {
if len(processDAGStageNameV2.FindString(k)) != len(k) {
return nil, fmt.Errorf("workflow branch name '%v' contains invalid characters", k)
}

child, err := newBranchFromParsed(v, mgr.IntoPath("workflow", "branches", k))
if err != nil {
return nil, err
}
branches[k] = child
}

dependencies := make(map[string][]string)

for k, v := range branchObjMap {
depList, _ := v.FieldStringList("dependency_list")
dependencies[k] = append(dependencies[k], depList...)
if len(depList) == 0 {
dependencies[k] = nil
}
}

return &workflowBranchMapV2{
Branches: branches,
dependencies: dependencies,
}, nil
}
Loading
Loading