Skip to content

Commit

Permalink
Merge pull request #3002 from redpanda-data/snow-polish
Browse files Browse the repository at this point in the history
snowflake: improve timestamp error message
  • Loading branch information
rockwotj authored Nov 13, 2024
2 parents e4f8636 + d4994a4 commit 499953f
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
9 changes: 8 additions & 1 deletion internal/impl/snowflake/output_snowflake_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (o *snowflakeStreamerOutput) WriteBatchInternal(ctx context.Context, batch
// Keep around the same channel just in case so we don't keep creating new channels.
o.channelPool.Put(channel)
}
return err
return wrapInsertError(err)
}
polls, err := channel.WaitUntilCommitted(ctx)
if err == nil {
Expand Down Expand Up @@ -775,3 +775,10 @@ func validateColumnType(v string) error {
}
return fmt.Errorf("invalid Snowflake column data type: %s", v)
}

func wrapInsertError(err error) error {
if errors.Is(err, streaming.InvalidTimestampFormatError{}) {
return fmt.Errorf("%w; if a custom format is required use a `%s` and bloblang functions `ts_parse` or `ts_strftime` to convert a custom format into a timestamp", err, ssoFieldMapping)
}
return err
}
11 changes: 11 additions & 0 deletions internal/impl/snowflake/streaming/schema_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,14 @@ func (e MissingColumnError) Value() any {
func (e MissingColumnError) Error() string {
return fmt.Sprintf("new data %+v with the name %q does not have an associated column", e.val, e.columnName)
}

// InvalidTimestampFormatError is when a timestamp column has a string value not in RFC3339 format.
type InvalidTimestampFormatError struct {
columnType string
val string
}

// Error implements the error interface
func (e InvalidTimestampFormatError) Error() string {
return fmt.Sprintf("unable to parse %s value from %q - string time values must be in RFC 3339 format", e.columnType, e.val)
}
8 changes: 7 additions & 1 deletion internal/impl/snowflake/streaming/userdata_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (c timestampConverter) ValidateAndConvert(stats *statsBuffer, val any, buf
location := c.defaultTZ
t, err = time.ParseInLocation(time.RFC3339Nano, s, location)
if err != nil {
return fmt.Errorf("unable to parse timestamp value from %q", s)
return InvalidTimestampFormatError{"timestamp", s}
}
}
if c.trimTZ {
Expand Down Expand Up @@ -404,6 +404,9 @@ func (c timeConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed
}
t, err := bloblang.ValueAsTimestamp(val)
if err != nil {
if s, ok := val.(string); ok {
return InvalidTimestampFormatError{"time", s}
}
return err
}
t = t.In(time.UTC)
Expand Down Expand Up @@ -433,6 +436,9 @@ func (c dateConverter) ValidateAndConvert(stats *statsBuffer, val any, buf typed
}
t, err := bloblang.ValueAsTimestamp(val)
if err != nil {
if s, ok := val.(string); ok {
return InvalidTimestampFormatError{"date", s}
}
return err
}
t = t.UTC()
Expand Down

0 comments on commit 499953f

Please sign in to comment.