Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change log format for runner and operator #179

Open
wants to merge 3 commits into
base: feat/configurable-disk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tokio-console = ["telemetry", "dep:console-subscriber"]
[dependencies]
anyhow.workspace = true
console-subscriber = { workspace = true, optional = true }
clap.workspace = true # technically optional to telemetry but required by the rest of the workspace right now
gethostname = "0.4.2"
opentelemetry-otlp = { workspace = true, optional = true }
opentelemetry = { workspace = true, optional = true }
Expand Down
141 changes: 136 additions & 5 deletions common/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Provides helper functions for initializing telemetry collection and publication.
use std::{convert::Infallible, net::SocketAddr, sync::OnceLock, time::Duration};
use std::{convert::Infallible, net::SocketAddr, str::FromStr, sync::OnceLock, time::Duration};

use anyhow::Result;
use hyper::{
Expand All @@ -19,19 +19,79 @@ use opentelemetry_sdk::{
};
use prometheus::{Encoder, TextEncoder};
use tokio::{sync::oneshot, task::JoinHandle};
use tracing_subscriber::{filter::LevelFilter, prelude::*, EnvFilter, Registry};
use tracing_subscriber::{
filter::LevelFilter,
fmt::{
format::{Compact, DefaultFields, Json, JsonFields, Pretty},
time::SystemTime,
FormatEvent, FormatFields,
},
prelude::*,
EnvFilter, Registry,
};

#[derive(
Clone,
Copy,
Debug,
PartialEq,
Eq,
Hash,
Default,
clap::ValueEnum,
serde::Deserialize,
serde::Serialize,
)]
/// The format to use for logging
#[serde(rename_all = "camelCase")]
pub enum LogFormat {
/// Compact format
SingleLine,
/// Pretty format
#[default]
MultiLine,
/// JSON format
Json,
}

impl FromStr for LogFormat {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self> {
match s {
"singleLine" | "single-line" | "SINGLE_LINE" => Ok(LogFormat::SingleLine),
"multiLine" | "multi-line" | "MULTI_LINE" => Ok(LogFormat::MultiLine),
"json" => Ok(LogFormat::Json),
_ => Err(anyhow::anyhow!("invalid log format: {}", s)),
}
}
}

impl std::fmt::Display for LogFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
// we match the clap enum format to make it easier when passing as an argument
LogFormat::SingleLine => write!(f, "single-line"),
LogFormat::MultiLine => write!(f, "multi-line"),
LogFormat::Json => write!(f, "json"),
}
}
}

// create a new prometheus registry
static PROM_REGISTRY: OnceLock<prometheus::Registry> = OnceLock::new();

/// Initialize tracing
pub async fn init_tracing(otlp_endpoint: Option<String>) -> Result<()> {
pub async fn init_tracing(otlp_endpoint: Option<String>, log_format: LogFormat) -> Result<()> {
//// Setup log filter
//// Default to INFO if no env is specified
let log_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env()?;

let fields_format = FieldsFormat::new(log_format);
let event_format = EventFormat::new(log_format);

// If we have an otlp_endpoint setup export of traces
if let Some(otlp_endpoint) = otlp_endpoint {
let tracer = opentelemetry_otlp::new_pipeline()
Expand Down Expand Up @@ -67,7 +127,8 @@ pub async fn init_tracing(otlp_endpoint: Option<String>) -> Result<()> {
// Setup logging to stdout
let logger = tracing_subscriber::fmt::layer()
.with_ansi(true)
.pretty()
.event_format(event_format)
.fmt_fields(fields_format)
.with_filter(log_filter);

let collector = Registry::default().with(telemetry).with(logger);
Expand All @@ -84,7 +145,8 @@ pub async fn init_tracing(otlp_endpoint: Option<String>) -> Result<()> {
// Setup basic log only tracing
let logger = tracing_subscriber::fmt::layer()
.with_ansi(true)
.pretty()
.event_format(event_format)
.fmt_fields(fields_format)
.with_filter(log_filter);
tracing_subscriber::registry().with(logger).init()
}
Expand Down Expand Up @@ -178,3 +240,72 @@ fn start_metrics_server(addr: &SocketAddr) -> (MetricsServerShutdown, MetricsSer
});
(tx, tokio::spawn(server))
}

// Implement a FormatEvent type that can be configured to one of a set of log formats.
struct EventFormat {
kind: LogFormat,
single: tracing_subscriber::fmt::format::Format<Compact, SystemTime>,
multi: tracing_subscriber::fmt::format::Format<Pretty, SystemTime>,
json: tracing_subscriber::fmt::format::Format<Json, SystemTime>,
}

impl EventFormat {
fn new(kind: LogFormat) -> Self {
Self {
kind,
single: tracing_subscriber::fmt::format().compact(),
multi: tracing_subscriber::fmt::format().pretty(),
json: tracing_subscriber::fmt::format().json(),
}
}
}

impl<S, N> FormatEvent<S, N> for EventFormat
where
S: tracing::Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
N: for<'a> tracing_subscriber::fmt::FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
writer: tracing_subscriber::fmt::format::Writer<'_>,
event: &tracing::Event<'_>,
) -> std::fmt::Result {
match self.kind {
LogFormat::SingleLine => self.single.format_event(ctx, writer, event),
LogFormat::MultiLine => self.multi.format_event(ctx, writer, event),
LogFormat::Json => self.json.format_event(ctx, writer, event),
}
}
}

// Implement a FormatFields type that can be configured to one of a set of log formats.
struct FieldsFormat {
kind: LogFormat,
default_fields: DefaultFields,
json_fields: JsonFields,
}

impl FieldsFormat {
pub fn new(kind: LogFormat) -> Self {
Self {
kind,
default_fields: DefaultFields::new(),
json_fields: JsonFields::new(),
}
}
}

