Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added code to create kafka topic 'TAG_PROP_EVENTS' and push events to… #3978

Open
wants to merge 1 commit into
base: beta
Choose a base branch
from

Conversation

abhijeet-atlan
Copy link

added better config to make sure messages are emitted to the right topic

removed 'assetGuid' from kafka message to remove duplication

Refactor: Wrap ApplicationProperties.get() and KafkaNotification initialization in try-catch blocks

  • Added exception handling for ApplicationProperties.get() and KafkaNotification initialization.
  • Ensures AtlasException is caught and rethrown as RuntimeException for better runtime error propagation.

Refactor: Initialize and in constructor with exception handling

  • Moved initialization of and () into the constructor.
  • Added block to handle during initialization.
  • Ensures any exceptions are wrapped and propagated as for consistency.

Refactor: Initialize 'configuration' and 'kfknotif' in 'EntityGraphMapper' constructor with exception handling

  • Moved initialization of 'configuration' and 'KafkaNotification' ('kfknotif') into the constructor.
  • Added 'try-catch' block to handle 'AtlasException' during initialization.
  • Ensures any exceptions are wrapped and propagated as 'RuntimeException' for consistency.

Implement Enhanced Kafka Topic Management and Partitioned Messaging in Apache Atlas

This commit introduces comprehensive enhancements to Apache Atlas's Kafka integration, focusing on dynamic topic management, partition-specific messaging, and configuration improvements to support scalability and efficient data distribution across Kafka topics.

Detailed Changes:

  1. Dynamic Topic Creation:

    • Introduced a new method createTopics in the KafkaUtils class. This method facilitates the creation of Kafka topics with configurable numbers of partitions and replication factors, derived from a list of topic details. This allows for more granular control over topic configurations directly from the application layer.
    • Added debug logging to provide detailed traceability for topic creation operations, enhancing monitoring and troubleshooting capabilities.
  2. Configuration Management Enhancements:

    • Modified atlas_config.py to automatically include the new TAG_PROP_EVENTS topic in the list of topics initialized at startup, ensuring that this topic is available for event propagation without manual configuration.
    • Introduced a new configuration key NOTIFICATION_PROPAGATION_TOPIC_NAME in AtlasConfiguration.java, standardizing the topic name across the codebase and reducing the risk of hard-coded string errors.
  3. Partition-Specific Messaging Capabilities:

    • Extended the KafkaNotification and AbstractNotification classes to include methods that support sending messages to specific partitions. This feature is critical for directing messages to particular segments of a topic, thereby optimizing the workload distribution and message consumption based on topic partitioning.
    • Implemented partition calculation in EntityGraphMapper using Guava's consistent hashing algorithm. This calculation uses the SHA-256 hash of a parent task's GUID to determine the partition, ensuring that related messages are co-located in the same partition for improved processing efficiency.
  4. Dependency Updates:

    • Added the Google Guava library to the project dependencies to utilize its robust hashing functions, which are essential for implementing consistent hashing for partition determination.
  5. Error Handling and Logging Improvements:

    • Enhanced error handling in the topic creation process to re-throw exceptions, allowing calling methods to handle these exceptions according to the application's error management policies.
    • Improved logging statements to include detailed error messages and contextual information, aiding in faster diagnosis and resolution of issues related to Kafka messaging.

This push has changes related to Kafka topic partitioning and configuration enhancements. Details are below:
1. Introduced a constant TAG_PROP_EVENTS_PARTITION_COUNT for Kafka partition value management.
2. Updated AtlasTopicCreator to use configurable partition counts for topics dynamically.
3. Refactored EntityGraphMapper to use the new constant for partition count initialization.

added code to send kafka message to topic 'TAG_PROP_EVENTS'

Change description

Description here

Type of change

  • Bug fix (fixes an issue)
  • New feature (adds functionality)

Related issues

Fix #1

Checklists

Development

  • Lint rules pass locally
  • Application changes have been tested thoroughly
  • Automated tests covering modified code pass

Security

  • Security impact of change has been considered
  • Code follows company security practices and guidelines

