From 2b17bcd442863f4f6a8dd90112866de3705693da Mon Sep 17 00:00:00 2001 From: Erik Date: Fri, 26 Nov 2021 22:49:09 +0000 Subject: [PATCH] Uncertainty agent clustering (#26) * methods for cluster analysis * Restructuring * Renaming * Mean sample plot data works * Uncertainty Cluster Analysis * Downloading Uncertainty Analysis Data * Cleanup * Spelling fixed --- processing-api/app/controllers/__init__.py | 3 + .../uncertainty_analysis/__init__.py | 0 .../uncertainty_analysis/clustering_tools.py | 129 ++++++++++++++++++ .../uncertainty_mean_sample.py | 24 ++++ .../uncertainty_cluster_mean_sample_agent.py | 54 ++++++++ .../uncertainty_clustering_agent.py | 71 ++++++++++ .../uncertainty_mean_sample_agent.py | 44 ++++++ processing-api/manifest/urls.json | 5 + processing-api/requirements.txt | 5 +- 9 files changed, 333 insertions(+), 2 deletions(-) create mode 100644 processing-api/app/controllers/uncertainty_analysis/__init__.py create mode 100644 processing-api/app/controllers/uncertainty_analysis/clustering_tools.py create mode 100644 processing-api/app/controllers/uncertainty_analysis/uncertainty_mean_sample.py create mode 100644 processing-api/app/controllers/uncertainty_cluster_mean_sample_agent.py create mode 100644 processing-api/app/controllers/uncertainty_clustering_agent.py create mode 100644 processing-api/app/controllers/uncertainty_mean_sample_agent.py diff --git a/processing-api/app/controllers/__init__.py b/processing-api/app/controllers/__init__.py index 7e5dcdc8..bc6bef9a 100644 --- a/processing-api/app/controllers/__init__.py +++ b/processing-api/app/controllers/__init__.py @@ -19,6 +19,9 @@ # Just load the module so that the scheduler can start import app.controllers.sensitivity_analysis_agent +import app.controllers.uncertainty_mean_sample_agent +import app.controllers.uncertainty_clustering_agent +import app.controllers.uncertainty_cluster_mean_sample_agent router = APIRouter() diff --git a/processing-api/app/controllers/uncertainty_analysis/__init__.py b/processing-api/app/controllers/uncertainty_analysis/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/processing-api/app/controllers/uncertainty_analysis/clustering_tools.py b/processing-api/app/controllers/uncertainty_analysis/clustering_tools.py new file mode 100644 index 00000000..ebdd3759 --- /dev/null +++ b/processing-api/app/controllers/uncertainty_analysis/clustering_tools.py @@ -0,0 +1,129 @@ +import pandas as pd +from sandu.data_types import UncertaintyInput +import json +import ujson +import numpy as np +from tslearn.clustering import TimeSeriesKMeans +from typing import Tuple, List, Callable + + +def function_on_clusters(raw_cluster_data: str, analysis_function: Callable[[UncertaintyInput], str]): + """ + Evaluates a function taking an UncertaintyInputObject, on clustered raw data. + Returns the result for each cluster in a list with the same style as the list of input objects for the cluster. + + Args: + raw_cluster_data: JSON string which must contain: + k: number of clusters. + clusters: a list with the UncertaintyInput object for each cluster in a JSON format. + + analysis_function: Function which is applied to each clusters UncertaintyInput object. + + Returns: + processed_cluster_data: List with the result of the function evaluation on each cluster, as JSON strings. + """ + processed_cluster_data = [] + for i in range(raw_cluster_data["k"]): + object_string = json.dumps(raw_cluster_data["clusters"][i]) + cluster_input_object = json.loads(object_string, object_hook=lambda d: UncertaintyInput(**d)) + processed_cluster_data.append(analysis_function(cluster_input_object)) + return processed_cluster_data + + +def make_time_series_dataset(df_in: pd.DataFrame, run_name_in: str, time_name_in: str, quantity_name_in: str) -> \ + Tuple[np.ndarray, dict]: + """ + Returns a numpy array of the form of ts-learn input data, + and a dictionary relating run_names to the index of that time series in the output. + + Args: + df_in: input dataframe with all time series. Columns: "run_name","time_name","quantity_name",... + run_name_in: Name of column containing the model run indices. + quantity_name_in: Name of column containing the model run indices. + time_name_in: Name of column containing time values. + Returns: + time_series: Numpy array of the time series dataset format ts-learn takes as input. + run_time_series: Dictionary associating the run_name in the df_in to the index of the time series in time_series + """ + runs_present = df_in[run_name_in].unique() + time_samples_present = df_in[time_name_in].unique() + + # making numpy matrix + nr_of_time_samples = len(time_samples_present) + nr_of_runs = len(runs_present) + + time_series = np.zeros((nr_of_runs, nr_of_time_samples, 1)) + g = df_in.groupby([run_name_in]) + run_cluster_index = {} # Associates the run_name with the index in the array clusters, which contains the cluster for each run + for idx, val in enumerate(runs_present): + g_df = g.get_group(runs_present[idx]).sort_values( + time_name_in) # Get all sampled values with the same index and then sort it according to the time + g_np = g_df[quantity_name_in].to_numpy() + time_series[idx, :, 0] = g_np[:] + run_cluster_index[val] = idx + return time_series, run_cluster_index + + +def get_k_mean_clusters(df_in: pd.DataFrame, run_name_in: str, time_name_in: str, quantity_name_in: str, k_in, + metric_in='euclidean', seed=42) -> pd.DataFrame: + """ + Returns a dataframe with an added column containing the cluster each time-series is assigned to + + Args: + df_in: input dataframe with all time series. Columns: "run_name","time_name","quantity_name",... + run_name_in: Name of column containing the model run indices. + quantity_name_in: Name of column containing the quantity of interest. + time_name_in: Name of column containing time values. + k_in: Number of clusters. + metric_in: Metric used for cluster assignment. Options are: “euclidean”, “dtw”, “softdtw”. + seed: seed used for random number generator. Default: 42. + Returns: + df_out: The df_in with an added column containing the cluster index. + """ + my_time_series, run_time_series = make_time_series_dataset(df_in, run_name_in, time_name_in, quantity_name_in) + clusters = TimeSeriesKMeans(n_clusters=k_in, verbose=False, random_state=seed, metric=metric_in).fit_predict( + my_time_series) + df_out = df_in + df_out["cluster"] = df_out[run_name_in].map(run_time_series) + df_out["cluster"] = clusters[df_out["cluster"]] + return df_out + + +def form_input_clusters(df_clusters_in, run_name, time_name, quantity_name) -> List[str]: + """Form a list with containing one input object for each cluster + + Args: + df_clusters_in: Dataframe containing a column "clusters" with the cluster index for that data + run_name: Name of column containing the model run indices. + time_name: Name of column containing time values. + quantity_name: Name of column containing the quantity of interest. + Returns: + clusters: List containing the uncertainty input objects for each cluster as a dictionary + """ + g = df_clusters_in.groupby(["cluster"]) + unique_clusters = df_clusters_in["cluster"].unique() + clusters = [] + for i, cluster in enumerate(unique_clusters): + g_df = g.get_group(cluster) + new_uncertainty_input = UncertaintyInput(g_df.to_json(index=False, orient="split"), quantity_name, time_name, + run_name) + clusters.append(new_uncertainty_input.__dict__) + return clusters + + +def save_cluster_data(cluster_data, cluster_data_name, filename, metric, k, model): + """Saves the data associated with a cluster in a dictionary as a JSON file. + + Args: + cluster_data: List with k entries, containing the data, can be an object, associated with each cluster. + cluster_data_name: The key associated with cluster_data + filename: Name of the created JSON file. + metric: metric used for clustering, may be euclidean, DTW etc... + k: number of clusters + model: Name of the model/input data, to be able to associate clustered data with the model it is from. + + """ + # make a cluster data object + output = {"metric": metric, "k": k, "model": model, cluster_data_name: cluster_data} + with open(filename, "w", encoding="utf-8") as f: + ujson.dump(output, f, ensure_ascii=False, indent=4) \ No newline at end of file diff --git a/processing-api/app/controllers/uncertainty_analysis/uncertainty_mean_sample.py b/processing-api/app/controllers/uncertainty_analysis/uncertainty_mean_sample.py new file mode 100644 index 00000000..23e7eec6 --- /dev/null +++ b/processing-api/app/controllers/uncertainty_analysis/uncertainty_mean_sample.py @@ -0,0 +1,24 @@ +import numpy as np + +from sandu.data_types import UncertaintyInput +from sandu.uncertainty_quantification import mean_time_series + + +def get_subset(df_in, run_name, desired_samples): + all_runs = df_in[run_name].unique() + n_samples = desired_samples if len(all_runs) > desired_samples else len(all_runs) + kept_runs = np.random.choice(all_runs, n_samples, replace=False) + df_out = df_in[df_in[run_name].isin(kept_runs)] + return df_out + + +def mean_and_sample(x_in: UncertaintyInput): + df_mean = mean_time_series.get_mean(x_in.df(), x_in.time_name, x_in.quantity_name) + df_all = get_subset(x_in.df(), x_in.run_name, 100) + data_dict = {"dataAll": df_all.to_dict(orient="records"), + "dataMean": df_mean.to_dict(orient="records"), + "runName": x_in.run_name, # Name of column denoting the different runs + "timeName": x_in.time_name, # Name of column with the time unit + "quantityName": x_in.quantity_name # Name of column with the quantity of interest + } + return data_dict diff --git a/processing-api/app/controllers/uncertainty_cluster_mean_sample_agent.py b/processing-api/app/controllers/uncertainty_cluster_mean_sample_agent.py new file mode 100644 index 00000000..88ed474b --- /dev/null +++ b/processing-api/app/controllers/uncertainty_cluster_mean_sample_agent.py @@ -0,0 +1,54 @@ +from pathlib import Path +import threading +import json +from loguru import logger +from apscheduler.schedulers.background import BackgroundScheduler + +import app.controllers.uncertainty_analysis.clustering_tools as ct +import app.controllers.uncertainty_analysis.uncertainty_mean_sample as ums + +from app.core.settings import DATA_PATH_LIVE + +raw_clusters = [{"filename": "models/uncertainty/example/raw-k3-E.json", + "output_filename": "models/uncertainty/example/mean_all_k3_E.json"}, + {"filename": "models/uncertainty/example/raw-k4-E.json", + "output_filename": "models/uncertainty/example/mean_all_k4_E.json"}, + {"filename": "models/uncertainty/example/raw-k5-E.json", + "output_filename": "models/uncertainty/example/mean_all_k5_E.json"}, + ] + + +def clusters_mean_sample(): + """Computes the mean of a group of time series and draws a random sample for plotting. + """ + + for values in raw_clusters: + folder = Path(DATA_PATH_LIVE) + filename = folder / values["filename"] + output_filename = folder / values["output_filename"] + + # Check if file exists + if not filename.is_file(): + print("CANNOT FIND ", filename) + return + + with open(filename, "r") as read_file: + imported = json.load(read_file) + + processed_clusters = ct.function_on_clusters(imported, ums.mean_and_sample) # Evaluate the function mean_all on all clusters + + ct.save_cluster_data(processed_clusters, "processed", output_filename, imported["metric"], imported["k"], + imported["model"]) # Save the processed data + + +# A recurrent job +scheduler = BackgroundScheduler(daemon=True) + +# Cron runs at 1am daily +scheduler.add_job(clusters_mean_sample, "cron", hour=1, minute=0, second=0) + +scheduler.start() +logger.info('Uncertainty-clustering-agent starts. Will run immediately now and every 1am.') + +# Run immediately after server starts +threading.Thread(target=clusters_mean_sample).start() diff --git a/processing-api/app/controllers/uncertainty_clustering_agent.py b/processing-api/app/controllers/uncertainty_clustering_agent.py new file mode 100644 index 00000000..3cd80d15 --- /dev/null +++ b/processing-api/app/controllers/uncertainty_clustering_agent.py @@ -0,0 +1,71 @@ +from pathlib import Path +import threading +import json +import ujson +import numpy as np +from loguru import logger +from apscheduler.schedulers.background import BackgroundScheduler + +from sandu.data_types import UncertaintyInput + +import app.controllers.uncertainty_analysis.clustering_tools as ct + +from app.core.settings import DATA_PATH_LIVE + +clusters = [{"metric": 'euclidean', + "filename": "models/uncertainty/example/raw.json", + "output_filename": "models/uncertainty/example/raw-k3-E.json", + "k": 3, + "model": "example"}, + + {"metric": 'euclidean', + "filename": "models/uncertainty/example/raw.json", + "output_filename": "models/uncertainty/example/raw-k4-E.json", + "k": 4, + "model": "example"}, + + {"metric": 'euclidean', + "filename": "models/uncertainty/example/raw.json", + "output_filename": "models/uncertainty/example/raw-k5-E.json", + "k": 5, + "model": "example" + } + ] + + +def uncertainty_form_clusters(): + """Clusters raw input data for cluster wise analysis. + """ + + for values in clusters: + folder = Path(DATA_PATH_LIVE) + metric = values["metric"] + filename = folder / values["filename"] + output_filename = folder / values["output_filename"] + k = values["k"] + model = values["model"] + + # Check if file exists + if not filename.is_file(): + print("CANNOT FIND ", filename) + return + print("Clustering Uncertainty Data") + with open(filename, "r") as read_file: + x = json.load(read_file, object_hook=lambda d: UncertaintyInput(**d)) + print("Calculating K mean clusters") + df_clusters = ct.get_k_mean_clusters(x.df(), x.run_name, x.time_name, x.quantity_name, k, metric) + input_clusters = ct.form_input_clusters(df_clusters, x.run_name, x.time_name, x.quantity_name) + ct.save_cluster_data(input_clusters, "clusters", output_filename, metric, k, model) + + +# A recurrent job +scheduler = BackgroundScheduler(daemon=True) + +# Cron runs at 1am daily +scheduler.add_job(uncertainty_form_clusters, "cron", hour=1, minute=0, second=0) + +scheduler.start() +logger.info('Uncertainty-clustering-agent starts. Will run immediately now and every 1am.') + +# Run immediately after server starts +threading.Thread(target=uncertainty_form_clusters).start() diff --git a/processing-api/app/controllers/uncertainty_mean_sample_agent.py b/processing-api/app/controllers/uncertainty_mean_sample_agent.py new file mode 100644 index 00000000..0433b1f7 --- /dev/null +++ b/processing-api/app/controllers/uncertainty_mean_sample_agent.py @@ -0,0 +1,44 @@ +from pathlib import Path +import threading +import json +import ujson +from loguru import logger +from apscheduler.schedulers.background import BackgroundScheduler + +from sandu.data_types import UncertaintyInput + +import app.controllers.uncertainty_analysis.uncertainty_mean_sample as ums + +from app.core.settings import DATA_PATH_LIVE + + +def compute_mean_sample(): + """Computes the mean of a group of time series and draws a random sample for plotting. + """ + folder = Path(DATA_PATH_LIVE) + filename = folder / "models/uncertainty/example/raw.json" + output_filename = folder / "models/uncertainty/example/mean_all.json" + + # Check if file exists + if not filename.is_file(): + print("CANNOT FIND ", filename) + return + + with open(filename, "r") as read_file: + x = json.load(read_file, object_hook=lambda d: UncertaintyInput(**d)) + data_dict = ums.mean_and_sample(x) + with open(output_filename, "w", encoding="utf-8") as f: + ujson.dump(data_dict, f, ensure_ascii=False, indent=4) + + +# A recurrent job +scheduler = BackgroundScheduler(daemon=True) + +# Cron runs at 1am daily +scheduler.add_job(compute_mean_sample, "cron", hour=1, minute=0, second=0) + +scheduler.start() +logger.info('Uncertainty-mean-sample-analysis agent starts. Will run immediately now and every 1am.') + +# Run immediately after server starts +threading.Thread(target=compute_mean_sample).start() diff --git a/processing-api/manifest/urls.json b/processing-api/manifest/urls.json index 703fc9f5..c3b16647 100644 --- a/processing-api/manifest/urls.json +++ b/processing-api/manifest/urls.json @@ -3,6 +3,11 @@ "name": "sobol", "url": "https://gist.githubusercontent.com/rampvisdevelopment/a73ff62e4b648541e5fa75ce0a5edae4/raw/example_sensitivity_input.json", "save_to": "models/sobol/raw.json" + }, + { + "name": "example_structured_data", + "url": "https://github.com/rampvisdevelopment/example/archive/main.zip", + "save_to": "models/uncertainty/example" }, { "name": "ensemble", diff --git a/processing-api/requirements.txt b/processing-api/requirements.txt index 7990ca4f..590a5417 100644 --- a/processing-api/requirements.txt +++ b/processing-api/requirements.txt @@ -23,5 +23,6 @@ pydantic loguru dependency-injector python-multipart -sandu -tslearn \ No newline at end of file +sandu >= 0.1.0 +tslearn +ujson \ No newline at end of file