Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor run with log #225

Merged
merged 6 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions agent/testflinger_agent/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright (C) 2024 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>

from .client import TestflingerClient


class LiveOutputHandler:
def __init__(self, client: TestflingerClient, job_id: str):
self.client = client
self.job_id = job_id

def __call__(self, data: str):
self.client.post_live_output(self.job_id, data)


class LogUpdateHandler:
def __init__(self, log_file: str):
self.log_file = log_file

def __call__(self, data: str):
with open(self.log_file, "a") as log:
log.write(data)
150 changes: 37 additions & 113 deletions agent/testflinger_agent/job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2017 Canonical
# Copyright (C) 2017-2024 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
Expand All @@ -16,12 +16,16 @@
import json
import logging
import os
import signal
import sys
import subprocess
import time

from testflinger_agent.errors import TFServerError
from .runner import CommandRunner
from .handlers import LiveOutputHandler, LogUpdateHandler
from .stop_condition_checkers import (
JobCancelledChecker,
GlobalTimeoutChecker,
OutputTimeoutChecker,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,16 +80,41 @@ def run_test_phase(self, phase, rundir):
results_file = os.path.join(rundir, "testflinger-outcome.json")
output_log = os.path.join(rundir, phase + ".log")
serial_log = os.path.join(rundir, phase + "-serial.log")

logger.info("Running %s_command: %s", phase, cmd)
# Set the exitcode to some failed status in case we get interrupted
exitcode = 99
runner = CommandRunner(cwd=rundir, env=self.client.config)
output_log_handler = LogUpdateHandler(output_log)
live_output_handler = LiveOutputHandler(self.client, self.job_id)
runner.register_output_handler(output_log_handler)
runner.register_output_handler(live_output_handler)

# Reserve phase uses a separate timeout handler
if phase != "reserve":
global_timeout_checker = GlobalTimeoutChecker(
self.get_global_timeout()
)
runner.register_stop_condition_checker(global_timeout_checker)

# We only need to check for output timeouts during the test phase
if phase == "test":
output_timeout_checker = OutputTimeoutChecker(
self.get_output_timeout()
)
runner.register_stop_condition_checker(output_timeout_checker)

# Do not allow cancellation during provision for safety reasons
if phase != "provision":
job_cancelled_checker = JobCancelledChecker(
self.client, self.job_id
)
runner.register_stop_condition_checker(job_cancelled_checker)

for line in self.banner(
"Starting testflinger {} phase on {}".format(phase, node)
):
self.run_with_log("echo '{}'".format(line), output_log, rundir)
runner.run(f"echo '{line}'")
try:
exitcode = self.run_with_log(cmd, output_log, rundir)
exitcode = runner.run(cmd)
except Exception as e:
logger.exception(e)
finally:
Expand Down Expand Up @@ -191,111 +220,6 @@ def _set_truncate(self, f, size=1024 * 1024):
else:
f.seek(0, 0)

def run_with_log(self, cmd, logfile, cwd=None):
"""Execute command in a subprocess and log the output

:param cmd:
Command to run
:param logfile:
Filename to save the output in
:param cwd:
Path to run the command from
:return:
returncode from the process
"""
env = os.environ.copy()
# Make sure there all values we add are strings
env.update(
{k: v for k, v in self.client.config.items() if isinstance(v, str)}
)
global_timeout = self.get_global_timeout()
output_timeout = self.get_output_timeout()
start_time = time.time()
with open(logfile, "a", encoding="utf-8") as f:
live_output_buffer = ""
buffer_timeout = time.time()
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
cwd=cwd,
env=env,
)

def cleanup(signum, frame):
process.kill()

signal.signal(signal.SIGTERM, cleanup)
set_nonblock(process.stdout.fileno())

while True:
line = process.stdout.read()
if not line and process.poll() is not None:
# Process exited
break

if line:
# Write the latest output to the log file, stdout, and
# the live output buffer
buf = line.decode(sys.stdout.encoding, errors="replace")
sys.stdout.write(buf)
live_output_buffer += buf
f.write(buf)
f.flush()
else:
if (
self.phase == "test"
and time.time() - buffer_timeout > output_timeout
):
buf = (
"\nERROR: Output timeout reached! "
"({}s)\n".format(output_timeout)
)
live_output_buffer += buf
f.write(buf)
process.kill()
break

# Check if it's time to send the output buffer to the server
if live_output_buffer and time.time() - buffer_timeout > 10:
if self.client.post_live_output(
self.job_id, live_output_buffer
):
live_output_buffer = ""
buffer_timeout = time.time()

# Check global timeout
if (
self.phase != "reserve"
and time.time() - start_time > global_timeout
):
buf = "\nERROR: Global timeout reached! ({}s)\n".format(
global_timeout
)
live_output_buffer += buf
f.write(buf)
process.kill()
break

# Check if job was canceled
if (
self.client.check_job_state(self.job_id) == "cancelled"
and self.phase != "provision"
):
logger.info("Job cancellation was requested, exiting.")
process.kill()
break

if live_output_buffer:
self.client.post_live_output(self.job_id, live_output_buffer)

try:
status = process.wait(10) # process.returncode
except TimeoutError:
status = 99 # Default in case something goes wrong
return status

def get_global_timeout(self):
"""Get the global timeout for the test run in seconds"""
# Default timeout is 4 hours
Expand Down
128 changes: 128 additions & 0 deletions agent/testflinger_agent/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright (C) 2024 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>

import logging
import os
import fcntl
import sys
import shlex
import signal
import subprocess
import threading
import time

from typing import Callable, Optional, List

logger = logging.getLogger(__name__)


class CommandRunner:
def __init__(self, cwd: Optional[str], env: Optional[dict]):
self.output_handlers: List[Callable] = []
self.stop_condition_checkers: List[Callable] = []
self.process: Optional[subprocess.Popen] = None
self.cwd = cwd
self.env = os.environ.copy()
if env:
self.env.update(
{k: str(v) for k, v in env.items() if isinstance(v, str)}
)

def register_output_handler(self, handler: Callable[[str], None]):
self.output_handlers.append(handler)
omar-selo marked this conversation as resolved.
Show resolved Hide resolved

def post_output(self, data: str):
for handler in self.output_handlers:
handler(data)

def register_stop_condition_checker(
self, checker: Callable[[], Optional[str]]
):
self.stop_condition_checkers.append(checker)
omar-selo marked this conversation as resolved.
Show resolved Hide resolved

def check_stop_conditions(self) -> bool:
for checker in self.stop_condition_checkers:
output = checker()
if output:
# This shouldn't happen, but makes mypy happy
assert self.process is not None
omar-selo marked this conversation as resolved.
Show resolved Hide resolved
self.post_output(output)
omar-selo marked this conversation as resolved.
Show resolved Hide resolved
return True
return False

def check_output(self):
raw_output = self.process.stdout.read()
if not raw_output:
return

output = raw_output.decode(sys.stdout.encoding, errors="replace")
self.post_output(output)
omar-selo marked this conversation as resolved.
Show resolved Hide resolved

def run_command_thread(self, cmd: str):
self.process = subprocess.Popen(
shlex.split(cmd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=self.cwd,
env=self.env,
)
# Ensure that the output doesn't get buffered on our end
if self.process.stdout is not None:
set_nonblock(self.process.stdout.fileno())
self.process.wait()

def cleanup(self):
if self.process is not None:
self.process.kill()

def run(self, cmd: str) -> int:

signal.signal(signal.SIGTERM, lambda signum, frame: self.cleanup())

run_cmd_thread = threading.Thread(
target=self.run_command_thread, args=(cmd,)
)
run_cmd_thread.start()

# Make sure to wait until the process actually starts
while self.process is None:
time.sleep(1)

while self.process.poll() is None:
time.sleep(10)

if self.check_stop_conditions():
self.cleanup()
omar-selo marked this conversation as resolved.
Show resolved Hide resolved
break

self.check_output()
# Check for any final output before exiting

run_cmd_thread.join()
self.check_output()

signal.signal(signal.SIGTERM, signal.SIG_DFL)
omar-selo marked this conversation as resolved.
Show resolved Hide resolved

return self.process.returncode


def set_nonblock(fd: int):
"""Set the specified fd to nonblocking output

:param fd:
File descriptor that should be set to nonblocking mode
"""

fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
omar-selo marked this conversation as resolved.
Show resolved Hide resolved
51 changes: 51 additions & 0 deletions agent/testflinger_agent/stop_condition_checkers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright (C) 2024 Canonical
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>

import time
from typing import Optional

from .client import TestflingerClient


class JobCancelledChecker:
def __init__(self, client: TestflingerClient, job_id: str):
self.client = client
self.job_id = job_id

def __call__(self) -> Optional[str]:
if self.client.check_job_state(self.job_id) == "cancelled":
return "\nJob cancellation was requested, exiting.\n"
return None


class GlobalTimeoutChecker:
def __init__(self, timeout: int):
self.timeout = timeout
self.start_time = time.time()

def __call__(self) -> Optional[str]:
if time.time() - self.start_time > self.timeout:
return f"\nERROR: Global timeout reached! ({self.timeout}s)\n"
return None


class OutputTimeoutChecker:
def __init__(self, timeout: int):
self.timeout = timeout
self.start_time = time.time()

def __call__(self) -> Optional[str]:
if time.time() - self.start_time > self.timeout:
return f"\nERROR: Output timeout reached! ({self.timeout}s)\n"
return None
omar-selo marked this conversation as resolved.
Show resolved Hide resolved
Loading