Skip to content

Commit

Permalink
feat(new sink): Keep
Browse files Browse the repository at this point in the history
  • Loading branch information
sainad2222 committed Jan 11, 2025
1 parent 269e862 commit 5f874b2
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ sinks-logs = [
"sinks-humio",
"sinks-influxdb",
"sinks-kafka",
"sinks-keep",
"sinks-loki",
"sinks-mezmo",
"sinks-mqtt",
Expand Down Expand Up @@ -780,6 +781,7 @@ sinks-http = []
sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"]
sinks-influxdb = []
sinks-kafka = ["dep:rdkafka"]
sinks-keep = []
sinks-mezmo = []
sinks-loki = ["loki-logproto"]
sinks-mqtt = ["dep:rumqttc"]
Expand Down
175 changes: 175 additions & 0 deletions src/sinks/keep/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
//! Configuration for the `keep` sink.
use bytes::Bytes;
use futures::FutureExt;
use http::{Request, StatusCode, Uri};
use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;
use vrl::value::Kind;

use crate::{
http::HttpClient,
sinks::{
prelude::*,
util::{
http::{http_response_retry_logic, HttpService},
BatchConfig, BoxedRawValue,
},
},
};

use super::{
encoder::KeepEncoder, request_builder::KeepRequestBuilder, service::KeepSvcRequestBuilder,
sink::KeepSink,
};

pub(super) const HTTP_HEADER_KEEP_API_KEY: &str = "x-api-key";

/// Configuration for the `keep` sink.
#[configurable_component(sink("keep", "Deliver log events to Keep."))]
#[derive(Clone, Debug)]
pub struct KeepConfig {
/// Keep's endpoint to send logs to
#[serde(default = "default_endpoint")]
#[configurable(metadata(
docs::examples = "https://backend.keep.com:8081/alerts/event/vectordev?provider_id=test",
))]
#[configurable(validation(format = "uri"))]
pub(super) endpoint: String,

/// The API key that is used to authenticate against Keep.
#[configurable(metadata(docs::examples = "${KEEP_API_KEY}"))]
#[configurable(metadata(docs::examples = "keepappkey"))]
api_key: SensitiveString,

#[configurable(derived)]
#[serde(default)]
batch: BatchConfig<KeepDefaultBatchSettings>,

#[configurable(derived)]
#[serde(default)]
request: TowerRequestConfig,

#[configurable(derived)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
encoding: Transformer,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,
}

fn default_endpoint() -> String {
"http://localhost:8080/alerts/event/vectordev?provider_id=test".to_string()
}

#[derive(Clone, Copy, Debug, Default)]
struct KeepDefaultBatchSettings;

impl SinkBatchSettings for KeepDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = None;
const MAX_BYTES: Option<usize> = Some(100_000);
const TIMEOUT_SECS: f64 = 1.0;
}

impl GenerateConfig for KeepConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"api_key = "${KEEP_API_KEY}"
"#,
)
.unwrap()
}
}

#[async_trait::async_trait]
#[typetag::serde(name = "keep")]
impl SinkConfig for KeepConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let batch_settings = self.batch.validate()?.into_batcher_settings()?;

let request_builder = KeepRequestBuilder {
encoder: KeepEncoder {
transformer: self.encoding.clone(),
},
// TODO: add compression support
compression: Compression::None,
};

let uri: Uri = self.endpoint.clone().try_into()?;
let keep_service_request_builder = KeepSvcRequestBuilder {
uri: uri.clone(),
api_key: self.api_key.clone(),
};

let client = HttpClient::new(None, cx.proxy())?;

let service = HttpService::new(client.clone(), keep_service_request_builder);

let request_limits = self.request.into_settings();

let service = ServiceBuilder::new()
.settings(request_limits, http_response_retry_logic())
.service(service);

let sink = KeepSink::new(service, batch_settings, request_builder);

let healthcheck = healthcheck(uri, self.api_key.clone(), client).boxed();

Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}

fn input(&self) -> Input {
let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());

Input::log().with_schema_requirement(requirement)
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}