impl<'writer> FormatFields<'writer> for FieldsFormat {
fn format_fields<R: tracing_subscriber::prelude::__tracing_subscriber_field_RecordFields>(
&self,
writer: tracing_subscriber::fmt::format::Writer<'writer>,
fields: R,
) -> std::fmt::Result {
match self.kind {
LogFormat::SingleLine => self.default_fields.format_fields(writer, fields),
LogFormat::MultiLine => self.default_fields.format_fields(writer, fields),
LogFormat::Json => self.json_fields.format_fields(writer, fields),
}
}
}
39 changes: 39 additions & 0 deletions keramik/src/advanced_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,45 @@ and adding the following environment variables to the `spec/template/spec/contai

# Image Resources

## Storage

Nearly all containers (monitoring outstanding), allow configuring the peristent storage size and class. The storage class must be created out of band, but can be included. The storage configuration has two keys (`size` and `class`) and can be used like so:

```yaml
apiVersion: "keramik.3box.io/v1alpha1"
kind: Network
metadata:
name: small
spec:
replicas: 2
bootstrap:
image: keramik/runner:dev
imagePullPolicy: IfNotPresent
cas:
casStorage:
size: "3Gi"
class: "fastDisk" # typically not set
ipfs:
go:
storage:
size: "1Gi"
ganacheStorage:
size: "1Gi"
postgresStorage:
size: "3Gi"
localstackStorage:
size: "5Gi"
ceramic:
- ipfs:
rust:
storage:
size: "3Gi"

```


## Requests / Limits

During local benchmarking, you may not have enough resources to run the cluster. A simple "fix" is to use the `devMode` flag on the network and simulation specs. This will override the resource requests and limits values to be none, which means it doesn't need available resources to deploy, and can consume as much as it desires. This would be problematic in production and should only be used for testing purposes.

```yaml
Expand Down
8 changes: 7 additions & 1 deletion keramik/src/developing_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ Edit `./k8s/operator/manifests/operator.yaml` to use `IfNotPresent` for the `ima
containers:
- name: keramik-operator
image: "keramik/operator"
imagePullPolicy: IfNotPresent
imagePullPolicy: IfNotPresent # Should be IfNotPresent when using imageTag: dev, but Always if using imageTag: latest
command:
- "/usr/bin/keramik-operator"
# you can use json logs like so
# - "--log-format"
# - "json"
- "daemon"
# ...
```

Expand Down
15 changes: 15 additions & 0 deletions operator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,18 @@ pub mod utils;
/// A list of constants used in various K8s resources
#[cfg(feature = "controller")]
const CONTROLLER_NAME: &str = "keramik";

static NETWORK_LOG_FORMAT: std::sync::OnceLock<keramik_common::telemetry::LogFormat> =
std::sync::OnceLock::new();

/// Sets the log format for the network
pub fn set_network_log_format(format: keramik_common::telemetry::LogFormat) {
let _ = NETWORK_LOG_FORMAT.get_or_init(|| format);
}

/// Sets the log format for the network. Not public outside of main
pub(crate) fn network_log_format() -> keramik_common::telemetry::LogFormat {
NETWORK_LOG_FORMAT
.get_or_init(keramik_common::telemetry::LogFormat::default)
.to_owned()
}
14 changes: 12 additions & 2 deletions operator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Operator is a long lived process that auotmates creating and managing Ceramic networks.
#![deny(missing_docs)]
use anyhow::Result;
use clap::{command, Parser, Subcommand};
use clap::{arg, command, Parser, Subcommand};
use keramik_common::telemetry;
use keramik_operator::set_network_log_format;
use opentelemetry::global::{shutdown_meter_provider, shutdown_tracer_provider};
use tracing::info;

Expand All @@ -17,6 +18,9 @@ struct Cli {

#[arg(long, env = "OPERATOR_PROM_BIND", default_value = "0.0.0.0:9464")]
prom_bind: String,

#[arg(long, env = "OPERATOR_LOG_FORMAT")]
log_format: Option<telemetry::LogFormat>,
}

/// Available Subcommands
Expand All @@ -29,7 +33,13 @@ pub enum Command {
#[tokio::main]
async fn main() -> Result<()> {
let args = Cli::parse();
telemetry::init_tracing(args.otlp_endpoint).await?;
let log_format = args
.log_format
.map(telemetry::LogFormat::from)
.unwrap_or_default();
set_network_log_format(log_format);
telemetry::init_tracing(args.otlp_endpoint, log_format).await?;

let (metrics_controller, metrics_server_shutdown, metrics_server_join) =
telemetry::init_metrics_prom(&args.prom_bind.parse()?).await?;
info!("starting operator");
Expand Down
10 changes: 9 additions & 1 deletion operator/src/network/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use k8s_openapi::api::{
},
};

use crate::network::{node_affinity::NodeAffinityConfig, BootstrapSpec, PEERS_CONFIG_MAP_NAME};
use crate::{
network::{node_affinity::NodeAffinityConfig, BootstrapSpec, PEERS_CONFIG_MAP_NAME},
network_log_format,
};

// BootstrapConfig defines which properties of the JobSpec can be customized.
pub struct BootstrapConfig {
Expand Down Expand Up @@ -94,6 +97,11 @@ pub fn bootstrap_job_spec(
value: Some("/keramik-peers/peers.json".to_owned()),
..Default::default()
},
EnvVar {
name: "RUNNER_LOG_FORMAT".to_owned(),
value: Some(network_log_format().to_string()),
..Default::default()
},
]),
volume_mounts: Some(vec![VolumeMount {
mount_path: "/keramik-peers".to_owned(),
Expand Down
Loading