Source code for titanfe.apps.control_peer.runner

#
# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#

"""Encapsulate brick runner related things"""
import asyncio
import os
import pickle
import signal
import subprocess

from titanfe import log as logging
from titanfe.config import configuration
from titanfe.utils import create_uid, Flag


[docs]class BrickRunner: """The BrickRunner can be used to start brick runner processes and hold corresponding data Arguments: controlpeer_address (NetworkAddress): the address on which the control peer is listening """ def __init__(self, brick_instance, on_termination_cb=lambda *args, **kwargs: None): self.uid = create_uid(prefix="R-") self.brick = brick_instance self.log = logging.TitanPlatformLogger( __name__, context=logging.FlowContext.from_brick(brick_instance) ) self.process = None self.terminated = Flag() self.on_termination_cb = on_termination_cb def __hash__(self): return hash(self.uid) def __eq__(self, other): if isinstance(other, BrickRunner): return other.uid == self.uid return False def __repr__(self): return f"BrickRunner(uid={self.uid}, brick={self.brick})"
[docs] def start(self): """Start a new brick runner process""" # pylint: disable=consider-using-with br_command = [ self.brick.base.exe, "-m", "titanfe.apps.brick_runner", "-id", str(self.uid), "-configuration", pickle.dumps(configuration).hex(), "-brick", pickle.dumps(self.brick).hex(), ] self.log.debug("command: %r", br_command) if os.name == "nt": self.process = subprocess.Popen( br_command, cwd=self.brick.base.venv_path, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP, # to allow SIGBREAK on windows ) else: self.process = subprocess.Popen(br_command, cwd=self.brick.base.venv_path) br_exitcode = self.process.poll() if br_exitcode is not None: raise RuntimeError(f"Failed to start runner. ({br_exitcode})") asyncio.create_task(self.check_termination())
[docs] async def check_termination(self): """ do cyclic checks for an exitcode of the brick runner's process to detect it's termination """ exitcode = None while exitcode is None: await asyncio.sleep(1) exitcode = self.process.poll() self.terminated.set() self.log.info("Runner terminated (%s) - %s", exitcode, self) self.on_termination_cb(self)
[docs] async def stop(self): """request and await runner termination""" self.log.info("stop %r", self) if os.name == "nt": self.process.send_signal(signal.CTRL_BREAK_EVENT) # pylint: disable=no-member else: self.process.send_signal(signal.SIGINT) try: await asyncio.wait_for(self.terminated.wait(), timeout=5) except asyncio.TimeoutError: self.log.with_context.warning("BrickRunner did not stop in a timely manner: %s", self)