From a12f505de73bdd1f8fd5bf63f2f46ac5a313d2c2 Mon Sep 17 00:00:00 2001 From: phil Date: Sun, 17 Nov 2024 23:39:19 -0500 Subject: [PATCH] Ignore unprocessable events + only log handler fails The client can't assume a jetstream server will always produce valid messages, or that it won't forward evil messages that break parsing [like a JSON object with 10,000 levels nested `[[[[[[]]]]]]`](https://github.com/bluesky-social/jetstream/issues/24). This change makes the reader continue after encountering any messages that can't be decompressed or unmarshalled into JSON -- they will be logged as errors but the client will continue. The handler will not be called on these events. The sequential scheduler is also modified to no longer quit with error if its handler func returns an error -- it just logs. This matches the behaviour of the parallel scheduler. Maybe different behaviour is fine, I guess it's to taste. --- pkg/client/client.go | 8 ++++---- pkg/client/schedulers/sequential/sequential.go | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/client/client.go b/pkg/client/client.go index f43f6f6..833bd7a 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -166,8 +166,8 @@ func (c *Client) readLoop(ctx context.Context) error { if c.decoder != nil && c.config.Compress { m, err := c.decoder.DecodeAll(msg, nil) if err != nil { - c.logger.Error("failed to decompress message", "error", err) - return fmt.Errorf("failed to decompress message: %w", err) + c.logger.Error("failed to decompress message (ignoring)", "error", err) + continue } msg = m } @@ -175,8 +175,8 @@ func (c *Client) readLoop(ctx context.Context) error { // Unpack the message and pass it to the handler var event models.Event if err := json.Unmarshal(msg, &event); err != nil { - c.logger.Error("failed to unmarshal event", "error", err) - return fmt.Errorf("failed to unmarshal event: %w", err) + c.logger.Error("failed to unmarshal event (ignoring)", "error", err, msg) + continue } if err := c.Scheduler.AddWork(ctx, event.Did, &event); err != nil { diff --git a/pkg/client/schedulers/sequential/sequential.go b/pkg/client/schedulers/sequential/sequential.go index dd16d73..3bba726 100644 --- a/pkg/client/schedulers/sequential/sequential.go +++ b/pkg/client/schedulers/sequential/sequential.go @@ -49,7 +49,9 @@ func (p *Scheduler) Shutdown() { func (s *Scheduler) AddWork(ctx context.Context, repo string, val *models.Event) error { s.itemsAdded.Inc() s.itemsActive.Inc() - err := s.handleEvent(ctx, val) + if err := s.handleEvent(ctx, val); err != nil { + s.logger.Error("event handler failed", "error", err) + } s.itemsProcessed.Inc() - return err + return nil }