Skip to content

Commit

Permalink
feat: ijimus device data extracted from influxdb
Browse files Browse the repository at this point in the history
  • Loading branch information
ymarcon committed Oct 29, 2024
1 parent a472bad commit 9453dc8
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 62 deletions.
5 changes: 4 additions & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ redis-up:
docker compose up -d redis

redis-stop:
docker compose stop redis
docker compose stop redis

redis-down:
docker compose down redis
46 changes: 26 additions & 20 deletions backend/api/data/datasets.json
Original file line number Diff line number Diff line change
Expand Up @@ -319,33 +319,39 @@
},
{
"name": "C1",
"file_spec": {
"file": "C1.csv",
"columns": [
{
"name": "Date",
"measure": "timestamp",
"format": "%d.%m.%y %H:%M"
},
{
"name": "Hauteur d'eau[0] (mm)",
"db_spec": {
"measurement": "ijinus",
"aggregate": "1h",
"location": {
"field": "ID",
"value": "IJA0102-00006283"
},
"measures": [
{
"filter": {
"field": "_field",
"value": "15[0] mm"
},
"measure": "water_level"
}
]
}
},
{
"name": "C2",
"file_spec": {
"file": "C2.csv",
"columns": [
{
"name": "Date",
"measure": "timestamp",
"format": "%d.%m.%y %H:%M"
},
{
"name": "Hauteur d'eau[0] (mm)",
"db_spec": {
"measurement": "ijinus",
"aggregate": "1h",
"location": {
"field": "ID",
"value": "IJA0102-00006736"
},
"measures": [
{
"filter": {
"field": "_field",
"value": "15[0] mm"
},
"measure": "water_level"
}
]
Expand Down
14 changes: 14 additions & 0 deletions backend/api/models/measures.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,25 @@ class Column(BaseModel):

class DataFileSpec(BaseModel):
file: str
aggregate: Optional[str] = 'h'
columns: List[Column]


class DBFilter(BaseModel):
field: str
value: str


class DBMeasure(BaseModel):
filter: DBFilter
measure: str


class DataDBSpec(BaseModel):
measurement: str
aggregate: Optional[str] = '1d'
location: DBFilter
measures: List[DBMeasure]


class SensorDataSpec(BaseModel):
Expand Down
54 changes: 33 additions & 21 deletions backend/api/services/db.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime
from influxdb_client import InfluxDBClient
from api.config import config
from fastapi.logger import logger


class DBClient:
Expand All @@ -10,37 +12,47 @@ def __init__(self):
def close(self):
self.client.close()

def query(self, measurement):
self.query_str = f"from(bucket: '{config.INFLUXDB_BUCKET}')"
self.query_str += f" |> filter(fn: (r) => r._measurement == '{measurement}')"
return self
def execute(self, query_str: str):
logger.info(f"Executing query: {query_str}")
query_api = self.client.query_api()
tables = query_api.query(query_str, org=config.INFLUXDB_ORG)
time = []
value = []
for table in tables:
for record in table.records:
time.append(record.get_time())
value.append(record.get_value())
return time, value


class DBQuery:

def range(self, start: str, stop: str = None):
def __init__(self, measurement, start: str, stop: str = None):
self.query_str = f"from(bucket: \"{config.INFLUXDB_BUCKET}\")"
if stop:
self.query_str += f" |> range(start: {start}, stop: {stop})"
self.query_str += f" |> range(start: {self._stringify(start)}, stop: {self._stringify(stop)})"
else:
self.query_str += f" |> range(start: {start})"
return self
self.query_str += f" |> range(start: {self._stringify(start)})"
self.query_str += f" |> filter(fn: (r) => r._measurement == \"{measurement}\")"

def aggregate(self, every: str, fn: str = "mean"):
self.query_str += f" |> aggregateWindow(every: {every}, fn: {fn}, createEmpty: false)"
self.query_str += f" |> yield(name: '{fn}')"
self.query_str += f" |> yield(name: \"{fn}\")"
return self

def filter(self, key: str, value: str):
self.query_str += f" |> filter(fn: (r) => r.'{key}' == '{value}')"
def filter(self, field: str, value: str):
self.query_str += f" |> filter(fn: (r) => r.{field} == \"{value}\")"
return self

def execute(self):
query_api = self.client.query_api()
tables = query_api.query(self.query_str, org=config.INFLUXDB_ORG)
time = []
value = []
for table in tables:
for record in table.records:
time.append(record.get_time())
value.append(record.get_value())
return time, value
def to_string(self):
return self.query_str

def _stringify(self, value):
if isinstance(value, str):
return value
if isinstance(value, datetime.datetime):
return value.strftime("%Y-%m-%dT%H:%M:%SZ")
return str(value)


db_client = DBClient()
70 changes: 50 additions & 20 deletions backend/api/services/measures.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import io
import json
import datetime
import pkg_resources
from api.services.s3 import s3_client
from api.services.db import db_client
from api.services.db import db_client, DBQuery
from fastapi.exceptions import HTTPException
from api.models.measures import Datasets, SensorData, SensorDataSpec, DatasetFile
from api.models.measures import Datasets, SensorData, SensorDataSpec, DatasetFile, Vector
import pandas as pd
import numpy as np
from fastapi.logger import logger
Expand All @@ -13,6 +14,8 @@

lock = redis.lock("s3_measures", timeout=10)

START_DATETIME = datetime.datetime(2024, 4, 8)


class MeasuresService:

Expand Down Expand Up @@ -48,7 +51,7 @@ async def get_dataset(self, name: str, from_date=None, to_date=None) -> SensorDa
raise HTTPException(status_code=404,
detail="Sensor not found")

async def get_dataset_from_file(self, datasets: Datasets, sensor: SensorDataSpec, from_date=None, to_date=None) -> SensorData:
async def get_dataset_from_file(self, datasets: Datasets, sensor: SensorDataSpec, from_date: datetime.datetime = None, to_date: datetime.datetime = None) -> SensorData:
file_specs = self.get_file_specs(
datasets, sensor.file_spec.file)
df = await self.read_dataset_file_concurrently(file_specs)
Expand All @@ -65,35 +68,62 @@ async def get_dataset_from_file(self, datasets: Datasets, sensor: SensorDataSpec

if from_date is None or to_date is None:
# no (or partial) time range defined: sample per hour mean
df = df.resample('h').mean()
df = df.resample(sensor.file_spec.aggregate).mean()
else:
# if the time range is less than a threshold, keep the original data
df = df[from_date:to_date]
difference = to_date - from_date
hours_difference = difference.total_seconds() / 3600
if hours_difference > config.RESAMPLE_THRESHOLD:
df = df.resample('h').mean()
if self.to_resample(from_date, to_date):
df = df.resample(sensor.file_spec.aggregate).mean()
df = df.replace({np.nan: None})

vectors = []
for column in sensor.file_spec.columns:
if column.name in df.columns:
if column.measure == "timestamp":
vectors.append({
"measure": column.measure,
"values": df[column.name].astype(str).tolist()
})
vectors.append(Vector(measure=column.measure,
values=df[column.name].astype(str).tolist()))
else:
vectors.append({
"measure": column.measure,
"values": df[column.name].tolist()
})
vectors.append(Vector(measure=column.measure,
values=df[column.name].tolist()))
return SensorData(name=sensor.name, vectors=vectors)

async def get_dataset_from_db(self, sensor: SensorDataSpec, from_date: datetime.datetime = None, to_date: datetime.datetime = None) -> SensorData:
from_datetime = from_date if from_date else START_DATETIME
to_datetime = to_date if to_date else datetime.datetime.now()
query = DBQuery(
sensor.db_spec.measurement, from_datetime, to_date if to_date else "now()")
query.filter(sensor.db_spec.location.field,
sensor.db_spec.location.value)
# TODO: handle multiple measures
measure_spec = sensor.db_spec.measures[0]
query.filter(measure_spec.filter.field,
measure_spec.filter.value)

if self.to_resample(from_datetime, to_datetime):
query.aggregate(sensor.db_spec.aggregate, "mean")

content = await redis.get(query.to_string())
vectors = []
if not content:
logger.info(
f"Query result not found in cache, getting it from InfluxDB: {query.to_string()}")
time, value = db_client.execute(query.to_string())
vectors = [
Vector(measure="timestamp", values=[
t.strftime("%Y-%m-%d %H:%M:%S") for t in time]),
Vector(measure=measure_spec.measure, values=value)
]
await redis.set(query.to_string(), json.dumps([vector.model_dump() for vector in vectors]), ex=config.CACHE_SOURCE_EXPIRY)
else:
vectors = [Vector(**vector_dict)
for vector_dict in json.loads(content)]

return SensorData(name=sensor.name, vectors=vectors)

async def get_dataset_from_db(self, sensor: SensorDataSpec, from_date=None, to_date=None) -> SensorData:
result = db_client.query(
sensor.db_spec.measurement).aggregate("1h").execute()
pass
def to_resample(self, from_date: datetime.datetime, to_date: datetime.datetime):
difference = to_date - from_date
hours_difference = difference.total_seconds() / 3600
return hours_difference > config.RESAMPLE_THRESHOLD

def get_file_specs(self, datasets: Datasets, name: str) -> DatasetFile:
"""Get the file specs from the S3 storage
Expand Down

0 comments on commit 9453dc8

Please sign in to comment.