Titan Flow Engine

The Titan Flow Engine provides the backend services for running data flows on a node.

The core flow engine is composed of two services:

  • Control Peer: One peer per node that takes care of managing Brick Runners on that node.

  • Brick Runner: Virtual Machine for executing the Bricks that the Control Peer has started on it’s node. The Brick Runner is generic and can process all types of Bricks. Each runner will only execute one type of Brick at a time.

Additionally Version 0.1.3 and onward require titan services (sources) to run a full deployment of the titan platform.

Choose How to Install

Titan Flow Engine requires Python 3.7 to be installed.

Depending on whether you want to contribute to the Titan Flow Engine source code you want to choose one of the following options to install:

Installing via PyPi

The flow engine is available on the Python Package Index (PyPi). It is available for Linux and Windows x64 systems. To install it without having to build it yourself open a command line and run:

pip install titanfe

Building the Titan Flow Engine (Advanced Install)

Required packages for building or developing the project can be installed via the requirements_dev.txt in the project’s root folder:

pip install -r requirements_dev.txt

To build and install the Flow Engine open a command line and run:

python setup.py build
pip install .

For those who also want to work on the documentation and build it locally, please use the requirements_doc.txt file to install the necessary python packages.

pip install -r requirements_doc.txt

Running the flow engine

The flow engine is being started by starting the Control Peer. The Control Peer takes care of starting Brick Runners as needed.

Parameters for starting the Control Peer on the command line are:

Name

Purpose

brick_folder

Folder to store installed bricks in (default: ~/titanfe/bricks)

config_file

Path to ControlPeer configuration file

The ControlPeers configuration file contains the following parameters:

Name

Type

Description

Kafka

string

Network address of Kafka instance for metrics and logging

hostname:port

(default: localhost:9092)

GridManager

string

Network address of the GridManager

hostname:port

(default: localhost:8080)

FlowManager

string

Network address of the FlowManager

hostname:port

(default: localhost:9002)

EndpointProvider

string

Network address of the EndpointProvider

hostname:port

