diff --git a/pkg/client/client.go b/pkg/client/client.go index f43f6f6..833bd7a 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -166,8 +166,8 @@ func (c *Client) readLoop(ctx context.Context) error { if c.decoder != nil && c.config.Compress { m, err := c.decoder.DecodeAll(msg, nil) if err != nil { - c.logger.Error("failed to decompress message", "error", err) - return fmt.Errorf("failed to decompress message: %w", err) + c.logger.Error("failed to decompress message (ignoring)", "error", err) + continue } msg = m } @@ -175,8 +175,8 @@ func (c *Client) readLoop(ctx context.Context) error { // Unpack the message and pass it to the handler var event models.Event if err := json.Unmarshal(msg, &event); err != nil { - c.logger.Error("failed to unmarshal event", "error", err) - return fmt.Errorf("failed to unmarshal event: %w", err) + c.logger.Error("failed to unmarshal event (ignoring)", "error", err, msg) + continue } if err := c.Scheduler.AddWork(ctx, event.Did, &event); err != nil { diff --git a/pkg/client/schedulers/sequential/sequential.go b/pkg/client/schedulers/sequential/sequential.go index dd16d73..3bba726 100644 --- a/pkg/client/schedulers/sequential/sequential.go +++ b/pkg/client/schedulers/sequential/sequential.go @@ -49,7 +49,9 @@ func (p *Scheduler) Shutdown() { func (s *Scheduler) AddWork(ctx context.Context, repo string, val *models.Event) error { s.itemsAdded.Inc() s.itemsActive.Inc() - err := s.handleEvent(ctx, val) + if err := s.handleEvent(ctx, val); err != nil { + s.logger.Error("event handler failed", "error", err) + } s.itemsProcessed.Inc() - return err + return nil }