Source code for titanfe.apps.control_peer.brick

#
# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#

"""A Brick"""

import re
import shutil
from collections import namedtuple
from datetime import datetime
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile
import venv
import os
import subprocess

from titanfe.constants import GET_PIP
from titanfe import log as logging
from .services import package_manager
from ...config import configuration

Flow = namedtuple("Flow", ("uid", "name", "schema"))
Connections = namedtuple("Connections", ("input", "output"))
Ports = namedtuple("Ports", ("input", "output"))

RuntimeParameters = namedtuple(
    "RuntimeParameters",
    ("autoscale_max_instances", "autoscale_queue_level", "exit_after_idle_seconds"),
)

REQUIREMENTS = "requirements.txt"


[docs]def get_venv_exe(directory): if os.name == "nt": return os.path.join(directory, "Scripts", "python.exe") return os.path.join(directory, "bin", "python")
[docs]class EnvBuilder(venv.EnvBuilder): """Builder for the virtual enviroments for each brick """ def __init__(self, logger, *args, **kwargs): self.log = logger self.pip_failed = False self.exe = None super().__init__(*args, **kwargs)
[docs] def post_setup(self, context): """ install platforma and brick requirements during setup of the virtual environment """ self.exe = context.env_exe if self.pip_failed: self.install_pip(context) self.install_requirements(context)
[docs] def install_pip(self, context): """install pip manually""" self.log.info("installing pip") binpath = context.bin_path pip = [context.env_exe, GET_PIP] with subprocess.Popen( pip, cwd=binpath, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) as process: with process.stdout: self.log_stdout(process.stdout) exitcode = process.wait() if exitcode > 0: raise RuntimeError(f"Failed to install pip. ({exitcode})")
[docs] def log_stdout(self, pipe): for line in pipe.readlines(): if line: self.log.info(line.decode())
[docs] def install_requirements(self, context): """install requirements in virtual environment""" requirements = os.path.join(context.env_dir, "..", REQUIREMENTS) if not os.path.exists(requirements): return self.log.info("installing brick requirements") binpath = context.bin_path get_requirements = [context.env_exe, "-m", "pip", "install", "-r", requirements] with subprocess.Popen( get_requirements, cwd=binpath, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) as process: with process.stdout: self.log_stdout(process.stdout) exitcode = process.wait() if exitcode > 0: raise RuntimeError(f"Failed to install requirements. ({exitcode})")
[docs]class BrickBaseDefinition: """ The general definition of a brick contains it's name and id, as well as the module itself and possibly a set of default parameters for that module read from the annexed config.yaml """ def __init__(self, uid, name=None, family=None, logger=None, last_modified=None): self.uid = uid self.name = name self.family = family self.last_modified = last_modified or None self.log = logger self.module_path = None self.guess_module_path() module_parent = Path(configuration.brick_folder) destination = module_parent / self.uid self.exe = get_venv_exe(os.path.join(destination, "venv")) def __getstate__(self): _dict = dict(self.__dict__) del _dict["log"] return _dict @property def venv_path(self): if self.module_path: return os.path.join(self.module_path.parent, "venv") return ""
[docs] def guess_module_path(self): """ The module is expected to be found in the configured brick_folder extended with the brick-ID and should be either a folder or python file having the same name as the brick. """ module_parent = Path(configuration.brick_folder) / self.uid try: self.module_path = next( path for path in module_parent.iterdir() if re.match(f"^{self.name}(?:\\.py)?$", path.name, re.IGNORECASE) ) except (FileNotFoundError, StopIteration): self.log.warning( "Missing module `%s/` or `%s.py` in %s", self.name, self.name, module_parent )
[docs] def create_virtual_env(self): """ create a virtual enviroment for the brick""" environment = EnvBuilder( logger=self.log, system_site_packages=True, clear=False, symlinks=True, upgrade=False, with_pip=True, prompt=None, ) try: environment.create(self.venv_path) except subprocess.CalledProcessError: environment.with_pip = False environment.pip_failed = True environment.create(self.venv_path) self.exe = environment.exe
def __repr__(self): return f"Base({self.uid}, {self.name}, " f"module_path={self.module_path})"
[docs] async def install_or_update(self, update=True, force_update=False): """ Get a brick from the package manager and install it""" module_parent = Path(configuration.brick_folder) destination = module_parent / self.uid if destination.exists(): self.log.debug("Brick %s is already present", self.uid) if not update: return if not force_update: last_modified_local = destination.stat().st_mtime if datetime.utcfromtimestamp(last_modified_local) >= datetime.utcfromtimestamp( self.last_modified ): return shutil.rmtree(destination) destination.mkdir(parents=True, exist_ok=True) source = await package_manager.get_source_files(self.uid) if not source: return with ZipFile(BytesIO(source), "r") as compressed: self.log.debug("compressed brick content: %s", compressed.printdir()) compressed.extractall(path=destination) self.log.info( "installed/updated source files for brick %s into %s", self.uid, list(destination.iterdir()), ) self.guess_module_path() self.create_virtual_env()
[docs]class BrickInstanceDefinition: """ The Brick Instance Definition is a fully configured brick in a flow context. It should have it's own name and uid within the flow, precise parameters and possibly connections to other bricks. """ def __init__( # pylint:disable= too-many-arguments self, uid, name, ports: Ports, flow: Flow, base: BrickBaseDefinition, processing_parameters: dict, runtime_parameters: RuntimeParameters, connections: Connections, ): self.flow = flow self.uid = uid self.name = name self.base = base self.ports = ports self.processing_parameters = processing_parameters self.runtime_parameters = runtime_parameters self.connections = connections def __repr__(self): return ( f"Brick({self.uid}, {self.name}, flow={self.flow}, " f"base={self.base}, " f"processing_parameters={self.processing_parameters}, " f"runtime_parameters={self.runtime_parameters}, " f")" ) def __hash__(self): return hash(self.uid) def __eq__(self, other): if isinstance(other, BrickInstanceDefinition): return other.uid == self.uid return False
[docs] @classmethod def from_gridmanager(cls, brick_description): """Add brick configuration using default and flow-specific parameters if available""" config = brick_description["Configuration"] instance_uid = config["instanceId"] instance_name = config["name"] ports = Ports(config["inputPorts"], config["outputPorts"]) flow = Flow( brick_description["FlowID"], brick_description["FlowName"], brick_description["FlowSchema"], ) logger = logging.TitanPlatformLogger( __name__, context=logging.FlowContext(flow.uid, flow.name, instance_uid, instance_name) ) base = BrickBaseDefinition( uid=config["id"], name=config["brick"], family=config["family"], logger=logger ) runtime_params = RuntimeParameters(*[config[f] for f in RuntimeParameters._fields]) processing_params = config["parameters"] connections = Connections(brick_description["Inbound"], brick_description["Outbound"]) instance = cls( instance_uid, instance_name, ports, flow, base, processing_params, runtime_params, connections, ) return instance