diff --git a/cli/testflinger_cli/__init__.py b/cli/testflinger_cli/__init__.py index e5bcf0f8..23a87eef 100644 --- a/cli/testflinger_cli/__init__.py +++ b/cli/testflinger_cli/__init__.py @@ -343,6 +343,7 @@ def _add_submit_args(self, subparsers): parser.set_defaults(func=self.submit) parser.add_argument("--poll", "-p", action="store_true") parser.add_argument("--quiet", "-q", action="store_true") + parser.add_argument("--wait-for-available-agents", action="store_true") parser.add_argument("filename").completer = ( argcomplete.completers.FilesCompleter( allowednames=("*.yaml", "*.yml", "*.json") @@ -493,6 +494,11 @@ def submit(self): ) auth_headers = None + # Check if agents are available to handle this queue + # and warn or exit depending on options + queue = job_dict.get("job_queue") + self.check_online_agents_available(queue) + attachments_data = self.extract_attachment_data(job_dict) if attachments_data is None: # submit job, no attachments @@ -515,7 +521,6 @@ def submit(self): "failed to submit attachments" ) - queue = job_dict.get("job_queue") self.history.new(job_id, queue) if self.args.quiet: print(job_id) @@ -525,6 +530,30 @@ def submit(self): if self.args.poll: self.do_poll(job_id) + def check_online_agents_available(self, queue: str): + """Exit or warn if no online agents available for a specified queue""" + try: + agents = self.client.get_agents_on_queue(queue) + except client.HTTPError: + agents = [] + online_agents = [ + agent for agent in agents if agent["state"] != "offline" + ] + if len(online_agents) > 0: + # If there are online agents, then we can proceed + return + if not self.args.wait_for_available_agents: + print( + f"ERROR: No online agents available for queue {queue}. " + "If you want to wait for agents to become available, use the " + "--wait-for-available-agents option." + ) + sys.exit(1) + print( + f"WARNING: No online agents available for queue {queue}. " + "Waiting for agents to become available..." + ) + def submit_job_data(self, data: dict, headers: dict = None): """Submit data that was generated or read from a file as a test job""" try: diff --git a/cli/testflinger_cli/client.py b/cli/testflinger_cli/client.py index b0f2a55d..a82bb1a0 100644 --- a/cli/testflinger_cli/client.py +++ b/cli/testflinger_cli/client.py @@ -273,3 +273,9 @@ def get_images(self, queue): return json.loads(data) except ValueError: return {} + + def get_agents_on_queue(self, queue): + """Get the list of all agents listening to a specified queue""" + endpoint = f"/v1/queues/{queue}/agents" + data = self.get(endpoint) + return json.loads(data) diff --git a/cli/testflinger_cli/tests/test_cli.py b/cli/testflinger_cli/tests/test_cli.py index ad35ef0b..6d4e0c37 100644 --- a/cli/testflinger_cli/tests/test_cli.py +++ b/cli/testflinger_cli/tests/test_cli.py @@ -80,11 +80,15 @@ def test_cancel(requests_mock): def test_submit(capsys, tmp_path, requests_mock): """Make sure jobid is read back from submitted job""" jobid = str(uuid.uuid1()) - fake_data = {"queue": "fake", "provision_data": {"distro": "fake"}} + fake_data = {"job_queue": "fake", "provision_data": {"distro": "fake"}} testfile = tmp_path / "test.json" testfile.write_text(json.dumps(fake_data)) fake_return = {"job_id": jobid} requests_mock.post(URL + "/v1/job", json=fake_return) + requests_mock.get( + URL + "/v1/queues/fake/agents", + json=[{"name": "fake_agent", "state": "waiting"}], + ) sys.argv = ["", "submit", str(testfile)] tfcli = testflinger_cli.TestflingerCli() tfcli.submit() @@ -94,11 +98,15 @@ def test_submit(capsys, tmp_path, requests_mock): def test_submit_bad_data(tmp_path, requests_mock): """Ensure a 422 response from bad data shows the returned errors""" - fake_data = {"badkey": "badvalue"} + fake_data = {"badkey": "badvalue", "job_queue": "fake"} testfile = tmp_path / "test.json" testfile.write_text(json.dumps(fake_data)) # return 422 and "expected error" requests_mock.post(URL + "/v1/job", status_code=422, text="expected error") + requests_mock.get( + URL + "/v1/queues/fake/agents", + json=[{"name": "fake_agent", "state": "waiting"}], + ) sys.argv = ["", "submit", str(testfile)] tfcli = testflinger_cli.TestflingerCli() with pytest.raises(SystemExit) as err: @@ -252,7 +260,7 @@ def test_submit_with_attachments(tmp_path): job_id = str(uuid.uuid1()) job_file = tmp_path / "test.json" job_data = { - "queue": "fake", + "job_queue": "fake", "test_data": { "attachments": [ { @@ -273,6 +281,10 @@ def test_submit_with_attachments(tmp_path): mock_response = {"job_id": job_id} mocker.post(f"{URL}/v1/job", json=mock_response) mocker.post(f"{URL}/v1/job/{job_id}/attachments") + mocker.get( + f"{URL}/v1/queues/fake/agents", + json=[{"name": "fake_agent", "state": "waiting"}], + ) # use cli to submit the job (processes `sys.argv` for arguments) tfcli.submit() @@ -281,9 +293,9 @@ def test_submit_with_attachments(tmp_path): # - there is a request to the job submission endpoint # - there a request to the attachment submission endpoint history = mocker.request_history - assert len(history) == 2 - assert history[0].path == "/v1/job" - assert history[1].path == f"/v1/job/{job_id}/attachments" + assert len(history) == 3 + assert history[1].path == "/v1/job" + assert history[2].path == f"/v1/job/{job_id}/attachments" # extract the binary file data from the request # (`requests_mock` only provides access to the `PreparedRequest`) @@ -307,7 +319,7 @@ def test_submit_attachments_retries(tmp_path): job_id = str(uuid.uuid1()) job_file = tmp_path / "test.json" job_data = { - "queue": "fake", + "job_queue": "fake", "test_data": { "attachments": [ { @@ -339,6 +351,10 @@ def test_submit_attachments_retries(tmp_path): {"status_code": 200}, ], ) + mocker.get( + f"{URL}/v1/queues/fake/agents", + json=[{"name": "fake_agent", "state": "waiting"}], + ) # use cli to submit the job (processes `sys.argv` for arguments) tfcli.submit() @@ -347,9 +363,9 @@ def test_submit_attachments_retries(tmp_path): # - there is a request to the job submission endpoint # - there are repeated requests to the attachment submission endpoint history = mocker.request_history - assert len(history) == 5 - assert history[0].path == "/v1/job" - for entry in history[1:]: + assert len(history) == 6 + assert history[1].path == "/v1/job" + for entry in history[2:]: assert entry.path == f"/v1/job/{job_id}/attachments" @@ -359,7 +375,7 @@ def test_submit_attachments_no_retries(tmp_path): job_id = str(uuid.uuid1()) job_file = tmp_path / "test.json" job_data = { - "queue": "fake", + "job_queue": "fake", "test_data": { "attachments": [ { @@ -383,6 +399,10 @@ def test_submit_attachments_no_retries(tmp_path): f"{URL}/v1/job/{job_id}/attachments", [{"status_code": 400}] ) mocker.post(f"{URL}/v1/job/{job_id}/action", [{"status_code": 200}]) + mocker.get( + f"{URL}/v1/queues/fake/agents", + json=[{"name": "fake_agent", "state": "waiting"}], + ) with pytest.raises(SystemExit) as exc_info: # use cli to submit the job (processes `sys.argv` for arguments) @@ -395,10 +415,10 @@ def test_submit_attachments_no_retries(tmp_path): # no retries # - there is a final request to cancel the action history = mocker.request_history - assert len(history) == 3 - assert history[0].path == "/v1/job" - assert history[1].path == f"/v1/job/{job_id}/attachments" - assert history[2].path == f"/v1/job/{job_id}/action" + assert len(history) == 4 + assert history[1].path == "/v1/job" + assert history[2].path == f"/v1/job/{job_id}/attachments" + assert history[3].path == f"/v1/job/{job_id}/action" def test_submit_attachments_timeout(tmp_path): @@ -407,7 +427,7 @@ def test_submit_attachments_timeout(tmp_path): job_id = str(uuid.uuid1()) job_file = tmp_path / "test.json" job_data = { - "queue": "fake", + "job_queue": "fake", "test_data": { "attachments": [ { @@ -438,6 +458,10 @@ def test_submit_attachments_timeout(tmp_path): ], ) mocker.post(f"{URL}/v1/job/{job_id}/action", [{"status_code": 200}]) + mocker.get( + f"{URL}/v1/queues/fake/agents", + json=[{"name": "fake_agent", "state": "waiting"}], + ) with pytest.raises(SystemExit) as exc_info: # use cli to submit the job (processes `sys.argv` for arguments) @@ -448,18 +472,18 @@ def test_submit_attachments_timeout(tmp_path): # - there is a request to the job submission endpoint # - there a request to the attachment submission endpoint history = mocker.request_history - assert len(history) == 4 - assert history[0].path == "/v1/job" - assert history[1].path == f"/v1/job/{job_id}/attachments" + assert len(history) == 5 + assert history[1].path == "/v1/job" assert history[2].path == f"/v1/job/{job_id}/attachments" - assert history[3].path == f"/v1/job/{job_id}/action" + assert history[3].path == f"/v1/job/{job_id}/attachments" + assert history[4].path == f"/v1/job/{job_id}/action" def test_submit_with_priority(tmp_path, requests_mock): """Tests authorization of jobs submitted with priority""" job_id = str(uuid.uuid1()) job_data = { - "queue": "fake", + "job_queue": "fake", "job_priority": 100, } job_file = tmp_path / "test.json" @@ -474,6 +498,10 @@ def test_submit_with_priority(tmp_path, requests_mock): requests_mock.post(f"{URL}/v1/oauth2/token", text=fake_jwt) mock_response = {"job_id": job_id} requests_mock.post(f"{URL}/v1/job", json=mock_response) + requests_mock.get( + URL + "/v1/queues/fake/agents", + json=[{"name": "fake_agent", "state": "waiting"}], + ) tfcli.submit() assert requests_mock.last_request.headers.get("Authorization") == fake_jwt @@ -481,7 +509,7 @@ def test_submit_with_priority(tmp_path, requests_mock): def test_submit_priority_no_credentials(tmp_path): """Tests priority jobs rejected with no specified credentials""" job_data = { - "queue": "fake", + "job_queue": "fake", "job_priority": 100, } job_file = tmp_path / "test.json" @@ -541,3 +569,44 @@ def test_list_queues_connection_error(caplog, requests_mock): with pytest.raises(SystemExit): tfcli.list_queues() assert "Unable to get a list of queues from the server." in caplog.text + + +def test_submit_no_agents_fails(capsys, tmp_path, requests_mock): + """Test that submitting a job without online agents fails""" + requests_mock.get(URL + "/v1/queues/fake/agents", json=[]) + fake_data = {"job_queue": "fake", "provision_data": {"distro": "fake"}} + test_file = tmp_path / "test.json" + test_file.write_text(json.dumps(fake_data)) + sys.argv = ["", "submit", str(test_file)] + tfcli = testflinger_cli.TestflingerCli() + with pytest.raises(SystemExit) as exc_info: + tfcli.submit() + assert exc_info.value.code == 1 + assert ( + "ERROR: No online agents available for queue fake" + in capsys.readouterr().out + ) + + +def test_submit_no_agents_wait(capsys, tmp_path, requests_mock): + """ + Test that submitting a job without online agents succeeds with + --wait-for-available-agents + """ + jobid = str(uuid.uuid1()) + fake_return = {"job_id": jobid} + requests_mock.post(URL + "/v1/job", json=fake_return) + requests_mock.get( + URL + "/v1/queues/fake/agents", + json=[], + ) + fake_data = {"job_queue": "fake", "provision_data": {"distro": "fake"}} + test_file = tmp_path / "test.json" + test_file.write_text(json.dumps(fake_data)) + sys.argv = ["", "submit", str(test_file), "--wait-for-available-agents"] + tfcli = testflinger_cli.TestflingerCli() + tfcli.submit() + assert ( + "WARNING: No online agents available for queue fake" + in capsys.readouterr().out + ) diff --git a/server/README.rst b/server/README.rst index 864aaec9..f44c1e8f 100644 --- a/server/README.rst +++ b/server/README.rst @@ -399,6 +399,22 @@ The job_status_webhook parameter is required for this endpoint. Other parameters $ curl http://localhost:8000/v1/queues/wait_times?queue=foo\&queue=bar +** [GET] /v1/queues//agents** - Get the list of agents listening to a specified queue + +- Parameters: + + - queue_name (string): name of the queue for which to get the agents that are listening to it + +- Returns: + + JSON array of agents listening to the specified queue + +- Example: + + .. code-block:: console + + $ curl http://localhost:8000/v1/queues/foo/agents + **[POST] /v1/oauth2/token** - Authenticate client key and return JWT with permissions - Headers: diff --git a/server/src/api/schemas.py b/server/src/api/schemas.py index 13722f76..4a94d454 100644 --- a/server/src/api/schemas.py +++ b/server/src/api/schemas.py @@ -57,8 +57,9 @@ class AgentIn(Schema): class AgentOut(Schema): - """Agent data input schema""" + """Agent data output schema""" + name = fields.String(required=True) state = fields.String(required=False) queues = fields.List(fields.String(), required=False) location = fields.String(required=False) diff --git a/server/src/api/v1.py b/server/src/api/v1.py index 76ca255f..346c8b1c 100644 --- a/server/src/api/v1.py +++ b/server/src/api/v1.py @@ -553,7 +553,7 @@ def images_post(): @v1.get("/agents/data") -@v1.output(schemas.AgentOut) +@v1.output(schemas.AgentOut(many=True)) def agents_get_all(): """Get all agent data""" agents = database.mongo.db.agents.find({}, {"_id": False, "log": False}) @@ -747,6 +747,13 @@ def queue_wait_time_percentiles_get(): return queue_percentile_data +@v1.get("/queues//agents") +@v1.output(schemas.AgentOut(many=True)) +def get_agents_on_queue(queue_name): + """Get the list of all data for agents listening to a specified queue""" + return database.get_agents_on_queue(queue_name) + + def generate_token(allowed_resources, secret_key): """Generates JWT token with queue permission given a secret key""" expiration_time = datetime.utcnow() + timedelta(seconds=2) diff --git a/server/src/database.py b/server/src/database.py index 5e276803..354000a0 100644 --- a/server/src/database.py +++ b/server/src/database.py @@ -251,6 +251,15 @@ def get_queue_wait_times(queues: list[str] | None = None) -> list[dict]: return list(wait_times) +def get_agents_on_queue(queue: str) -> list[dict]: + """Get the agents that are listening on the specified queue""" + agents = mongo.db.agents.find( + {"queues": {"$in": [queue]}}, + {"_id": 0}, + ) + return list(agents) + + def calculate_percentiles(data: list) -> dict: """ Calculate the percentiles of the wait times for each queue diff --git a/server/tests/test_v1.py b/server/tests/test_v1.py index 99c287c9..9b99973b 100644 --- a/server/tests/test_v1.py +++ b/server/tests/test_v1.py @@ -725,3 +725,23 @@ def test_get_queue_wait_times(mongo_app): assert len(output.json) == 2 assert output.json["queue1"]["50"] == 3.0 assert output.json["queue2"]["50"] == 30.0 + + +def test_get_agents_on_queue(mongo_app): + """Test api to get agents on a queue""" + app, _ = mongo_app + agent_name = "agent1" + agent_data = {"state": "provision", "queues": ["q1", "q2"]} + output = app.post(f"/v1/agents/data/{agent_name}", json=agent_data) + assert 200 == output.status_code + + # Get the agents on the queue + output = app.get("/v1/queues/q1/agents") + assert 200 == output.status_code + assert len(output.json) == 1 + assert output.json[0]["name"] == agent_name + + # Should get an empty list if there are no agents on the queue + output = app.get("/v1/queues/q3/agents") + assert 200 == output.status_code + assert len(output.json) == 0