-
Notifications
You must be signed in to change notification settings - Fork 847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: implement change stream reader from GCP SpannerChangeStream #3066
base: main
Are you sure you want to change the base?
Conversation
func newSpannerChangeStreamInputConfig() *service.ConfigSpec { | ||
return service.NewConfigSpec(). | ||
Beta(). | ||
Version("3.43.0"). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very aware this is most likely not correct...
Field(service.NewIntField("start_time_epoch").Advanced().Optional().Default(0).Description("Optional microsecond accurate epoch timestamp to start reading from. If empty time.Now() will be used.")). | ||
Field(service.NewStringField("partition_dsn").Optional().Description("Field used to set the DSN for the metadata partition table, can be the same as stream_dsn.").Example("projects/<project_id>/instances/<instance_id>/databases/<database_id>")). | ||
Field(service.NewStringField("partition_table").Optional().Description("Name of the table to create/use in spanner to track change stream partition metadata.")). | ||
Field(service.NewBoolField("use_in_mememory_partition").Description("use an in memory partition table for tracking the partitions.").Default(false)). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: use_in_mememory_partition
Version("3.43.0"). | ||
Categories("Services", "GCP"). | ||
Summary("Creates an input that consumes from a spanner change stream."). | ||
Field(service.NewStringField("stream_dsn").Description("Required field to use to connect to spanner for the change stream.").Example("projects/<project_id>/instances/<instance_id>/databases/<database_id>")). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we are now creating the fields as constants so it's harder to mess them up.
|
||
func init() { | ||
err := service.RegisterInput( | ||
"gcp_spanner_change_stream", newSpannerChangeStreamInputConfig(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"gcp_spanner_change_stream", newSpannerChangeStreamInputConfig(), | |
"gcp_spanner_cdc", newSpannerChangeStreamInputConfig(), |
if rerr := i.reader.Stream(jobctx, i.changeChannel); rerr != nil { | ||
i.log.Errorf("Subscription error: %v\n", rerr) | ||
close(i.changeChannel) | ||
panic(rerr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't panic in Connect, please gracefully mark the input as disconnected, then return service.ErrNotConnected
in Read
so that the input is restarted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to see an example of doing this - see the postgres_cdc input.
} | ||
|
||
func (i *spannerStreamInput) Close(_ context.Context) error { | ||
close(i.changeChannel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Multiple different places close this - generally only the writer of a channel should close it.
If someone tries to write on this channel there will be a panic.
@@ -25,6 +26,7 @@ require ( | |||
github.com/Masterminds/squirrel v1.5.4 | |||
github.com/PaesslerAG/gval v1.2.2 | |||
github.com/PaesslerAG/jsonpath v0.1.1 | |||
github.com/anicoll/screamer v0.0.0-20241206035431-9ba919b54dfd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try not to take dependencies on libraries that aren't stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on this comment and the one below.
raised this PR.
#3083
|
||
func (i *spannerStreamInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { | ||
msg := <-i.changeChannel | ||
data, err := json.Marshal(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a struct from an external library - which I don't feel comfortable taking it's model as the same format that we using in our pipeline - the library could make a change and that would be a breaking change in Connect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#3083
Does this make a difference?
No description provided.