async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> crate::Result<()> {
let request = Request::post(uri).header(HTTP_HEADER_KEEP_API_KEY, api_key.inner());
let body = crate::serde::json::to_bytes(&Vec::<BoxedRawValue>::new())
.unwrap()
.freeze();
let req: Request<Bytes> = request.body(body)?;
let req = req.map(hyper::Body::from);

let res = client.send(req).await?;

let status = res.status();
let body = hyper::body::to_bytes(res.into_body()).await?;

match status {
StatusCode::OK => Ok(()), // Healthcheck passed
StatusCode::BAD_REQUEST => Ok(()), // Healthcheck failed due to client error but is still considered valid
StatusCode::ACCEPTED => Ok(()), // Consider healthcheck passed if server accepted request
StatusCode::UNAUTHORIZED => {
// Handle unauthorized errors
let json: serde_json::Value = serde_json::from_slice(&body[..])?;
let message = json
.as_object()
.and_then(|o| o.get("error"))
.and_then(|s| s.as_str())
.unwrap_or("Token is not valid, 401 returned.")
.to_string();
Err(message.into())
}
_ => {
// Handle other unexpected statuses
let body = String::from_utf8_lossy(&body[..]);
Err(format!(
"Server returned unexpected error status: {} body: {}",
status, body
)
.into())
}
}
}
50 changes: 50 additions & 0 deletions src/sinks/keep/encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! Encoding for the `keep` sink.
use bytes::Bytes;
use serde_json::{json, to_vec};
use std::io;

use crate::sinks::{
prelude::*,
util::encoding::{write_all, Encoder as SinkEncoder},
};

pub(super) struct KeepEncoder {
pub(super) transformer: Transformer,
}

impl SinkEncoder<Vec<Event>> for KeepEncoder {
fn encode_input(
&self,
events: Vec<Event>,
writer: &mut dyn io::Write,
) -> io::Result<(usize, GroupedCountByteSize)> {
let mut byte_size = telemetry().create_request_count_byte_size();
let n_events = events.len();
let mut json_events: Vec<serde_json::Value> = Vec::with_capacity(n_events);

for mut event in events {
self.transformer.transform(&mut event);

byte_size.add_event(&event, event.estimated_json_encoded_size_of());

let mut data = json!(event.as_log());
if let Some(message) = data.get("message") {
if let Some(message_str) = message.as_str() {
// Parse the JSON string in `message`
let parsed_message: serde_json::Value = serde_json::from_str(message_str)?;

// Reassign the parsed JSON back to `message`
data["message"] = parsed_message;
}
}
data["keep_source_type"] = json!(event.source_id());

json_events.push(data);
}

let body = Bytes::from(to_vec(&serde_json::Value::Array(json_events))?);

write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size))
}
}
10 changes: 10 additions & 0 deletions src/sinks/keep/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//! The Keep [`vector_lib::sink::VectorSink`].
//!
//! This module contains the [`vector_lib::sink::VectorSink`] instance that is responsible for
//! taking a stream of [`vector_lib::event::Event`]s and forwarding them to the Keep service.
mod config;
mod encoder;
mod request_builder;
mod service;
mod sink;
48 changes: 48 additions & 0 deletions src/sinks/keep/request_builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//! `RequestBuilder` implementation for the `keep` sink.
use bytes::Bytes;
use std::io;

use crate::sinks::{prelude::*, util::http::HttpRequest};

use super::encoder::KeepEncoder;

pub(super) struct KeepRequestBuilder {
pub(super) encoder: KeepEncoder,
pub(super) compression: Compression,
}

impl RequestBuilder<Vec<Event>> for KeepRequestBuilder {
type Metadata = EventFinalizers;
type Events = Vec<Event>;
type Encoder = KeepEncoder;
type Payload = Bytes;
type Request = HttpRequest<()>;
type Error = io::Error;

fn compression(&self) -> Compression {
self.compression
}

fn encoder(&self) -> &Self::Encoder {
&self.encoder
}

fn split_input(
&self,
mut events: Vec<Event>,
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let finalizers = events.take_finalizers();
let builder = RequestMetadataBuilder::from_events(&events);
(finalizers, builder, events)
}

fn build_request(
&self,
metadata: Self::Metadata,
request_metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
HttpRequest::new(payload.into_payload(), metadata, request_metadata, ())
}
}
33 changes: 33 additions & 0 deletions src/sinks/keep/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//! Service implementation for the `keep` sink.
use bytes::Bytes;
use http::{Request, Uri};
use vector_lib::sensitive_string::SensitiveString;

use crate::sinks::{
util::http::{HttpRequest, HttpServiceRequestBuilder},
HTTPRequestBuilderSnafu,
};
use snafu::ResultExt;

use super::config::HTTP_HEADER_KEEP_API_KEY;

#[derive(Debug, Clone)]
pub(super) struct KeepSvcRequestBuilder {
pub(super) uri: Uri,
pub(super) api_key: SensitiveString,
}

impl HttpServiceRequestBuilder<()> for KeepSvcRequestBuilder {
fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
let builder =
Request::post(&self.uri).header(HTTP_HEADER_KEEP_API_KEY, self.api_key.inner());

let builder = builder.header("Content-Type".to_string(), "application/json".to_string());

builder
.body(request.take_payload())
.context(HTTPRequestBuilderSnafu)
.map_err(Into::into)
}
}
Loading

0 comments on commit 5f874b2

Please sign in to comment.