Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CERTTF-303] Introduce support for file attachments #250

Merged
merged 24 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0c13b06
[CERTTF-303] Introduce support for file attachments
boukeas Apr 3, 2024
036f2ed
fix: compatibility with Python 3.8
boukeas Apr 10, 2024
c909f25
chore: correct typos
boukeas Apr 11, 2024
40f23b9
style: resolve some review comments
boukeas Apr 11, 2024
1d35713
refactor: use `data` filter for extracting from archive (superset of …
boukeas Apr 11, 2024
1de743d
chore: add/improve some error handling messages
boukeas Apr 11, 2024
4559592
refactor: introduce `testflinger_agent.config` for configuration cons…
boukeas Apr 11, 2024
2680878
refactor: use `tempfile.NamedTemporaryFile` in the CLI for attachments
boukeas Apr 11, 2024
88ba428
fix: actually include `testflinger_agent.config`
boukeas Apr 11, 2024
adcd7ac
refactor: use `tempfile` context managers to handle temporary objects…
boukeas Apr 11, 2024
2b19fdc
refactor: remove redundant if-statement
boukeas Apr 11, 2024
443c55c
feat: CLI retries when submitting attachments
boukeas Apr 12, 2024
2efade1
docs: add documentation for attachments
boukeas Apr 12, 2024
7a71243
refactor: simplify approach to attachment packing and unpacking
boukeas Apr 12, 2024
5141e9d
chore: use `sys.exit` instead of raising `SystemExit` in CLI's modifi…
boukeas Apr 15, 2024
86336bc
style: improve docstring
boukeas Apr 15, 2024
7efb95a
style: improve return statement to be more pythonic
boukeas Apr 15, 2024
774dbb1
fix: correct exponential backoff
boukeas Apr 15, 2024
e3d1be7
refactor: 'get_attachments' method no longer returns anything
boukeas Apr 15, 2024
37fbc38
fix: make unpacking stricter and ensure recovery in case of failure
boukeas Apr 15, 2024
df53f7f
fix: add path resolution to attachment packing, to handle dodgy cases
boukeas Apr 15, 2024
557e902
refactor: rename the derived `attachments` field and modify its seman…
boukeas Apr 15, 2024
7050f20
test: add test for attempting to extract attachments out of phase folder
boukeas Apr 16, 2024
fcf43cd
chore: improve CLI messages in `put_file` in error cases
boukeas Apr 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion agent/testflinger_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,39 @@
import json
import logging
import os
from pathlib import Path
import shutil
import tarfile
import tempfile

from testflinger_agent.job import TestflingerJob
from testflinger_agent.errors import TFServerError
from testflinger_agent.config import ATTACHMENTS_DIR

logger = logging.getLogger(__name__)


def secure_filter(member, path):
"""Combine the `data` filter with custom attachment filtering

Makes sure that the starting folder for all attachments coincides
with one of the supported phases, i.e. that the attachment archive
has been created properly and no attachment will be extracted to an
unexpected location.
"""
try:
resolved = Path(member.name).resolve().relative_to(Path.cwd())
boukeas marked this conversation as resolved.
Show resolved Hide resolved
except ValueError as error:
# essentially trying to extract higher than the attachments folder
raise tarfile.OutsideDestinationError(member, path) from error
if not str(resolved).startswith(
("provision/", "firmware_update/", "test/")
):
# trying to extract in an invalid folder, under the attachments folder
raise tarfile.OutsideDestinationError(member, path)
return tarfile.data_filter(member, path)


class TestflingerAgent:
def __init__(self, client):
self.client = client
Expand Down Expand Up @@ -111,6 +136,18 @@ def mark_device_offline(self):
# Create the offline file, this should work even if it exists
open(self.get_offline_files()[0], "w").close()

def unpack_attachments(self, job_data: dict, cwd: Path):
"""Download and unpack the attachments associated with a job"""
job_id = job_data["job_id"]

with tempfile.NamedTemporaryFile(suffix="tar.gz") as archive_tmp:
archive_path = Path(archive_tmp.name)
# download attachment archive
self.client.get_attachments(job_id, path=archive_path)
# extract archive into the attachments folder
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(cwd / ATTACHMENTS_DIR, filter=secure_filter)

def process_jobs(self):
"""Coordinate checking for new jobs and handling them if they exists"""
TEST_PHASES = [
Expand All @@ -136,7 +173,9 @@ def process_jobs(self):
self.client.config.get("execution_basedir"), job.job_id
)
os.makedirs(rundir)

self.client.post_agent_data({"job_id": job.job_id})

# Dump the job data to testflinger.json in our execution dir
with open(os.path.join(rundir, "testflinger.json"), "w") as f:
json.dump(job_data, f)
Expand All @@ -146,14 +185,19 @@ def process_jobs(self):
) as f:
json.dump({}, f)

