Skip to content

Commit

Permalink
Uncertainty agent clustering (#26)
Browse files Browse the repository at this point in the history
* methods for cluster analysis

* Restructuring

* Renaming

* Mean sample plot data works

* Uncertainty Cluster Analysis

* Downloading Uncertainty Analysis Data

* Cleanup

* Spelling fixed
  • Loading branch information
ErikRZH authored Nov 26, 2021
1 parent aa30353 commit 2b17bcd
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 2 deletions.
3 changes: 3 additions & 0 deletions processing-api/app/controllers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
# Just load the module so that the scheduler can start
import app.controllers.sensitivity_analysis_agent

import app.controllers.uncertainty_mean_sample_agent
import app.controllers.uncertainty_clustering_agent
import app.controllers.uncertainty_cluster_mean_sample_agent

router = APIRouter()

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import pandas as pd
from sandu.data_types import UncertaintyInput
import json
import ujson
import numpy as np
from tslearn.clustering import TimeSeriesKMeans
from typing import Tuple, List, Callable


def function_on_clusters(raw_cluster_data: str, analysis_function: Callable[[UncertaintyInput], str]):
"""
Evaluates a function taking an UncertaintyInputObject, on clustered raw data.
Returns the result for each cluster in a list with the same style as the list of input objects for the cluster.
Args:
raw_cluster_data: JSON string which must contain:
k: number of clusters.
clusters: a list with the UncertaintyInput object for each cluster in a JSON format.
analysis_function: Function which is applied to each clusters UncertaintyInput object.
Returns:
processed_cluster_data: List with the result of the function evaluation on each cluster, as JSON strings.
"""
processed_cluster_data = []
for i in range(raw_cluster_data["k"]):
object_string = json.dumps(raw_cluster_data["clusters"][i])
cluster_input_object = json.loads(object_string, object_hook=lambda d: UncertaintyInput(**d))
processed_cluster_data.append(analysis_function(cluster_input_object))
return processed_cluster_data


def make_time_series_dataset(df_in: pd.DataFrame, run_name_in: str, time_name_in: str, quantity_name_in: str) -> \
Tuple[np.ndarray, dict]:
"""
Returns a numpy array of the form of ts-learn input data,
and a dictionary relating run_names to the index of that time series in the output.
Args:
df_in: input dataframe with all time series. Columns: "run_name","time_name","quantity_name",...
run_name_in: Name of column containing the model run indices.
quantity_name_in: Name of column containing the model run indices.
time_name_in: Name of column containing time values.
Returns:
time_series: Numpy array of the time series dataset format ts-learn takes as input.
run_time_series: Dictionary associating the run_name in the df_in to the index of the time series in time_series
"""
runs_present = df_in[run_name_in].unique()
time_samples_present = df_in[time_name_in].unique()

# making numpy matrix
nr_of_time_samples = len(time_samples_present)
nr_of_runs = len(runs_present)

time_series = np.zeros((nr_of_runs, nr_of_time_samples, 1))
g = df_in.groupby([run_name_in])
run_cluster_index = {} # Associates the run_name with the index in the array clusters, which contains the cluster for each run
for idx, val in enumerate(runs_present):
g_df = g.get_group(runs_present[idx]).sort_values(
time_name_in) # Get all sampled values with the same index and then sort it according to the time
g_np = g_df[quantity_name_in].to_numpy()
time_series[idx, :, 0] = g_np[:]
run_cluster_index[val] = idx
return time_series, run_cluster_index


def get_k_mean_clusters(df_in: pd.DataFrame, run_name_in: str, time_name_in: str, quantity_name_in: str, k_in,
metric_in='euclidean', seed=42) -> pd.DataFrame:
"""
Returns a dataframe with an added column containing the cluster each time-series is assigned to
Args:
df_in: input dataframe with all time series. Columns: "run_name","time_name","quantity_name",...
run_name_in: Name of column containing the model run indices.
quantity_name_in: Name of column containing the quantity of interest.
time_name_in: Name of column containing time values.
k_in: Number of clusters.
metric_in: Metric used for cluster assignment. Options are: “euclidean”, “dtw”, “softdtw”.
seed: seed used for random number generator. Default: 42.
Returns:
df_out: The df_in with an added column containing the cluster index.
"""
my_time_series, run_time_series = make_time_series_dataset(df_in, run_name_in, time_name_in, quantity_name_in)
clusters = TimeSeriesKMeans(n_clusters=k_in, verbose=False, random_state=seed, metric=metric_in).fit_predict(
my_time_series)
df_out = df_in
df_out["cluster"] = df_out[run_name_in].map(run_time_series)
df_out["cluster"] = clusters[df_out["cluster"]]
return df_out


def form_input_clusters(df_clusters_in, run_name, time_name, quantity_name) -> List[str]:
"""Form a list with containing one input object for each cluster
Args:
df_clusters_in: Dataframe containing a column "clusters" with the cluster index for that data
run_name: Name of column containing the model run indices.
time_name: Name of column containing time values.
quantity_name: Name of column containing the quantity of interest.
Returns:
clusters: List containing the uncertainty input objects for each cluster as a dictionary
"""
g = df_clusters_in.groupby(["cluster"])
unique_clusters = df_clusters_in["cluster"].unique()
clusters = []
for i, cluster in enumerate(unique_clusters):
g_df = g.get_group(cluster)
new_uncertainty_input = UncertaintyInput(g_df.to_json(index=False, orient="split"), quantity_name, time_name,
run_name)
clusters.append(new_uncertainty_input.__dict__)
return clusters


def save_cluster_data(cluster_data, cluster_data_name, filename, metric, k, model):
"""Saves the data associated with a cluster in a dictionary as a JSON file.
Args:
cluster_data: List with k entries, containing the data, can be an object, associated with each cluster.
cluster_data_name: The key associated with cluster_data
filename: Name of the created JSON file.
metric: metric used for clustering, may be euclidean, DTW etc...
k: number of clusters
model: Name of the model/input data, to be able to associate clustered data with the model it is from.
"""
# make a cluster data object
output = {"metric": metric, "k": k, "model": model, cluster_data_name: cluster_data}
with open(filename, "w", encoding="utf-8") as f:
ujson.dump(output, f, ensure_ascii=False, indent=4)
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import numpy as np

from sandu.data_types import UncertaintyInput
from sandu.uncertainty_quantification import mean_time_series


def get_subset(df_in, run_name, desired_samples):
all_runs = df_in[run_name].unique()
n_samples = desired_samples if len(all_runs) > desired_samples else len(all_runs)
kept_runs = np.random.choice(all_runs, n_samples, replace=False)
df_out = df_in[df_in[run_name].isin(kept_runs)]
return df_out


def mean_and_sample(x_in: UncertaintyInput):
df_mean = mean_time_series.get_mean(x_in.df(), x_in.time_name, x_in.quantity_name)
df_all = get_subset(x_in.df(), x_in.run_name, 100)
data_dict = {"dataAll": df_all.to_dict(orient="records"),
"dataMean": df_mean.to_dict(orient="records"),
"runName": x_in.run_name, # Name of column denoting the different runs
"timeName": x_in.time_name, # Name of column with the time unit
"quantityName": x_in.quantity_name # Name of column with the quantity of interest
}
return data_dict
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from pathlib import Path
import threading
import json
from loguru import logger
from apscheduler.schedulers.background import BackgroundScheduler

import app.controllers.uncertainty_analysis.clustering_tools as ct
import app.controllers.uncertainty_analysis.uncertainty_mean_sample as ums

from app.core.settings import DATA_PATH_LIVE

raw_clusters = [{"filename": "models/uncertainty/example/raw-k3-E.json",
"output_filename": "models/uncertainty/example/mean_all_k3_E.json"},
{"filename": "models/uncertainty/example/raw-k4-E.json",
"output_filename": "models/uncertainty/example/mean_all_k4_E.json"},
{"filename": "models/uncertainty/example/raw-k5-E.json",
"output_filename": "models/uncertainty/example/mean_all_k5_E.json"},
]


def clusters_mean_sample():
"""Computes the mean of a group of time series and draws a random sample for plotting.
"""

for values in raw_clusters:
folder = Path(DATA_PATH_LIVE)
filename = folder / values["filename"]
output_filename = folder / values["output_filename"]

# Check if file exists
if not filename.is_file():
print("CANNOT FIND ", filename)
return

with open(filename, "r") as read_file:
imported = json.load(read_file)

processed_clusters = ct.function_on_clusters(imported, ums.mean_and_sample) # Evaluate the function mean_all on all clusters

ct.save_cluster_data(processed_clusters, "processed", output_filename, imported["metric"], imported["k"],
imported["model"]) # Save the processed data


# A recurrent job
scheduler = BackgroundScheduler(daemon=True)

# Cron runs at 1am daily
scheduler.add_job(clusters_mean_sample, "cron", hour=1, minute=0, second=0)

scheduler.start()
logger.info('Uncertainty-clustering-agent starts. Will run immediately now and every 1am.')

# Run immediately after server starts
threading.Thread(target=clusters_mean_sample).start()
71 changes: 71 additions & 0 deletions processing-api/app/controllers/uncertainty_clustering_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from pathlib import Path
import threading
import json
import ujson
import numpy as np
from loguru import logger
from apscheduler.schedulers.background import BackgroundScheduler

from sandu.data_types import UncertaintyInput

import app.controllers.uncertainty_analysis.clustering_tools as ct

from app.core.settings import DATA_PATH_LIVE

clusters = [{"metric": 'euclidean',
"filename": "models/uncertainty/example/raw.json",
"output_filename": "models/uncertainty/example/raw-k3-E.json",
"k": 3,
"model": "example"},

{"metric": 'euclidean',
"filename": "models/uncertainty/example/raw.json",
"output_filename": "models/uncertainty/example/raw-k4-E.json",
"k": 4,
"model": "example"},

{"metric": 'euclidean',
"filename": "models/uncertainty/example/raw.json",
"output_filename": "models/uncertainty/example/raw-k5-E.json",
"k": 5,
"model": "example"
}
]


def uncertainty_form_clusters():
"""Clusters raw input data for cluster wise analysis.
"""

for values in clusters:
folder = Path(DATA_PATH_LIVE)
metric = values["metric"]
filename = folder / values["filename"]
output_filename = folder / values["output_filename"]
k = values["k"]
model = values["model"]

# Check if file exists
if not filename.is_file():
print("CANNOT FIND ", filename)
return
print("Clustering Uncertainty Data")
with open(filename, "r") as read_file:
x = json.load(read_file, object_hook=lambda d: UncertaintyInput(**d))
print("Calculating K mean clusters")
df_clusters = ct.get_k_mean_clusters(x.df(), x.run_name, x.time_name, x.quantity_name, k, metric)
input_clusters = ct.form_input_clusters(df_clusters, x.run_name, x.time_name, x.quantity_name)
ct.save_cluster_data(input_clusters, "clusters", output_filename, metric, k, model)


# A recurrent job
scheduler = BackgroundScheduler(daemon=True)

# Cron runs at 1am daily
scheduler.add_job(uncertainty_form_clusters, "cron", hour=1, minute=0, second=0)

scheduler.start()
logger.info('Uncertainty-clustering-agent starts. Will run immediately now and every 1am.')

# Run immediately after server starts
threading.Thread(target=uncertainty_form_clusters).start()
44 changes: 44 additions & 0 deletions processing-api/app/controllers/uncertainty_mean_sample_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from pathlib import Path
import threading
import json
import ujson
from loguru import logger
from apscheduler.schedulers.background import BackgroundScheduler

from sandu.data_types import UncertaintyInput

import app.controllers.uncertainty_analysis.uncertainty_mean_sample as ums

from app.core.settings import DATA_PATH_LIVE


def compute_mean_sample():
"""Computes the mean of a group of time series and draws a random sample for plotting.
"""
folder = Path(DATA_PATH_LIVE)
filename = folder / "models/uncertainty/example/raw.json"
output_filename = folder / "models/uncertainty/example/mean_all.json"

# Check if file exists
if not filename.is_file():
print("CANNOT FIND ", filename)
return

with open(filename, "r") as read_file:
x = json.load(read_file, object_hook=lambda d: UncertaintyInput(**d))
data_dict = ums.mean_and_sample(x)
with open(output_filename, "w", encoding="utf-8") as f:
ujson.dump(data_dict, f, ensure_ascii=False, indent=4)


# A recurrent job
scheduler = BackgroundScheduler(daemon=True)

# Cron runs at 1am daily
scheduler.add_job(compute_mean_sample, "cron", hour=1, minute=0, second=0)

scheduler.start()
logger.info('Uncertainty-mean-sample-analysis agent starts. Will run immediately now and every 1am.')

# Run immediately after server starts
threading.Thread(target=compute_mean_sample).start()
5 changes: 5 additions & 0 deletions processing-api/manifest/urls.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
"name": "sobol",
"url": "https://gist.githubusercontent.com/rampvisdevelopment/a73ff62e4b648541e5fa75ce0a5edae4/raw/example_sensitivity_input.json",
"save_to": "models/sobol/raw.json"
},
{
"name": "example_structured_data",
"url": "https://github.com/rampvisdevelopment/example/archive/main.zip",
"save_to": "models/uncertainty/example"
},
{
"name": "ensemble",
Expand Down
5 changes: 3 additions & 2 deletions processing-api/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ pydantic
loguru
dependency-injector
python-multipart
sandu
tslearn
sandu >= 0.1.0
tslearn
ujson

0 comments on commit 2b17bcd

Please sign in to comment.