Skip to content

Commit

Permalink
Check for online agents when submitting a job
Browse files Browse the repository at this point in the history
  • Loading branch information
plars committed Dec 4, 2024
1 parent 750fe0b commit ec8dadc
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 25 deletions.
31 changes: 30 additions & 1 deletion cli/testflinger_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ def _add_submit_args(self, subparsers):
parser.set_defaults(func=self.submit)
parser.add_argument("--poll", "-p", action="store_true")
parser.add_argument("--quiet", "-q", action="store_true")
parser.add_argument("--wait-for-available-agents", action="store_true")
parser.add_argument("filename").completer = (
argcomplete.completers.FilesCompleter(
allowednames=("*.yaml", "*.yml", "*.json")
Expand Down Expand Up @@ -493,6 +494,11 @@ def submit(self):
)
auth_headers = None

# Check if agents are available to handle this queue
# and warn or exit depending on options
queue = job_dict.get("job_queue")
self.check_online_agents_available(queue)

attachments_data = self.extract_attachment_data(job_dict)
if attachments_data is None:
# submit job, no attachments
Expand All @@ -515,7 +521,6 @@ def submit(self):
"failed to submit attachments"
)

queue = job_dict.get("job_queue")
self.history.new(job_id, queue)
if self.args.quiet:
print(job_id)
Expand All @@ -525,6 +530,30 @@ def submit(self):
if self.args.poll:
self.do_poll(job_id)

def check_online_agents_available(self, queue: str):
"""Exit or warn if no online agents available for a specified queue"""
try:
agents = self.client.get_agents_on_queue(queue)
except client.HTTPError:
agents = []
online_agents = [
agent for agent in agents if agent["state"] != "offline"
]
if len(online_agents) > 0:
# If there are online agents, then we can proceed
return
if not self.args.wait_for_available_agents:
print(
f"ERROR: No online agents available for queue {queue}. "
"If you want to wait for agents to become available, use the "
"--wait-for-available-agents option."
)
sys.exit(1)
print(
f"WARNING: No online agents available for queue {queue}. "
"Waiting for agents to become available..."
)

def submit_job_data(self, data: dict, headers: dict = None):
"""Submit data that was generated or read from a file as a test job"""
try:
Expand Down
6 changes: 6 additions & 0 deletions cli/testflinger_cli/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,9 @@ def get_images(self, queue):
return json.loads(data)
except ValueError:
return {}

def get_agents_on_queue(self, queue):
"""Get the list of all agents listening to a specified queue"""
endpoint = f"/v1/queues/{queue}/agents"
data = self.get(endpoint)
return json.loads(data)
106 changes: 84 additions & 22 deletions cli/testflinger_cli/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ def test_cancel(requests_mock):
def test_submit(capsys, tmp_path, requests_mock):
"""Make sure jobid is read back from submitted job"""
jobid = str(uuid.uuid1())
fake_data = {"queue": "fake", "provision_data": {"distro": "fake"}}
fake_data = {"job_queue": "fake", "provision_data": {"distro": "fake"}}
testfile = tmp_path / "test.json"
testfile.write_text(json.dumps(fake_data))
fake_return = {"job_id": jobid}
requests_mock.post(URL + "/v1/job", json=fake_return)
requests_mock.get(
URL + "/v1/queues/fake/agents",
json=[{"name": "fake_agent", "state": "waiting"}],
)
sys.argv = ["", "submit", str(testfile)]
tfcli = testflinger_cli.TestflingerCli()
tfcli.submit()
Expand All @@ -94,11 +98,15 @@ def test_submit(capsys, tmp_path, requests_mock):

def test_submit_bad_data(tmp_path, requests_mock):
"""Ensure a 422 response from bad data shows the returned errors"""
fake_data = {"badkey": "badvalue"}
fake_data = {"badkey": "badvalue", "job_queue": "fake"}
testfile = tmp_path / "test.json"
testfile.write_text(json.dumps(fake_data))
# return 422 and "expected error"
requests_mock.post(URL + "/v1/job", status_code=422, text="expected error")
requests_mock.get(
URL + "/v1/queues/fake/agents",
json=[{"name": "fake_agent", "state": "waiting"}],
)
sys.argv = ["", "submit", str(testfile)]
tfcli = testflinger_cli.TestflingerCli()
with pytest.raises(SystemExit) as err:
Expand Down Expand Up @@ -252,7 +260,7 @@ def test_submit_with_attachments(tmp_path):
job_id = str(uuid.uuid1())
job_file = tmp_path / "test.json"
job_data = {
"queue": "fake",
"job_queue": "fake",
"test_data": {
"attachments": [
{
Expand All @@ -273,6 +281,10 @@ def test_submit_with_attachments(tmp_path):
mock_response = {"job_id": job_id}
mocker.post(f"{URL}/v1/job", json=mock_response)
mocker.post(f"{URL}/v1/job/{job_id}/attachments")
mocker.get(
f"{URL}/v1/queues/fake/agents",
json=[{"name": "fake_agent", "state": "waiting"}],
)

# use cli to submit the job (processes `sys.argv` for arguments)
tfcli.submit()
Expand All @@ -281,9 +293,9 @@ def test_submit_with_attachments(tmp_path):
# - there is a request to the job submission endpoint
# - there a request to the attachment submission endpoint
history = mocker.request_history
assert len(history) == 2
assert history[0].path == "/v1/job"
assert history[1].path == f"/v1/job/{job_id}/attachments"
assert len(history) == 3
assert history[1].path == "/v1/job"
assert history[2].path == f"/v1/job/{job_id}/attachments"

# extract the binary file data from the request
# (`requests_mock` only provides access to the `PreparedRequest`)
Expand All @@ -307,7 +319,7 @@ def test_submit_attachments_retries(tmp_path):
job_id = str(uuid.uuid1())
job_file = tmp_path / "test.json"
job_data = {
"queue": "fake",
"job_queue": "fake",
"test_data": {
"attachments": [
{
Expand Down Expand Up @@ -339,6 +351,10 @@ def test_submit_attachments_retries(tmp_path):
{"status_code": 200},
],
)
mocker.get(
f"{URL}/v1/queues/fake/agents",
json=[{"name": "fake_agent", "state": "waiting"}],
)

# use cli to submit the job (processes `sys.argv` for arguments)
tfcli.submit()
Expand All @@ -347,9 +363,9 @@ def test_submit_attachments_retries(tmp_path):
# - there is a request to the job submission endpoint
# - there are repeated requests to the attachment submission endpoint
history = mocker.request_history
assert len(history) == 5
assert history[0].path == "/v1/job"
for entry in history[1:]:
assert len(history) == 6
assert history[1].path == "/v1/job"
for entry in history[2:]:
assert entry.path == f"/v1/job/{job_id}/attachments"


Expand All @@ -359,7 +375,7 @@ def test_submit_attachments_no_retries(tmp_path):
job_id = str(uuid.uuid1())
job_file = tmp_path / "test.json"
job_data = {
"queue": "fake",
"job_queue": "fake",
"test_data": {
"attachments": [
{
Expand All @@ -383,6 +399,10 @@ def test_submit_attachments_no_retries(tmp_path):
f"{URL}/v1/job/{job_id}/attachments", [{"status_code": 400}]
)
mocker.post(f"{URL}/v1/job/{job_id}/action", [{"status_code": 200}])
mocker.get(
f"{URL}/v1/queues/fake/agents",
json=[{"name": "fake_agent", "state": "waiting"}],
)

with pytest.raises(SystemExit) as exc_info:
# use cli to submit the job (processes `sys.argv` for arguments)
Expand All @@ -395,10 +415,10 @@ def test_submit_attachments_no_retries(tmp_path):
# no retries
# - there is a final request to cancel the action
history = mocker.request_history
assert len(history) == 3
assert history[0].path == "/v1/job"
assert history[1].path == f"/v1/job/{job_id}/attachments"
assert history[2].path == f"/v1/job/{job_id}/action"
assert len(history) == 4
assert history[1].path == "/v1/job"
assert history[2].path == f"/v1/job/{job_id}/attachments"
assert history[3].path == f"/v1/job/{job_id}/action"


def test_submit_attachments_timeout(tmp_path):
Expand All @@ -407,7 +427,7 @@ def test_submit_attachments_timeout(tmp_path):
job_id = str(uuid.uuid1())
job_file = tmp_path / "test.json"
job_data = {
"queue": "fake",
"job_queue": "fake",
"test_data": {
"attachments": [
{
Expand Down Expand Up @@ -438,6 +458,10 @@ def test_submit_attachments_timeout(tmp_path):
],
)
mocker.post(f"{URL}/v1/job/{job_id}/action", [{"status_code": 200}])
mocker.get(
f"{URL}/v1/queues/fake/agents",
json=[{"name": "fake_agent", "state": "waiting"}],
)

with pytest.raises(SystemExit) as exc_info:
# use cli to submit the job (processes `sys.argv` for arguments)
Expand All @@ -448,18 +472,18 @@ def test_submit_attachments_timeout(tmp_path):
# - there is a request to the job submission endpoint
# - there a request to the attachment submission endpoint
history = mocker.request_history
assert len(history) == 4
assert history[0].path == "/v1/job"
assert history[1].path == f"/v1/job/{job_id}/attachments"
assert len(history) == 5
assert history[1].path == "/v1/job"
assert history[2].path == f"/v1/job/{job_id}/attachments"
assert history[3].path == f"/v1/job/{job_id}/action"
assert history[3].path == f"/v1/job/{job_id}/attachments"
assert history[4].path == f"/v1/job/{job_id}/action"


def test_submit_with_priority(tmp_path, requests_mock):
"""Tests authorization of jobs submitted with priority"""
job_id = str(uuid.uuid1())
job_data = {
"queue": "fake",
"job_queue": "fake",
"job_priority": 100,
}
job_file = tmp_path / "test.json"
Expand All @@ -474,14 +498,18 @@ def test_submit_with_priority(tmp_path, requests_mock):
requests_mock.post(f"{URL}/v1/oauth2/token", text=fake_jwt)
mock_response = {"job_id": job_id}
requests_mock.post(f"{URL}/v1/job", json=mock_response)
requests_mock.get(
URL + "/v1/queues/fake/agents",
json=[{"name": "fake_agent", "state": "waiting"}],
)
tfcli.submit()
assert requests_mock.last_request.headers.get("Authorization") == fake_jwt


def test_submit_priority_no_credentials(tmp_path):
"""Tests priority jobs rejected with no specified credentials"""
job_data = {
"queue": "fake",
"job_queue": "fake",
"job_priority": 100,
}
job_file = tmp_path / "test.json"
Expand Down Expand Up @@ -541,3 +569,37 @@ def test_list_queues_connection_error(caplog, requests_mock):
with pytest.raises(SystemExit):
tfcli.list_queues()
assert "Unable to get a list of queues from the server." in caplog.text


def test_submit_no_agents_fails(capsys, tmp_path, requests_mock):
"""Test that submitting a job without online agents fails"""
requests_mock.get(URL + "/v1/queues/fake/agents", json=[])
fake_data = {"job_queue": "fake", "provision_data": {"distro": "fake"}}
test_file = tmp_path / "test.json"
test_file.write_text(json.dumps(fake_data))
sys.argv = ["", "submit", str(test_file)]
tfcli = testflinger_cli.TestflingerCli()
with pytest.raises(SystemExit) as exc_info:
tfcli.submit()
assert exc_info.value.code == 1
assert (
"ERROR: No online agents available for queue fake"
in capsys.readouterr().out
)


def test_submit_no_agents_wait(capsys, tmp_path):
"""
Test that submitting a job without online agents succeeds with
--wait-for-available-agents
"""
fake_data = {"job_queue": "fake", "provision_data": {"distro": "fake"}}
test_file = tmp_path / "test.json"
test_file.write_text(json.dumps(fake_data))
sys.argv = ["", "submit", str(test_file), "--wait-for-available-agents"]
tfcli = testflinger_cli.TestflingerCli()
tfcli.submit()
assert (
"WARNING: No online agents available for queue fake"
in capsys.readouterr().out
)
16 changes: 16 additions & 0 deletions server/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,22 @@ The job_status_webhook parameter is required for this endpoint. Other parameters
$ curl http://localhost:8000/v1/queues/wait_times?queue=foo\&queue=bar
** [GET] /v1/queues/<queue_name>/agents** - Get the list of agents listening to a specified queue

- Parameters:

- queue_name (string): name of the queue for which to get the agents that are listening to it

- Returns:

JSON array of agents listening to the specified queue

- Example:

.. code-block:: console
$ curl http://localhost:8000/v1/queues/foo/agents
**[POST] /v1/oauth2/token** - Authenticate client key and return JWT with permissions

- Headers:
Expand Down
3 changes: 2 additions & 1 deletion server/src/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ class AgentIn(Schema):


class AgentOut(Schema):
"""Agent data input schema"""
"""Agent data output schema"""

name = fields.String(required=True)
state = fields.String(required=False)
queues = fields.List(fields.String(), required=False)
location = fields.String(required=False)
Expand Down
9 changes: 8 additions & 1 deletion server/src/api/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ def images_post():


@v1.get("/agents/data")
@v1.output(schemas.AgentOut)
@v1.output(schemas.AgentOut(many=True))
def agents_get_all():
"""Get all agent data"""
agents = database.mongo.db.agents.find({}, {"_id": False, "log": False})
Expand Down Expand Up @@ -747,6 +747,13 @@ def queue_wait_time_percentiles_get():
return queue_percentile_data


@v1.get("/queues/<queue_name>/agents")
@v1.output(schemas.AgentOut(many=True))
def get_agents_on_queue(queue_name):
"""Get the list of all data for agents listening to a specified queue"""
return database.get_agents_on_queue(queue_name)


def generate_token(allowed_resources, secret_key):
"""Generates JWT token with queue permission given a secret key"""
expiration_time = datetime.utcnow() + timedelta(seconds=2)
Expand Down
9 changes: 9 additions & 0 deletions server/src/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ def get_queue_wait_times(queues: list[str] | None = None) -> list[dict]:
return list(wait_times)


def get_agents_on_queue(queue: str) -> list[dict]:
"""Get the agents that are listening on the specified queue"""
agents = mongo.db.agents.find(
{"queues": {"$in": [queue]}},
{"_id": 0},
)
return list(agents)


def calculate_percentiles(data: list) -> dict:
"""
Calculate the percentiles of the wait times for each queue
Expand Down
Loading

0 comments on commit ec8dadc

Please sign in to comment.