# handle job attachments, if any
# (always after creating "testflinger.json", for reporting
# in case of an unpacking error)
if job_data.get("attachments_status") == "complete":
self.unpack_attachments(job_data, cwd=Path(rundir))

for phase in TEST_PHASES:
# First make sure the job hasn't been cancelled
if self.client.check_job_state(job.job_id) == "cancelled":
logger.info("Job cancellation was requested, exiting.")
break
self.client.post_job_state(job.job_id, phase)
self.set_agent_state(phase)

exitcode = job.run_test_phase(phase, rundir)

self.client.post_influx(phase, exitcode)
Expand Down
21 changes: 21 additions & 0 deletions agent/testflinger_agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging
import json
import os
from pathlib import Path
import requests
import shutil
import tempfile
Expand Down Expand Up @@ -105,6 +106,26 @@ def check_jobs(self):
# Wait a little extra before trying again
time.sleep(60)

def get_attachments(self, job_id: str, path: Path):
"""Download the attachment archive associated with a job

:param job_id:
Id for the job
:param path:
Where to save the attachment archive
"""
uri = urljoin(self.server, f"/v1/job/{job_id}/attachments")
with requests.get(uri, stream=True, timeout=600) as response:
if response.status_code != 200:
logger.error(
f"Unable to retrieve attachments for job {job_id} "
f"(error: {response.status_code})"
)
raise TFServerError(response.status_code)
with open(path, "wb") as attachments:
for chunk in response.iter_content(chunk_size=4096):
attachments.write(chunk)

