Skip to content

Commit

Permalink
fix(vectordotdev#22007): add content encoding header when compression…
Browse files Browse the repository at this point in the history
… is enabled
  • Loading branch information
10x-mattsearle authored and ChocPanda committed Dec 13, 2024
1 parent 7e93489 commit 0a70104
Showing 1 changed file with 35 additions and 12 deletions.
47 changes: 35 additions & 12 deletions src/sinks/gcp_chronicle/chronicle_unstructured.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use bytes::{Bytes, BytesMut};

use futures_util::{future::BoxFuture, task::Poll};
use goauth::scopes::Scope;
use http::{header::HeaderValue, Request, StatusCode, Uri};
use http::header::{self, HeaderName, HeaderValue};
use http::{Request, StatusCode, Uri};
use hyper::Body;
use indoc::indoc;
use serde::Serialize;
Expand Down Expand Up @@ -318,6 +319,7 @@ impl ChronicleUnstructuredConfig {
pub struct ChronicleRequest {
pub body: Bytes,
pub finalizers: EventFinalizers,
pub headers: HashMap<HeaderName, HeaderValue>,
metadata: RequestMetadata,
}

Expand Down Expand Up @@ -471,7 +473,33 @@ impl RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleRequestBui
metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
ChronicleRequest {
let mut headers = HashMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("application/json"),
);

match payload.compressed_byte_size {
Some(compressed_byte_size) => {
headers.insert(
header::CONTENT_LENGTH,
HeaderValue::from_str(&compressed_byte_size.to_string()).unwrap(),
);
headers.insert(
header::CONTENT_ENCODING,
HeaderValue::from_str(&self.compression.content_encoding().unwrap()).unwrap(),
);
}
None => {
headers.insert(
header::CONTENT_LENGTH,
HeaderValue::from_str(&payload.uncompressed_byte_size.to_string()).unwrap(),
);
}
}

return ChronicleRequest {
headers: headers,
body: payload.into_payload().bytes,
finalizers,
metadata,
Expand Down Expand Up @@ -547,18 +575,13 @@ impl Service<ChronicleRequest> for ChronicleService {

fn call(&mut self, request: ChronicleRequest) -> Self::Future {
let mut builder = Request::post(&self.base_url);
let headers = builder.headers_mut().unwrap();
headers.insert(
"content-type",
HeaderValue::from_str("application/json").unwrap(),
);
headers.insert(
"content-length",
HeaderValue::from_str(&request.body.len().to_string()).unwrap(),
);

let metadata = request.get_metadata().clone();

let headers = builder.headers_mut().unwrap();
for (name, value) in request.headers {
headers.insert(name, value);
}

let mut http_request = builder.body(Body::from(request.body)).unwrap();
self.creds.apply(&mut http_request);

Expand Down

0 comments on commit 0a70104

Please sign in to comment.