Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend free up space and propery check token expiration #716

Merged
merged 10 commits into from
Oct 15, 2024
41 changes: 31 additions & 10 deletions cdci_data_analysis/flask_app/dispatcher_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def __init__(self, app,
self.app = app

temp_scratch_dir = None
temp_job_id = None

self.set_sentry_sdk(getattr(self.app.config.get('conf'), 'sentry_url', None))

Expand Down Expand Up @@ -283,6 +284,7 @@ def __init__(self, app,
self.set_scratch_dir(self.par_dic['session_id'], job_id=self.job_id, verbose=verbose)
# temp_job_id = self.job_id
temp_scratch_dir = self.scratch_dir
temp_job_id = self.job_id
if not data_server_call_back:
try:
self.set_temp_dir(self.par_dic['session_id'], verbose=verbose)
Expand Down Expand Up @@ -363,7 +365,7 @@ def __init__(self, app,
finally:
self.logger.info("==> clean-up temporary directory")
self.log_query_progression("before clear_temp_dir")
self.clear_temp_dir(temp_scratch_dir=temp_scratch_dir)
self.clear_temp_dir(temp_scratch_dir=temp_scratch_dir, temp_job_id=temp_job_id)
self.log_query_progression("after clear_temp_dir")

logger.info("constructed %s:%s for data_server_call_back=%s", self.__class__, self, data_server_call_back)
Expand All @@ -386,7 +388,7 @@ def free_up_space(app):
hard_minimum_folder_age_days = app_config.hard_minimum_folder_age_days
# let's pass the minimum age the folders to be deleted should have
soft_minimum_folder_age_days = request.args.get('soft_minimum_age_days', None)
if soft_minimum_folder_age_days is None or isinstance(soft_minimum_folder_age_days, int):
if soft_minimum_folder_age_days is None:
soft_minimum_folder_age_days = app_config.soft_minimum_folder_age_days
else:
soft_minimum_folder_age_days = int(soft_minimum_folder_age_days)
Expand All @@ -404,8 +406,11 @@ def free_up_space(app):
dict_analysis_parameters = json.load(analysis_parameters_file)
token = dict_analysis_parameters.get('token', None)
token_expired = False
if token is not None and token['exp'] < current_time_secs:
token_expired = True
if token is not None:
try:
tokenHelper.get_decoded_token(token, secret_key)
except jwt.exceptions.ExpiredSignatureError:
token_expired = True

job_monitor_path = os.path.join(scratch_dir, 'job_monitor.json')
with open(job_monitor_path, 'r') as jm_file:
Expand Down Expand Up @@ -433,15 +438,28 @@ def free_up_space(app):
for d in list_scratch_dir_to_delete:
shutil.rmtree(d)

list_lock_files = sorted(glob.glob(".lock_*"), key=os.path.getatime)
num_lock_files_removed = 0
for l in list_lock_files:
lock_file_job_id = l.split('_')[-1]
list_job_id_scratch_dir = glob.glob(f"scratch_sid_*_jid_{lock_file_job_id}*")
if len(list_job_id_scratch_dir) == 0:
os.remove(l)
num_lock_files_removed += 1

post_clean_space_space = shutil.disk_usage(os.getcwd())
post_clean_available_space = format_size(post_clean_space_space.free, format_returned='M')

list_scratch_dir = sorted(glob.glob("scratch_sid_*_jid_*"))
logger.info(f"Number of scratch folder after clean-up: {len(list_scratch_dir)}.\n"
f"Removed {len(list_scratch_dir_to_delete)} scratch directories, "
f"and now the available amount of space is {post_clean_available_space}")

result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories"
list_lock_files = sorted(glob.glob(".lock_*"))
logger.info(f"Number of scratch folder after clean-up: {len(list_scratch_dir)}, "
f"number of lock files after clean-up: {len(list_lock_files)}.\n"
f"Removed {len(list_scratch_dir_to_delete)} scratch directories "
f"and {num_lock_files_removed} lock files.\n"
f"Now the available amount of space is {post_clean_available_space}")

result_scratch_dir_deletion = f"Removed {len(list_scratch_dir_to_delete)} scratch directories, " \
f"and {num_lock_files_removed} lock files."
logger.info(result_scratch_dir_deletion)

return jsonify(dict(output_status=result_scratch_dir_deletion))
Expand Down Expand Up @@ -951,11 +969,14 @@ def move_temp_content(self):
file_full_path = os.path.join(self.temp_dir, f)
shutil.copy(file_full_path, self.scratch_dir)

def clear_temp_dir(self, temp_scratch_dir=None):
def clear_temp_dir(self, temp_scratch_dir=None, temp_job_id=None):
if hasattr(self, 'temp_dir') and os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
if temp_scratch_dir is not None and temp_scratch_dir != self.scratch_dir and os.path.exists(temp_scratch_dir):
shutil.rmtree(temp_scratch_dir)
if temp_job_id is not None and os.path.exists(f".lock_{temp_job_id}"):
os.remove(f".lock_{temp_job_id}")


@staticmethod
def validated_download_file_path(basepath, filename, should_exist=True):
Expand Down
9 changes: 9 additions & 0 deletions cdci_data_analysis/pytest_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,15 @@
for d in dir_list:
shutil.rmtree(d)

@staticmethod
def remove_lock_files(job_id=None):
if job_id is None:
lock_files = glob.glob('.lock_*')
else:
lock_files = glob.glob(f'.lock_{job_id}')

Check warning on line 1576 in cdci_data_analysis/pytest_fixtures.py

View check run for this annotation

Codecov / codecov/patch

cdci_data_analysis/pytest_fixtures.py#L1576

Added line #L1576 was not covered by tests
for f in lock_files:
os.remove(f)

@staticmethod
def remove_download_folders(id=None):
if id is None:
Expand Down
11 changes: 8 additions & 3 deletions tests/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -2605,6 +2605,7 @@ def test_email_t1_t2(dispatcher_long_living_fixture,
("hard_minimum_folder_age_days", 60)], indirect=True)
def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_minimum_age_days):
DispatcherJobState.remove_scratch_folders()
DispatcherJobState.remove_lock_files()

server = dispatcher_live_fixture

Expand All @@ -2622,6 +2623,8 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m
"exp": int(time.time()) - 15
}

