Titan Flow Engine¶
The Titan Flow Engine provides the backend services for running data flows on a node.
The core flow engine is composed of two services:
Control Peer: One peer per node that takes care of managing Brick Runners on that node.
Brick Runner: Virtual Machine for executing the Bricks that the Control Peer has started on it’s node. The Brick Runner is generic and can process all types of Bricks. Each runner will only execute one type of Brick at a time.
Additionally Version 0.1.3 and onward require titan services (sources) to run a full deployment of the titan platform.
Choose How to Install¶
Titan Flow Engine requires Python 3.7 to be installed.
Depending on whether you want to contribute to the Titan Flow Engine source code you want to choose one of the following options to install:
Installing via PyPi¶
The flow engine is available on the Python Package Index (PyPi). It is available for Linux and Windows x64 systems. To install it without having to build it yourself open a command line and run:
pip install titanfe
Building the Titan Flow Engine (Advanced Install)¶
Required packages for building or developing the project can be installed
via the requirements_dev.txt
in the project’s root folder:
pip install -r requirements_dev.txt
To build and install the Flow Engine open a command line and run:
python setup.py build
pip install .
For those who also want to work on the documentation and build it locally,
please use the requirements_doc.txt
file to install the necessary python
packages.
pip install -r requirements_doc.txt
Running the flow engine¶
The flow engine is being started by starting the Control Peer. The Control Peer takes care of starting Brick Runners as needed.
Parameters for starting the Control Peer on the command line are:
Name |
Purpose |
---|---|
brick_folder |
Folder to store installed bricks in (default: ~/titanfe/bricks) |
config_file |
Path to ControlPeer configuration file |
The ControlPeers configuration file contains the following parameters:
Name |
Type |
Description |
---|---|---|
Kafka |
string |
Network address of Kafka instance for metrics and logging |
hostname:port |
(default: localhost:9092) |
|
GridManager |
string |
Network address of the GridManager |
hostname:port |
(default: localhost:8080) |
|
FlowManager |
string |
Network address of the FlowManager |
hostname:port |
(default: localhost:9002) |
|
EndpointProvider |
string |
Network address of the EndpointProvider |
hostname:port |
(default: “tcp://127.0.0.1:9021”) |
|
SecretKey |
string |
secret key for brick parameter decryption |
(default: None), alternatively use TITAN_SECRET_KEY environment variable |
If no secret key is given, the ControlPeer will stop immediately. The secret key needs to be identical to the one used in the FlowManager for encrypting parameters.
To try out the flow engine first download and start the services GridManager, Repository service, User service, PackageManager, and FlowManager. These services are needed for the flow engine to receive flows to process.
Adjust the ControlPeers configuration file (titanfe/apps/control_peer/config.yaml) or create a new one. Then, open a command line and run.
python -m titanfe.apps.control_peer -brick_folder ~/titanfe/bricks -config_file path_to_config_file/config.yaml
Or run it from the root directory of the flow engine by providing the path to the example:
python -m titanfe.apps.control_peer -brick_folder ~/titanfe/bricks -config_file titanfe/apps/control_peer/config.yaml
Code Quality¶
Code quality within the project is checked using pylint and flake8.
pylint¶
Linting is performed with pylint. To define the
intended checks .pylintrc
is used to configure linting for this project.
Running pylint for the python code in this project the following commands are used:
pylint --rcfile=.pylintrc titanfe
Linting the tests is done running the command:
pylint --rcfile=.pylintrc --disable=duplicate-code ./test
flake8¶
To make sure the PEP8 standard is applied to the code flake8 can be added to the static tests.
For this project we exclude various errors, warnings and notifications because they do not make sense at this time. This may change while refactoring is considered.
You can run flake 8 with:
flake8
It finds all the python files in this project.
The configuration for this project is read from .flake8
in the project’s
root directory.
How to configure the Logging for titanfe¶
The titan FlowEngine utilizes Python’s builtin Logging
module.
It can be configured via a configuration file, which is located in ./titanfe/logutils/log_config.yml
The following will provide a brief overview, a more detailed explanation can be found in the Python Documentation:
Loggers¶
The loggers section contains two Loggers, that can be configured separately:
titanfe
titanfe.bricks
The “titanfe” logger will capture any titanfe internal log messages, whereas the “titanfe.bricks” logger will capture any logging from within a brick.
For each logger you can set a log level and one or more specific handlers:
loggers:
titanfe:
level: INFO
handlers: [console]
The above would capture all logging on info level and above (warning) and output it to the console.
Furthermore, there is a global “root”-Logger, that captures LogRecords from all other loggers
unless they are explicitly set to propagate: False
.
If enabled the root logger allows you to capture logs from Python internals like the debug
output of the asyncio
package.
To enable the root logger set:
root:
level: DEBUG
handlers:
- console
Handlers¶
The handlers section allows specifying different handlers. Note: An instance of each configured handler gets created on initialization of the Python logging module, regardless of if it gets assigned to any logger or not.
The handlers section allows to specify different handlers. Note: An instance for each configured handler will be created, regardless if it is assigned to any logger or not.
A handler could for example output to the console, write to a file or stream the log records to a remote system. See logging.handlers for the builtin options.
Each handler has the following options:
handlers:
console:
class: logging.StreamHandler # the python class to instantiate
level: INFO # minimum log level for this handler
formatter: default # a formatter
filters: [hostname] # additional filters for the formatter to use
stream: ext://sys.stdout # parameters for the class to instantiate, will be passed in as keywords
Formatter¶
A formatter is used to specify how the log message is to be build up.
Formatter:
default:
format: "%(asctime)s %(hostname)s (%(process)5d|%(thread)-5d) %(name)s %(levelname)-8s %(message)s"
above renders to 2019-09-25 14:06:29,961 COMPUTER (15032|3448 ) titanfe.bricks.generator INFO Finished generating
see Logrecord Attributes for the available options.
Communication within the Titan Flow Engine¶
Between ControlPeer and Grid Manager¶
The Grid Manager provides a REST API with the endpoint address/gridmanager/controlpeers. During its setup, the ControlPeer registers at address/gridmanager/controlpeers by posting “address:port” of its own REST interface for later communication with the Grid Manager. When a flow run state is to be changed, the Grid Manager sends a request to the ControlPeers Rest API to the endpoint /api/v1/flows/string:name.
Between ControlPeer and Package Manager¶
The Package Manager provides a REST API with the endpoint address/packagemanager/controlpeers. During its setup, the ControlPeer registers at address/packagemanager/controlpeers by posting “address:port” of its own REST interface for later communication with the Package Manager. After registering it requests the list of bricks at address/packagemanager/bricks . For installation of bricks, the Package Manager provides a REST API to download brick code zip archives. The ControlPeer fetches code of not yet installed bricks from the endpoint address/packagemanager/bricks/string:BrickId> and unpacks it locally. In turn, when a brick package is installed, the Package Manager sends the list of installed bricks to the ControlPeers Rest API to the endpoint /api/v1/bricks/install. During shutdown, the ControlPeer deregisters at the PackageManager by sending a delete request to address/packagemanager/controlpeers.
Between ControlPeer and Flow Manager¶
The Flow Manager provides a REST api with the endpoint address/flows/string:name/config In case the Grid Manager starts a flow, the ControlPeer requests the flow configuration from this endpoint using the flow name as provided by the Grid Manager.
Between BrickRunners¶
BrickRunners connect to each other on their Input/Output: A BrickRunner (A) creates a server for it’s Output. This server address is then passed onto the ControlPeer when requesting an assignment. A successor BrickRunner (B) will get this address as an input target in the answer to it’s own assignment request. The B then opens a TCP/IP connection to A on this address.
Once the connection is established B will order A to deliver a batch of packets and then process incoming packets until that batch size is reached. And as soon as the number of unprocessed packets drops below the low queue limit a new batch of packets is requested.
Autoscaling¶
If the criterion for triggering autoscaling of a successor brick (details in Autoscaling.md) is met the BrickRunner sends a scaling request message to the ControlPeer. The scaling request contains the name of the successor brick. If autoscaling was successful, the new BrickRunner instances sends a Scaling Message containing the output server address as a new input target to its successor BrickRunners.
Messaging and MessageTypes¶
On each connection a four byte large integer (Network Byte Order) gets sent first, announcing the size of the following message. The message itself is then sent as a binary representation of a two-item UjoList. The first item always identifies the type of the message, the second item is actual content, another UjoContainer.
The following MessageTypes exist:
# BrickRunner-BrickRunner:
Scaling = 6
Packet = 20
PacketRequest = 21
The Containerized Flow Engine¶
The services composing a titan Flow Engine deployment are containerized into docker images. They can be used for production or testing environments as well as for development.
Available image labels:
stable: Images built from the master branch
latest: Images built from the development branch
The Flow Engine on Docker Hub:
Control Peer Service Image¶
Images provided for running the services are based on Debian Buster (Slim) mainly for reasons of image size.
The Control Peer image is built in a two step Docker build process. The first stepp is equipped with a gcc build environment that enables building all necessary Python packages that are required for running the flow engine.
gcc
Further the ZeroMQ Python site-package is installed.
The second Docker build step then packages the Python titan Flow Engine using the build artifacts from the first step.
The titan Control Peer requires a Python 3.7 runtime and the Python site package: ujotypes-py. It is installed as titanfe Python site-package.
Configuration files for the processe are expected to be at
/etc/titanfe/flowengine/config.yaml
and should be mounted from a folder on
the host to the running container:
docker run -v /titanfe/flowengine:/etc/titanfe/flowengine industrialdevops/flow-engine
When using the Flow Engine with docker-compose
the configuration file should
be mounted from a location on the host. Additionally the file controling how the
Control Peer and Brickrunner log can me mounted to replace the config file that
comes with the titanfe Python site-package
titan-controlpeer:
image: industrialdevops/flow-engine:stable
restart: always
volumes:
- ./etc/titanfe/flowengine:/etc/titanfe/flowengine
- ./etc/titanfe/flowengine-log/log_config.yml:/usr/local/lib/python3.7/site-packages/titanfe/log_config.yml
environment:
SVC_CONFIG: /etc/titanfe/flowengine/config.yaml
titanfe package¶
Subpackages¶
titanfe.apps package¶
a Consumer represents a connection made to the output server
The BrickAdapter get’s passed into the brick’s module on execution
-
class
titanfe.apps.brick_runner.adapter.
AdapterMeta
(brick: titanfe.apps.brick_runner.adapter.MetaData, flow: titanfe.apps.brick_runner.adapter.MetaData)[source]¶ Bases:
object
flow/brick meta data to be made available for access inside a brick
-
class
titanfe.apps.brick_runner.adapter.
BrickAdapter
(meta_data: titanfe.apps.brick_runner.adapter.AdapterMeta, result_put_callback, log, default_port)[source]¶ Bases:
object
The BrickAdapter get’s passed into the brick’s module on execution
- Parameters
result_put_callback (Callable) – callback to output a result to the runner
log (logging.Logger) – the logger instance of the parent runner
- Attributes
- log: a logging.logger instance to be used from within the brick’s module
if one wants to have something in the general application log.
-
decrypt_parameter
(parameter)[source]¶ Decrypt a secret parameter using AES GCM
- Parameters
parameter (String) – hex encoded encryped parameter
-
emit_new_packet
(value, port=None)[source]¶ A new packet will be created from the given value and infused into the flow of data.
Note
This will create and output a new packet into the flow. To modify the payload of an already travelling packet, simply return a value from the brick processing method.
- Parameters
value (Any) – Any value
-
class
titanfe.apps.brick_runner.adapter.
MetaData
(uid, name)¶ Bases:
tuple
-
property
name
¶ Alias for field number 1
-
property
uid
¶ Alias for field number 0
-
property
A Brick within the brick runner
-
class
titanfe.apps.brick_runner.brick.
Brick
(instance_definition: titanfe.apps.control_peer.brick.BrickInstanceDefinition, metric_emitter, logger)[source]¶ Bases:
object
Wraps all the Brick-Handling
-
enqueue_result_as_packet
(result, port=None, parent_packet=None)[source]¶ create a packet with an empty buffer if needed and add the bricks result to the packets payload
-
property
execution_time
¶
-
Connection objects and its methods: Buffer, Mapping..
-
class
titanfe.apps.brick_runner.connection.
BasicConstant
(constant)[source]¶ Bases:
titanfe.apps.brick_runner.connection.Constant
A constant of a basic type
-
class
titanfe.apps.brick_runner.connection.
Buffer
(ujoBuffer=None)[source]¶ Bases:
collections.abc.MutableMapping
A connections buffer of memorized upstream values
-
class
titanfe.apps.brick_runner.connection.
BufferDescription
(description_dict)[source]¶ Bases:
collections.abc.Mapping
A connections description of a buffer object
-
class
titanfe.apps.brick_runner.connection.
MappingRules
(rules)[source]¶ Bases:
object
A connections mapping rules
-
class
titanfe.apps.brick_runner.connection.
ObjectConstant
(constant)[source]¶ Bases:
titanfe.apps.brick_runner.connection.Constant
A constant of type object
-
class
titanfe.apps.brick_runner.connection.
RecordConstant
(constant)[source]¶ Bases:
titanfe.apps.brick_runner.connection.Constant
A constant of type record
GridManager communication
The INPUT side of a brick (runner)
-
class
titanfe.apps.brick_runner.input.
Input
(runner)[source]¶ Bases:
object
The Input side of a brick runner requests new packets from the previous BrickRunners OutputServer until it’s QueueLimit is exceeded and again once the “low level” is reached. The Input will also emit queue metrics every 0.1 sec if there are packets in the queue.
- Parameters
runner (BrickRunner) – instance of a parent brick runner
adress (NetworkAddress) – (host, port) of the source-BrickRunners OutputServer
-
async
get_input
(address, port_name, target_port)[source]¶ Connect to and retrieve packets from the given address
-
handle_input_loss
(address, port_name, task)[source]¶ if we loose a connection to some input source, we handle removing the appropriate task here. Any CancelledError will be ignored, all others Exceptions are unexpected and will be logged.
-
property
is_empty
¶
Handle creation of metric data and streaming it to Kafka
-
class
titanfe.apps.brick_runner.metrics.
BrickMetrics
(flow: str = 'FlowName?', brick: str = 'BrickName?', brick_type: str = 'BrickType?', brick_family: str = 'BrickFamily?', runner: str = 'RunnerUid?', host: str = 'build-17650203-project-510666-titanfe', timestamp: str = <factory>, content_type: str = 'titan-brick-metrics', execution_time: float = 0.0)[source]¶ Bases:
titanfe.apps.brick_runner.metrics.MetricsBase
Metric data for brick executions
-
content_type
: str = 'titan-brick-metrics'¶
-
execution_time
: float = 0.0¶
-
-
class
titanfe.apps.brick_runner.metrics.
MetricEmitter
(metrics_metadata, logger)[source]¶ Bases:
object
The MetricEmitter encapsulates creation of metric data and sending them to a Kafka instance
- Parameters
metrics_metadata (dict) – base meta data of metrics emitted
logger (logging.logger) – the parent’s logger instance
-
async classmethod
create_from_brick_runner
(runner) → titanfe.apps.brick_runner.metrics.MetricEmitter[source]¶ Creates, starts and returns a MetricEmitter instance
-
class
titanfe.apps.brick_runner.metrics.
MetricsBase
(flow: str = 'FlowName?', brick: str = 'BrickName?', brick_type: str = 'BrickType?', brick_family: str = 'BrickFamily?', runner: str = 'RunnerUid?', host: str = 'build-17650203-project-510666-titanfe', timestamp: str = <factory>)[source]¶ Bases:
titanfe.utils.DictConvertable
,abc.ABC
Information that every “metric” should contain
-
brick
: str = 'BrickName?'¶
-
brick_family
: str = 'BrickFamily?'¶
-
brick_type
: str = 'BrickType?'¶
-
static
extract_from_runner
(runner)[source]¶ extract the basic information from a brick runner instance
-
flow
: str = 'FlowName?'¶
-
host
: str = 'build-17650203-project-510666-titanfe'¶
-
runner
: str = 'RunnerUid?'¶
-
timestamp
: str¶
-
-
class
titanfe.apps.brick_runner.metrics.
PacketMetricsAtBrick
(flow: str = 'FlowName?', brick: str = 'BrickName?', brick_type: str = 'BrickType?', brick_family: str = 'BrickFamily?', runner: str = 'RunnerUid?', host: str = 'build-17650203-project-510666-titanfe', timestamp: str = <factory>, content_type: str = 'titan-packet-metrics', packet: str = 'PacketUid?', execution_time: float = 0.0, traveling_time: float = 0.0, time_in_input: float = 0.0, time_in_output: float = 0.0, time_on_wire: float = 0.0, at_outlet: bool = False)[source]¶ Bases:
titanfe.apps.brick_runner.metrics.MetricsBase
Metric data for a packet being processed at a Brick
-
at_outlet
: bool = False¶
-
content_type
: str = 'titan-packet-metrics'¶
-
execution_time
: float = 0.0¶
-
packet
: str = 'PacketUid?'¶
-
time_in_input
: float = 0.0¶
-
time_in_output
: float = 0.0¶
-
time_on_wire
: float = 0.0¶
-
traveling_time
: float = 0.0¶
-
-
class
titanfe.apps.brick_runner.metrics.
QueueMetrics
(flow: str = 'FlowName?', brick: str = 'BrickName?', brick_type: str = 'BrickType?', brick_family: str = 'BrickFamily?', runner: str = 'RunnerUid?', host: str = 'build-17650203-project-510666-titanfe', timestamp: str = <factory>, content_type: str = 'titan-queue-metrics', queue_name: str = 'QueueName?', queue_length: int = 0)[source]¶ Bases:
titanfe.apps.brick_runner.metrics.MetricsBase
Metric data for Input/Output-queues
-
content_type
: str = 'titan-queue-metrics'¶
-
queue_length
: int = 0¶
-
queue_name
: str = 'QueueName?'¶
-
-
class
titanfe.apps.brick_runner.metrics.
QueueWithMetrics
(emitter, name, interval=0.1, maxsize=0)[source]¶ Bases:
asyncio.queues.Queue
an ayncio.Queue that emits metrics (queue length)
-
async
put
(item)[source]¶ Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
-
property
unfinished_tasks
¶
-
async
An information packet passed between Bricks
-
class
titanfe.apps.brick_runner.packet.
Packet
(uid: str = <factory>, started: float = <factory>, port: str = '', payload: ujotypes.variants.base.UjoBase = UjoNone(None), buffer: titanfe.apps.brick_runner.connection.Buffer = <factory>, input_entry: float = 0.0, input_exit: float = 0.0, output_entry: float = 0.0, output_exit: float = 0.0)[source]¶ Bases:
titanfe.utils.DictConvertable
Represents an information packet (IP) passing through a flow
-
input_entry
: float = 0.0¶
-
input_exit
: float = 0.0¶
-
output_entry
: float = 0.0¶
-
output_exit
: float = 0.0¶
-
payload
: ujotypes.variants.base.UjoBase = UjoNone(None)¶
-
port
: str = ''¶
-
property
queue_times
¶
-
started
: float¶
-
property
traveling_time
¶
-
uid
: str¶
-
RESTlike interface to control the control peer
Provide a RESTlike interface to manage the ControlPeer remotely
-
class
titanfe.apps.control_peer.webapi.app.
HelloWorld
(*, message: str = 'Hello, World!')[source]¶ Bases:
pydantic.main.BaseModel
-
message
: str¶
-
-
class
titanfe.apps.control_peer.webapi.app.
WebApi
(control_peer)[source]¶ Bases:
object
Provide a RESTlike interface to manage the ControlPeer remotely
- Parameters
control_peer (ControlPeer) – an instance of the ControlPeer
- Usage:
create an Instance of the WebAPI (glued together with FastAPI/Starlette and uvicorn) and use run to create an endlessly running asyncio task or use the serve coroutine to run it manually
-
property
address
¶
Routes for Flow management
-
class
titanfe.apps.control_peer.webapi.bricks.
RequestBrickStart
(*, brick: dict)[source]¶ Bases:
pydantic.main.BaseModel
-
brick
: dict¶
-
-
class
titanfe.apps.control_peer.webapi.bricks.
RequestInstallBricks
(*, bricks: list)[source]¶ Bases:
pydantic.main.BaseModel
-
bricks
: list¶
-
-
titanfe.apps.control_peer.webapi.bricks.
create_brick_router
(control_peer)[source]¶ Setup the routing for flow management
- Parameters
control_peer (ControlPeer) – an instance of the ControlPeer
- Returns
router/routes to manage the control peer’s flows
- Return type
APIRouter
Routes for Flow management
-
titanfe.apps.control_peer.webapi.flows.
create_flow_router
(control_peer)[source]¶ Setup the routing for flow management
- Parameters
control_peer (ControlPeer) – an instance of the ControlPeer
- Returns
router/routes to manage the control peer’s flows
- Return type
APIRouter
Routes for Flow management
-
titanfe.apps.control_peer.webapi.state.
create_state_router
(control_peer)[source]¶ Create a router for state
- Parameters
control_peer (ControlPeer) – an instance of the ControlPeer
- Returns
router/routes to manage the control peer’s flows
- Return type
APIRouter
-
titanfe.apps.control_peer.webapi.state.
get_state
(control_peer) → List[Dict][source]¶ Create a router for state
- Parameters
control_peer (ControlPeer) – an instance of the ControlPeer
- Returns
list of brick information
A Brick
-
class
titanfe.apps.control_peer.brick.
BrickBaseDefinition
(uid, name=None, family=None, logger=None, last_modified=None)[source]¶ Bases:
object
The general definition of a brick contains it’s name and id, as well as the module itself and possibly a set of default parameters for that module read from the annexed config.yaml
-
guess_module_path
()[source]¶ The module is expected to be found in the configured brick_folder extended with the brick-ID and should be either a folder or python file having the same name as the brick.
-
async
install_or_update
(update=True, force_update=False)[source]¶ Get a brick from the package manager and install it
-
property
venv_path
¶
-
-
class
titanfe.apps.control_peer.brick.
BrickInstanceDefinition
(uid, name, ports: titanfe.apps.control_peer.brick.Ports, flow: titanfe.apps.control_peer.brick.Flow, base: titanfe.apps.control_peer.brick.BrickBaseDefinition, processing_parameters: dict, runtime_parameters: titanfe.apps.control_peer.brick.RuntimeParameters, connections: titanfe.apps.control_peer.brick.Connections)[source]¶ Bases:
object
The Brick Instance Definition is a fully configured brick in a flow context. It should have it’s own name and uid within the flow, precise parameters and possibly connections to other bricks.
-
class
titanfe.apps.control_peer.brick.
Connections
(input, output)¶ Bases:
tuple
-
property
input
¶ Alias for field number 0
-
property
output
¶ Alias for field number 1
-
property
-
class
titanfe.apps.control_peer.brick.
EnvBuilder
(logger, *args, **kwargs)[source]¶ Bases:
venv.EnvBuilder
Builder for the virtual enviroments for each brick
-
class
titanfe.apps.control_peer.brick.
Flow
(uid, name, schema)¶ Bases:
tuple
-
property
name
¶ Alias for field number 1
-
property
schema
¶ Alias for field number 2
-
property
uid
¶ Alias for field number 0
-
property
-
class
titanfe.apps.control_peer.brick.
Ports
(input, output)¶ Bases:
tuple
-
property
input
¶ Alias for field number 0
-
property
output
¶ Alias for field number 1
-
property
-
class
titanfe.apps.control_peer.brick.
RuntimeParameters
(autoscale_max_instances, autoscale_queue_level, exit_after_idle_seconds)¶ Bases:
tuple
-
property
autoscale_max_instances
¶ Alias for field number 0
-
property
autoscale_queue_level
¶ Alias for field number 1
-
property
exit_after_idle_seconds
¶ Alias for field number 2
-
property
the actual control peer
-
class
titanfe.apps.control_peer.control_peer.
ControlPeer
[source]¶ Bases:
object
The control peer application will start runners as required for the flows/bricks as described in the given config file. Once the runners have registered themselves, they will get according assignments.
Encapsulate brick runner related things
-
class
titanfe.apps.control_peer.runner.
BrickRunner
(brick_instance, on_termination_cb=<function BrickRunner.<lambda>>)[source]¶ Bases:
object
The BrickRunner can be used to start brick runner processes and hold corresponding data
- Parameters
controlpeer_address (NetworkAddress) – the address on which the control peer is listening
Install a Brick
-
class
titanfe.apps.control_peer.services.
ControlPeerServiceRegistration
[source]¶ Bases:
abc.ABC
BaseClass to handle control peer registration of various services
-
abstract property
control_peer_endpoint
¶
-
abstract property
-
class
titanfe.apps.control_peer.services.
GridManager
[source]¶ Bases:
titanfe.apps.control_peer.services.ControlPeerServiceRegistration
handle all requests to the grid manager
-
property
address
¶
-
property
control_peer_endpoint
¶
-
property
titanfe.testing package¶
Submodules¶
titanfe.brick module¶
Abstract base classes for building Bricks
-
class
titanfe.brick.
BrickBase
(adapter: titanfe.apps.brick_runner.adapter.BrickAdapter, parameters: Optional[Dict] = None)[source]¶ Bases:
object
An abstract base class for building Bricks
-
abstract
process
(input: Type[ujotypes.variants.base.UjoBase], port: str)[source]¶ Do the input processing.
To modify the payload of the current packet simply return a new value. Use the adapter’s emit_new_packet to create a data packet and insert it into the flow.
- Parameters
input (Type[UjoBase]) – the input data to be processed
- Returns
the new payload for current data packet traveling in the flow. When returning None the current packet get’s dropped.
- Return type
Optional[UjoBase]
-
abstract
-
class
titanfe.brick.
InletBrickBase
(adapter: titanfe.apps.brick_runner.adapter.BrickAdapter, parameters: Optional[Dict] = None)[source]¶ Bases:
titanfe.brick.BrickBase
An abstract base class for building bricks that will run a continous process
titanfe.config module¶
the global configuration
-
class
titanfe.config.
Configuration
[source]¶ Bases:
object
Current Configuration
-
option_aliases
= {'IP': 'IP', 'brick_folder': 'BrickFolder', 'endpoint_provider': 'EndpointProvider', 'flowmanager_address': 'FlowManager', 'gridmanager_address': 'GridManager', 'kafka_bootstrap_servers': 'Kafka', 'kafka_log_topic': 'KafkaLogTopic', 'packagemanager_address': 'PackageManager', 'reposervice_address': 'RepositoryService', 'secret_key': 'SecretKey'}¶
-
update
(config: Union[Configuration, dict])[source]¶ update config from dict or other config
-
titanfe.connection module¶
Encapsulate asyncio connections by wrapping them into a Connection
-
class
titanfe.connection.
Connection
(reader, writer, log=None, encoding='UJO')[source]¶ Bases:
object
Wrap an asyncio StreamReader/Writer combination into a connection object.
- Parameters
reader (asyncio.StreamReader) – the stream reader
writer (asyncio.StreamWriter) – the stream writer
log (logging.logger) – a parent logger
encoding – “PICKLE” or “UJO”
-
async classmethod
open
(address: titanfe.connection.NetworkAddress, log: Optional[logging.Logger] = None) → titanfe.connection.Connection[source]¶ open an asyncio connection to the given address (host, port)
titanfe.constants module¶
a collection of constants
titanfe.get-pip module¶
titanfe.log module¶
setup the logging with a custom metric-level
-
class
titanfe.log.
FlowContext
(flowuid: str = '', flowname: str = '', brickuid: str = '', brickname: str = '')[source]¶ Bases:
object
The Flow Context
-
brickname
: str = ''¶
-
brickuid
: str = ''¶
-
flowname
: str = ''¶
-
flowuid
: str = ''¶
-
-
class
titanfe.log.
KafkaLogHandler
(bootstrap_server, topic)[source]¶ Bases:
logging.Handler
Stream LogRecords to Kafka
- Parameters
bootstrap_server (str) – ‘Host:Port’ of a kafka bootstrap server
topic (str) – the kafka topic to produce into
-
class
titanfe.log.
TitanLogAdapter
(logger, extra)[source]¶ Bases:
logging.LoggerAdapter
The Log Adapter wraps a logger and adds some context to each log record
-
property
context
¶
-
metric
(message, *args, **kwargs)¶
-
property
-
class
titanfe.log.
TitanLogRecord
(name, level, pathname, lineno, msg, args, exc_info, func=None, sinfo=None, **kwargs)[source]¶ Bases:
logging.LogRecord
A log record - Titan style
-
brickname
= ''¶
-
brickuid
= ''¶
-
flowname
= ''¶
-
flowuid
= ''¶
-
hostname
= 'build-17650203-project-510666-titanfe'¶
-
servicename
= ''¶
-
-
class
titanfe.log.
TitanPlatformLogger
(name, context: Optional[titanfe.log.FlowContext] = None)[source]¶ Bases:
logging.Logger
to write contextual logging information use e.g. log.with_context.info
-
getChild
(suffix) → titanfe.log.TitanPlatformLogger[source]¶ Get a logger which is a descendant to this one.
This is a convenience method, such that
logging.getLogger(‘abc’).getChild(‘def.ghi’)
is the same as
logging.getLogger(‘abc.def.ghi’)
It’s useful, for example, when the parent logger is named using __name__ rather than a literal string.
-
property
with_context
¶
-
-
class
titanfe.log.
UjoBinFormatter
(fmt=None, datefmt=None, style='%')[source]¶ Bases:
logging.Formatter
Format log records as an UjoBinary
-
titanfe.log.
add_logging_level
(level, level_name, method_name=None)[source]¶ add a level to the logging module
- Parameters
level (int) – level number
level_name – name of the level
method_name – name of the method that gets attached to logging
-
titanfe.log.
flush_kafka_log_handler
()[source]¶ “Flush messages sent to KafkaLogHandler and suppress warnings from kafka –> called during shutdown of brick runner
-
titanfe.log.
getLogger
(name: str, context: Optional[titanfe.log.FlowContext] = None) → logging.Logger[source]¶ Get a Logger :param name: the logger name :param context: a flow context (if available)
- Returns
a Logger
- Return type
logging.Logger
titanfe.messages module¶
Messages within titanfe
-
class
titanfe.messages.
BrickDescription
(flowuid, flowname, name, brick_type, brick_family, parameters, uid, path_to_module, is_inlet, exit_after_idle_seconds, default_port)¶ Bases:
tuple
-
property
brick_family
¶ Alias for field number 4
-
property
brick_type
¶ Alias for field number 3
-
property
default_port
¶ Alias for field number 10
-
property
exit_after_idle_seconds
¶ Alias for field number 9
-
property
flowname
¶ Alias for field number 1
-
property
flowuid
¶ Alias for field number 0
-
property
is_inlet
¶ Alias for field number 8
-
property
name
¶ Alias for field number 2
-
property
parameters
¶ Alias for field number 5
-
property
path_to_module
¶ Alias for field number 7
-
property
uid
¶ Alias for field number 6
-
property
-
class
titanfe.messages.
InputSource
(name, port, address)¶ Bases:
tuple
-
property
address
¶ Alias for field number 2
-
property
name
¶ Alias for field number 0
-
property
port
¶ Alias for field number 1
-
property
-
class
titanfe.messages.
Message
(type, content)¶ Bases:
tuple
-
property
content
¶ Alias for field number 1
-
property
type
¶ Alias for field number 0
-
property
titanfe.repository module¶
Repository can be used to connect to the titan repository service
-
class
titanfe.repository.
RepositoryService
(logger, reposervice_address=None)[source]¶ Bases:
object
Repository service implements a connection to the titan repository service
- Parameters
brick_name (string) – the name of the brick instance
logger – the logger instance of the parent
repo_service (string) – optional, address of the repository service
-
class
titanfe.repository.
Request
(address: str, method: Callable, content: titanfe.repository.RequestData, log: logging.LoggerAdapter, response: Any = <factory>)[source]¶ Bases:
object
Request object
- Parameters
address – str Target address
method – Callable requests method(get, put, delete..)
logger – the logger instance of the parent
content – RequestData request content to be sent
-
address
: str¶
-
content
: titanfe.repository.RequestData¶
-
log
: logging.LoggerAdapter¶
-
method
: Callable¶
-
response
: Any¶
-
class
titanfe.repository.
RequestData
(collection: str, document: str, value: Any, find: Optional[dict] = <factory>, database: str = 'BrickRunner')[source]¶ Bases:
object
Request data object sent to the Repository service
-
collection
: str¶
-
database
: str = 'BrickRunner'¶
-
document
: str¶
-
find
: Optional[dict]¶
-
classmethod
from_dict
(kvs: Optional[Union[dict, list, str, int, float, bool]], *, infer_missing=False) → A¶
-
classmethod
from_json
(s: Union[str, bytes, bytearray], *, parse_float=None, parse_int=None, parse_constant=None, infer_missing=False, **kw) → A¶
-
classmethod
schema
(*, infer_missing: bool = False, only=None, exclude=(), many: bool = False, context=None, load_only=(), dump_only=(), partial: bool = False, unknown=None) → dataclasses_json.mm.SchemaF[A]¶
-
to_dict
(encode_json=False) → Dict[str, Optional[Union[dict, list, str, int, float, bool]]]¶
-
to_json
(*, skipkeys: bool = False, ensure_ascii: bool = True, check_circular: bool = True, allow_nan: bool = True, indent: Optional[Union[int, str]] = None, separators: Tuple[str, str] = None, default: Callable = None, sort_keys: bool = False, **kw) → str¶
-
value
: Any¶
-
titanfe.ujo_helper module¶
simplify ujo conversions (eventually we’d want to use an UjoScheme instead though)
-
titanfe.ujo_helper.
get_ujo_value
(value, type_name)[source]¶ get value in ujo based on given type_name and value
titanfe.utils module¶
a collection of useful helper functions
-
class
titanfe.utils.
DictConvertable
[source]¶ Bases:
abc.ABC
Mixin to make a dataclass convert from/into a dictionary
-
class
titanfe.utils.
Flag
(*, loop=None)[source]¶ Bases:
asyncio.locks.Event
Extends the asyncio.Event to be a tad more convenient
-
class
titanfe.utils.
Timer
[source]¶ Bases:
object
a simple Timer using the performance counters from the “time”-module
>>> with Timer() as t: >>> # do_something() >>> print(t.elapsed) >>> # do_something_else() >>> print(t.elapsed) >>> print(t.elapsed_total)
-
property
elapsed
¶
-
property
elapsed_total
¶
-
property
perf_counter_since_last
¶
-
property
perf_counter_total
¶
-
property
process_time_since_last
¶
-
property
process_time_total
¶
-
property
-
async
titanfe.utils.
cancel_tasks
(tasks: Sequence[_asyncio.Task], wait_cancelled=True)[source]¶ Cancel all tasks in sequence
- Parameters
tasks (Sequence[asyncio.Task]) – tasks to cancel
wait_cancelled (bool) – True (default): wait until all tasks returned / False: well, don’t.
-
titanfe.utils.
first
(iterable, default=None, key=None)[source]¶ Return first element of iterable that evaluates to
True
, else returnNone
or optional default. Similar toone()
.>>> first([0, False, None, [], (), 42]) 42 >>> first([0, False, None, [], ()]) is None True >>> first([0, False, None, [], ()], default='ohai') 'ohai' >>> import re >>> m = first(re.match(regex, 'abc') for regex in ['b.*', 'a(.*)']) >>> m.group(1) 'bc'
The optional key argument specifies a one-argument predicate function like that used for filter(). The key argument, if supplied, should be in keyword form. For example, finding the first even number in an iterable:
>>> first([1, 1, 3, 4, 5], key=lambda x: x % 2 == 0) 4
By Hynek Schlawack, author of the original standalone module.
-
titanfe.utils.
flatten
(iterable)[source]¶ flatten
yields all the elements from iterable while collapsing any nested iterables.>>> nested = [[1, 2], [[3], [4, 5]]] >>> list(flatten(nested)) [1, 2, 3, 4, 5]