Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add generator code #203

Merged
merged 14 commits into from
Jul 30, 2024
1,299 changes: 646 additions & 653 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CARGO = CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUSTFLAGS="--cfg tokio_unstab
all: build check-fmt check-clippy test

.PHONY: test
test:
test: update
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to run update as part of these tasks. Adding an update step is fine, but we should run make update and commit the lockfile in a commit. I suspect this will cause unstaged changes in CI that could fail the pipeline.

# Test with default features
${CARGO} test --locked
# Test with all features
Expand All @@ -16,7 +16,11 @@ test:
check-fmt:
cargo fmt --all -- --check

.PHONY: check-clippy
.PHONY: update
update:
${CARGO} update

.PHONY: check-clippy
check-clippy:
# Check with default features
${CARGO} clippy --workspace
Expand Down
2 changes: 1 addition & 1 deletion k8s/operator/manifests/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rules:
resources: ["clusterroles", "clusterrolebindings"]
verbs: ["create", "get", "patch"]
- apiGroups: ["keramik.3box.io"]
resources: ["networks", "networks/status", "simulations", "simulations/status"]
resources: ["networks", "networks/status", "simulations", "simulations/status", "loadgenerators", "loadgenerators/status"]
verbs: ["get", "list", "watch", "patch", "delete"]
- apiGroups: ["monitoring.coreos.com"]
resources: ["podmonitors"]
Expand Down
18 changes: 18 additions & 0 deletions keramik/src/developing_runner.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,21 @@ spec:
image: keramik/runner:dev
imagePullPolicy: IfNotPresent
```

## Setup Load Generator with the runner image
```yaml
# Custom load generator
---
apiVersion: "keramik.3box.io/v1alpha1"
kind: LoadGenerator
metadata:
name: load-gen
namespace: keramik-lgen-demo
spec:
scenario: "CreateModelInstancesSynced"
runTime: 3
image: "keramik/runner:dev"
imagePullPolicy: "IfNotPresent"
throttleRequests: 20
tasks: 2
```
43 changes: 43 additions & 0 deletions keramik/src/load_generator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Load Generation

To run a load generator, you need to create a `LoadGenerator` resource. This resource is similar to a `Simulation` resource. However, the load generation can last for up to a week. They are used to generate sustained load on the system for a longer period of time.

## Parameters

- **`scenario`**: The scenario to run. Supported scenarios are:
- `CreateModelInstancesSynced`: Requires at least two ceramic instances. Creates models on one node and has the other node sync them.
- **`runTime`**: The duration to run the load generator, in hours.
- **`image`**: The image to use for the load generator. This is the same as the `image` in the `Simulation` resource.
- **`throttleRequests`**: WIP, not ready yet. The number of requests to send per second. This is the same as the `throttleRequests` in the `Simulation` resource.
- **`tasks`**: The number of tasks to run. Increasing the number of tasks will increase the load on the node. A value of 2 generates a steady load of 20 requests per second. Values between 2-100 are recommended. Keep in mind the increase of tasks to throughput is non-linear. A value of 100 generates what we consider high load, which is 200 TPS.

## Sample configuration

```yaml
apiVersion: "keramik.3box.io/v1alpha1"
kind: LoadGenerator
metadata:
name: load-gen
# Namespace of the network you wish to run against
namespace: keramik-<unique-name>-small
spec:
scenario: CreateModelInstancesSynced
runTime: 3
image: "keramik/runner:dev"
throttleRequests: 20
tasks: 2
```

If you want to run this against a defined network, set the namespace to the same as the network. In this example, the namespace is set to the same network applied when [the network was set up](./setup_network.md).

The load generator will automatically stop once the `runTime` is up. You should be able to see some success and error metrics at the end of the run. To see the metrics, you can use the `kubectl` command to get the logs of the load generator:


```shell
kubectl logs load-gen-<unique-string-for-each-run> -n keramik-<unique-name>-small
```
You can get the name of the load-gen pod by running:

```shell
kubectl get pods -n keramik-<unique-name>-small
```
6 changes: 4 additions & 2 deletions operator/src/crdgen.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use kube::CustomResourceExt;

use keramik_operator::lgen::spec::LoadGenerator;
use keramik_operator::network::Network;
use keramik_operator::simulation::Simulation;
use kube::CustomResourceExt;

fn main() {
print!("{}", serde_yaml::to_string(&Network::crd()).unwrap());
println!("---");
print!("{}", serde_yaml::to_string(&Simulation::crd()).unwrap());
println!("---");
print!("{}", serde_yaml::to_string(&LoadGenerator::crd()).unwrap());
}
200 changes: 200 additions & 0 deletions operator/src/lgen/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use std::{sync::Arc, time::Duration};

use futures::stream::StreamExt;
use k8s_openapi::api::batch::v1::Job;
use kube::{
api::{Patch, PatchParams},
client::Client,
core::object::HasSpec,
runtime::Controller,
Api,
};
use kube::{
runtime::{
controller::Action,
watcher::{self, Config},
},
Resource, ResourceExt,
};
use opentelemetry::{global, KeyValue};
use rand::{distributions::Alphanumeric, thread_rng, Rng, RngCore};

use tracing::{debug, error, info};

use crate::{
labels::MANAGED_BY_LABEL_SELECTOR,
lgen::{
job::{job_spec, JobConfig, JobImageConfig},
spec::{LoadGenerator, LoadGeneratorState},
},
simulation::controller::monitoring_ready,
utils::Clock,
};

use crate::network::ipfs_rpc::{HttpRpcClient, IpfsRpcClient};

use crate::utils::{apply_job, Context};

/// The name of the load generator job.
pub const LOAD_GENERATOR_JOB_NAME: &str = "load-gen-job";

/// Handle errors during reconciliation.
fn on_error(
_network: Arc<LoadGenerator>,
_error: &Error,
_context: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Action {
Action::requeue(Duration::from_secs(5))
}

/// Errors produced by the reconcile function.
#[derive(Debug, thiserror::Error)]
enum Error {
#[error("App error: {source}")]
App {
#[from]
source: anyhow::Error,
},
#[error("Kube error: {source}")]
Kube {
#[from]
source: kube::Error,
},
}

/// Start a controller for the LoadGenerator CRD.
pub async fn run() {
let k_client = Client::try_default().await.unwrap();
let context = Arc::new(
Context::new(k_client.clone(), HttpRpcClient).expect("should be able to create context"),
);

let load_generators: Api<LoadGenerator> = Api::all(k_client.clone());
let jobs = Api::<Job>::all(k_client.clone());

Controller::new(load_generators.clone(), Config::default())
.owns(
jobs,
watcher::Config::default().labels(MANAGED_BY_LABEL_SELECTOR),
)
.run(reconcile, on_error, context)
.for_each(|rec_res| async move {
match rec_res {
Ok((load_generator, _)) => {
info!(load_generator.name, "reconcile success");
}
Err(err) => {
error!(?err, "reconcile error")
}
}
})
.await;
}

/// Perform a reconcile pass for the LoadGenerator CRD
async fn reconcile(
load_generator: Arc<LoadGenerator>,
cx: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Result<Action, Error> {
let meter = global::meter("keramik");
let runs = meter
.u64_counter("load_generator_reconcile_count")
.with_description("Number of load generator reconciles")
.init();

match reconcile_(load_generator, cx).await {
Ok(action) => {
runs.add(
1,
&[KeyValue {
key: "result".into(),
value: "ok".into(),
}],
);
Ok(action)
}
Err(err) => {
runs.add(
1,
&[KeyValue {
key: "result".into(),
value: "err".into(),
}],
);
Err(err)
}
}
}

/// Perform a reconcile pass for the LoadGenerator CRD
async fn reconcile_(
load_generator: Arc<LoadGenerator>,
cx: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Result<Action, Error> {
let spec = load_generator.spec();

let status = if let Some(status) = &load_generator.status {
status.clone()
} else {
// Generate new status with random name and nonce
LoadGeneratorState {
nonce: thread_rng().gen(),
name: "load-gen-"
.chars()
.chain(
thread_rng()
.sample_iter(&Alphanumeric)
.take(6)
.map(char::from),
)
.collect::<String>(),
}
};
debug!(?spec, ?status, "reconcile");

let ns = load_generator.namespace().unwrap();

// The load generator does not deploy the monitoring resources but they must exist in order to
// collect the results of load generators.
let ready = monitoring_ready(cx.clone(), &ns).await?;

if !ready {
return Ok(Action::requeue(Duration::from_secs(10)));
}

let job_image_config = JobImageConfig::from(spec);

let job_config = JobConfig {
name: status.name.clone(),
scenario: spec.scenario.to_owned(),
tasks: spec.tasks.to_owned(),
run_time: spec.run_time.to_owned(),
nonce: status.nonce,
job_image_config: job_image_config.clone(),
throttle_requests: spec.throttle_requests,
};
let orefs = load_generator
.controller_owner_ref(&())
.map(|oref| vec![oref])
.unwrap_or_default();

apply_job(
cx.clone(),
&ns,
orefs.clone(),
LOAD_GENERATOR_JOB_NAME,
job_spec(job_config),
)
.await?;

let load_generators: Api<LoadGenerator> = Api::namespaced(cx.k_client.clone(), &ns);
let _patched = load_generators
.patch_status(
&load_generator.name_any(),
&PatchParams::default(),
&Patch::Merge(serde_json::json!({ "status": status })),
)
.await?;

Ok(Action::requeue(Duration::from_secs(10)))
}
Loading
Loading