expired_token_encoded = jwt.encode(expired_token, secret_key, algorithm='HS256')

params = {
'query_status': 'new',
'product_type': 'dummy',
Expand Down Expand Up @@ -2654,12 +2657,12 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m
analysis_parameters_path = os.path.join(scratch_dir, 'analysis_parameters.json')
with open(analysis_parameters_path) as analysis_parameters_file:
dict_analysis_parameters = json.load(analysis_parameters_file)
dict_analysis_parameters['token'] = expired_token
# dict_analysis_parameters['token'] = expired_token
dict_analysis_parameters['token'] = expired_token_encoded
with open(analysis_parameters_path, 'w') as dict_analysis_parameters_outfile:
my_json_str = json.dumps(dict_analysis_parameters, indent=4)
dict_analysis_parameters_outfile.write(u'%s' % my_json_str)


params = {
'token': encoded_token,
'soft_minimum_age_days': soft_minimum_age_days
Expand All @@ -2674,7 +2677,9 @@ def test_free_up_space(dispatcher_live_fixture, number_folders_to_delete, soft_m

assert 'output_status' in jdata

assert jdata['output_status'] == f"Removed {number_folders_to_delete} scratch directories"
number_lock_files_deleted = 0 if number_folders_to_delete < number_analysis_to_run else 1
assert jdata['output_status'] == (f"Removed {number_folders_to_delete} scratch directories, "
f"and {number_lock_files_deleted} lock files.")

assert len(glob.glob("scratch_sid_*_jid_*")) == number_analysis_to_run - number_folders_to_delete

Expand Down
Loading