def check_job_state(self, job_id):
job_data = self.get_result(job_id)
if job_data:
Expand Down
17 changes: 17 additions & 0 deletions agent/testflinger_agent/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (C) 2024 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Configuration constants for the Testflinger agent"""

ATTACHMENTS_DIR = "attachments"
135 changes: 125 additions & 10 deletions agent/testflinger_agent/tests/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import os
from pathlib import Path
import re
import shutil
import tarfile
import tempfile
import uuid
import requests_mock as rmock
Expand All @@ -9,6 +12,7 @@
from mock import patch

import testflinger_agent
from testflinger_agent.config import ATTACHMENTS_DIR
from testflinger_agent.errors import TFServerError
from testflinger_agent.client import TestflingerClient as _TestflingerClient
from testflinger_agent.agent import TestflingerAgent as _TestflingerAgent
Expand Down Expand Up @@ -92,46 +96,157 @@ def test_check_and_run_test(self, agent, requests_mock):
).read()
assert "test1" == testlog.splitlines()[-1].strip()

def test_attachments(self, agent, tmp_path):
# create file to be used as attachment
attachment = tmp_path / "random.bin"
attachment.write_bytes(os.urandom(128))
# create gzipped archive containing attachment
archive = tmp_path / "attachments.tar.gz"
with tarfile.open(archive, "w:gz") as attachments:
attachments.add(attachment, arcname="test/random.bin")
# job data specifies how the attachment will be handled
job_id = str(uuid.uuid1())
mock_job_data = {
"job_id": job_id,
"job_queue": "test",
"test_data": {
"attachments": [
{
"local": str(attachment),
"agent": str(attachment.name),
}
]
},
"attachments_status": "complete",
}

with rmock.Mocker() as mocker:
mocker.post(rmock.ANY, status_code=200)
# mock response to requesting jobs
mocker.get(
re.compile(r"/v1/job\?queue=\w+"),
[{"text": json.dumps(mock_job_data)}, {"text": "{}"}],
)
# mock response to requesting job attachments
mocker.get(
re.compile(r"/v1/job/[-a-z0-9]+/attachments"),
content=archive.read_bytes(),
)
# mock response to results request
mocker.get(re.compile(r"/v1/result/"))

# request and process the job (should unpack the archive)
with patch("shutil.rmtree"):
agent.process_jobs()

# check the request history to confirm that:
# - there is a request to the job retrieval endpoint
# - there a request to the attachment retrieval endpoint
history = mocker.request_history
assert history[0].path == "/v1/job"
assert history[2].path == f"/v1/job/{job_id}/attachments"

# check that the attachment is where it's supposed to be
basepath = Path(self.tmpdir) / mock_job_data["job_id"]
attachment = basepath / ATTACHMENTS_DIR / "test" / attachment.name
assert attachment.exists()

def test_attachments_insecure(self, agent, tmp_path):
# create file to be used as attachment
attachment = tmp_path / "random.bin"
attachment.write_bytes(os.urandom(128))
# create gzipped archive containing attachment
archive = tmp_path / "attachments.tar.gz"
with tarfile.open(archive, "w:gz") as attachments:
# note: archive name should start with a phase folder
attachments.add(attachment, arcname="random.bin")
# job data specifies how the attachment will be handled
job_id = str(uuid.uuid1())
mock_job_data = {
"job_id": job_id,
"job_queue": "test",
"test_data": {
"attachments": [
{
"local": str(attachment),
"agent": str(attachment.name),
}
]
},
"attachments_status": "complete",
}

with rmock.Mocker() as mocker:
mocker.post(rmock.ANY, status_code=200)
# mock response to requesting jobs
mocker.get(
re.compile(r"/v1/job\?queue=\w+"),
[{"text": json.dumps(mock_job_data)}, {"text": "{}"}],
)
# mock response to requesting job attachments
mocker.get(
re.compile(r"/v1/job/[-a-z0-9]+/attachments"),
content=archive.read_bytes(),
)
# mock response to results request
mocker.get(re.compile(r"/v1/result/"))

# request and process the job (should unpack the archive)
with patch("shutil.rmtree"):
agent.process_jobs()

# check the request history to confirm that:
# - there is a request to the job retrieval endpoint
# - there a request to the attachment retrieval endpoint
history = mocker.request_history
assert history[0].path == "/v1/job"
assert history[2].path == f"/v1/job/{job_id}/attachments"

# check that the attachment is *not* where it's supposed to be
basepath = Path(self.tmpdir) / mock_job_data["job_id"]
attachment = basepath / ATTACHMENTS_DIR / "test" / attachment.name
assert not attachment.exists()

def test_config_vars_in_env(self, agent, requests_mock):
self.config["test_command"] = (
"bash -c 'echo test_string is $test_string'"
)
fake_job_data = {
mock_job_data = {
"job_id": str(uuid.uuid1()),
"job_queue": "test",
"test_data": {"test_cmds": "foo"},
}
requests_mock.get(
rmock.ANY, [{"text": json.dumps(fake_job_data)}, {"text": "{}"}]
rmock.ANY, [{"text": json.dumps(mock_job_data)}, {"text": "{}"}]
)
requests_mock.post(rmock.ANY, status_code=200)
with patch("shutil.rmtree"):
agent.process_jobs()
testlog = open(
os.path.join(self.tmpdir, fake_job_data.get("job_id"), "test.log")
os.path.join(self.tmpdir, mock_job_data.get("job_id"), "test.log")
).read()
assert "ThisIsATest" in testlog

def test_phase_failed(self, agent, requests_mock):
# Make sure we stop running after a failed phase
self.config["provision_command"] = "/bin/false"
self.config["test_command"] = "echo test1"
fake_job_data = {
mock_job_data = {
"job_id": str(uuid.uuid1()),
"job_queue": "test",
"provision_data": {"url": "foo"},
"test_data": {"test_cmds": "foo"},
}
requests_mock.get(
rmock.ANY, [{"text": json.dumps(fake_job_data)}, {"text": "{}"}]
rmock.ANY, [{"text": json.dumps(mock_job_data)}, {"text": "{}"}]
)
requests_mock.post(rmock.ANY, status_code=200)
with patch("shutil.rmtree"), patch("os.unlink"):
agent.process_jobs()
outcome_file = os.path.join(
os.path.join(
self.tmpdir,
fake_job_data.get("job_id"),
mock_job_data.get("job_id"),
"testflinger-outcome.json",
)
)
Expand All @@ -144,12 +259,12 @@ def test_retry_transmit(self, agent, requests_mock):
# Make sure we retry sending test results
self.config["provision_command"] = "/bin/false"
self.config["test_command"] = "echo test1"
fake_job_data = {"job_id": str(uuid.uuid1()), "job_queue": "test"}
mock_job_data = {"job_id": str(uuid.uuid1()), "job_queue": "test"}
# Send an extra empty data since we will be calling get 3 times
requests_mock.get(
rmock.ANY,
[
{"text": json.dumps(fake_job_data)},
{"text": json.dumps(mock_job_data)},
{"text": "{}"},
{"text": "{}"},
],
Expand All @@ -163,13 +278,13 @@ def test_retry_transmit(self, agent, requests_mock):
agent.process_jobs()
first_dir = os.path.join(
self.config.get("execution_basedir"),
fake_job_data.get("job_id"),
mock_job_data.get("job_id"),
)
mock_transmit_job_outcome.assert_called_with(first_dir)
# Try processing jobs again, now it should be in results_basedir
agent.process_jobs()
retry_dir = os.path.join(
self.config.get("results_basedir"), fake_job_data.get("job_id")
self.config.get("results_basedir"), mock_job_data.get("job_id")
)
mock_transmit_job_outcome.assert_called_with(retry_dir)

Expand Down
Loading