diff --git a/f/connectors/comapeo/README.md b/f/connectors/comapeo/README.md index ece78f4..0bcff35 100644 --- a/f/connectors/comapeo/README.md +++ b/f/connectors/comapeo/README.md @@ -1,4 +1,4 @@ -# CoMapeo: Fetch Observations +# `comapeo_observations`: Fetch Observations from CoMapeo API This script fetches data from the REST API of a [CoMapeo archive server](https://github.com/digidem/comapeo-core/tree/server/src/server), which stores data from multiple CoMapeo projects. Each project contains observation data and attachments. @@ -6,7 +6,9 @@ For each project, the observations data is stored in a table prefixed by `table_ ## Endpoints -The request header must include an access token in the format: Authorized: Bearer . +The request header must include an access token in the format: + + Authorized: Bearer . ### `GET /projects` @@ -41,4 +43,46 @@ The request header must include an access token in the format: Authorized: Beare ### `GET /projects/abc123/attachments/attachment2_hash/photo/blob2_hash` -This endpoint retrieves the binary data of a specific attachment, such as a photo, associated with a project. The response will contain the raw binary content of the file, which can be saved or processed as needed. \ No newline at end of file +This endpoint retrieves the binary data of a specific attachment, such as a photo, associated with a project. The response will contain the raw binary content of the file, which can be saved or processed as needed. + +# `comapeo_alerts`: Post Alerts to CoMapeo API + +This script fetches alerts data from a database and posts it to a CoMapeo server. + +## Endpoints + +The request header must include an access token in the format: Authorized: Bearer . + +### `POST /projects/abc123/remoteDetectionAlerts` + +```json +{ + "detectionDateStart": "2024-11-03T04:20:69Z", + "detectionDateEnd": "2024-11-04T04:20:69Z", + "sourceId": "abc123", + "metadata": { "foo": "bar" }, + "geometry": { + "type": "Point", + "coordinates": [12, 34] + } +} +# => HTTP 201, no response body +``` + +### `GET /projects/abc123/remoteDetectionAlerts` + +```json +[ + { + "detectionDateStart": "2024-11-03T04:20:69Z", + "detectionDateEnd": "2024-11-04T04:20:69Z", + "sourceId": "abc123", + "metadata": { "foo": "bar" }, + "geometry": { + "type": "Point", + "coordinates": [12, 34] + } + }, + ... +] +``` \ No newline at end of file diff --git a/f/connectors/comapeo/comapeo_alerts.py b/f/connectors/comapeo/comapeo_alerts.py new file mode 100644 index 0000000..785199c --- /dev/null +++ b/f/connectors/comapeo/comapeo_alerts.py @@ -0,0 +1,175 @@ +# requirements: +# psycopg2-binary +# requests~=2.32 + +import logging +from typing import TypedDict + +import psycopg2 +import requests + +# type names that refer to Windmill Resources +postgresql = dict + + +class comapeo_server(TypedDict): + server_url: str + access_token: str + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def conninfo(db: postgresql): + """Convert a `postgresql` Windmill Resources to psycopg-style connection string""" + # password is optional + password_part = f" password={db['password']}" if "password" in db else "" + conn = "dbname={dbname} user={user} host={host} port={port}".format(**db) + return conn + password_part + + +def main( + db: postgresql, + comapeo: comapeo_server, + comapeo_project: str, + db_table_name: str = "alerts", +): + comapeo_server_url = comapeo["server_url"] + comapeo_alerts_endpoint = ( + f"{comapeo_server_url}/projects/{comapeo_project}/remoteDetectionAlerts" + ) + + comapeo_access_token = comapeo["access_token"] + comapeo_headers = { + "Authorization": f"Bearer {comapeo_access_token}", + "Content-Type": "application/json", + } + + alerts = get_alerts_from_db(conninfo(db), db_table_name) + + unposted_alerts = filter_alerts(comapeo_alerts_endpoint, comapeo_headers, alerts) + + post_alerts(comapeo_alerts_endpoint, comapeo_headers, unposted_alerts) + + +def get_alerts_from_db(db_connection_string, db_table_name: str): + """ + Retrieves alerts from a PostgreSQL database table. + + Parameters + ---------- + db_connection_string : str + The connection string for the PostgreSQL database. + db_table_name : str + The name of the database table containing the alerts. + + Returns + ------- + list + A list of dictionaries, where each dictionary represents an alert row from the database table with keys for the column names. + """ + logger.info("Fetching alerts from database...") + + conn = psycopg2.connect(dsn=db_connection_string) + cur = conn.cursor() + cur.execute(f"SELECT * FROM {db_table_name}") + alerts = [ + dict(zip([col.name for col in cur.description], row)) for row in cur.fetchall() + ] + cur.close() + conn.close() + return alerts + + +def _get_alerts_from_comapeo(comapeo_alerts_endpoint: str, comapeo_headers: str): + """ + Fetches alerts from the CoMapeo API. + + Parameters + ---------- + comapeo_alerts_endpoint : str + The URL endpoint for retrieving alerts from the CoMapeo API. + comapeo_headers : str + The headers to be included in the API request, such as authorization tokens. + + Returns + ------- + set + A set of alert source IDs for alerts that have been posted to the CoMapeo API. + """ + logger.info("Fetching alerts from CoMapeo API...") + response = requests.request( + "GET", url=comapeo_alerts_endpoint, headers=comapeo_headers, data={} + ) + + response.raise_for_status() + alerts = response.json().get("data", []) + + posted_alert_source_ids = {alert["sourceId"] for alert in alerts} + + return posted_alert_source_ids + + +def filter_alerts( + comapeo_alerts_endpoint: str, comapeo_headers: str, alerts: list[dict] +): + """ + Filters a list of alerts to find those that have not been posted to the CoMapeo API. + + Parameters + ---------- + comapeo_alerts_endpoint : str + The URL endpoint for retrieving alerts from the CoMapeo API. + comapeo_headers : str + The headers to be included in the API request, such as authorization tokens. + alerts : list[dict] + A list of dictionaries, where each dictionary represents an alert. + + Returns + ------- + list[dict] + A list of dictionaries, where each dictionary represents an alert that has not been posted to the CoMapeo API. + """ + logger.info("Filtering alerts...") + + alerts_posted_to_comapeo = _get_alerts_from_comapeo( + comapeo_alerts_endpoint, comapeo_headers + ) + + # alert_id in the database matches sourceId on CoMapeo + unposted_alerts = [ + alert + for alert in alerts + if alert.get("alert_id") not in alerts_posted_to_comapeo + ] + + return unposted_alerts + + +def post_alerts( + comapeo_alerts_endpoint: str, + comapeo_headers: str, + unposted_alerts, +): + """ + Posts a list of alerts to the CoMapeo API. + + Parameters + ---------- + comapeo_alerts_endpoint : str + The URL endpoint for posting alerts to the CoMapeo API. + comapeo_headers : str + The headers to be included in the API request, such as authorization tokens. + unposted_alerts : list + A list of dictionaries, where each dictionary represents an alert to be posted to the CoMapeo API. + """ + logger.info("Posting alerts to CoMapeo API...") + + for alert in unposted_alerts: + response = requests.post( + url=comapeo_alerts_endpoint, headers=comapeo_headers, json=alert + ) + response.raise_for_status() + + logger.info(f"{len(unposted_alerts)} alerts posted successfully.") diff --git a/f/connectors/comapeo/comapeo_alerts.script.lock b/f/connectors/comapeo/comapeo_alerts.script.lock new file mode 100644 index 0000000..03307c7 --- /dev/null +++ b/f/connectors/comapeo/comapeo_alerts.script.lock @@ -0,0 +1,2 @@ +psycopg2-binary==2.9.10 +requests==2.32.3 \ No newline at end of file diff --git a/f/connectors/comapeo/comapeo_alerts.script.yaml b/f/connectors/comapeo/comapeo_alerts.script.yaml new file mode 100644 index 0000000..50e8920 --- /dev/null +++ b/f/connectors/comapeo/comapeo_alerts.script.yaml @@ -0,0 +1,40 @@ +summary: 'CoMapeo: Post Alerts' +description: This script fetches alerts data from a database and posts it to a CoMapeo server. +lock: '!inline f/connectors/comapeo/comapeo_alerts.script.lock' +concurrency_time_window_s: 0 +kind: script +schema: + $schema: 'https://json-schema.org/draft/2020-12/schema' + type: object + order: + - db + - db_table_name + - comapeo + - comapeo_project + properties: + comapeo: + type: object + description: A server URL and access token pair to connect to a CoMapeo archive server. + default: null + format: resource-comapeo_server + comapeo_project: + type: string + description: A project ID on the CoMapeo server where the alerts will be posted. + default: null + originalType: string + db: + type: object + description: A database connection for fetching alerts data. + default: null + format: resource-postgresql + db_table_name: + type: string + description: The name of the database table where alerts data is stored. + default: "alerts" + originalType: string + pattern: '^.{1,54}$' + required: + - db + - db_table_name + - comapeo + - comapeo_project \ No newline at end of file diff --git a/f/connectors/comapeo/tests/assets/server_responses.py b/f/connectors/comapeo/tests/assets/server_responses.py index 797417d..6ff72c0 100644 --- a/f/connectors/comapeo/tests/assets/server_responses.py +++ b/f/connectors/comapeo/tests/assets/server_responses.py @@ -63,3 +63,17 @@ def comapeo_project_observations(uri, project_id): }, ] } + + +def comapeo_alerts(): + return { + "data": [ + { + "detectionDateStart": "2024-11-03T04:20:69Z", + "detectionDateEnd": "2024-11-04T04:20:69Z", + "sourceId": "abc123", + "metadata": {"foo": "bar"}, + "geometry": {"type": "Point", "coordinates": [12, 34]}, + } + ] + } diff --git a/f/connectors/comapeo/tests/comapeo_alerts_test.py b/f/connectors/comapeo/tests/comapeo_alerts_test.py new file mode 100644 index 0000000..9f42fd9 --- /dev/null +++ b/f/connectors/comapeo/tests/comapeo_alerts_test.py @@ -0,0 +1,53 @@ +from typing import NamedTuple + +import psycopg2 +import pytest + +from f.connectors.comapeo.comapeo_alerts import ( + main, +) + + +class Alert(NamedTuple): + alert_id: str + alert_message: str + + +@pytest.fixture +def fake_alerts_table(pg_database): + alerts = [ + Alert("abc123", "gold_mining"), + Alert("def456", "illegal_fishing"), + ] + + conn = psycopg2.connect(**pg_database) + conn.autocommit = True + cur = conn.cursor() + + try: + cur.execute(""" + CREATE TABLE fake_alerts ( + alert_id TEXT PRIMARY KEY, + alert_type TEXT + ) + """) # there are more alert columns than these, but it is not necessary for this test to include them + + values = [(a.alert_id, a.alert_message) for a in alerts] + cur.executemany("INSERT INTO fake_alerts VALUES (%s, %s)", values) + + yield alerts + finally: + cur.close() + conn.close() + + +def test_script_e2e(comapeoserver_alerts, pg_database, fake_alerts_table): + main( + pg_database, + comapeoserver_alerts.comapeo_server, + "forest_expedition", + "fake_alerts", + ) + + expected_alerts = set(a.alert_id for a in fake_alerts_table) + assert expected_alerts == {"def456", "abc123"} diff --git a/f/connectors/comapeo/tests/comapeo_observations_test.py b/f/connectors/comapeo/tests/comapeo_observations_test.py index 73fb3f9..7bcf23f 100644 --- a/f/connectors/comapeo/tests/comapeo_observations_test.py +++ b/f/connectors/comapeo/tests/comapeo_observations_test.py @@ -42,12 +42,12 @@ def test_normalize_and_snakecase_keys(): assert result == expected_output, f"Expected {expected_output}, but got {result}" -def test_script_e2e(comapeoserver, pg_database, tmp_path): +def test_script_e2e(comapeoserver_observations, pg_database, tmp_path): asset_storage = tmp_path / "datalake" main( - comapeoserver.comapeo_server, - comapeoserver.comapeo_project_blocklist, + comapeoserver_observations.comapeo_server, + comapeoserver_observations.comapeo_project_blocklist, pg_database, "comapeo", asset_storage, diff --git a/f/connectors/comapeo/tests/conftest.py b/f/connectors/comapeo/tests/conftest.py index c0d1365..0195331 100644 --- a/f/connectors/comapeo/tests/conftest.py +++ b/f/connectors/comapeo/tests/conftest.py @@ -5,8 +5,8 @@ import responses import testing.postgresql -from f.connectors.comapeo.tests.assets import server_responses from f.connectors.comapeo.comapeo_observations import comapeo_server +from f.connectors.comapeo.tests.assets import server_responses @pytest.fixture @@ -16,7 +16,7 @@ def mocked_responses(): @pytest.fixture -def comapeoserver(mocked_responses): +def comapeoserver_observations(mocked_responses): """A mock CoMapeo Server that you can use to provide projects and their observations""" @dataclass @@ -54,6 +54,39 @@ class CoMapeoServer: ) +@pytest.fixture +def comapeoserver_alerts(mocked_responses): + """A mock CoMapeo Server that you can use to get and post alerts""" + + @dataclass + class CoMapeoServer: + comapeo_server: dict + + server_url = "http://comapeo.example.org" + project_id = "forest_expedition" + comapeo_alerts_endpoint = ( + f"{server_url}/projects/{project_id}/remoteDetectionAlerts" + ) + access_token = "MapYourWorldTogether!" + + mocked_responses.get( + comapeo_alerts_endpoint, + json=server_responses.comapeo_alerts(), + status=201, + ) + + mocked_responses.post( + comapeo_alerts_endpoint, + status=201, + ) + + server: comapeo_server = dict(server_url=server_url, access_token=access_token) + + return CoMapeoServer( + server, + ) + + @pytest.fixture def pg_database(): db = testing.postgresql.Postgresql(port=7654) diff --git a/tox.ini b/tox.ini index bb2078d..14cd360 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] skipsdist = True -env_list = alerts, comapeo_observations, kobotoolbox_responses, postgres_to_geojson +env_list = alerts, comapeo, kobotoolbox_responses, postgres_to_geojson [testenv] setenv = @@ -22,9 +22,10 @@ environment = expose = TOX_DOCKER_GCS_PORT=10010/tcp -[testenv:comapeo_observations] +[testenv:comapeo] deps = -r{toxinidir}/f/connectors/comapeo/comapeo_observations.script.lock + -r{toxinidir}/f/connectors/comapeo/comapeo_alerts.script.lock -r{toxinidir}/f/connectors/comapeo/tests/requirements-test.txt commands = pytest {posargs} f/connectors/comapeo