(default: “tcp://127.0.0.1:9021”)

SecretKey

string

secret key for brick parameter decryption

(default: None), alternatively use TITAN_SECRET_KEY environment variable

If no secret key is given, the ControlPeer will stop immediately. The secret key needs to be identical to the one used in the FlowManager for encrypting parameters.

To try out the flow engine first download and start the services GridManager, Repository service, User service, PackageManager, and FlowManager. These services are needed for the flow engine to receive flows to process.

Adjust the ControlPeers configuration file (titanfe/apps/control_peer/config.yaml) or create a new one. Then, open a command line and run.

python -m titanfe.apps.control_peer -brick_folder ~/titanfe/bricks -config_file path_to_config_file/config.yaml

Or run it from the root directory of the flow engine by providing the path to the example:

python -m titanfe.apps.control_peer  -brick_folder ~/titanfe/bricks -config_file titanfe/apps/control_peer/config.yaml

Code Quality

Code quality within the project is checked using pylint and flake8.

pylint

Linting is performed with pylint. To define the intended checks .pylintrc is used to configure linting for this project.

Running pylint for the python code in this project the following commands are used:

pylint --rcfile=.pylintrc titanfe

Linting the tests is done running the command:

pylint --rcfile=.pylintrc --disable=duplicate-code ./test

flake8

To make sure the PEP8 standard is applied to the code flake8 can be added to the static tests.

For this project we exclude various errors, warnings and notifications because they do not make sense at this time. This may change while refactoring is considered.

You can run flake 8 with:

flake8

It finds all the python files in this project. The configuration for this project is read from .flake8 in the project’s root directory.

How to configure the Logging for titanfe

The titan FlowEngine utilizes Python’s builtin Logging module. It can be configured via a configuration file, which is located in ./titanfe/logutils/log_config.yml

The following will provide a brief overview, a more detailed explanation can be found in the Python Documentation:

Loggers

The loggers section contains two Loggers, that can be configured separately:

  • titanfe

  • titanfe.bricks

The “titanfe” logger will capture any titanfe internal log messages, whereas the “titanfe.bricks” logger will capture any logging from within a brick.

For each logger you can set a log level and one or more specific handlers:

loggers:
  titanfe:
    level: INFO
    handlers: [console]

The above would capture all logging on info level and above (warning) and output it to the console.

Furthermore, there is a global “root”-Logger, that captures LogRecords from all other loggers unless they are explicitly set to propagate: False. If enabled the root logger allows you to capture logs from Python internals like the debug output of the asyncio package. To enable the root logger set:

root:
 level: DEBUG
 handlers:
 - console
Handlers

The handlers section allows specifying different handlers. Note: An instance of each configured handler gets created on initialization of the Python logging module, regardless of if it gets assigned to any logger or not.

The handlers section allows to specify different handlers. Note: An instance for each configured handler will be created, regardless if it is assigned to any logger or not.

A handler could for example output to the console, write to a file or stream the log records to a remote system. See logging.handlers for the builtin options.

Each handler has the following options:

handlers:
  console:
    class: logging.StreamHandler  # the python class to instantiate
    level: INFO  # minimum log level for this handler
    formatter: default  # a formatter
    filters: [hostname]  # additional filters for the formatter to use
    stream: ext://sys.stdout  # parameters for the class to instantiate, will be passed in as keywords
Formatter

A formatter is used to specify how the log message is to be build up.

Formatter:
  default:
    format: "%(asctime)s %(hostname)s (%(process)5d|%(thread)-5d) %(name)s  %(levelname)-8s %(message)s"

above renders to 2019-09-25 14:06:29,961 COMPUTER (15032|3448 ) titanfe.bricks.generator INFO Finished generating

see Logrecord Attributes for the available options.

Communication within the Titan Flow Engine

Between ControlPeer and Grid Manager

The Grid Manager provides a REST API with the endpoint address/gridmanager/controlpeers. During its setup, the ControlPeer registers at address/gridmanager/controlpeers by posting “address:port” of its own REST interface for later communication with the Grid Manager. When a flow run state is to be changed, the Grid Manager sends a request to the ControlPeers Rest API to the endpoint /api/v1/flows/string:name.

Between ControlPeer and Package Manager

The Package Manager provides a REST API with the endpoint address/packagemanager/controlpeers. During its setup, the ControlPeer registers at address/packagemanager/controlpeers by posting “address:port” of its own REST interface for later communication with the Package Manager. After registering it requests the list of bricks at address/packagemanager/bricks . For installation of bricks, the Package Manager provides a REST API to download brick code zip archives. The ControlPeer fetches code of not yet installed bricks from the endpoint address/packagemanager/bricks/string:BrickId> and unpacks it locally. In turn, when a brick package is installed, the Package Manager sends the list of installed bricks to the ControlPeers Rest API to the endpoint /api/v1/bricks/install. During shutdown, the ControlPeer deregisters at the PackageManager by sending a delete request to address/packagemanager/controlpeers.

Between ControlPeer and Flow Manager

The Flow Manager provides a REST api with the endpoint address/flows/string:name/config In case the Grid Manager starts a flow, the ControlPeer requests the flow configuration from this endpoint using the flow name as provided by the Grid Manager.

Between BrickRunners

BrickRunners connect to each other on their Input/Output: A BrickRunner (A) creates a server for it’s Output. This server address is then passed onto the ControlPeer when requesting an assignment. A successor BrickRunner (B) will get this address as an input target in the answer to it’s own assignment request. The B then opens a TCP/IP connection to A on this address.

Once the connection is established B will order A to deliver a batch of packets and then process incoming packets until that batch size is reached. And as soon as the number of unprocessed packets drops below the low queue limit a new batch of packets is requested.

Autoscaling

If the criterion for triggering autoscaling of a successor brick (details in Autoscaling.md) is met the BrickRunner sends a scaling request message to the ControlPeer. The scaling request contains the name of the successor brick. If autoscaling was successful, the new BrickRunner instances sends a Scaling Message containing the output server address as a new input target to its successor BrickRunners.

Messaging and MessageTypes

On each connection a four byte large integer (Network Byte Order) gets sent first, announcing the size of the following message. The message itself is then sent as a binary representation of a two-item UjoList. The first item always identifies the type of the message, the second item is actual content, another UjoContainer.

The following MessageTypes exist:

# BrickRunner-BrickRunner:

    Scaling = 6
    Packet = 20
    PacketRequest = 21

The Containerized Flow Engine

The services composing a titan Flow Engine deployment are containerized into docker images. They can be used for production or testing environments as well as for development.

Available image labels:

  • stable: Images built from the master branch

  • latest: Images built from the development branch

The Flow Engine on Docker Hub:

Control Peer Service Image

Images provided for running the services are based on Debian Buster (Slim) mainly for reasons of image size.

The Control Peer image is built in a two step Docker build process. The first stepp is equipped with a gcc build environment that enables building all necessary Python packages that are required for running the flow engine.

  • gcc

Further the ZeroMQ Python site-package is installed.

The second Docker build step then packages the Python titan Flow Engine using the build artifacts from the first step.

The titan Control Peer requires a Python 3.7 runtime and the Python site package: ujotypes-py. It is installed as titanfe Python site-package.

Configuration files for the processe are expected to be at /etc/titanfe/flowengine/config.yaml and should be mounted from a folder on the host to the running container:

docker run -v /titanfe/flowengine:/etc/titanfe/flowengine industrialdevops/flow-engine

When using the Flow Engine with docker-compose the configuration file should be mounted from a location on the host. Additionally the file controling how the Control Peer and Brickrunner log can me mounted to replace the config file that comes with the titanfe Python site-package

titan-controlpeer:
  image: industrialdevops/flow-engine:stable
  restart: always
  volumes:
    - ./etc/titanfe/flowengine:/etc/titanfe/flowengine
    - ./etc/titanfe/flowengine-log/log_config.yml:/usr/local/lib/python3.7/site-packages/titanfe/log_config.yml
  environment:
    SVC_CONFIG: /etc/titanfe/flowengine/config.yaml

titanfe package

Subpackages
titanfe.apps package
Subpackages
titanfe.apps.brick_runner package
Subpackages
titanfe.apps.brick_runner.output package
Submodules
titanfe.apps.brick_runner.output.consumer module

a Consumer represents a connection made to the output server

class titanfe.apps.brick_runner.output.consumer.Consumer(port_name, brick_instance_id, connection)[source]

Bases: object

wrap incoming connections and handle sending packets

async close_connection()[source]
async is_receptive()[source]
async listen()[source]

wait for packet requests, set disconnected-Event if the connection gets closed

async send(packet)[source]

send a packet

titanfe.apps.brick_runner.output.group module
titanfe.apps.brick_runner.output.output module
titanfe.apps.brick_runner.output.port module
Submodules
titanfe.apps.brick_runner.adapter module

The BrickAdapter get’s passed into the brick’s module on execution

class titanfe.apps.brick_runner.adapter.AdapterMeta(brick: titanfe.apps.brick_runner.adapter.MetaData, flow: titanfe.apps.brick_runner.adapter.MetaData)[source]

Bases: object

flow/brick meta data to be made available for access inside a brick

brick: titanfe.apps.brick_runner.adapter.MetaData
flow: titanfe.apps.brick_runner.adapter.MetaData
class titanfe.apps.brick_runner.adapter.BrickAdapter(meta_data: titanfe.apps.brick_runner.adapter.AdapterMeta, result_put_callback, log, default_port)[source]

Bases: object

The BrickAdapter get’s passed into the brick’s module on execution

Parameters
  • result_put_callback (Callable) – callback to output a result to the runner

  • log (logging.Logger) – the logger instance of the parent runner

Attributes
log: a logging.logger instance to be used from within the brick’s module

if one wants to have something in the general application log.

decrypt_parameter(parameter)[source]

Decrypt a secret parameter using AES GCM

Parameters

parameter (String) – hex encoded encryped parameter

emit_new_packet(value, port=None)[source]

A new packet will be created from the given value and infused into the flow of data.

Note

This will create and output a new packet into the flow. To modify the payload of an already travelling packet, simply return a value from the brick processing method.

Parameters

value (Any) – Any value

class titanfe.apps.brick_runner.adapter.MetaData(uid, name)

Bases: tuple

property name

Alias for field number 1

property uid

Alias for field number 0

class titanfe.apps.brick_runner.adapter.State(flow_id, brick_uid, log)[source]

Bases: object

State allows bricks to persist/get state and the brick runner to reset it during teardown

get()[source]

Getting brick state

reset()[source]

Delete brick state

set(value)[source]

Store brick state :param value: state to be stored :type value: Any

titanfe.apps.brick_runner.brick module

A Brick within the brick runner

class titanfe.apps.brick_runner.brick.Brick(instance_definition: titanfe.apps.control_peer.brick.BrickInstanceDefinition, metric_emitter, logger)[source]

Bases: object

Wraps all the Brick-Handling

create_instance()[source]

create an instance of the actual Brick

enqueue_result_as_packet(result, port=None, parent_packet=None)[source]

create a packet with an empty buffer if needed and add the bricks result to the packets payload

async execute_brick(packet)[source]

run the brick module for the given packet in a separate thread

property execution_time
get_results()[source]

async generator over the results from the brick

async process(packet)[source]
run_instance_processing(payload, port)[source]

do the actual execution of the brick module and return it’s result

terminate()[source]
class titanfe.apps.brick_runner.brick.PortMapping(rules, type)

Bases: tuple

property rules

Alias for field number 0

property type

Alias for field number 1

titanfe.apps.brick_runner.connection module

Connection objects and its methods: Buffer, Mapping..

class titanfe.apps.brick_runner.connection.BasicConstant(constant)[source]

Bases: titanfe.apps.brick_runner.connection.Constant

A constant of a basic type

to_ujo()[source]

convert the constants value to ujo

class titanfe.apps.brick_runner.connection.Buffer(ujoBuffer=None)[source]

Bases: collections.abc.MutableMapping

A connections buffer of memorized upstream values

classmethod from_dict(buffer_dict)[source]
update_from_result(result, buffer_description)[source]

update the buffer using the information given in the buffer_description

class titanfe.apps.brick_runner.connection.BufferDescription(description_dict)[source]

Bases: collections.abc.Mapping

A connections description of a buffer object

class titanfe.apps.brick_runner.connection.Constant(constant)[source]

Bases: abc.ABC

A constant

abstract to_ujo()[source]

convert the constants value to ujo

class titanfe.apps.brick_runner.connection.MappingRules(rules)[source]

Bases: object

A connections mapping rules

apply(buffer, source, target)[source]

“convert ujo types according to its mapping rules

class titanfe.apps.brick_runner.connection.ObjectConstant(constant)[source]

Bases: titanfe.apps.brick_runner.connection.Constant

A constant of type object

to_ujo()[source]

convert the constants value to ujo

class titanfe.apps.brick_runner.connection.RecordConstant(constant)[source]

Bases: titanfe.apps.brick_runner.connection.Constant

A constant of type record

to_ujo()[source]

convert the constants value to ujo

class titanfe.apps.brick_runner.connection.Rule(rule)[source]

Bases: object

A mapping rule

property is_buffer
property is_const
titanfe.apps.brick_runner.connection.ensure_ujo_key(key)[source]
titanfe.apps.brick_runner.connection.get_constant(constant)[source]

get a constant as a class based on the constants type

titanfe.apps.brick_runner.grid_manager module

GridManager communication

class titanfe.apps.brick_runner.grid_manager.GridManager(runner_uid, brick_uid)[source]

Bases: object

property address
async deregister_runner()[source]

deregister brick runner at grid manager

async register_runner(runner_address)[source]

register brick runner at grid manager

async request_scaling(consumer_uid)[source]

send brick scaling request

titanfe.apps.brick_runner.input module

The INPUT side of a brick (runner)

class titanfe.apps.brick_runner.input.Input(runner)[source]

Bases: object

The Input side of a brick runner requests new packets from the previous BrickRunners OutputServer until it’s QueueLimit is exceeded and again once the “low level” is reached. The Input will also emit queue metrics every 0.1 sec if there are packets in the queue.

Parameters
  • runner (BrickRunner) – instance of a parent brick runner

  • adress (NetworkAddress) – (host, port) of the source-BrickRunners OutputServer

add_source(source)[source]
add_sources(sources)[source]
async close()[source]

Stop the input

async get()[source]

awaitable to get the next available packet from the input queue

async get_input(address, port_name, target_port)[source]

Connect to and retrieve packets from the given address

handle_input_loss(address, port_name, task)[source]

if we loose a connection to some input source, we handle removing the appropriate task here. Any CancelledError will be ignored, all others Exceptions are unexpected and will be logged.

property is_empty
async put(packet)[source]
titanfe.apps.brick_runner.metrics module

Handle creation of metric data and streaming it to Kafka

class titanfe.apps.brick_runner.metrics.BrickMetrics(flow: str = 'FlowName?', brick: str = 'BrickName?', brick_type: str = 'BrickType?', brick_family: str = 'BrickFamily?', runner: str = 'RunnerUid?', host: str = 'build-17650203-project-510666-titanfe', timestamp: str = <factory>, content_type: str = 'titan-brick-metrics', execution_time: float = 0.0)[source]

Bases: titanfe.apps.brick_runner.metrics.MetricsBase

Metric data for brick executions

content_type: str = 'titan-brick-metrics'
execution_time: float = 0.0
class titanfe.apps.brick_runner.metrics.MetricEmitter(metrics_metadata, logger)[source]

Bases: object

The MetricEmitter encapsulates creation of metric data and sending them to a Kafka instance

Parameters
  • metrics_metadata (dict) – base meta data of metrics emitted

  • logger (logging.logger) – the parent’s logger instance

async classmethod create_from_brick_runner(runner)titanfe.apps.brick_runner.metrics.MetricEmitter[source]

Creates, starts and returns a MetricEmitter instance

async emit(metrics_dict)[source]
async emit_brick_metrics(execution_time)[source]
async emit_packet_metrics(packet, duration)[source]
async emit_queue_metrics(queue_name, queue_length)[source]
set_metadata_from_runner(runner)[source]

assigns flowname and brickname after brickrunner has gotten his assignment

async start()[source]

creates and starts the internal Kafka producer

async stop()[source]
class titanfe.apps.brick_runner.metrics.MetricsBase(flow: str = 'FlowName?', brick: str = 'BrickName?', brick_type: str = 'BrickType?', brick_family: str = 'BrickFamily?', runner: str = 'RunnerUid?', host: str = 'build-17650203-project-510666-titanfe', timestamp: str = <factory>)[source]

Bases: titanfe.utils.DictConvertable, abc.ABC

Information that every “metric” should contain

brick: str = 'BrickName?'
brick_family: str = 'BrickFamily?'
brick_type: str = 'BrickType?'
static extract_from_runner(runner)[source]

extract the basic information from a brick runner instance

flow: str = 'FlowName?'
host: str = 'build-17650203-project-510666-titanfe'
runner: str = 'RunnerUid?'
timestamp: str
class titanfe.apps.brick_runner.metrics.PacketMetricsAtBrick(flow: str = 'FlowName?', brick: str = 'BrickName?', brick_type: str = 'BrickType?', brick_family: str = 'BrickFamily?', runner: str = 'RunnerUid?', host: str = 'build-17650203-project-510666-titanfe', timestamp: str = <factory>, content_type: str = 'titan-packet-metrics', packet: str = 'PacketUid?', execution_time: float = 0.0, traveling_time: float = 0.0, time_in_input: float = 0.0, time_in_output: float = 0.0, time_on_wire: float = 0.0, at_outlet: bool = False)[source]

Bases: titanfe.apps.brick_runner.metrics.MetricsBase

Metric data for a packet being processed at a Brick

at_outlet: bool = False
content_type: str = 'titan-packet-metrics'
execution_time: float = 0.0
packet: str = 'PacketUid?'
time_in_input: float = 0.0
time_in_output: float = 0.0
time_on_wire: float = 0.0
traveling_time: float = 0.0
class titanfe.apps.brick_runner.metrics.QueueMetrics(flow: str = 'FlowName?', brick: str = 'BrickName?', brick_type: str = 'BrickType?', brick_family: str = 'BrickFamily?', runner: str = 'RunnerUid?', host: str = 'build-17650203-project-510666-titanfe', timestamp: str = <factory>, content_type: str = 'titan-queue-metrics', queue_name: str = 'QueueName?', queue_length: int = 0)[source]

Bases: titanfe.apps.brick_runner.metrics.MetricsBase

Metric data for Input/Output-queues

content_type: str = 'titan-queue-metrics'
queue_length: int = 0
queue_name: str = 'QueueName?'
class titanfe.apps.brick_runner.metrics.QueueWithMetrics(emitter, name, interval=0.1, maxsize=0)[source]

Bases: asyncio.queues.Queue

an ayncio.Queue that emits metrics (queue length)

async close()[source]
async emit_metrics(emitter, interval=0.1)[source]

automatically scheduled as task

async put(item)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

property unfinished_tasks
titanfe.apps.brick_runner.packet module

An information packet passed between Bricks

class titanfe.apps.brick_runner.packet.Packet(uid: str = <factory>, started: float = <factory>, port: str = '', payload: ujotypes.variants.base.UjoBase = UjoNone(None), buffer: titanfe.apps.brick_runner.connection.Buffer = <factory>, input_entry: float = 0.0, input_exit: float = 0.0, output_entry: float = 0.0, output_exit: float = 0.0)[source]

Bases: titanfe.utils.DictConvertable

Represents an information packet (IP) passing through a flow

as_message()[source]
buffer: titanfe.apps.brick_runner.connection.Buffer
input_entry: float = 0.0
input_exit: float = 0.0
output_entry: float = 0.0
output_exit: float = 0.0
payload: ujotypes.variants.base.UjoBase = UjoNone(None)
port: str = ''
property queue_times
started: float
property traveling_time
uid: str
update_input_entry()[source]
update_input_exit()[source]
update_output_entry()[source]
update_output_exit()[source]
titanfe.apps.brick_runner.runner module
titanfe.apps.control_peer package
Subpackages
titanfe.apps.control_peer.webapi package

RESTlike interface to control the control peer

Submodules
titanfe.apps.control_peer.webapi.app module

Provide a RESTlike interface to manage the ControlPeer remotely

class titanfe.apps.control_peer.webapi.app.HelloWorld(*, message: str = 'Hello, World!')[source]

Bases: pydantic.main.BaseModel

message: str
class titanfe.apps.control_peer.webapi.app.WebApi(control_peer)[source]

Bases: object

Provide a RESTlike interface to manage the ControlPeer remotely

Parameters

control_peer (ControlPeer) – an instance of the ControlPeer

Usage:

create an Instance of the WebAPI (glued together with FastAPI/Starlette and uvicorn) and use run to create an endlessly running asyncio task or use the serve coroutine to run it manually

property address
async serve()[source]

serve the api using uvicorn

async stop()[source]
titanfe.apps.control_peer.webapi.bricks module

Routes for Flow management

class titanfe.apps.control_peer.webapi.bricks.RequestBrickStart(*, brick: dict)[source]

Bases: pydantic.main.BaseModel

brick: dict
class titanfe.apps.control_peer.webapi.bricks.RequestInstallBricks(*, bricks: list)[source]

Bases: pydantic.main.BaseModel

bricks: list
titanfe.apps.control_peer.webapi.bricks.create_brick_router(control_peer)[source]

Setup the routing for flow management

Parameters

control_peer (ControlPeer) – an instance of the ControlPeer

Returns

router/routes to manage the control peer’s flows

Return type

APIRouter

titanfe.apps.control_peer.webapi.flows module

Routes for Flow management

titanfe.apps.control_peer.webapi.flows.create_flow_router(control_peer)[source]

Setup the routing for flow management

Parameters

control_peer (ControlPeer) – an instance of the ControlPeer

Returns

router/routes to manage the control peer’s flows

Return type

APIRouter

titanfe.apps.control_peer.webapi.state module

Routes for Flow management

titanfe.apps.control_peer.webapi.state.create_state_router(control_peer)[source]

Create a router for state

Parameters

control_peer (ControlPeer) – an instance of the ControlPeer

Returns

router/routes to manage the control peer’s flows

Return type

APIRouter

titanfe.apps.control_peer.webapi.state.get_state(control_peer) → List[Dict][source]

Create a router for state

Parameters

control_peer (ControlPeer) – an instance of the ControlPeer

Returns

list of brick information

Submodules
titanfe.apps.control_peer.brick module

A Brick

class titanfe.apps.control_peer.brick.BrickBaseDefinition(uid, name=None, family=None, logger=None, last_modified=None)[source]

Bases: object

The general definition of a brick contains it’s name and id, as well as the module itself and possibly a set of default parameters for that module read from the annexed config.yaml

create_virtual_env()[source]

create a virtual enviroment for the brick

guess_module_path()[source]

The module is expected to be found in the configured brick_folder extended with the brick-ID and should be either a folder or python file having the same name as the brick.

async install_or_update(update=True, force_update=False)[source]

Get a brick from the package manager and install it

property venv_path
class titanfe.apps.control_peer.brick.BrickInstanceDefinition(uid, name, ports: titanfe.apps.control_peer.brick.Ports, flow: titanfe.apps.control_peer.brick.Flow, base: titanfe.apps.control_peer.brick.BrickBaseDefinition, processing_parameters: dict, runtime_parameters: titanfe.apps.control_peer.brick.RuntimeParameters, connections: titanfe.apps.control_peer.brick.Connections)[source]

Bases: object

The Brick Instance Definition is a fully configured brick in a flow context. It should have it’s own name and uid within the flow, precise parameters and possibly connections to other bricks.

classmethod from_gridmanager(brick_description)[source]

Add brick configuration using default and flow-specific parameters if available

class titanfe.apps.control_peer.brick.Connections(input, output)

Bases: tuple

property input

Alias for field number 0

property output

Alias for field number 1

class titanfe.apps.control_peer.brick.EnvBuilder(logger, *args, **kwargs)[source]

Bases: venv.EnvBuilder

Builder for the virtual enviroments for each brick

install_pip(context)[source]

install pip manually

install_requirements(context)[source]

install requirements in virtual environment

log_stdout(pipe)[source]
post_setup(context)[source]

install platforma and brick requirements during setup of the virtual environment

class titanfe.apps.control_peer.brick.Flow(uid, name, schema)

Bases: tuple

property name

Alias for field number 1

property schema

Alias for field number 2

property uid

Alias for field number 0

class titanfe.apps.control_peer.brick.Ports(input, output)

Bases: tuple

property input

Alias for field number 0

property output

Alias for field number 1

class titanfe.apps.control_peer.brick.RuntimeParameters(autoscale_max_instances, autoscale_queue_level, exit_after_idle_seconds)

Bases: tuple

property autoscale_max_instances

Alias for field number 0

property autoscale_queue_level

Alias for field number 1

property exit_after_idle_seconds

Alias for field number 2

titanfe.apps.control_peer.brick.get_venv_exe(directory)[source]
titanfe.apps.control_peer.control_peer module

the actual control peer

class titanfe.apps.control_peer.control_peer.ControlPeer[source]

Bases: object

The control peer application will start runners as required for the flows/bricks as described in the given config file. Once the runners have registered themselves, they will get according assignments.

classmethod create()[source]

“Create control peer

async static install_bricks()[source]

get brick list from PackageManager and install bricks if necessary

install_signal_handlers()[source]
remove_runner(runner)[source]
async run()[source]

run the application

schedule_shutdown(sig, _)[source]
async shutdown()[source]

shut down the controlpeer

async start_new_runner(brick)[source]

update the configuration and start the flow

async stop_runners(flow_uid=None)[source]

stop all runners or all runners for the given flow.uid

titanfe.apps.control_peer.runner module

Encapsulate brick runner related things

class titanfe.apps.control_peer.runner.BrickRunner(brick_instance, on_termination_cb=<function BrickRunner.<lambda>>)[source]

Bases: object

The BrickRunner can be used to start brick runner processes and hold corresponding data

Parameters

controlpeer_address (NetworkAddress) – the address on which the control peer is listening

async check_termination()[source]

do cyclic checks for an exitcode of the brick runner’s process to detect it’s termination

start()[source]

Start a new brick runner process

async stop()[source]

request and await runner termination

titanfe.apps.control_peer.services module

Install a Brick

class titanfe.apps.control_peer.services.ControlPeerServiceRegistration[source]

Bases: abc.ABC

BaseClass to handle control peer registration of various services

abstract property control_peer_endpoint
async deregister(own_api_address)[source]

Cancel registration at target_address

async register(own_api_address)[source]

Inquire registration at target_address

class titanfe.apps.control_peer.services.GridManager[source]

Bases: titanfe.apps.control_peer.services.ControlPeerServiceRegistration

handle all requests to the grid manager

property address
property control_peer_endpoint
class titanfe.apps.control_peer.services.PackageManager[source]

Bases: titanfe.apps.control_peer.services.ControlPeerServiceRegistration

handle all requests to the package manager

property address
property brick_code_endpoint
property brick_endpoint
property control_peer_endpoint
async static get(endpoint, context)[source]

get endpoint

async get_bricks()[source]

get bricks

async get_source_files(brick_id)[source]

get the source files archive from the package manager

exception titanfe.apps.control_peer.services.ServiceError[source]

Bases: Exception

titanfe.apps.kafka_to_elastic package
titanfe.apps.kafka_viewer package
titanfe.testing package
Submodules
titanfe.testing.testrunner module
Submodules
titanfe.brick module

Abstract base classes for building Bricks

class titanfe.brick.BrickBase(adapter: titanfe.apps.brick_runner.adapter.BrickAdapter, parameters: Optional[Dict] = None)[source]

Bases: object

An abstract base class for building Bricks

abstract process(input: Type[ujotypes.variants.base.UjoBase], port: str)[source]

Do the input processing.

To modify the payload of the current packet simply return a new value. Use the adapter’s emit_new_packet to create a data packet and insert it into the flow.

Parameters

input (Type[UjoBase]) – the input data to be processed

Returns

the new payload for current data packet traveling in the flow. When returning None the current packet get’s dropped.

Return type

Optional[UjoBase]

setup()[source]

Upon loading the Brick in the BrickRunner the setup-method is run once and can be used to e.g. open connections that will be held persistent.

teardown()[source]

When unloading the Brick from the BrickRunner the teardown-method is run once, implement it to e.g. close connections opened during setup

exception titanfe.brick.ConfigurationError[source]

Bases: Exception

class titanfe.brick.InletBrickBase(adapter: titanfe.apps.brick_runner.adapter.BrickAdapter, parameters: Optional[Dict] = None)[source]

Bases: titanfe.brick.BrickBase

An abstract base class for building bricks that will run a continous process

abstract stop_processing()[source]

The BrickRunner needs a way to properly end continuously running bricks. It will call this method upon receiving a termination request and expect the processing to be aborted/terminated.

titanfe.config module

the global configuration

class titanfe.config.Configuration[source]

Bases: object

Current Configuration

option_aliases = {'IP': 'IP', 'brick_folder': 'BrickFolder', 'endpoint_provider': 'EndpointProvider', 'flowmanager_address': 'FlowManager', 'gridmanager_address': 'GridManager', 'kafka_bootstrap_servers': 'Kafka', 'kafka_log_topic': 'KafkaLogTopic', 'packagemanager_address': 'PackageManager', 'reposervice_address': 'RepositoryService', 'secret_key': 'SecretKey'}
update(config: Union[Configuration, dict])[source]

update config from dict or other config

update_from_yaml(file_path)[source]

Read and update the configuration from a yaml file

class titanfe.config.NotFound[source]

Bases: object

titanfe.connection module

Encapsulate asyncio connections by wrapping them into a Connection

class titanfe.connection.Connection(reader, writer, log=None, encoding='UJO')[source]

Bases: object

Wrap an asyncio StreamReader/Writer combination into a connection object.

Parameters
  • reader (asyncio.StreamReader) – the stream reader

  • writer (asyncio.StreamWriter) – the stream writer

  • log (logging.logger) – a parent logger

  • encoding – “PICKLE” or “UJO”

async close()[source]

close the connection by closing it’s reader and writer

async classmethod open(address: titanfe.connection.NetworkAddress, log: Optional[logging.Logger] = None)titanfe.connection.Connection[source]

open an asyncio connection to the given address (host, port)

async receive()[source]

wait until a message comes through and return it’s content after decoding

Returns

a message or None if the connection was closed remotely

Return type

Message

async send(message)[source]

encode and send the content as a message

class titanfe.connection.NetworkAddress(host, port)

Bases: tuple

property host

Alias for field number 0

property port

Alias for field number 1

titanfe.connection.decode_ujo_message(ujo_bytes)[source]

Decode ujo bytes into a corresponding python object, but keep an existing “Payload” as Ujo.

titanfe.constants module

a collection of constants

titanfe.get-pip module
titanfe.log module

setup the logging with a custom metric-level

class titanfe.log.FlowContext(flowuid: str = '', flowname: str = '', brickuid: str = '', brickname: str = '')[source]

Bases: object

The Flow Context

asdict()[source]
brickname: str = ''
brickuid: str = ''
flowname: str = ''
flowuid: str = ''
classmethod from_brick(brick: titanfe.apps.control_peer.brick.Brick)[source]
classmethod from_flow(flow: titanfe.apps.control_peer.flow.Flow)[source]
class titanfe.log.KafkaLogHandler(bootstrap_server, topic)[source]

Bases: logging.Handler

Stream LogRecords to Kafka

Parameters
  • bootstrap_server (str) – ‘Host:Port’ of a kafka bootstrap server

  • topic (str) – the kafka topic to produce into

close()[source]

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

emit(record)[source]

emits the record

flush()[source]

Ensure all logging output has been flushed.

This version does nothing and is intended to be implemented by subclasses.

class titanfe.log.TitanLogAdapter(logger, extra)[source]

Bases: logging.LoggerAdapter

The Log Adapter wraps a logger and adds some context to each log record

property context
getChild(suffix)[source]
metric(message, *args, **kwargs)
class titanfe.log.TitanLogRecord(name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, **kwargs)[source]

Bases: logging.LogRecord

A log record - Titan style

brickname = ''
brickuid = ''
flowname = ''
flowuid = ''
hostname = 'build-17650203-project-510666-titanfe'
servicename = ''
class titanfe.log.TitanPlatformLogger(name, context: Optional[titanfe.log.FlowContext] = None)[source]

Bases: logging.Logger

to write contextual logging information use e.g. log.with_context.info

getChild(suffix)titanfe.log.TitanPlatformLogger[source]

Get a logger which is a descendant to this one.

This is a convenience method, such that

logging.getLogger(‘abc’).getChild(‘def.ghi’)

is the same as

logging.getLogger(‘abc.def.ghi’)

It’s useful, for example, when the parent logger is named using __name__ rather than a literal string.

property with_context
class titanfe.log.UjoBinFormatter(fmt=None, datefmt=None, style='%')[source]

Bases: logging.Formatter

Format log records as an UjoBinary

format(record)[source]

Format a log record as an UjoBinary

Parameters

record (logging.Record) – the log record

Returns

binary UjoMap

Return type

bytes

titanfe.log.add_logging_level(level, level_name, method_name=None)[source]

add a level to the logging module

Parameters
  • level (int) – level number

  • level_name – name of the level

  • method_name – name of the method that gets attached to logging

titanfe.log.flush_kafka_log_handler()[source]

“Flush messages sent to KafkaLogHandler and suppress warnings from kafka –> called during shutdown of brick runner

titanfe.log.getLogger(name: str, context: Optional[titanfe.log.FlowContext] = None) → logging.Logger[source]

Get a Logger :param name: the logger name :param context: a flow context (if available)

Returns

a Logger

Return type

logging.Logger

titanfe.log.initialize(service='')[source]

initialize the titan logging module, e.g. set up a KafkaLogHandler

Parameters

service – name of the current service

titanfe.messages module

Messages within titanfe

class titanfe.messages.BrickDescription(flowuid, flowname, name, brick_type, brick_family, parameters, uid, path_to_module, is_inlet, exit_after_idle_seconds, default_port)

Bases: tuple

property brick_family

Alias for field number 4

property brick_type

Alias for field number 3

property default_port

Alias for field number 10

property exit_after_idle_seconds

Alias for field number 9

property flowname

Alias for field number 1

property flowuid

Alias for field number 0

property is_inlet

Alias for field number 8

property name

Alias for field number 2

property parameters

Alias for field number 5

property path_to_module

Alias for field number 7

property uid

Alias for field number 6

class titanfe.messages.InputSource(name, port, address)

Bases: tuple

property address

Alias for field number 2

property name

Alias for field number 0

property port

Alias for field number 1

class titanfe.messages.Message(type, content)

Bases: tuple

property content

Alias for field number 1

property type

Alias for field number 0

class titanfe.messages.MessageType(value)[source]

Bases: enum.IntEnum

Types of Messages used within titanfe

ConsumerRegistration = 22
Packet = 20
PacketRequest = 21
class titanfe.messages.OutputTarget(name, port, autoscale_queue_level)

Bases: tuple

property autoscale_queue_level

Alias for field number 2

property name

Alias for field number 0

property port

Alias for field number 1

titanfe.repository module

Repository can be used to connect to the titan repository service

class titanfe.repository.RepositoryService(logger, reposervice_address=None)[source]

Bases: object

Repository service implements a connection to the titan repository service

Parameters
  • brick_name (string) – the name of the brick instance

  • logger – the logger instance of the parent

  • repo_service (string) – optional, address of the repository service

delete(collection, document)[source]

delete data using the repository service

get(collection, document, find)[source]

get data using the repository service

store(collection, document, value)[source]

store data using the repository service

class titanfe.repository.Request(address: str, method: Callable, content: titanfe.repository.RequestData, log: logging.LoggerAdapter, response: Any = <factory>)[source]

Bases: object

Request object

Parameters
  • address – str Target address

  • method – Callable requests method(get, put, delete..)

  • logger – the logger instance of the parent

  • content – RequestData request content to be sent

address: str
content: titanfe.repository.RequestData
log: logging.LoggerAdapter
method: Callable
response: Any
send()[source]

send request

class titanfe.repository.RequestData(collection: str, document: str, value: Any, find: Optional[dict] = <factory>, database: str = 'BrickRunner')[source]

Bases: object

Request data object sent to the Repository service

collection: str
database: str = 'BrickRunner'
document: str
find: Optional[dict]
classmethod from_dict(kvs: Optional[Union[dict, list, str, int, float, bool]], *, infer_missing=False) → A
classmethod from_json(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) → A
classmethod schema(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) → dataclasses_json.mm.SchemaF[A]
to_dict(encode_json=False) → Dict[str, Optional[Union[dict, list, str, int, float, bool]]]
to_json(*, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) → str
value: Any
titanfe.ujo_helper module

simplify ujo conversions (eventually we’d want to use an UjoScheme instead though)

titanfe.ujo_helper.get_ujo_value(value, type_name)[source]

get value in ujo based on given type_name and value

titanfe.ujo_helper.py_to_ujo_bytes(py_obj)[source]
titanfe.ujo_helper.python_to_ujo(py_obj)[source]

convert python objects recursively into corresponding UJO

int, float, etc. will be converted to Int64, Float64, etc. If you actually want e.g. an Int8 do a manual conversion for that specific item beforehand.

titanfe.ujo_helper.ujo_bytes_to_py(bytes_obj)[source]
titanfe.utils module

a collection of useful helper functions

class titanfe.utils.DictConvertable[source]

Bases: abc.ABC

Mixin to make a dataclass convert from/into a dictionary

dicts_to_dataclasses()[source]

Convert all fields of type dataclass into an dataclass instance of the fields specified dataclass if the current value is of type dict.

classmethod from_dict(_dict)[source]
to_dict()[source]
class titanfe.utils.Flag(*, loop=None)[source]

Bases: asyncio.locks.Event

Extends the asyncio.Event to be a tad more convenient

class titanfe.utils.Timer[source]

Bases: object

a simple Timer using the performance counters from the “time”-module

>>> with Timer() as t:
>>>    # do_something()
>>>    print(t.elapsed)
>>>    # do_something_else()
>>>    print(t.elapsed)
>>>    print(t.elapsed_total)
property elapsed
property elapsed_total
property perf_counter_since_last
property perf_counter_total
property process_time_since_last
property process_time_total
update_last_access()[source]
async titanfe.utils.cancel_tasks(tasks: Sequence[_asyncio.Task], wait_cancelled=True)[source]

Cancel all tasks in sequence

Parameters
  • tasks (Sequence[asyncio.Task]) – tasks to cancel

  • wait_cancelled (bool) – True (default): wait until all tasks returned / False: well, don’t.

titanfe.utils.create_uid(prefix='')[source]
titanfe.utils.first(iterable, default=None, key=None)[source]

Return first element of iterable that evaluates to True, else return None or optional default. Similar to one().

>>> first([0, False, None, [], (), 42])
42
>>> first([0, False, None, [], ()]) is None
True
>>> first([0, False, None, [], ()], default='ohai')
'ohai'
>>> import re
>>> m = first(re.match(regex, 'abc') for regex in ['b.*', 'a(.*)'])
>>> m.group(1)
'bc'

The optional key argument specifies a one-argument predicate function like that used for filter(). The key argument, if supplied, should be in keyword form. For example, finding the first even number in an iterable:

>>> first([1, 1, 3, 4, 5], key=lambda x: x % 2 == 0)
4

By Hynek Schlawack, author of the original standalone module.

titanfe.utils.flatten(iterable)[source]

flatten yields all the elements from iterable while collapsing any nested iterables.

>>> nested = [[1, 2], [[3], [4, 5]]]
>>> list(flatten(nested))
[1, 2, 3, 4, 5]
titanfe.utils.generate_key(secret_key, salt)[source]
titanfe.utils.get_ip_address() → str[source]

try to get the public IPv4 address

titanfe.utils.get_module(location: Union[str, pathlib.Path]) → module[source]

Get the Brick content module

If the Brick content module cannot be found, None is returned.

Returns

The loaded Brick content module

Return type

(module or None)

titanfe.utils.iso_utc_time_string()[source]
titanfe.utils.ns_to_ms(ns)[source]
titanfe.utils.pairwise(iterable)[source]
titanfe.utils.time_delta_in_ms(time_ns)[source]

Index and Tables