Code review

  • Pull request has a descriptive title and context useful to a reviewer. Screenshots or screencasts are attached as necessary
  • "Ready for review" label attached and reviewers assigned
  • Changes have been reviewed by at least one other contributor
  • Pull request linked to task tracker where applicable

… it for add propogation

added better config to make sure messages are emitted to the right topic

removed 'assetGuid' from kafka message to remove duplication

Refactor: Wrap `ApplicationProperties.get()` and `KafkaNotification` initialization in try-catch blocks

- Added exception handling for `ApplicationProperties.get()` and `KafkaNotification` initialization.
- Ensures `AtlasException` is caught and rethrown as `RuntimeException` for better runtime error propagation.

Refactor: Initialize  and  in  constructor with exception handling

- Moved initialization of  and  () into the constructor.
- Added  block to handle  during initialization.
- Ensures any exceptions are wrapped and propagated as  for consistency.

Refactor: Initialize 'configuration' and 'kfknotif' in 'EntityGraphMapper' constructor with exception handling

- Moved initialization of 'configuration' and 'KafkaNotification' ('kfknotif') into the constructor.
- Added 'try-catch' block to handle 'AtlasException' during initialization.
- Ensures any exceptions are wrapped and propagated as 'RuntimeException' for consistency.

Implement Enhanced Kafka Topic Management and Partitioned Messaging in Apache Atlas

This commit introduces comprehensive enhancements to Apache Atlas's Kafka integration, focusing on dynamic topic management, partition-specific messaging, and configuration improvements to support scalability and efficient data distribution across Kafka topics.

Detailed Changes:

1. **Dynamic Topic Creation**:
   - Introduced a new method `createTopics` in the `KafkaUtils` class. This method facilitates the creation of Kafka topics with configurable numbers of partitions and replication factors, derived from a list of topic details. This allows for more granular control over topic configurations directly from the application layer.
   - Added debug logging to provide detailed traceability for topic creation operations, enhancing monitoring and troubleshooting capabilities.

2. **Configuration Management Enhancements**:
   - Modified `atlas_config.py` to automatically include the new `TAG_PROP_EVENTS` topic in the list of topics initialized at startup, ensuring that this topic is available for event propagation without manual configuration.
   - Introduced a new configuration key `NOTIFICATION_PROPAGATION_TOPIC_NAME` in `AtlasConfiguration.java`, standardizing the topic name across the codebase and reducing the risk of hard-coded string errors.

3. **Partition-Specific Messaging Capabilities**:
   - Extended the `KafkaNotification` and `AbstractNotification` classes to include methods that support sending messages to specific partitions. This feature is critical for directing messages to particular segments of a topic, thereby optimizing the workload distribution and message consumption based on topic partitioning.
   - Implemented partition calculation in `EntityGraphMapper` using Guava's consistent hashing algorithm. This calculation uses the SHA-256 hash of a parent task's GUID to determine the partition, ensuring that related messages are co-located in the same partition for improved processing efficiency.

4. **Dependency Updates**:
   - Added the Google Guava library to the project dependencies to utilize its robust hashing functions, which are essential for implementing consistent hashing for partition determination.

5. **Error Handling and Logging Improvements**:
   - Enhanced error handling in the topic creation process to re-throw exceptions, allowing calling methods to handle these exceptions according to the application's error management policies.
   - Improved logging statements to include detailed error messages and contextual information, aiding in faster diagnosis and resolution of issues related to Kafka messaging.

These enhancements aim to bolster Apache Atlas's capabilities in handling high-throughput Kafka topics like `TAG_PROP_EVENTS` by introducing advanced topic management features and partition-specific messaging. This aligns with the needs of large-scale, distributed environments where efficient data handling and scalability are paramount.

This push has changes related to Kafka topic partitioning and configuration enhancements. Details are below:
	1.	Introduced a constant TAG_PROP_EVENTS_PARTITION_COUNT for Kafka partition value management.
	2.	Updated AtlasTopicCreator to use configurable partition counts for topics dynamically.
	3.	Refactored EntityGraphMapper to use the new constant for partition count initialization.

added code to send kafka message to topic 'TAG_PROP_EVENTS'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant