diff --git a/services/ui_backend_service/api/card.py b/services/ui_backend_service/api/card.py index 3095043c..39f925eb 100644 --- a/services/ui_backend_service/api/card.py +++ b/services/ui_backend_service/api/card.py @@ -10,17 +10,39 @@ DBPagination, DBResponse, ) -from services.ui_backend_service.data.cache.card_cache_manager import wait_until_card_is_ready, CARD_API_HTML_WAIT_TIME from services.ui_backend_service.data.cache.card_cache_manager import list_cards as list_cards_from_cache -import time +from services.ui_backend_service.data.cache.card_datastore_gets import DynamicCardGetClients +import json from aiohttp import web -import asyncio + + +async def stream_html_response(request, html_file_path): + # Create a response object + response = web.StreamResponse( + status=200, + headers={"Content-Type": "text/html", "Content-Disposition": "inline"}, + ) + # Prepare the response for streaming + await response.prepare(request) + + # Stream the file content to the response + with open(html_file_path, "rb") as file: + while True: + chunk = file.read(4096) + if not chunk: + break + await response.write(chunk) + + return response class CardsApi(object): def __init__(self, app, db, cache=None): self.db = db self.cache = getattr(cache, "card_cache", None) + self.card_db_client = DynamicCardGetClients() + self._metadata_table = self.db.metadata_table_postgres + app.router.add_route( "GET", "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards", @@ -28,43 +50,50 @@ def __init__(self, app, db, cache=None): ) app.router.add_route( "GET", - "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{hash}", - self.get_card_content_by_hash, + "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{type}/{uuid}", + self.get_cards_direct, ) + app.router.add_route( "GET", - "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{hash}/data", - self.get_card_data_by_hash, + "/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/cards/{type}/{uuid}/data", + self.get_cards_data_direct, ) - async def get_task_by_request(self, request): + + async def extract_ds_root_from_metadata(self, request): flow_id, run_number, step_name, task_id, _ = get_pathspec_from_request(request) run_id_key, run_id_value = translate_run_key(run_number) task_id_key, task_id_value = translate_task_key(task_id) + step_name = request.match_info.get("step_name") + conditions = [ - "flow_id = %s", - "{run_id_key} = %s".format(run_id_key=run_id_key), - "step_name = %s", - "{task_id_key} = %s".format(task_id_key=task_id_key), + "flow_id = %s", + "{run_id_key} = %s".format( + run_id_key=run_id_key), + "step_name = %s", + "{task_id_key} = %s".format( + task_id_key=task_id_key), + "field_name = 'ds-root'" ] - values = [flow_id, run_id_value, step_name, task_id_value] - db_response, *_ = await self.db.task_table_postgres.find_records( + + db_response, *_ = await self._metadata_table.find_records( fetch_single=True, conditions=conditions, - values=values, + values=[flow_id, run_id_value, step_name, task_id_value], expanded=True, ) if db_response.response_code == 200: - return db_response.body + return db_response.body.get("value", None) return None @handle_exceptions - async def get_cards_list_for_task(self, request): + async def get_cards_direct(self, request): """ --- - description: Get all identifiers of cards for a task + description: Get specific card of a task tags: - Card parameters: @@ -72,56 +101,57 @@ async def get_cards_list_for_task(self, request): - $ref: '#/definitions/Params/Path/run_number' - $ref: '#/definitions/Params/Path/step_name' - $ref: '#/definitions/Params/Path/task_id' - - $ref: '#/definitions/Params/Builtin/_page' - - $ref: '#/definitions/Params/Builtin/_limit' - - $ref: '#/definitions/Params/Custom/invalidate' + - $ref: '#/definitions/Params/Path/type' + - $ref: '#/definitions/Params/Path/hash' + - $ref: '#/definitions/Params/Custom/user_set_id' produces: - - application/json + - text/html responses: "200": - description: Returns a list of cards for the specified task - schema: - $ref: '#/definitions/ResponsesCardList' + description: Returns the HTML content of a card with the specific hash "404": - description: Task was not found. + description: Card was not found. "405": description: invalid HTTP Method schema: $ref: '#/definitions/ResponsesError405' """ - task = await self.get_task_by_request(request) if not task: - return web_response(404, {"data": []}) - - cards = await get_card_list(self.cache, task, max_wait_time=1) - - if cards is None: - # Handle edge: Cache failed to return anything, even errors. - # NOTE: choice of status 200 here is quite arbitrary, as the cache returning None is usually - # caused by a premature request, and cards are not permanently missing. - return web_response(200, {"data": []}) - - card_hashes = [ - {"id": data["id"], "hash": hash, "type": data["type"]} - for hash, data in cards.items() - ] - # paginate list of cards - limit, page, offset = get_pagination_params(request) - _pages = max(len(card_hashes) // limit, 1) - limited_set = card_hashes[offset:][:limit] - - response = DBResponse(200, limited_set) - pagination = DBPagination(limit, limit * (page - 1), len(response.body), page) - status, body = format_response_list(request, response, pagination, page, _pages) - - return web_response(status, body) + return web.Response( + content_type="text/html", status=404, body="Task not found." + ) + datastore_root = await self.extract_ds_root_from_metadata(request) + if datastore_root is None: + return web.Response( + content_type="text/html", status=404, body="No datastore found for task" + ) + + pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format( + flow_id=task.get("flow_id"), + run_id=task.get("run_id") or task.get("run_number"), + step_name=task.get("step_name"), + task_id=task.get("task_name") or task.get("task_id"), + ) + card_type = request.match_info.get("type") + card_uuid = request.match_info.get("uuid") + # card_user_id is set in the query parameters + card_user_id = request.query.get("user_set_id") + with self.card_db_client[datastore_root].fetch(pathspec, card_type, card_uuid, card_user_id, object_type="card") as ff: + for key, path, metadata in ff: + if path is None: + return web.Response( + content_type="text/html", + status=404, + body="Card not found for task.", + ) + return await stream_html_response(request, path) @handle_exceptions - async def get_card_content_by_hash(self, request): + async def get_cards_data_direct(self, request): """ --- - description: Get specific card of a task + description: Get the data of a card created for a task. Contains any additional updates needed by the card. tags: - Card parameters: @@ -129,51 +159,84 @@ async def get_card_content_by_hash(self, request): - $ref: '#/definitions/Params/Path/run_number' - $ref: '#/definitions/Params/Path/step_name' - $ref: '#/definitions/Params/Path/task_id' - - $ref: '#/definitions/Params/Custom/invalidate' + - $ref: '#/definitions/Params/Path/type' + - $ref: '#/definitions/Params/Path/hash' + - $ref: '#/definitions/Params/Custom/user_set_id' + produces: - - text/html + - application/json responses: "200": - description: Returns the HTML content of a card with the specific hash + description: Returns the data object created by the realtime card with the specific hash "404": - description: Card was not found. + description: Card data was not found. "405": description: invalid HTTP Method schema: $ref: '#/definitions/ResponsesError405' """ - hash = request.match_info.get("hash") task = await self.get_task_by_request(request) if not task: return web.Response( content_type="text/html", status=404, body="Task not found." ) - cards = await get_card_html_for_task_async( - self.cache, - task, - hash, + datastore_root = await self.extract_ds_root_from_metadata(request) + + if datastore_root is None: + return web_response(404, "No datastore found for task") + + pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format( + flow_id=task.get("flow_id"), + run_id=task.get("run_id") or task.get("run_number"), + step_name=task.get("step_name"), + task_id=task.get("task_name") or task.get("task_id"), ) - if cards is None: - return web.Response( - content_type="text/html", - status=404, - body="Card not found for task. Possibly still being processed. Please refresh page to check again.", - ) + card_type = request.match_info.get("type") + card_uuid = request.match_info.get("uuid") + # card_user_id is set in the query parameters + card_user_id = request.query.get("user_set_id") + with self.card_db_client[datastore_root].fetch(pathspec, card_type, card_uuid, card_user_id, object_type="data") as ff: + for key, path, metadata in ff: + if path is None: + return web_response(404, {"error": "Card data not found for task"}) + with open(path) as f: + # TODO: This can become super slow if the file is VERY large + # Add a layer of optimization where we read this file byte by byte + # and return the data in chunks + return web_response(200, { + "id" : card_user_id, + "type": card_type, + "data": json.loads(f.read()) + }) - if cards and hash in cards: - return web.Response(content_type="text/html", body=cards[hash]["html"]) - else: - return web.Response( - content_type="text/html", - status=404, - body="Card not found for task.", - ) + async def get_task_by_request(self, request): + flow_id, run_number, step_name, task_id, _ = get_pathspec_from_request(request) + + run_id_key, run_id_value = translate_run_key(run_number) + task_id_key, task_id_value = translate_task_key(task_id) + + conditions = [ + "flow_id = %s", + "{run_id_key} = %s".format(run_id_key=run_id_key), + "step_name = %s", + "{task_id_key} = %s".format(task_id_key=task_id_key), + ] + values = [flow_id, run_id_value, step_name, task_id_value] + db_response, *_ = await self.db.task_table_postgres.find_records( + fetch_single=True, + conditions=conditions, + values=values, + expanded=True, + ) + if db_response.response_code == 200: + return db_response.body + return None @handle_exceptions - async def get_card_data_by_hash(self, request): + async def get_cards_list_for_task(self, request): """ --- - description: Get the data of a card created for a task. Contains any additional updates needed by the card. + description: Get all identifiers of cards for a task tags: - Card parameters: @@ -181,108 +244,50 @@ async def get_card_data_by_hash(self, request): - $ref: '#/definitions/Params/Path/run_number' - $ref: '#/definitions/Params/Path/step_name' - $ref: '#/definitions/Params/Path/task_id' - - $ref: '#/definitions/Params/Path/hash' - + - $ref: '#/definitions/Params/Builtin/_page' + - $ref: '#/definitions/Params/Builtin/_limit' + - $ref: '#/definitions/Params/Custom/invalidate' produces: - application/json responses: "200": - description: Returns the data object created by the realtime card with the specific hash + description: Returns a list of cards for the specified task + schema: + $ref: '#/definitions/ResponsesCardList' "404": - description: Card data was not found. + description: Task was not found. "405": description: invalid HTTP Method schema: $ref: '#/definitions/ResponsesError405' """ - _hash = request.match_info.get("hash") + task = await self.get_task_by_request(request) if not task: - return web.Response( - content_type="text/html", status=404, body="Task not found." - ) - data = await get_card_data_for_task_async( - self.cache, - task, - _hash, - ) + return web_response(404, {"data": []}) - if data is None: - return web_response(404, {"error": "Card data not found for task"}) - else: - return web_response(200, data) + cards = await get_card_list(self.cache, task, max_wait_time=1) + if cards is None: + # Handle edge: Cache failed to return anything, even errors. + # NOTE: choice of status 200 here is quite arbitrary, as the cache returning None is usually + # caused by a premature request, and cards are not permanently missing. + return web_response(200, {"data": []}) -def _card_data_from_cache(local_cache): - data = local_cache.read_data() - if data is None: - return None - return { - "data": data["data"], - "id": local_cache.card_id, - "type": local_cache.card_type, - } - - -async def get_card_html_for_task_async( - cache_client, - task, - card_hash, -) -> Optional[Dict[str, Dict]]: - """ - Return the card-data from the cache, or nothing. - - Example: - -------- - { - "id": 1, - "hash": "abc123", - "data": {} - } - """ - pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format( - flow_id=task.get("flow_id"), - run_id=task.get("run_id") or task.get("run_number"), - step_name=task.get("step_name"), - task_id=task.get("task_name") or task.get("task_id"), - ) - _local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash) - _html = await wait_until_card_is_ready( - cache_client.cache_manager, _local_cache, max_wait_time=CARD_API_HTML_WAIT_TIME - ) - return _html - - -async def get_card_data_for_task_async( - cache_client, - task, - card_hash, -) -> Optional[Dict[str, Dict]]: - """ - Return the card-data from the cache, or nothing. - - Example: - -------- - { - "id": 1, - "hash": "abc123", - "data": {} - } - """ - pathspec = "{flow_id}/{run_id}/{step_name}/{task_id}".format( - flow_id=task.get("flow_id"), - run_id=task.get("run_id") or task.get("run_number"), - step_name=task.get("step_name"), - task_id=task.get("task_name") or task.get("task_id"), - ) - await cache_client.cache_manager.register(pathspec) - _local_cache = cache_client.cache_manager.get_local_cache(pathspec, card_hash) - if not _local_cache.read_ready(): - # Since this is a data update call we can return a 404 and the client - # should handle calling back so we only await at registration. - return None + card_hashes = [ + {"id": data["id"], "hash": hash, "type": data["type"]} + for hash, data in cards.items() + ] + # paginate list of cards + limit, page, offset = get_pagination_params(request) + _pages = max(len(card_hashes) // limit, 1) + limited_set = card_hashes[offset:][:limit] - return _card_data_from_cache(_local_cache) + response = DBResponse(200, limited_set) + pagination = DBPagination(limit, limit * (page - 1), len(response.body), page) + status, body = format_response_list(request, response, pagination, page, _pages) + + return web_response(status, body) async def get_card_list( diff --git a/services/ui_backend_service/data/cache/card_cache_manager.py b/services/ui_backend_service/data/cache/card_cache_manager.py index 39173f40..fd05d25e 100644 --- a/services/ui_backend_service/data/cache/card_cache_manager.py +++ b/services/ui_backend_service/data/cache/card_cache_manager.py @@ -1,12 +1,11 @@ import time -from subprocess import Popen import os import sys from services.utils import logging import uuid from asyncio.subprocess import Process import asyncio -from .card_cache_service import CardCache, cleanup_non_running_caches +from .card_cache_service import CardCache, safe_wipe_dir import contextlib @@ -26,15 +25,15 @@ "CARD_CACHE_PROCESS_MAX_UPTIME", 3 * 60 # 3 minutes ) CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME = os.environ.get( - "CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME", 4 # 4 seconds + "CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME", 20 # 20 seconds ) -DEFAULT_CACHE_STORAGE_PATH = "/tmp" -CACHE_STORAGE_PATH = os.environ.get( - "CARD_CACHE_STORAGE_PATH", DEFAULT_CACHE_STORAGE_PATH +DEFAULT_CACHE_STORAGE_PATH_ROOT = "/tmp" +CACHE_STORAGE_PATH_ROOT = os.environ.get( + "CARD_CACHE_STORAGE_PATH_ROOT", DEFAULT_CACHE_STORAGE_PATH_ROOT ) CACHE_SERVICE_LOG_STORAGE_ROOT = os.environ.get("CACHE_SERVICE_LOG_STORAGE_ROOT", None) -CARD_API_HTML_WAIT_TIME = float(os.environ.get("CARD_API_HTML_WAIT_TIME", 5)) +CARD_API_HTML_WAIT_TIME = float(os.environ.get("CARD_API_HTML_WAIT_TIME", 3)) async def _get_latest_return_code(process: Process): @@ -57,179 +56,212 @@ async def process_is_running(process: Process): return status == "running" -class AsyncProcessManager: +class AsyncCardCacheProcessManager: processes = { - # "procid": { - # "proc": asyncio.subprocess.Process, - # "started": time.time() + # "":{ + # "processes": { + # "": { + # "proc": asyncio.subprocess.Process, + # "started": time.time() + # } + # }, + # "write_directory": "" # } } + def get_context_dict(self, context): + _x = self.processes.get(context, None) + if _x is None: + return _x + return _x.copy() + + @property + def current_process_dict(self): + return self.processes[self._current_context]["processes"] + + @property + def current_write_directory(self): + return self.processes[self._current_context]["write_directory"] + def __init__(self, logger) -> None: - self.lock = asyncio.Lock() + self.context_lock = asyncio.Lock() self.logger = logger + self._current_context = None + self.update_context() + + async def set_new_context(self): + async with self.context_lock: + old_context = self._current_context + self.update_context() + return old_context + + async def remove_context(self, context): + async with self.context_lock: + if context in self.processes: + del self.processes[context] + + def update_context(self): + _ctx_dict, _ctx = self.create_context_dict() + self.processes.update(_ctx_dict) + self._current_context = _ctx + + def create_context_dict(self): + _ctx = uuid.uuid4().hex[:8] + return { + _ctx: { + "processes": {}, + "write_directory": os.path.join(CACHE_STORAGE_PATH_ROOT, _ctx), + } + }, _ctx def _register_process(self, procid, proc): - self.processes[procid] = { + self.current_process_dict[procid] = { "proc": proc, "started": time.time(), } + asyncio.create_task(proc.wait()) - async def add(self, procid, cmd, logs_file_path=CACHE_SERVICE_LOG_STORAGE_ROOT): - running_proc = await self.is_running(procid) - if running_proc: - return procid, "running" + def _make_command(self, pathspec): + return [ + str(i) + for i in [ + sys.executable, + PATH_TO_CACHE_SERVICE, + "task-updates", + pathspec, + "--uptime-seconds", + CARD_CACHE_PROCESS_MAX_UPTIME, + "--list-frequency", + CARD_LIST_POLLING_FREQUENCY, + "--cache-path", + self.current_write_directory, + "--max-no-card-wait-time", + CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME, + ] + ] + + async def add(self, procid, pathspec, lock_timeout=0.5): # The lock helps to ensure that the processes only get added one at a time # This is important because the processes are added to a shared dictionary - async with self.lock: + procid, status = procid, None + _acquired_lock = False + try: + await asyncio.wait_for(self.context_lock.acquire(), timeout=lock_timeout) + _acquired_lock = True + running_proc = await self.is_running(procid) + if running_proc: + return procid, "running" proc, started_on = self.get(procid) if proc is not None: - await self.remove(procid, delete_item=True) + self.remove_current(procid, delete_item=True) - logs_file = None - if logs_file_path is not None: - logs_file = open( - os.path.join( - logs_file_path, - "card_cache_service_%s.log" % (procid), - ), - "w", - ) - - await self.spawn(procid, cmd, logs_file, logs_file) - return procid, "started" + cmd = self._make_command(pathspec) + await self.spawn(procid, cmd) + status = "started" + except asyncio.TimeoutError: + status = "add-timeout" + except Exception as e: + status = "add-exception" + finally: + if _acquired_lock and self.context_lock.locked(): + self.context_lock.release() + return procid, status def get(self, procid): - proc_dict = self.processes.get(procid, None) + proc_dict = self.current_process_dict.get(procid, None) if proc_dict is not None: return proc_dict["proc"], proc_dict["started"] return None, None - async def spawn(self, procid, cmd, stdout_file, std_err_file=None): + async def spawn(self, procid, cmd,): proc = await asyncio.create_subprocess_exec( *cmd, - stdout=stdout_file, - stderr=std_err_file, shell=False, ) - self._register_process(procid, proc) + self._register_process(procid, proc,) - async def remove(self, procid, delete_item=True): - if procid not in self.processes: + def remove_current(self, procid, delete_item=True): + if procid not in self.current_process_dict: return - self.logger.info("Removing process: %s" % procid) - await self.processes[procid]["proc"].wait() - self.logger.info("Process removed: %s" % procid) - if self.processes[procid]["proc"].stdout is not None: - self.processes[procid]["proc"].stdout.close() if delete_item: - del self.processes[procid] + del self.current_process_dict[procid] async def cleanup(self): - # The lock ensures that when the dictionary is being modified, - # no other process can modify it at the same time. - async with self.lock: - removal_keys = [] - for procid in self.processes: - running_proc = await self.is_running(procid) - if running_proc: - continue - removal_keys.append(procid) - await self.remove(procid, delete_item=False) - for procid in removal_keys: - del self.processes[procid] - return removal_keys + old_context = await self.set_new_context() + _ctx_dict = self.get_context_dict(old_context) + if _ctx_dict is None: + return [] + # Two things to remove (old Keys and old directories) + wait_keys = [] + for pid in _ctx_dict["processes"]: + status = await process_status(_ctx_dict["processes"][pid]["proc"]) + if status not in ["completed", "failed"]: + wait_keys.append(pid) + + if len(wait_keys) > 0: + # sleeping for MAX_PROCESS_TIME so that all the processes writing should have finished + # AND any upstream request accessing the cache-dir should have been attenteded + await asyncio.sleep(CARD_CACHE_PROCESS_MAX_UPTIME) + + _write_dir = _ctx_dict["write_directory"] + safe_wipe_dir(_write_dir) # TODO : Make this happen in a different thread. + await self.remove_context(old_context) async def is_running(self, procid): - if procid not in self.processes: + if procid not in self.current_process_dict: return False - return await process_is_running(self.processes[procid]["proc"]) + return await process_is_running(self.current_process_dict[procid]["proc"]) async def get_status(self, procid): - if procid not in self.processes: + if procid not in self.current_process_dict: return None - return await process_status(self.processes[procid]["proc"]) + return await process_status(self.current_process_dict[procid]["proc"]) async def running_processes(self): - return [procid for procid in self.processes if await self.is_running(procid)] + return [ + procid + for procid in self.current_process_dict + if await self.is_running(procid) + ] class CardCacheManager: def __init__(self) -> None: self.logger = logging.getLogger("CardCacheManager") - self._process_manager = AsyncProcessManager(self.logger) + self._process_manager = AsyncCardCacheProcessManager(self.logger) self._manager_id = uuid.uuid4().hex self.logger.info("CardCacheManager initialized") - def _make_task_command(self, pathspec): - return [ - str(i) - for i in [ - sys.executable, - PATH_TO_CACHE_SERVICE, - "task-updates", - pathspec, - "--uptime-seconds", - CARD_CACHE_PROCESS_MAX_UPTIME, - "--list-frequency", - CARD_LIST_POLLING_FREQUENCY, - "--data-update-frequency", - DATA_UPDATE_POLLING_FREQUENCY, - "--html-update-frequency", - CARD_UPDATE_POLLING_FREQUENCY, - "--cache-path", - CACHE_STORAGE_PATH, - "--max-no-card-wait-time", - CARD_CACHE_PROCESS_NO_CARD_WAIT_TIME - ] - ] - def get_local_cache(self, pathspec, card_hash): cache = CardCache.load_from_disk( - pathspec, card_hash, CACHE_STORAGE_PATH, + pathspec, + card_hash, + self._process_manager.current_write_directory, ) return cache - async def register(self, pathspec): + async def register(self, pathspec, lock_timeout=0.5): proc_id = pathspec is_running = await self._process_manager.is_running(proc_id) if is_running: return proc_id, "running" - - cmd = self._make_task_command(pathspec) - self.logger.info( - "Registering task [%s]" % (pathspec) + self.logger.info("Registering task [%s]" % (pathspec)) + _id, status = await self._process_manager.add( + proc_id, pathspec, lock_timeout=lock_timeout ) - _id, status = await self._process_manager.add(proc_id, cmd, logs_file_path=CACHE_SERVICE_LOG_STORAGE_ROOT) return _id, status async def get_status(self, pathspec): return await self._process_manager.get_status(pathspec) - async def start_process_cleanup_routine(self, interval=60): - try: - while True: - cleanup_keys = await self._process_manager.cleanup() # Perform the cleanup - if len(cleanup_keys) > 0: - self.logger.info( - "Cleaned up processes: %s" % ", ".join(cleanup_keys) - ) - await asyncio.sleep(interval) # Wait for a specified interval before the next cleanup - except asyncio.CancelledError: - self.logger.info("Process cleanup routine cancelled") - - async def cleanup_disk_routine(self, interval=60 * 60 * 4): + async def regular_cleanup_routine(self, interval=60 * 60 * 24 * 5): try: while True: await asyncio.sleep(interval) - # The lock ensure that new processes are not getting created or - # processes are not getting removed when the disk cleanup is happening. - async with self._process_manager.lock: - running_proc_ids = await self._process_manager.running_processes() - cleanup_non_running_caches(CACHE_STORAGE_PATH, CardCache.CACHE_DIR, running_proc_ids) + await self._process_manager.cleanup() except asyncio.CancelledError: - self.logger.info("Disk cleanup routine cancelled") + self.logger.info("Process/Directory cleanup routine cancelled") async def verify_process_has_crashed(cache_manager: CardCacheManager, pathspec): @@ -239,40 +271,6 @@ async def verify_process_has_crashed(cache_manager: CardCacheManager, pathspec): return False -def _get_html_or_refresh(local_cache: CardCache): - if local_cache.read_ready(): - _html = local_cache.read_html() - if _html is not None: - return { - local_cache.card_hash: { - "html": _html, - } - } - else: - local_cache.refresh() - return None - - -async def wait_until_card_is_ready( - cache_manager: CardCacheManager, local_cache: CardCache, max_wait_time=3, frequency=0.1 -): - html = None - start_time = time.time() - await cache_manager.register(local_cache.pathspec) - # At this point in the function the process should aleady be running - while time.time() - start_time < max_wait_time: - html = _get_html_or_refresh(local_cache) - if html is not None: - break - process_failed = await verify_process_has_crashed(cache_manager, local_cache.pathspec) - if process_failed: - cache_manager.logger.error( - f"Card {local_cache.card_hash} has crashed for {local_cache.pathspec}" - ) - break - await asyncio.sleep(frequency) - return html # We ONLY return None if the card is not found after max_wait_time; This is because cards may not be ready - async def list_cards(cache_manager: CardCacheManager, pathspec, max_wait_time=3): await cache_manager.register(pathspec) diff --git a/services/ui_backend_service/data/cache/card_cache_service.py b/services/ui_backend_service/data/cache/card_cache_service.py index 20472404..15a2e1dc 100644 --- a/services/ui_backend_service/data/cache/card_cache_service.py +++ b/services/ui_backend_service/data/cache/card_cache_service.py @@ -57,13 +57,13 @@ def _make_hash(_str): return hashlib.md5(_str.encode()).hexdigest() -def cleanup_non_running_caches(cache_path, cache_dir, pathspecs): - task_dirs = os.listdir(os.path.join(cache_path, cache_dir)) - task_dir_names = [_make_hash(p) for p in pathspecs] - for task_dir in task_dirs: - if task_dir in task_dir_names: - continue - shutil.rmtree(os.path.join(cache_path, cache_dir, task_dir), ignore_errors=True) +def safe_wipe_dir(path): + try: + if os.path.exists(path): + shutil.rmtree(path, ignore_errors=True) + return None + except Exception as e: + return e class CardCache: @@ -172,19 +172,6 @@ def _eligible_for_refresh(update_timings, update_frequency): return False -def _update_card_cache(cache: CardCache, update_type: str, card: Card): - if update_type == "data": - data = card.get_data() - cache._write_data(data) - elif update_type == "html": - card._html = None - html = card.get() - cache._write_html(html) - else: - raise Exception(f"Invalid update type {update_type}") - # def update_cache(self, card_hash, update_type): - - class PeriodicLogger: def __init__(self, logger, n_seconds=5, log_level=logging.INFO): self.logger = logger @@ -204,18 +191,12 @@ class TaskCardCacheService: LIST_FREQUENCY_SECONDS = 5 - DATA_UPDATE_FREQUENCY = 0.2 - - HTML_UPDATE_FREQUENCY = 2 - def __init__( self, task_pathspec, cache_path="./", uptime_seconds=600, list_frequency_seconds=5, - data_update_frequency=0.2, - html_update_frequency=2, max_no_card_wait_time=10 ) -> None: self._task_pathspec = task_pathspec @@ -233,8 +214,6 @@ def __init__( raise MetaflowNotFound(f"Task with pathspec {task_pathspec} not found") self.LIST_FREQUENCY_SECONDS = list_frequency_seconds - self.DATA_UPDATE_FREQUENCY = data_update_frequency - self.HTML_UPDATE_FREQUENCY = html_update_frequency self._max_no_card_wait_time = max_no_card_wait_time @property @@ -270,15 +249,6 @@ def load_all_cards(self): return status, resolved_cards.unresolvable - def update_card_cache(self, card_hash, update_type): - if card_hash not in self._cache: - raise Exception( - f"Card with hash {card_hash} not found for task {self._task_pathspec}" - ) - cache = self._cache[card_hash] - card = self._cards[card_hash] - _update_card_cache(cache, update_type, card) - def write_available_cards(self): _cardinfo = {} for chash in self._cards: @@ -312,17 +282,19 @@ def _get_cards_safely(self) -> ResolvedCards: return ResolvedCards([], True,) # On other errors fail away too! def refresh_loop(self): - timings = {"data": None, "html": None, "list": None} + timings = {"card_info": {}, "list": None} start_time = time.time() self.logger.info("Starting cache refresh loop for %s" % self._task_pathspec) cards_are_unresolvable = False _sleep_time = 0.25 + while True: if time.time() - start_time > self._uptime_seconds: # exit condition break if _eligible_for_refresh(timings["list"], self.LIST_FREQUENCY_SECONDS): list_status, cards_are_unresolvable = self.load_all_cards() if list_status: + timings["list"] = time.time() self.write_available_cards() cache_is_empty = len(self._cache) == 0 @@ -337,13 +309,6 @@ def refresh_loop(self): self.logger.error(f"Cache is empty for {self._task_pathspec} and no cards were unresolvable") break - for card_hash in self._cache: - if _eligible_for_refresh(timings["html"], self.HTML_UPDATE_FREQUENCY): - self.update_card_cache(card_hash, "html") - - if _eligible_for_refresh(timings["data"], self.DATA_UPDATE_FREQUENCY): - self.update_card_cache(card_hash, "data") - time.sleep(_sleep_time) @@ -357,16 +322,12 @@ def cli(): @click.option("--cache-path", default="./", help="Path to the cache") @click.option("--uptime-seconds", default=600, help="Timeout for the cache service") @click.option("--list-frequency", default=5, help="Frequency for the listing cards to populate the cache") -@click.option("--data-update-frequency", default=0.2, help="Frequency for the data update") -@click.option("--html-update-frequency", default=2, help="Frequency for the html update") @click.option("--max-no-card-wait-time", default=10, help="Maximum time to wait a card to be present") def task_updates( pathspec, cache_path, uptime_seconds, list_frequency, - data_update_frequency, - html_update_frequency, max_no_card_wait_time, ): cache_service = TaskCardCacheService( @@ -374,8 +335,6 @@ def task_updates( cache_path=cache_path, uptime_seconds=uptime_seconds, list_frequency_seconds=list_frequency, - data_update_frequency=data_update_frequency, - html_update_frequency=html_update_frequency, max_no_card_wait_time=max_no_card_wait_time, ) cache_service.refresh_loop() diff --git a/services/ui_backend_service/data/cache/card_datastore_gets.py b/services/ui_backend_service/data/cache/card_datastore_gets.py new file mode 100644 index 00000000..ce6c5b5a --- /dev/null +++ b/services/ui_backend_service/data/cache/card_datastore_gets.py @@ -0,0 +1,143 @@ +from metaflow.plugins.datastores.azure_storage import AzureStorage +from metaflow.plugins.datastores.gs_storage import GSStorage +from metaflow.plugins.datastores.s3_storage import S3Storage +from metaflow.metaflow_config import DEFAULT_DATASTORE, CARD_SUFFIX +from metaflow.plugins.cards.card_datastore import ( + CardPathSuffix, + CardDatastore, + CardNameSuffix, +) +import time +from typing import Tuple, Any + +import os + + +def _make_path(base_pth, pathspec=None, with_steps=False, suffix=CardPathSuffix.CARD): + sysroot = base_pth + flow_name, run_id, step_name, task_id = pathspec.split("/") + + # We have a condition that checks for `with_steps` because + # when cards were introduced there was an assumption made + # about task-ids being unique. + # This assumption is incorrect since pathspec needs to be + # unique but there is no such guarantees on task-ids. + # This is why we have a `with_steps` flag that allows + # constructing the path with and without steps so that + # older-cards (cards with a path without `steps/` in them) + # can also be accessed by the card cli and the card client. + if with_steps: + pth_arr = [ + sysroot, + flow_name, + "runs", + run_id, + "steps", + step_name, + "tasks", + task_id, + suffix, + ] + else: + pth_arr = [ + sysroot, + flow_name, + "runs", + run_id, + "tasks", + task_id, + suffix, + ] + if sysroot == "" or sysroot is None: + pth_arr.pop(0) + return os.path.join(*pth_arr) + + +def get_storage_client(storage_type, storage_root): + _base_root = os.path.join(storage_root, CARD_SUFFIX) + if storage_type == "s3": + return S3Storage(_base_root) + if storage_type == "gs": + return GSStorage(_base_root) + if storage_type == "azure": + return AzureStorage(_base_root) + + +class DynamicCardGetClients: + + def __init__(self, client_refresh_timings=60 * 60) -> None: + self._init_time = time.time() + self.client_refresh_timings = client_refresh_timings + self._clients = {} + + def __getitem__(self, storage_root) -> "CardGetClient": + if storage_root not in self._clients: + self._clients[storage_root] = CardGetClient(storage_root, self.client_refresh_timings) + return self._clients[storage_root] + + +class CardGetClient: + + def __init__(self, datastore_root , client_refresh_timings=60 * 60) -> None: + self._init_time = time.time() + self.client_refresh_timings = client_refresh_timings + self._datastore_root = datastore_root + self._setup_client(datastore_root) + + def _setup_client(self, datastore_root): + self._client = get_storage_client(DEFAULT_DATASTORE, datastore_root) + self._init_time = time.time() + + @property + def client(self): + if time.time() - self._init_time > self.client_refresh_timings: + self._setup_client( + self._datastore_root + ) + return self._client + + def _make_card_path( + self, pathspec, card_type, card_uuid, card_user_id, object_type="card" + ): + path_suffix, name_suffix = CardPathSuffix.CARD, CardNameSuffix.CARD + if object_type == "data": + path_suffix, name_suffix = CardPathSuffix.DATA, CardNameSuffix.DATA + + path = _make_path( + "", + pathspec, + with_steps=True, + suffix=path_suffix, + ) + + return CardDatastore.get_card_location( + path, + card_type, + card_uuid, + card_user_id, + name_suffix, + ) + + def download( + self, pathspec, card_type, card_uuid, card_user_id, object_type="card" + ): + final_path = self._make_card_path( + pathspec, card_type, card_uuid, card_user_id, object_type + ) + with self.client.load_bytes([final_path]) as ff: + for key, file_path, metadata in ff: + with open(file_path) as f: + return f.read() + + def fetch( + self, + pathspec, + card_type, + card_uuid, + card_user_id, + object_type="card", + ) -> Tuple[str, str, Any]: + final_path = self._make_card_path( + pathspec, card_type, card_uuid, card_user_id, object_type + ) + return self.client.load_bytes([final_path]) diff --git a/services/ui_backend_service/data/cache/store.py b/services/ui_backend_service/data/cache/store.py index 037dc894..25166f52 100644 --- a/services/ui_backend_service/data/cache/store.py +++ b/services/ui_backend_service/data/cache/store.py @@ -37,7 +37,9 @@ CACHE_DAG_STORAGE_LIMIT = int(os.environ.get("CACHE_DAG_STORAGE_LIMIT", DISK_SIZE // 4)) CACHE_LOG_MAX_ACTIONS = int(os.environ.get("CACHE_LOG_MAX_ACTIONS", 8)) CACHE_LOG_STORAGE_LIMIT = int(os.environ.get("CACHE_LOG_STORAGE_LIMIT", DISK_SIZE // 5)) -CARD_CACHE_DISK_CLEANUP_INTERVAL = int(os.environ.get("CARD_CACHE_DISK_CLEANUP_INTERVAL", 60 * 60 * 4)) +CARD_CACHE_REGULAR_CLEANUP_INTERVAL = int( + os.environ.get("CARD_CACHE_PROCESS_CLEANUP_INTERVAL", 60 * 60 * 24 * 5) +) # 5 Days class CacheStore(object): @@ -123,18 +125,15 @@ def __init__(self, event_emitter): self.cache_manager = CardCacheManager() async def start_cache(self): - self._cleanup_coroutine = asyncio.create_task( - self.cache_manager.start_process_cleanup_routine(120) - ) - self._disk_cleanup_coroutine = asyncio.create_task( - self.cache_manager.cleanup_disk_routine(CARD_CACHE_DISK_CLEANUP_INTERVAL) + self._regualar_cleanup_routine = asyncio.create_task( + self.cache_manager.regular_cleanup_routine( + CARD_CACHE_REGULAR_CLEANUP_INTERVAL + ) ) async def stop_cache(self): - self._cleanup_coroutine.cancel() - await self._cleanup_coroutine - self._disk_cleanup_coroutine.cancel() - await self._disk_cleanup_coroutine + self._regualar_cleanup_routine.cancel() + await self._regualar_cleanup_routine class ArtifactCacheStore(object):