Source code for titanfe.brick

#
# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#

"""Abstract base classes for building Bricks"""

from abc import ABCMeta, abstractmethod
from typing import Type, Optional, Dict

from ujotypes import UjoBase

from titanfe.apps.brick_runner.adapter import BrickAdapter


[docs]class ConfigurationError(Exception): pass
[docs]class BrickBase(metaclass=ABCMeta): """An abstract base class for building Bricks""" def __init__(self, adapter: BrickAdapter, parameters: Optional[Dict] = None): """ Initialize the Brick Arguments: adapter (BrickAdapter): the BrickAdapter provides an interface between the BrickRunner and the Brick parameters (Optional[Dict]): if any parameters are required, the module should be accompanied with a `config.yml` this will be the base for parameters and their default values which are then updated with values from the current flow configuration """ self.adapter = adapter self.parameters = parameters def __enter__(self): self.setup() return self def __exit__(self, exc_type, exc_val, exc_tb): self.teardown()
[docs] def setup(self): """ Upon loading the Brick in the BrickRunner the setup-method is run once and can be used to e.g. open connections that will be held persistent. """
[docs] def teardown(self): """ When unloading the Brick from the BrickRunner the teardown-method is run once, implement it to e.g. close connections opened during `setup`"""
[docs] @abstractmethod def process(self, input: Type[UjoBase], port: str): # pylint: disable=redefined-builtin """ Do the input processing. To modify the payload of the current packet simply return a new value. Use the adapter's `emit_new_packet` to create a data packet and insert it into the flow. Arguments: input (Type[UjoBase]): the input data to be processed Returns: Optional[UjoBase]: the new payload for current data packet traveling in the flow. When returning `None` the current packet get's dropped. """
[docs]class InletBrickBase(BrickBase, metaclass=ABCMeta): """An abstract base class for building bricks that will run a continous `process`"""
[docs] @abstractmethod def stop_processing(self): """ The BrickRunner needs a way to properly end continuously running bricks. It will call this method upon receiving a termination request and expect the processing to be aborted/terminated. """