Skip to content

Commit

Permalink
[CERTTF-303] Introduce support for file attachments
Browse files Browse the repository at this point in the history
  • Loading branch information
boukeas committed Apr 10, 2024
1 parent ee1b7ce commit 0c13b06
Show file tree
Hide file tree
Showing 12 changed files with 657 additions and 121 deletions.
64 changes: 63 additions & 1 deletion agent/testflinger_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,27 @@
import json
import logging
import os
from pathlib import Path
import shutil
import tarfile
import tempfile

from testflinger_agent.job import TestflingerJob
from testflinger_agent.errors import TFServerError

logger = logging.getLogger(__name__)


def tmp_dir() -> Path:
"""Create a temporary directory and return the path to it"""
return Path(tempfile.mkdtemp())


def secure_filter(member, path):
"""Combine the `data` and `tar` filter from `tarfile`"""
return tarfile.tar_filter(tarfile.data_filter(member, path), path)


class TestflingerAgent:
def __init__(self, client):
self.client = client
Expand Down Expand Up @@ -111,6 +124,50 @@ def mark_device_offline(self):
# Create the offline file, this should work even if it exists
open(self.get_offline_files()[0], "w").close()

def unpack_attachments(self, job_data: dict, cwd: Path):
"""Download and unpack the attachments associated with a job"""

# download attachment archive to a unique temporary folder
archive_dir = tmp_dir()
archive_path = self.client.get_attachments(
job_id=job_data["job_id"], path=archive_dir
)
if archive_path is None:
raise FileNotFoundError
# extract archive data to a unique temporary folder and clean up
extracted_dir = tmp_dir()
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(extracted_dir, filter=secure_filter)
shutil.rmtree(archive_dir)

# [TODO] clarify if this is an appropriate destination for extraction
attachment_dir = cwd / "attachments"

# move/rename extracted archive files to their specified destinations
for phase in ["provision", "firmware_update", "test"]:
try:
attachments = job_data[f"{phase}_data"]["attachments"]
except KeyError:
continue
for attachment in attachments:
original = Path(attachment["local"])
if original.is_absolute():
# absolute filenames become relative
original = original.relative_to(original.root)
# use renaming destination, if provided
# otherwise use the original one
destination_path = (
attachment_dir / phase / attachment.get("agent", original)
)
try:
# create intermediate path to destination, if required
destination_path.resolve().parent.mkdir(parents=True)
except FileExistsError:
pass
# move file
source_path = extracted_dir / phase / original
shutil.move(source_path, destination_path)

def process_jobs(self):
"""Coordinate checking for new jobs and handling them if they exists"""
TEST_PHASES = [
Expand All @@ -136,7 +193,13 @@ def process_jobs(self):
self.client.config.get("execution_basedir"), job.job_id
)
os.makedirs(rundir)

self.client.post_agent_data({"job_id": job.job_id})

# handle job attachments, if any
if job_data.get("attachments", "none") == "complete":
self.unpack_attachments(job_data, cwd=Path(rundir))

# Dump the job data to testflinger.json in our execution dir
with open(os.path.join(rundir, "testflinger.json"), "w") as f:
json.dump(job_data, f)
Expand All @@ -153,7 +216,6 @@ def process_jobs(self):
break
self.client.post_job_state(job.job_id, phase)
self.set_agent_state(phase)

exitcode = job.run_test_phase(phase, rundir)

self.client.post_influx(phase, exitcode)
Expand Down
25 changes: 25 additions & 0 deletions agent/testflinger_agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import json
import os
from pathlib import Path
import requests
import shutil
import tempfile
Expand Down Expand Up @@ -105,6 +106,30 @@ def check_jobs(self):
# Wait a little extra before trying again
time.sleep(60)

def get_attachments(self, job_id: int, path: Path):
"""Download the attachment archive associated with a job
:param job_id:
Id for the job
:param path:
Where to save the attachment archive. If it is a folder,
the default filename `attachments.tar.gz` is used.
:returns path or None:
Where the attachment archive was saved or `None` id nothing
was retrieved
"""
uri = urljoin(self.server, f"/v1/job/{job_id}/attachments")
if path.is_dir():
path = path / "attachments.tar.gz"
with requests.get(uri, stream=True, timeout=600) as response:
if response.status_code != 200:
return None
with open(path, "wb") as attachments:
for chunk in response.iter_content(chunk_size=4096):
if chunk:
attachments.write(chunk)
return path

def check_job_state(self, job_id):
job_data = self.get_result(job_id)
if job_data:
Expand Down
49 changes: 49 additions & 0 deletions agent/testflinger_agent/tests/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import os
from pathlib import Path
import re
import shutil
import tarfile
import tempfile
import uuid
import requests_mock as rmock
Expand Down Expand Up @@ -92,6 +95,52 @@ def test_check_and_run_test(self, agent, requests_mock):
).read()
assert "test1" == testlog.splitlines()[-1].strip()

def test_attachments(self, agent, tmp_path):
# create file to be used as attachment
attachment = tmp_path / "random.bin"
attachment.write_bytes(os.urandom(128))
# create gzipped archive containing attachment
archive = tmp_path / "attachments.tar.gz"
with tarfile.open(archive, "w:gz") as attachments:
attachments.add(attachment, arcname=f"test/{attachment}")
# job data specifies how the attachment will be handled
fake_job_data = {
"job_id": str(uuid.uuid1()),
"job_queue": "test",
"test_data": {
"attachments": [
{
"local": str(attachment),
"agent": str(attachment.name),
}
]
},
"attachments": "complete",
}

