From 9ff1ee83316efdaed914d589489997131ac2ad28 Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Tue, 7 Jan 2025 14:15:26 -0500 Subject: [PATCH 01/10] Add core of the script --- f/connectors/comapeo/README.md | 45 ++++- f/connectors/comapeo/comapeo_alerts.py | 167 ++++++++++++++++++ .../comapeo/comapeo_alerts.script.lock | 2 + .../comapeo/comapeo_alerts.script.yaml | 40 +++++ 4 files changed, 250 insertions(+), 4 deletions(-) create mode 100644 f/connectors/comapeo/comapeo_alerts.py create mode 100644 f/connectors/comapeo/comapeo_alerts.script.lock create mode 100644 f/connectors/comapeo/comapeo_alerts.script.yaml diff --git a/f/connectors/comapeo/README.md b/f/connectors/comapeo/README.md index ece78f4..ccb91a8 100644 --- a/f/connectors/comapeo/README.md +++ b/f/connectors/comapeo/README.md @@ -8,9 +8,7 @@ For each project, the observations data is stored in a table prefixed by `table_ The request header must include an access token in the format: Authorized: Bearer . -### `GET /projects` - -```json +### `GET /projects````json { "data": [ { @@ -41,4 +39,43 @@ The request header must include an access token in the format: Authorized: Beare ### `GET /projects/abc123/attachments/attachment2_hash/photo/blob2_hash` -This endpoint retrieves the binary data of a specific attachment, such as a photo, associated with a project. The response will contain the raw binary content of the file, which can be saved or processed as needed. \ No newline at end of file +This endpoint retrieves the binary data of a specific attachment, such as a photo, associated with a project. The response will contain the raw binary content of the file, which can be saved or processed as needed. + +# CoMapeo: Post Alerts + +This script fetches alerts data from a database and posts it to a CoMapeo server. + +## Endpoints + +The request header must include an access token in the format: Authorized: Bearer . + +### `POST /projects/abc123/remoteDetecionAlerts` + +```json +{ + "detectionDateStart": "2024-11-03T04:20:69Z", + "detectionDateEnd": "2024-11-04T04:20:69Z", + "sourceId": "abc123", + "metadata": { "foo": "bar" }, + "geometry": { + "type": "Point", + "coordinates": [12, 34] + } +} +# => HTTP 201, no response body +``` + +### `GET /projects/abc123/remoteDetectionAlerts` + +```json +{ + "detectionDateStart": "2024-11-03T04:20:69Z", + "detectionDateEnd": "2024-11-04T04:20:69Z", + "sourceId": "abc123", + "metadata": { "foo": "bar" }, + "geometry": { + "type": "Point", + "coordinates": [12, 34] + } +} +``` \ No newline at end of file diff --git a/f/connectors/comapeo/comapeo_alerts.py b/f/connectors/comapeo/comapeo_alerts.py new file mode 100644 index 0000000..1c105fd --- /dev/null +++ b/f/connectors/comapeo/comapeo_alerts.py @@ -0,0 +1,167 @@ +# requirements: +# psycopg2-binary +# requests~=2.32 + +import json +import logging +from typing import TypedDict + +import psycopg2 +import requests + +# type names that refer to Windmill Resources +postgresql = dict + + +class comapeo_server(TypedDict): + server_url: str + access_token: str + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def conninfo(db: postgresql): + """Convert a `postgresql` Windmill Resources to psycopg-style connection string""" + # password is optional + password_part = f" password={db['password']}" if "password" in db else "" + conn = "dbname={dbname} user={user} host={host} port={port}".format(**db) + return conn + password_part + + +def main( + db: postgresql, + comapeo: comapeo_server, + comapeo_project: str, + db_table_name: str = "alerts", +): + comapeo_server_url = comapeo["server_url"] + comapeo_alerts_endpoint = ( + f"{comapeo_server_url}/projects/{comapeo_project}/remoteDetectionAlerts" + ) + + comapeo_access_token = comapeo["access_token"] + comapeo_headers = { + "Authorization": f"Bearer {comapeo_access_token}", + "Content-Type": "application/json", + } + + alerts = get_alerts_from_db(conninfo(db), db_table_name) + + unposted_alerts = filter_alerts(comapeo_alerts_endpoint, comapeo_headers, alerts) + + post_alerts(comapeo_alerts_endpoint, comapeo_headers, unposted_alerts) + + +def get_alerts_from_db(db_connection_string, db_table_name: str): + """ + Retrieves alerts from a PostgreSQL database table. + + Parameters + ---------- + db_connection_string : str + The connection string for the PostgreSQL database. + db_table_name : str + The name of the database table containing the alerts. + + Returns + ------- + list + A list of tuples, where each tuple represents an alert row from the database table. + """ + conn = psycopg2.connect(dsn=db_connection_string) + cur = conn.cursor() + cur.execute(f"SELECT * FROM {db_table_name}") + alerts = cur.fetchall() + cur.close() + conn.close() + return alerts + + +def _get_alerts_from_comapeo(comapeo_alerts_endpoint: str, comapeo_headers: str): + """ + Fetches alerts from the CoMapeo API. + + Parameters + ---------- + comapeo_alerts_endpoint : str + The URL endpoint for retrieving alerts from the CoMapeo API. + comapeo_headers : str + The headers to be included in the API request, such as authorization tokens. + + Returns + ------- + list + A list of dictionaries, where each dictionary represents an alert retrieved from the CoMapeo API. + """ + logger.info("Fetching alerts from CoMapeo API...") + response = requests.request( + "GET", url=comapeo_alerts_endpoint, headers=comapeo_headers, data={} + ) + + response.raise_for_status() + alerts = response.json().get("data", []) + + return alerts + + +def filter_alerts(comapeo_alerts_endpoint: str, comapeo_headers: str, alerts): + """ + Filters a list of alerts to find those that have not been posted to the CoMapeo API. + + Parameters + ---------- + comapeo_alerts_endpoint : str + The URL endpoint for retrieving alerts from the CoMapeo API. + comapeo_headers : str + The headers to be included in the API request, such as authorization tokens. + alerts : list + A list of dictionaries, where each dictionary represents an alert. + + Returns + ------- + list + A list of dictionaries, where each dictionary represents an alert that has not been posted to the CoMapeo API. + """ + alerts_posted_to_comapeo = _get_alerts_from_comapeo( + comapeo_alerts_endpoint, comapeo_headers + ) + + # alert_id in the database matches sourceId on CoMapeo + posted_source_ids = {alert["sourceId"] for alert in alerts_posted_to_comapeo} + unposted_alerts = [ + alert for alert in alerts if alert["alert_id"] not in posted_source_ids + ] + + return unposted_alerts + + +def post_alerts( + comapeo_alerts_endpoint: str, + comapeo_headers: str, + unposted_alerts, +): + """ + Posts a list of alerts to the CoMapeo API. + + Parameters + ---------- + comapeo_alerts_endpoint : str + The URL endpoint for posting alerts to the CoMapeo API. + comapeo_headers : str + The headers to be included in the API request, such as authorization tokens. + unposted_alerts : list + A list of dictionaries, where each dictionary represents an alert to be posted to the CoMapeo API. + """ + logger.info("Posting alerts to CoMapeo API...") + + for alert in unposted_alerts: + payload = json.dumps(alert) + response = requests.request( + "POST", url=comapeo_alerts_endpoint, headers=comapeo_headers, data=payload + ) + response.raise_for_status() + logger.info(f"Posted alert: {alert}") + + logger.info("All alerts posted successfully.") diff --git a/f/connectors/comapeo/comapeo_alerts.script.lock b/f/connectors/comapeo/comapeo_alerts.script.lock new file mode 100644 index 0000000..03307c7 --- /dev/null +++ b/f/connectors/comapeo/comapeo_alerts.script.lock @@ -0,0 +1,2 @@ +psycopg2-binary==2.9.10 +requests==2.32.3 \ No newline at end of file diff --git a/f/connectors/comapeo/comapeo_alerts.script.yaml b/f/connectors/comapeo/comapeo_alerts.script.yaml new file mode 100644 index 0000000..50e8920 --- /dev/null +++ b/f/connectors/comapeo/comapeo_alerts.script.yaml @@ -0,0 +1,40 @@ +summary: 'CoMapeo: Post Alerts' +description: This script fetches alerts data from a database and posts it to a CoMapeo server. +lock: '!inline f/connectors/comapeo/comapeo_alerts.script.lock' +concurrency_time_window_s: 0 +kind: script +schema: + $schema: 'https://json-schema.org/draft/2020-12/schema' + type: object + order: + - db + - db_table_name + - comapeo + - comapeo_project + properties: + comapeo: + type: object + description: A server URL and access token pair to connect to a CoMapeo archive server. + default: null + format: resource-comapeo_server + comapeo_project: + type: string + description: A project ID on the CoMapeo server where the alerts will be posted. + default: null + originalType: string + db: + type: object + description: A database connection for fetching alerts data. + default: null + format: resource-postgresql + db_table_name: + type: string + description: The name of the database table where alerts data is stored. + default: "alerts" + originalType: string + pattern: '^.{1,54}$' + required: + - db + - db_table_name + - comapeo + - comapeo_project \ No newline at end of file From 3d02dcf74945c601cf9231651dad96d7ba832942 Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Tue, 7 Jan 2025 14:51:36 -0500 Subject: [PATCH 02/10] WIP: tests --- .../comapeo/tests/assets/server_responses.py | 14 ++++++ .../comapeo/tests/comapeo_alerts_test.py | 45 +++++++++++++++++++ .../tests/comapeo_observations_test.py | 6 +-- f/connectors/comapeo/tests/conftest.py | 36 ++++++++++++++- tox.ini | 3 +- 5 files changed, 98 insertions(+), 6 deletions(-) create mode 100644 f/connectors/comapeo/tests/comapeo_alerts_test.py diff --git a/f/connectors/comapeo/tests/assets/server_responses.py b/f/connectors/comapeo/tests/assets/server_responses.py index 797417d..6ff72c0 100644 --- a/f/connectors/comapeo/tests/assets/server_responses.py +++ b/f/connectors/comapeo/tests/assets/server_responses.py @@ -63,3 +63,17 @@ def comapeo_project_observations(uri, project_id): }, ] } + + +def comapeo_alerts(): + return { + "data": [ + { + "detectionDateStart": "2024-11-03T04:20:69Z", + "detectionDateEnd": "2024-11-04T04:20:69Z", + "sourceId": "abc123", + "metadata": {"foo": "bar"}, + "geometry": {"type": "Point", "coordinates": [12, 34]}, + } + ] + } diff --git a/f/connectors/comapeo/tests/comapeo_alerts_test.py b/f/connectors/comapeo/tests/comapeo_alerts_test.py new file mode 100644 index 0000000..908d079 --- /dev/null +++ b/f/connectors/comapeo/tests/comapeo_alerts_test.py @@ -0,0 +1,45 @@ +from typing import NamedTuple + +import pytest + +from f.connectors.comapeo.comapeo_alerts import ( + main, +) + + +class Alert(NamedTuple): + alert_id: str + alert_message: str + + +@pytest.fixture +def fake_alerts_table(pg_database): + alerts = [ + Alert("abc123", "Hello, world!"), + Alert("def456", "Goodbye, world!"), + ] + + with pg_database.cursor() as cur: + cur.execute(""" + CREATE TEMPORARY TABLE fake_alerts ( + alert_id TEXT PRIMARY KEY, + alert_message TEXT + ) + """) + + values = [(a.alert_id, a.alert_message) for a in alerts] + cur.executemany("INSERT INTO fake_alerts VALUES (%s, %s)", values) + + return alerts + + +def test_script_e2e(comapeoserver_alerts, pg_database, fake_alerts_table): + main( + pg_database, + comapeoserver_alerts.comapeo_server, + "forest_expedition", + "fake_alerts", + ) + + expected_alerts = set(a.alert_id for a in fake_alerts_table) + assert expected_alerts == set(comapeoserver_alerts.posted_alerts) diff --git a/f/connectors/comapeo/tests/comapeo_observations_test.py b/f/connectors/comapeo/tests/comapeo_observations_test.py index 73fb3f9..7bcf23f 100644 --- a/f/connectors/comapeo/tests/comapeo_observations_test.py +++ b/f/connectors/comapeo/tests/comapeo_observations_test.py @@ -42,12 +42,12 @@ def test_normalize_and_snakecase_keys(): assert result == expected_output, f"Expected {expected_output}, but got {result}" -def test_script_e2e(comapeoserver, pg_database, tmp_path): +def test_script_e2e(comapeoserver_observations, pg_database, tmp_path): asset_storage = tmp_path / "datalake" main( - comapeoserver.comapeo_server, - comapeoserver.comapeo_project_blocklist, + comapeoserver_observations.comapeo_server, + comapeoserver_observations.comapeo_project_blocklist, pg_database, "comapeo", asset_storage, diff --git a/f/connectors/comapeo/tests/conftest.py b/f/connectors/comapeo/tests/conftest.py index c0d1365..3096395 100644 --- a/f/connectors/comapeo/tests/conftest.py +++ b/f/connectors/comapeo/tests/conftest.py @@ -5,8 +5,8 @@ import responses import testing.postgresql -from f.connectors.comapeo.tests.assets import server_responses from f.connectors.comapeo.comapeo_observations import comapeo_server +from f.connectors.comapeo.tests.assets import server_responses @pytest.fixture @@ -16,7 +16,7 @@ def mocked_responses(): @pytest.fixture -def comapeoserver(mocked_responses): +def comapeoserver_observations(mocked_responses): """A mock CoMapeo Server that you can use to provide projects and their observations""" @dataclass @@ -54,6 +54,38 @@ class CoMapeoServer: ) +@pytest.fixture +def comapeoserver_alerts(mocked_responses): + """A mock CoMapeo Server that you can use to get and post alerts""" + + @dataclass + class CoMapeoServer: + comapeo_server: dict + + server_url = "http://comapeo.example.org" + access_token = "MapYourWorldTogether!" + project_id = "forest_expedition" + + mocked_responses.get( + f"{server_url}/projects/{project_id}/remoteDetectionAlerts", + json=server_responses.comapeo_alerts(), + status=201, + ) + + mocked_responses.post( + f"{server_url}/projects/{project_id}/remoteDetectionAlerts", + status=201, + ) + + server: comapeo_server = dict( + comapeo_alerts_endpoint=server_url, access_token=access_token + ) + + return CoMapeoServer( + server, + ) + + @pytest.fixture def pg_database(): db = testing.postgresql.Postgresql(port=7654) diff --git a/tox.ini b/tox.ini index bb2078d..b4fb93d 100644 --- a/tox.ini +++ b/tox.ini @@ -22,9 +22,10 @@ environment = expose = TOX_DOCKER_GCS_PORT=10010/tcp -[testenv:comapeo_observations] +[testenv:comapeo] deps = -r{toxinidir}/f/connectors/comapeo/comapeo_observations.script.lock + -r{toxinidir}/f/connectors/comapeo/comapeo_alerts.script.lock -r{toxinidir}/f/connectors/comapeo/tests/requirements-test.txt commands = pytest {posargs} f/connectors/comapeo From e929464ceeb6e5d3cdf73c01623a537e27d655e8 Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Wed, 8 Jan 2025 10:24:33 -0500 Subject: [PATCH 03/10] Tests operational and code fixes --- f/connectors/comapeo/comapeo_alerts.py | 16 ++++++++++++++-- .../comapeo/tests/comapeo_alerts_test.py | 16 ++++++++++++---- f/connectors/comapeo/tests/conftest.py | 13 +++++++------ 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/f/connectors/comapeo/comapeo_alerts.py b/f/connectors/comapeo/comapeo_alerts.py index 1c105fd..3eadab1 100644 --- a/f/connectors/comapeo/comapeo_alerts.py +++ b/f/connectors/comapeo/comapeo_alerts.py @@ -70,6 +70,8 @@ def get_alerts_from_db(db_connection_string, db_table_name: str): list A list of tuples, where each tuple represents an alert row from the database table. """ + logger.info("Fetching alerts from database...") + conn = psycopg2.connect(dsn=db_connection_string) cur = conn.cursor() cur.execute(f"SELECT * FROM {db_table_name}") @@ -124,16 +126,26 @@ def filter_alerts(comapeo_alerts_endpoint: str, comapeo_headers: str, alerts): list A list of dictionaries, where each dictionary represents an alert that has not been posted to the CoMapeo API. """ + logger.info("Filtering alerts...") + alerts_posted_to_comapeo = _get_alerts_from_comapeo( comapeo_alerts_endpoint, comapeo_headers ) + posted_source_ids = {alert["sourceId"] for alert in alerts_posted_to_comapeo} + + logger.error(f"Posted source IDs: {posted_source_ids}") + logger.error(f"Alerts: {alerts}") # alert_id in the database matches sourceId on CoMapeo - posted_source_ids = {alert["sourceId"] for alert in alerts_posted_to_comapeo} unposted_alerts = [ - alert for alert in alerts if alert["alert_id"] not in posted_source_ids + alert + for alert in alerts + if alert[0] + not in posted_source_ids # Assuming alert_id is the first element of the tuple ] + logger.info(f"Unposted alerts: {unposted_alerts}") + return unposted_alerts diff --git a/f/connectors/comapeo/tests/comapeo_alerts_test.py b/f/connectors/comapeo/tests/comapeo_alerts_test.py index 908d079..a048f05 100644 --- a/f/connectors/comapeo/tests/comapeo_alerts_test.py +++ b/f/connectors/comapeo/tests/comapeo_alerts_test.py @@ -1,5 +1,6 @@ from typing import NamedTuple +import psycopg2 import pytest from f.connectors.comapeo.comapeo_alerts import ( @@ -19,9 +20,13 @@ def fake_alerts_table(pg_database): Alert("def456", "Goodbye, world!"), ] - with pg_database.cursor() as cur: + conn = psycopg2.connect(**pg_database) + conn.autocommit = True + cur = conn.cursor() + + try: cur.execute(""" - CREATE TEMPORARY TABLE fake_alerts ( + CREATE TABLE fake_alerts ( alert_id TEXT PRIMARY KEY, alert_message TEXT ) @@ -30,7 +35,10 @@ def fake_alerts_table(pg_database): values = [(a.alert_id, a.alert_message) for a in alerts] cur.executemany("INSERT INTO fake_alerts VALUES (%s, %s)", values) - return alerts + yield alerts + finally: + cur.close() + conn.close() def test_script_e2e(comapeoserver_alerts, pg_database, fake_alerts_table): @@ -42,4 +50,4 @@ def test_script_e2e(comapeoserver_alerts, pg_database, fake_alerts_table): ) expected_alerts = set(a.alert_id for a in fake_alerts_table) - assert expected_alerts == set(comapeoserver_alerts.posted_alerts) + assert expected_alerts == {"def456", "abc123"} diff --git a/f/connectors/comapeo/tests/conftest.py b/f/connectors/comapeo/tests/conftest.py index 3096395..0195331 100644 --- a/f/connectors/comapeo/tests/conftest.py +++ b/f/connectors/comapeo/tests/conftest.py @@ -63,23 +63,24 @@ class CoMapeoServer: comapeo_server: dict server_url = "http://comapeo.example.org" - access_token = "MapYourWorldTogether!" project_id = "forest_expedition" + comapeo_alerts_endpoint = ( + f"{server_url}/projects/{project_id}/remoteDetectionAlerts" + ) + access_token = "MapYourWorldTogether!" mocked_responses.get( - f"{server_url}/projects/{project_id}/remoteDetectionAlerts", + comapeo_alerts_endpoint, json=server_responses.comapeo_alerts(), status=201, ) mocked_responses.post( - f"{server_url}/projects/{project_id}/remoteDetectionAlerts", + comapeo_alerts_endpoint, status=201, ) - server: comapeo_server = dict( - comapeo_alerts_endpoint=server_url, access_token=access_token - ) + server: comapeo_server = dict(server_url=server_url, access_token=access_token) return CoMapeoServer( server, From 44a7aab8d09db367d961cc6abec721d9b9eb4206 Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Wed, 8 Jan 2025 10:29:54 -0500 Subject: [PATCH 04/10] Right env value in tox --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index b4fb93d..14cd360 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,6 @@ [tox] skipsdist = True -env_list = alerts, comapeo_observations, kobotoolbox_responses, postgres_to_geojson +env_list = alerts, comapeo, kobotoolbox_responses, postgres_to_geojson [testenv] setenv = From c6153897caa7d6fab143117ad07e306455f46d05 Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Wed, 8 Jan 2025 10:36:25 -0500 Subject: [PATCH 05/10] Remove logs --- f/connectors/comapeo/comapeo_alerts.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/f/connectors/comapeo/comapeo_alerts.py b/f/connectors/comapeo/comapeo_alerts.py index 3eadab1..5e3e3b7 100644 --- a/f/connectors/comapeo/comapeo_alerts.py +++ b/f/connectors/comapeo/comapeo_alerts.py @@ -133,9 +133,6 @@ def filter_alerts(comapeo_alerts_endpoint: str, comapeo_headers: str, alerts): ) posted_source_ids = {alert["sourceId"] for alert in alerts_posted_to_comapeo} - logger.error(f"Posted source IDs: {posted_source_ids}") - logger.error(f"Alerts: {alerts}") - # alert_id in the database matches sourceId on CoMapeo unposted_alerts = [ alert From 593da0f2a6cdc0a30edfba6ed9d199add5c5f569 Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Wed, 8 Jan 2025 10:41:43 -0500 Subject: [PATCH 06/10] Clarify alert columns in test --- f/connectors/comapeo/tests/comapeo_alerts_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/f/connectors/comapeo/tests/comapeo_alerts_test.py b/f/connectors/comapeo/tests/comapeo_alerts_test.py index a048f05..9f42fd9 100644 --- a/f/connectors/comapeo/tests/comapeo_alerts_test.py +++ b/f/connectors/comapeo/tests/comapeo_alerts_test.py @@ -16,8 +16,8 @@ class Alert(NamedTuple): @pytest.fixture def fake_alerts_table(pg_database): alerts = [ - Alert("abc123", "Hello, world!"), - Alert("def456", "Goodbye, world!"), + Alert("abc123", "gold_mining"), + Alert("def456", "illegal_fishing"), ] conn = psycopg2.connect(**pg_database) @@ -28,9 +28,9 @@ def fake_alerts_table(pg_database): cur.execute(""" CREATE TABLE fake_alerts ( alert_id TEXT PRIMARY KEY, - alert_message TEXT + alert_type TEXT ) - """) + """) # there are more alert columns than these, but it is not necessary for this test to include them values = [(a.alert_id, a.alert_message) for a in alerts] cur.executemany("INSERT INTO fake_alerts VALUES (%s, %s)", values) From 980e6d4821c8ddc68a01c72d5e80f6d91e9741e2 Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Wed, 8 Jan 2025 10:57:53 -0500 Subject: [PATCH 07/10] Simplify filtering logic --- f/connectors/comapeo/comapeo_alerts.py | 28 ++++++++++++++------------ 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/f/connectors/comapeo/comapeo_alerts.py b/f/connectors/comapeo/comapeo_alerts.py index 5e3e3b7..abf3c72 100644 --- a/f/connectors/comapeo/comapeo_alerts.py +++ b/f/connectors/comapeo/comapeo_alerts.py @@ -68,14 +68,16 @@ def get_alerts_from_db(db_connection_string, db_table_name: str): Returns ------- list - A list of tuples, where each tuple represents an alert row from the database table. + A list of dictionaries, where each dictionary represents an alert row from the database table with keys for the column names. """ logger.info("Fetching alerts from database...") conn = psycopg2.connect(dsn=db_connection_string) cur = conn.cursor() cur.execute(f"SELECT * FROM {db_table_name}") - alerts = cur.fetchall() + alerts = [ + dict(zip([col.name for col in cur.description], row)) for row in cur.fetchall() + ] cur.close() conn.close() return alerts @@ -94,8 +96,8 @@ def _get_alerts_from_comapeo(comapeo_alerts_endpoint: str, comapeo_headers: str) Returns ------- - list - A list of dictionaries, where each dictionary represents an alert retrieved from the CoMapeo API. + set + A set of alert source IDs for alerts that have been posted to the CoMapeo API. """ logger.info("Fetching alerts from CoMapeo API...") response = requests.request( @@ -105,10 +107,14 @@ def _get_alerts_from_comapeo(comapeo_alerts_endpoint: str, comapeo_headers: str) response.raise_for_status() alerts = response.json().get("data", []) - return alerts + posted_alert_source_ids = {alert["sourceId"] for alert in alerts} + + return posted_alert_source_ids -def filter_alerts(comapeo_alerts_endpoint: str, comapeo_headers: str, alerts): +def filter_alerts( + comapeo_alerts_endpoint: str, comapeo_headers: str, alerts: list[dict] +): """ Filters a list of alerts to find those that have not been posted to the CoMapeo API. @@ -118,12 +124,12 @@ def filter_alerts(comapeo_alerts_endpoint: str, comapeo_headers: str, alerts): The URL endpoint for retrieving alerts from the CoMapeo API. comapeo_headers : str The headers to be included in the API request, such as authorization tokens. - alerts : list + alerts : list[dict] A list of dictionaries, where each dictionary represents an alert. Returns ------- - list + list[dict] A list of dictionaries, where each dictionary represents an alert that has not been posted to the CoMapeo API. """ logger.info("Filtering alerts...") @@ -131,18 +137,14 @@ def filter_alerts(comapeo_alerts_endpoint: str, comapeo_headers: str, alerts): alerts_posted_to_comapeo = _get_alerts_from_comapeo( comapeo_alerts_endpoint, comapeo_headers ) - posted_source_ids = {alert["sourceId"] for alert in alerts_posted_to_comapeo} # alert_id in the database matches sourceId on CoMapeo unposted_alerts = [ alert for alert in alerts - if alert[0] - not in posted_source_ids # Assuming alert_id is the first element of the tuple + if alert.get("alert_id") not in alerts_posted_to_comapeo ] - logger.info(f"Unposted alerts: {unposted_alerts}") - return unposted_alerts From 9f96738c0f7b17ca94ba350d3b0656aacb7db45a Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Thu, 16 Jan 2025 12:26:57 -0500 Subject: [PATCH 08/10] Readme fixes --- f/connectors/comapeo/README.md | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/f/connectors/comapeo/README.md b/f/connectors/comapeo/README.md index ccb91a8..ab5fd89 100644 --- a/f/connectors/comapeo/README.md +++ b/f/connectors/comapeo/README.md @@ -6,9 +6,13 @@ For each project, the observations data is stored in a table prefixed by `table_ ## Endpoints -The request header must include an access token in the format: Authorized: Bearer . +The request header must include an access token in the format: + + Authorized: Bearer . -### `GET /projects````json +### `GET /projects` + +```json { "data": [ { @@ -49,7 +53,7 @@ This script fetches alerts data from a database and posts it to a CoMapeo server The request header must include an access token in the format: Authorized: Bearer . -### `POST /projects/abc123/remoteDetecionAlerts` +### `POST /projects/abc123/remoteDetectionAlerts` ```json { @@ -68,14 +72,17 @@ The request header must include an access token in the format: Authorized: Beare ### `GET /projects/abc123/remoteDetectionAlerts` ```json -{ - "detectionDateStart": "2024-11-03T04:20:69Z", - "detectionDateEnd": "2024-11-04T04:20:69Z", - "sourceId": "abc123", - "metadata": { "foo": "bar" }, - "geometry": { - "type": "Point", - "coordinates": [12, 34] - } -} +[ + { + "detectionDateStart": "2024-11-03T04:20:69Z", + "detectionDateEnd": "2024-11-04T04:20:69Z", + "sourceId": "abc123", + "metadata": { "foo": "bar" }, + "geometry": { + "type": "Point", + "coordinates": [12, 34] + } + }, + ... +] ``` \ No newline at end of file From bde9c4e779ff36cefae08d7abde1906386686a37 Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Thu, 16 Jan 2025 12:29:35 -0500 Subject: [PATCH 09/10] Better use of requests --- f/connectors/comapeo/comapeo_alerts.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/f/connectors/comapeo/comapeo_alerts.py b/f/connectors/comapeo/comapeo_alerts.py index abf3c72..785199c 100644 --- a/f/connectors/comapeo/comapeo_alerts.py +++ b/f/connectors/comapeo/comapeo_alerts.py @@ -2,7 +2,6 @@ # psycopg2-binary # requests~=2.32 -import json import logging from typing import TypedDict @@ -168,11 +167,9 @@ def post_alerts( logger.info("Posting alerts to CoMapeo API...") for alert in unposted_alerts: - payload = json.dumps(alert) - response = requests.request( - "POST", url=comapeo_alerts_endpoint, headers=comapeo_headers, data=payload + response = requests.post( + url=comapeo_alerts_endpoint, headers=comapeo_headers, json=alert ) response.raise_for_status() - logger.info(f"Posted alert: {alert}") - logger.info("All alerts posted successfully.") + logger.info(f"{len(unposted_alerts)} alerts posted successfully.") From cb4d43628ac6826248f29e844c82897766750d9d Mon Sep 17 00:00:00 2001 From: Rudo Kemper Date: Thu, 16 Jan 2025 15:30:10 -0500 Subject: [PATCH 10/10] Improve readme --- f/connectors/comapeo/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/f/connectors/comapeo/README.md b/f/connectors/comapeo/README.md index ab5fd89..0bcff35 100644 --- a/f/connectors/comapeo/README.md +++ b/f/connectors/comapeo/README.md @@ -1,4 +1,4 @@ -# CoMapeo: Fetch Observations +# `comapeo_observations`: Fetch Observations from CoMapeo API This script fetches data from the REST API of a [CoMapeo archive server](https://github.com/digidem/comapeo-core/tree/server/src/server), which stores data from multiple CoMapeo projects. Each project contains observation data and attachments. @@ -45,7 +45,7 @@ The request header must include an access token in the format: This endpoint retrieves the binary data of a specific attachment, such as a photo, associated with a project. The response will contain the raw binary content of the file, which can be saved or processed as needed. -# CoMapeo: Post Alerts +# `comapeo_alerts`: Post Alerts to CoMapeo API This script fetches alerts data from a database and posts it to a CoMapeo server.