Skip to content

Conversation

aws-nageshvh
Copy link

Purpose of the change

[FLINK-35989][Connectors/AWS] Log errors on partially failed requests for AWS Kinesis Stream sink

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

Copy link

boring-cyborg bot commented Apr 16, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

KinesisAsyncClient get();

/** Closes any resources held by this provider. */
void close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the interface extend Closeable? Doing so allows for try-with-resources.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Updated the PR

streamArn,
kinesisClientProperties,
states,
null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have an anonymous function implementation of client provider here which calls buildClient() instead of providing null and handling null in the other constructor method? This will allow for cleaner code in the other constructor by removing the null handling.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up refactoring the way client/clientprovider are injected and passed between SinkWriter and Sink which should simplify this much further. PTAL

try {
kinesisClientProvider.close();
} catch (IOException e) {
throw new RuntimeException("Failed to close the kinesisClientProvider", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific or even generic connector exception which extends RuntimeException that we can use here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found there was a common exception so replaced it with that

Comment on lines 402 to 403
summary.getCount(),
summary.getExampleMessage())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be implemented as ErrorSummary.toString(). Then, instead of using StringBuilder to construct string, calling toString() on the hashmap should produce a readable string. This way, we'd have less code to maintain.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. was in two minds because the format looked off but ended up changing that for better encapsulation


// Using a single WARN log with aggregated information provides operational
// visibility into errors without flooding logs in high-throughput scenarios
LOG.warn("KDS Sink failed to write, " + errorSummary.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use full class name here for searchability/debug-ability. e.g. KinesisStreamsSinkWriter failed to write records: ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally didn't change it and kept the same since it was being used in multiple places. I don't think we need the full class name since it would show up in Logger anyway. I have expanded KDS into Kinesis Data Stream so it's more readable

@aws-nageshvh
Copy link
Author

Rebased from main and resolved all conflicts

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants