From 7b62927f4f084fa59e22159b5c9887c4bb6ec24c Mon Sep 17 00:00:00 2001 From: messense Date: Mon, 2 Dec 2019 18:28:13 +0800 Subject: [PATCH 1/4] Add member assignments parsing --- Cargo.toml | 4 ++-- src/error.rs | 8 +++++++ src/groups.rs | 53 +++++++++++++++++++++++++++++++++++++++-------- src/lib.rs | 2 -- src/statistics.rs | 2 ++ src/util.rs | 12 +++++++++++ 6 files changed, 68 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d04cda20a..b8e822ef0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,9 +15,9 @@ rdkafka-sys = { path = "rdkafka-sys", version = "1.2.1", default-features = fals futures = "0.1.21" libc = "0.2.0" log = "0.4.8" -serde = "1.0.0" -serde_derive = "1.0.0" +serde = { version = "1.0.0", features = ["derive"] } serde_json = "1.0.0" +byteorder = "1.3.2" [dev-dependencies] backoff = "0.1.5" diff --git a/src/error.rs b/src/error.rs index c18a3908c..980001c33 100644 --- a/src/error.rs +++ b/src/error.rs @@ -54,6 +54,8 @@ pub enum KafkaError { MessageProduction(RDKafkaError), /// Metadata fetch error. MetadataFetch(RDKafkaError), + /// Member assignment parsing failed + MemberAssignment(String), /// No message was received. NoMessageReceived, /// Unexpected null pointer @@ -106,6 +108,9 @@ impl fmt::Debug for KafkaError { KafkaError::MetadataFetch(err) => { write!(f, "KafkaError (Metadata fetch error: {})", err) } + KafkaError::MemberAssignment(ref err) => { + write!(f, "KafkaError (Member assignment parsing error: {})", err) + } KafkaError::NoMessageReceived => { write!(f, "No message received within the given poll interval") } @@ -145,6 +150,7 @@ impl fmt::Display for KafkaError { KafkaError::MessageConsumption(err) => write!(f, "Message consumption error: {}", err), KafkaError::MessageProduction(err) => write!(f, "Message production error: {}", err), KafkaError::MetadataFetch(err) => write!(f, "Meta data fetch error: {}", err), + KafkaError::MemberAssignment(ref err) => write!(f, "Member assignment parsing error: {}", err), KafkaError::NoMessageReceived => { write!(f, "No message received within the given poll interval") } @@ -174,6 +180,7 @@ impl error::Error for KafkaError { KafkaError::MessageConsumption(_) => "Message consumption error", KafkaError::MessageProduction(_) => "Message production error", KafkaError::MetadataFetch(_) => "Meta data fetch error", + KafkaError::MemberAssignment(_) => "Member assignment parsing error", KafkaError::NoMessageReceived => "No message received within the given poll interval", KafkaError::Nul(_) => "FFI nul error", KafkaError::OffsetFetch(_) => "Offset fetch error", @@ -200,6 +207,7 @@ impl error::Error for KafkaError { KafkaError::MessageConsumption(ref err) => Some(err), KafkaError::MessageProduction(ref err) => Some(err), KafkaError::MetadataFetch(ref err) => Some(err), + KafkaError::MemberAssignment(_) => None, KafkaError::NoMessageReceived => None, KafkaError::Nul(_) => None, KafkaError::OffsetFetch(ref err) => Some(err), diff --git a/src/groups.rs b/src/groups.rs index 041dd3688..ab4aee239 100644 --- a/src/groups.rs +++ b/src/groups.rs @@ -1,10 +1,25 @@ //! Group membership API. use crate::rdsys; use crate::rdsys::types::*; +use crate::error::{KafkaResult, KafkaError}; +use crate::util::read_str; use std::ffi::CStr; use std::fmt; use std::slice; +use std::io::Cursor; + +use byteorder::{BigEndian, ReadBytesExt}; +use serde::{Serialize, Deserialize}; + +/// Group member assignment +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct MemberAssignment { + /// Kafka topic name + pub topic: String, + /// Assigned partitions + pub partitions: Vec, +} /// Group member information container. pub struct GroupMemberInfo(RDKafkaGroupMemberInfo); @@ -52,17 +67,37 @@ impl GroupMemberInfo { } /// Return the assignment of the member - pub fn assignment(&self) -> Option<&[u8]> { - unsafe { - if self.0.member_assignment.is_null() { - None - } else { - Some(slice::from_raw_parts::( - self.0.member_assignment as *const u8, - self.0.member_assignment_size as usize, - )) + pub fn assignment(&self) -> KafkaResult> { + if self.0.member_assignment.is_null() { + return Ok(Vec::new()); + } + let payload = unsafe { + slice::from_raw_parts::( + self.0.member_assignment as *const u8, + self.0.member_assignment_size as usize, + ) + }; + let mut cursor = Cursor::new(payload); + let _version = cursor.read_i16::() + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))?; + let assign_len = cursor.read_i32::() + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))?; + let mut assigns = Vec::with_capacity(assign_len as usize); + for _ in 0..assign_len { + let topic = read_str(&mut cursor) + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))? + .to_string(); + let partition_len = cursor.read_i32::() + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))?; + let mut partitions = Vec::with_capacity(partition_len as usize); + for _ in 0..partition_len { + let partition = cursor.read_i32::() + .map_err(|e| KafkaError::MemberAssignment(format!("{}", e)))?; + partitions.push(partition); } + assigns.push(MemberAssignment { topic, partitions }) } + Ok(assigns) } } diff --git a/src/lib.rs b/src/lib.rs index b7c67a41b..52ab633a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -217,8 +217,6 @@ #[macro_use] extern crate log; -#[macro_use] -extern crate serde_derive; extern crate futures; extern crate serde_json; diff --git a/src/statistics.rs b/src/statistics.rs index 9b101d148..a641acd0a 100644 --- a/src/statistics.rs +++ b/src/statistics.rs @@ -3,6 +3,8 @@ use std::collections::HashMap; +use serde::Deserialize; + /// Statistics from librdkafka. Refer to the [librdkafka documentation](https://github.com/edenhill/librdkafka/wiki/Statistics) /// for details. #[derive(Deserialize, Debug)] diff --git a/src/util.rs b/src/util.rs index 716817ebf..2016c587f 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,14 +1,18 @@ //! Utility functions use crate::rdsys; +use std::io::{Cursor, BufRead}; use std::ffi::CStr; use std::os::raw::c_char; use std::os::raw::c_void; use std::ptr; use std::slice; +use std::str; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use byteorder::{BigEndian, ReadBytesExt}; + /// Return a tuple representing the version of `librdkafka` in /// hexadecimal and string format. pub fn get_rdkafka_version() -> (u16, String) { @@ -214,6 +218,14 @@ impl AsCArray for Vec { } } +pub(crate) fn read_str<'a>(rdr: &'a mut Cursor<&[u8]>) -> Result<&'a str, Box> { + let len = (rdr.read_i16::())? as usize; + let pos = rdr.position() as usize; + let slice = str::from_utf8(&rdr.get_ref()[pos..(pos + len)])?; + rdr.consume(len); + Ok(slice) +} + #[cfg(test)] mod tests { use super::*; From cfa1e38af5fb024410c3bd811a388b5119e6a785 Mon Sep 17 00:00:00 2001 From: Ian Date: Sun, 28 May 2023 20:01:15 +0200 Subject: [PATCH 2/4] test: expand test_group_membership test to check on member assignment --- tests/test_metadata.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_metadata.rs b/tests/test_metadata.rs index 2a5cee53a..e04df7e18 100644 --- a/tests/test_metadata.rs +++ b/tests/test_metadata.rs @@ -8,6 +8,7 @@ use futures::*; use rdkafka::config::ClientConfig; use rdkafka::consumer::{Consumer, StreamConsumer}; +use rdkafka::groups::MemberAssignment; use rdkafka::topic_partition_list::TopicPartitionList; use std::time::Duration; @@ -160,4 +161,15 @@ fn test_group_membership() { consumer_member.client_id(), "rdkafka_integration_test_client" ); + + assert_eq!(consumer_member.assignment().unwrap().len(), 1); + + let assignment_topic = &consumer_member.assignment().unwrap()[0]; + assert_eq!( + assignment_topic, + &MemberAssignment { + partitions: vec![0, 1, 2], + topic: topic_name + } + ); } From 79cb1500894340e7bfed06477306f787bf679fb7 Mon Sep 17 00:00:00 2001 From: Ian Date: Sun, 28 May 2023 22:13:40 +0200 Subject: [PATCH 3/4] cargo lock --- Cargo.lock | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 75212b7c1..b93361ba3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,10 +1070,11 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.30.0" +version = "0.31.0" dependencies = [ "async-std", "backoff", + "byteorder", "chrono", "clap", "env_logger", From ec6ce78538e2a0280191f585f40b1c07de173825 Mon Sep 17 00:00:00 2001 From: Ian Date: Sun, 28 May 2023 22:18:11 +0200 Subject: [PATCH 4/4] fix: remove unnecessary imports --- src/groups.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/groups.rs b/src/groups.rs index 25aa98919..7127b81ab 100644 --- a/src/groups.rs +++ b/src/groups.rs @@ -1,9 +1,3 @@ -//! Group membership API. -use crate::rdsys; -use crate::rdsys::types::*; -use crate::error::{KafkaResult, KafkaError}; -use crate::util::read_str; - use std::ffi::CStr; use std::fmt; use std::slice; @@ -24,6 +18,9 @@ pub struct MemberAssignment { use rdkafka_sys as rdsys; use rdkafka_sys::types::*; +use crate::error::KafkaError; +use crate::error::KafkaResult; +use crate::util::read_str; use crate::util::{KafkaDrop, NativePtr}; /// Group member information container.