with rmock.Mocker() as mocker:
mocker.post(rmock.ANY, status_code=200)
# mock response to requesting jobs
mocker.get(
re.compile(r"/v1/job\?queue=\w+"),
[{"text": json.dumps(fake_job_data)}, {"text": "{}"}],
)
# mock response to requesting job attachments
mocker.get(
re.compile(r"/v1/job/[-a-z0-9]+/attachments"),
content=archive.read_bytes(),
)
# mock response to results request
mocker.get(re.compile(r"/v1/result/"))

# request and process the job (should unpack the archive)
with patch("shutil.rmtree"):
agent.process_jobs()

# check that the attachment is where it's supposed to be
basepath = Path(self.tmpdir) / fake_job_data["job_id"]
assert (basepath / "attachments/test" / attachment.name).exists()

def test_config_vars_in_env(self, agent, requests_mock):
self.config["test_command"] = (
"bash -c 'echo test_string is $test_string'"
Expand Down
97 changes: 93 additions & 4 deletions cli/testflinger_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import logging
import os
import sys
from pathlib import Path
import tarfile
import tempfile
import time
from argparse import ArgumentParser
from datetime import datetime
Expand Down Expand Up @@ -119,6 +122,11 @@ def _print_queue_message():
)


def tmp_dir() -> Path:
"""Create a temporary directory and return the path to it"""
return Path(tempfile.mkdtemp())


class TestflingerCli:
"""Class for handling the Testflinger CLI"""

Expand Down Expand Up @@ -303,6 +311,53 @@ def configure(self):
print("{} = {}".format(k, v))
print()

@staticmethod
def pack_attachments(job_data: dict) -> Path | None:
"""Return the path to a compressed tarball of attachments"""

# pull together the attachement data per phase
phases = ["provision", "firmware_update", "test"]
attachment_data = {}
for phase in phases:
phase_str = f"{phase}_data"
try:
attachment_data[phase] = job_data[phase_str]["attachments"]
except KeyError:
pass
if not attachment_data:
return None

# create archive in a unique temporary folder
archive_dir = tmp_dir()
archive_path = archive_dir / "archive.tar.gz"

# use `tarfile` instead of `shutil` because:
# > [it] handles directories, regular files, hardlinks, symbolic links,
# > fifos, character devices and block devices and is able to acquire
# > and restore file information like timestamp, access permissions and
# > owner.
# ref: https://docs.python.org/3/library/tarfile.html

with tarfile.open(archive_path, "w:gz") as tar:
for phase, attachments in attachment_data.items():
phase_path = Path(phase)
# generate filenames for the current phase
filenames = (
Path(attachment["local"]) for attachment in attachments
)
# add the corresponding files to the archive
# using a phase-prefixed archive filename
for filename in filenames:
if filename.is_absolute():
# absolute filenames become relative
relative = filename.relative_to(filename.root)
archive_name = phase_path / relative
else:
archive_name = phase_path / filename
tar.add(filename, arcname=archive_name)

return archive_path

def submit(self):
"""Submit a new test job to the server"""
if self.args.filename == "-":
Expand All @@ -317,8 +372,16 @@ def submit(self):
raise SystemExit(
"File not found: {}".format(self.args.filename)
) from exc
job_id = self.submit_job_data(data)
queue = yaml.safe_load(data).get("job_queue")
job_dict = yaml.safe_load(data)

archive_path = self.pack_attachments(job_dict)
job_id = self.submit_job_data(job_dict)
if archive_path is not None:
# follow job submission with attachment submission, if required
self.submit_job_attachments(job_id, archive_path)
archive_path.unlink()

queue = job_dict.get("job_queue")
self.history.new(job_id, queue)
if self.args.quiet:
print(job_id)
Expand All @@ -328,10 +391,10 @@ def submit(self):
if self.args.poll:
self.do_poll(job_id)

def submit_job_data(self, data):
def submit_job_data(self, data_dict):
"""Submit data that was generated or read from a file as a test job"""
try:
job_id = self.client.submit_job(data)
job_id = self.client.submit_job(data_dict)
except client.HTTPError as exc:
if exc.status == 400:
raise SystemExit(
Expand All @@ -351,6 +414,32 @@ def submit_job_data(self, data):
) from exc
return job_id

def submit_job_attachments(self, job_id: str, path: Path):
"""Submit attachments archive for a job to the server
:param job_id:
ID for the test job
:param path:
The path to the attachment archive
"""
try:
self.client.post_attachment(job_id, path)
except client.HTTPError as exc:
if exc.status == 400:
raise SystemExit(
f"Unable to submit attachment archive for {job_id}"
) from exc
if exc.status == 404:
raise SystemExit(
"Received 404 error from server. Are you "
"sure this is a testflinger server?"
) from exc
# This shouldn't happen, so let's get more information
raise SystemExit(
"Unexpected error status from testflinger "
"server: {}".format(exc.status)
) from exc

def show(self):
"""Show the requested job JSON for a specified JOB_ID"""
try:
Expand Down
Loading

0 comments on commit 0c13b06

Please sign in to comment.