Skip to content

Commit

Permalink
Propogating centralized errors (#4412)
Browse files Browse the repository at this point in the history
  • Loading branch information
udsamani authored Sep 16, 2024
1 parent 01be858 commit b125085
Show file tree
Hide file tree
Showing 13 changed files with 232 additions and 65 deletions.
8 changes: 5 additions & 3 deletions pkg/compute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,10 @@ func (e *BaseExecutor) Run(ctx context.Context, state store.LocalExecutionState)
stopwatch := telemetry.Timer(ctx, jobDurationMilliseconds, state.Execution.Job.MetricAttributes()...)
topic := EventTopicExecutionRunning
defer func() {
if err != nil && err.Error() != executor.ErrAlreadyCancelled.Error() {
e.handleFailure(ctx, state, err, topic)
if err != nil {
if !models.IsErrorWithCode(err, executor.ExecutionAlreadyCancelled) {
e.handleFailure(ctx, state, err, topic)
}
}
dur := stopwatch()
log.Ctx(ctx).Debug().
Expand All @@ -320,7 +322,7 @@ func (e *BaseExecutor) Run(ctx context.Context, state store.LocalExecutionState)
}
}()
if err := res.Err; err != nil {
if errors.Is(err, executor.ErrAlreadyStarted) {
if models.IsErrorWithCode(err, executor.ExecutionAlreadyStarted) {
// by not returning this error to the caller when the execution has already been started/is already running
// we allow duplicate calls to `Run` to be idempotent and fall through to the below `Wait` call.
log.Ctx(ctx).Warn().Err(err).Str("execution", execution.ID).
Expand Down
32 changes: 16 additions & 16 deletions pkg/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type Client struct {
func NewDockerClient() (*Client, error) {
client, err := tracing.NewTracedClient()
if err != nil {
return nil, err
return nil, NewDockerError(err)
}
return &Client{
client,
Expand All @@ -102,10 +102,10 @@ func (c *Client) IsInstalled(ctx context.Context) bool {
func (c *Client) HostGatewayIP(ctx context.Context) (net.IP, error) {
response, err := c.NetworkInspect(ctx, "bridge", network.InspectOptions{})
if err != nil {
return net.IP{}, err
return net.IP{}, NewDockerError(err)
}
if configs := response.IPAM.Config; len(configs) < 1 {
return net.IP{}, fmt.Errorf("bridge network unattached")
return net.IP{}, NewCustomDockerError(DockerBridgeNetworkUnattached, "bridge network unattached")
} else {
return net.ParseIP(configs[0].Gateway), nil
}
Expand All @@ -114,7 +114,7 @@ func (c *Client) HostGatewayIP(ctx context.Context) (net.IP, error) {
func (c *Client) removeContainers(ctx context.Context, filterz filters.Args) error {
containers, err := c.ContainerList(ctx, container.ListOptions{All: true, Filters: filterz})
if err != nil {
return err
return NewDockerError(err)
}

wg := multierrgroup.Group{}
Expand All @@ -130,7 +130,7 @@ func (c *Client) removeContainers(ctx context.Context, filterz filters.Args) err
func (c *Client) removeNetworks(ctx context.Context, filterz filters.Args) error {
networks, err := c.NetworkList(ctx, network.ListOptions{Filters: filterz})
if err != nil {
return err
return NewDockerError(err)
}

wg := multierrgroup.Group{}
Expand All @@ -157,7 +157,7 @@ func (c *Client) RemoveObjectsWithLabel(ctx context.Context, labelName, labelVal
func (c *Client) FindContainer(ctx context.Context, label string, value string) (string, error) {
containers, err := c.ContainerList(ctx, container.ListOptions{All: true})
if err != nil {
return "", err
return "", NewDockerError(err)
}

for _, ctr := range containers {
Expand All @@ -166,13 +166,13 @@ func (c *Client) FindContainer(ctx context.Context, label string, value string)
}
}

return "", fmt.Errorf("unable to find container for %s=%s", label, value)
return "", NewCustomDockerError(DockerContainerNotFound, fmt.Sprintf("unable to find container for %s=%s", label, value))
}

func (c *Client) FollowLogs(ctx context.Context, id string) (stdout, stderr io.Reader, err error) {
cont, err := c.ContainerInspect(ctx, id)
if err != nil {
return nil, nil, pkgerrors.Wrap(err, "failed to get container")
return nil, nil, NewDockerError(err)
}

logOptions := container.LogsOptions{
Expand All @@ -184,7 +184,7 @@ func (c *Client) FollowLogs(ctx context.Context, id string) (stdout, stderr io.R
ctx = log.Ctx(ctx).With().Str("ContainerID", cont.ID).Str("Image", cont.Image).Logger().WithContext(ctx)
logsReader, err := c.ContainerLogs(ctx, cont.ID, logOptions)
if err != nil {
return nil, nil, pkgerrors.Wrap(err, "failed to get container logs")
return nil, nil, NewDockerError(err)
}

stdoutReader, stdoutWriter := io.Pipe()
Expand All @@ -210,11 +210,11 @@ func (c *Client) FollowLogs(ctx context.Context, id string) (stdout, stderr io.R
func (c *Client) GetOutputStream(ctx context.Context, id string, since string, follow bool) (io.ReadCloser, error) {
cont, err := c.ContainerInspect(ctx, id)
if err != nil {
return nil, pkgerrors.Wrap(err, "failed to get container")
return nil, NewDockerError(err)
}

if !cont.State.Running {
return nil, pkgerrors.Wrap(err, "cannot get logs when container is not running")
return nil, NewCustomDockerError(DockerContainerNotRunning, "cannot get logs when container is not running")
}

logOptions := container.LogsOptions{
Expand All @@ -229,7 +229,7 @@ func (c *Client) GetOutputStream(ctx context.Context, id string, since string, f
ctx = log.Ctx(ctx).With().Str("ContainerID", cont.ID).Str("Image", cont.Image).Logger().WithContext(ctx)
logsReader, err := c.ContainerLogs(ctx, cont.ID, logOptions)
if err != nil {
return nil, pkgerrors.Wrap(err, "failed to get container logs")
return nil, NewDockerError(err)
}

return logsReader, nil
Expand Down Expand Up @@ -261,7 +261,7 @@ func (c *Client) ImagePlatforms(ctx context.Context, image string, dockerCreds c
} else if !dockerclient.IsErrNotFound(err) {
// The only error we wanted to see was a not found error which means we don't have
// the image being requested.
return nil, err
return nil, NewDockerError(err)
}

authToken := getAuthToken(ctx, image, dockerCreds)
Expand Down Expand Up @@ -363,7 +363,7 @@ func (c *Client) ImageDistribution(
digestParts := strings.Split(repos[0], "@")
digest, err := digest.Parse(digestParts[1])
if err != nil {
return nil, err
return nil, NewCustomDockerError(DockerImageDigestMismatch, "image digest mismatch")
}

return &ImageManifest{
Expand Down Expand Up @@ -405,7 +405,7 @@ func (c *Client) PullImage(ctx context.Context, img string, dockerCreds config_l
if !dockerclient.IsErrNotFound(err) {
// The only error we wanted to see was a not found error which means we don't have
// the image being requested.
return err
return NewDockerError(err)
}

log.Ctx(ctx).Debug().Str("image", img).Msg("Pulling image as it wasn't found")
Expand All @@ -416,7 +416,7 @@ func (c *Client) PullImage(ctx context.Context, img string, dockerCreds config_l

output, err := c.ImagePull(ctx, img, pullOptions)
if err != nil {
return err
return NewDockerError(err)
}

defer closer.CloseWithLogOnError("image-pull", output)
Expand Down
113 changes: 113 additions & 0 deletions pkg/docker/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package docker

import (
"net/http"
"strings"

"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/docker/docker/errdefs"
)

const DockerComponent = "Docker"

// Docker-specific error codes
const (
DockerContainerNotFound = "DockerContainerNotFound"
DockerImageNotFound = "DockerImageNotFound"
DockerNetworkNotFound = "DockerNetworkNotFound"
DockerVolumeNotFound = "DockerVolumeNotFound"
DockerConflict = "DockerConflict"
DockerUnauthorized = "DockerUnauthorized"
DockerForbidden = "DockerForbidden"
DockerDataLoss = "DockerDataLoss"
DockerDeadline = "DockerDeadline"
DockerCancelled = "DockerCancelled"
DockerUnavailable = "DockerUnavailable"
DockerSystemError = "DockerSystemError"
DockerNotImplemented = "DockerNotImplemented"
DockerUnknownError = "DockerUnknownError"
)

// Custom Docker error codes
const (
DockerBridgeNetworkUnattached = "DockerBridgeNetworkUnattached"
DockerContainerNotRunning = "DockerContainerNotRunning"
DockerImageDigestMismatch = "DockerImageDigestMismatch"
)

func NewDockerError(err error) *models.BaseError {
switch {
case errdefs.IsNotFound(err):
return handleNotFoundError(err)
case errdefs.IsConflict(err):
return models.NewBaseError(err.Error()).
WithCode(DockerConflict).
WithHTTPStatusCode(http.StatusConflict).
WithComponent(DockerComponent)
case errdefs.IsUnauthorized(err):
return models.NewBaseError(err.Error()).
WithCode(DockerUnauthorized).
WithHTTPStatusCode(http.StatusUnauthorized).
WithComponent(DockerComponent)
case errdefs.IsForbidden(err):
return models.NewBaseError(err.Error()).
WithCode(DockerForbidden).
WithHTTPStatusCode(http.StatusForbidden).
WithComponent(DockerComponent)
case errdefs.IsDataLoss(err):
return models.NewBaseError(err.Error()).
WithCode(DockerDataLoss).
WithHTTPStatusCode(http.StatusInternalServerError).
WithComponent(DockerComponent)
case errdefs.IsDeadline(err):
return models.NewBaseError(err.Error()).
WithCode(DockerDeadline).
WithHTTPStatusCode(http.StatusGatewayTimeout).
WithComponent(DockerComponent)
case errdefs.IsCancelled(err):
return models.NewBaseError(err.Error()).
WithCode(DockerCancelled).
WithHTTPStatusCode(http.StatusRequestTimeout).
WithComponent(DockerComponent)
case errdefs.IsUnavailable(err):
return models.NewBaseError(err.Error()).
WithCode(DockerUnavailable).
WithHTTPStatusCode(http.StatusServiceUnavailable).
WithComponent(DockerComponent)
case errdefs.IsSystem(err):
return models.NewBaseError(err.Error()).
WithCode(DockerSystemError).
WithHTTPStatusCode(http.StatusInternalServerError).
WithComponent(DockerComponent)
case errdefs.IsNotImplemented(err):
return models.NewBaseError(err.Error()).
WithCode(DockerNotImplemented).
WithHTTPStatusCode(http.StatusNotImplemented).
WithComponent(DockerComponent)
default:
return models.NewBaseError(err.Error()).
WithCode(DockerUnknownError).
WithHTTPStatusCode(http.StatusInternalServerError).
WithComponent(DockerComponent)
}
}

func NewCustomDockerError(code models.ErrorCode, message string) *models.BaseError {
return models.NewBaseError(message).
WithCode(code).
WithComponent(DockerComponent)
}

func handleNotFoundError(err error) *models.BaseError {
errorLower := strings.ToLower(err.Error())
if strings.Contains(errorLower, "no such container") {
return models.NewBaseError(err.Error()).
WithCode(DockerContainerNotFound).
WithHTTPStatusCode(http.StatusNotFound).
WithComponent(DockerComponent)
}
return models.NewBaseError(err.Error()).
WithCode(DockerUnknownError).
WithHTTPStatusCode(http.StatusNotFound).
WithComponent(DockerComponent)
}
16 changes: 8 additions & 8 deletions pkg/executor/docker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (e *Executor) Shutdown(ctx context.Context) error {
logLevel := map[bool]zerolog.Level{true: zerolog.DebugLevel, false: zerolog.ErrorLevel}[err == nil]
log.Ctx(ctx).WithLevel(logLevel).Err(err).Msg("Cleaned up all Docker resources")

return nil
return err
}

// IsInstalled checks if docker itself is installed.
Expand Down Expand Up @@ -129,9 +129,9 @@ func (e *Executor) Start(ctx context.Context, request *executor.RunCommandReques
// failing that will create a new container.
if handler, found := e.handlers.Get(request.ExecutionID); found {
if handler.active() {
return fmt.Errorf("starting execution (%s): %w", request.ExecutionID, executor.ErrAlreadyStarted)
return executor.NewExecutorError(executor.ExecutionAlreadyStarted, fmt.Sprintf("starting execution (%s)", request.ExecutionID))
} else {
return fmt.Errorf("starting execution (%s): %w", request.ExecutionID, executor.ErrAlreadyComplete)
return executor.NewExecutorError(executor.ExecutionAlreadyComplete, fmt.Sprintf("starting execution (%s)", request.ExecutionID))
}
}

Expand All @@ -146,7 +146,7 @@ func (e *Executor) Start(ctx context.Context, request *executor.RunCommandReques
ResultsDir: request.ResultsDir,
})
if err != nil {
return fmt.Errorf("failed to create docker job container: %w", err)
return err
}

containerID = jobContainer.ID
Expand Down Expand Up @@ -193,7 +193,7 @@ func (e *Executor) Wait(ctx context.Context, executionID string) (<-chan *models
errCh := make(chan error, 1)

if !found {
errCh <- fmt.Errorf("waiting on execution (%s): %w", executionID, executor.ErrNotFound)
errCh <- executor.NewExecutorError(executor.ExecutionNotFound, fmt.Sprintf("waiting on execution (%s)", executionID))
return resultCh, errCh
}

Expand Down Expand Up @@ -233,9 +233,9 @@ func (e *Executor) doWait(ctx context.Context, out chan *models.RunCommandResult
func (e *Executor) Cancel(ctx context.Context, executionID string) error {
handler, found := e.handlers.Get(executionID)
if !found {
return fmt.Errorf("canceling execution (%s): %w", executionID, executor.ErrNotFound)
return executor.NewExecutorError(executor.ExecutionNotFound, fmt.Sprintf("canceling execution (%s)", executionID))
}
handler.cancelFunc(executor.ErrAlreadyCancelled)
handler.cancelFunc(executor.NewExecutorError(executor.ExecutionAlreadyCancelled, "execution already cancelled"))
return nil
}

Expand Down Expand Up @@ -281,7 +281,7 @@ func (e *Executor) GetLogStream(ctx context.Context, request executor.LogStreamR
chExit <- struct{}{}
}

return nil, fmt.Errorf("getting outputs for execution (%s): %w", request.ExecutionID, executor.ErrNotFound)
return nil, executor.NewExecutorError(executor.ExecutionNotFound, fmt.Sprintf("getting outputs for execution (%s)", request.ExecutionID))
}

// Run initiates and waits for the completion of an execution in one call.
Expand Down
1 change: 0 additions & 1 deletion pkg/executor/docker/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,6 @@ func (s *ExecutorTestSuite) TestDockerExecutionCancellation() {
s.Require().Failf("Executor run should have returned a result, but instead returned err: %w", err.Error())
case result := <-resultC:
s.Require().NotNil(result)
s.Require().Equal(executor.ErrAlreadyCancelled.Error(), result.ErrorMsg)
}
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/executor/noop/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package noop

import (
"github.com/bacalhau-project/bacalhau/pkg/models"
)

const NoopExecutorComponent = "Executor/Noop"

func NewNoopExecutorError(code models.ErrorCode, message string) *models.BaseError {
return models.NewBaseError(message).
WithCode(code).
WithComponent(NoopExecutorComponent)
}
2 changes: 1 addition & 1 deletion pkg/executor/noop/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (e *NoopExecutor) Wait(ctx context.Context, executionID string) (<-chan *mo
errC := make(chan error, 1)

if !found {
errC <- fmt.Errorf("waiting on execution (%s): %w", executionID, executor.ErrNotFound)
errC <- NewNoopExecutorError(executor.ExecutionNotFound, fmt.Sprintf("waiting on execution (%s)", executionID))
return resultC, errC
}

Expand Down
Loading

0 comments on commit b125085

Please sign in to comment.