Source code for titanfe.config

#
# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#

""" the global configuration """

import os
from ast import literal_eval

# pylint: disable=invalid-name
from pathlib import Path
from typing import Union

from ruamel import yaml
from ruamel.yaml import YAMLError

DEFAULT_KAFKA_BOOTSTRAP_SERVER = "10.14.0.23:9092"
DEFAULT_KAFKA_LOG_TOPIC = "titan.logs"
DEFAULT_GRIDMANAGER_ADDRESS = "http://localhost:8080/gridmanager"
DEFAULT_FLOWMANAGER_ADDRESS = "http://localhost:9002/flowmanager"
DEFAULT_PACKAGEMANAGER_ADDRESS = "http://localhost:8087/packagemanager"
DEFAULT_REPOSERVICE_ADDRESS = "http://localhost:8085/object"
DEFAULT_ENDPOINTPROVIDER_ADDRESS = "tcp://127.0.0.1:9021"
VALID_KEY_LENGTHS = (16, 24, 32)


[docs]class NotFound: # pylint: disable=too-few-public-methods def __bool__(self): return False
NOTFOUND = NotFound()
[docs]class Configuration: """Current Configuration""" def __init__(self): self.kafka_bootstrap_servers = DEFAULT_KAFKA_BOOTSTRAP_SERVER self.kafka_log_topic = DEFAULT_KAFKA_LOG_TOPIC self.no_kafka_today = literal_eval( os.getenv("TITAN_METRICS_DISABLED") or os.getenv("TITANFE_WITHOUT_KAFKA") or "False" ) self.gridmanager_address = DEFAULT_GRIDMANAGER_ADDRESS self.flowmanager_address = DEFAULT_FLOWMANAGER_ADDRESS self.packagemanager_address = DEFAULT_PACKAGEMANAGER_ADDRESS self.reposervice_address = DEFAULT_REPOSERVICE_ADDRESS self.secret_key = os.getenv("TITAN_SECRET_KEY") or None self.endpoint_provider = DEFAULT_ENDPOINTPROVIDER_ADDRESS self.IP = None self.brick_folder = str(Path.home() / "titanfe/bricks") option_aliases = { "IP": "IP", "gridmanager_address": "GridManager", "flowmanager_address": "FlowManager", "packagemanager_address": "PackageManager", "reposervice_address": "RepositoryService", "kafka_bootstrap_servers": "Kafka", "kafka_log_topic": "KafkaLogTopic", "brick_folder": "BrickFolder", "secret_key": "SecretKey", "endpoint_provider": "EndpointProvider" }
[docs] def update(self, config: Union["Configuration", dict]): """update config from dict or other config""" for attr, alias in self.option_aliases.items(): if isinstance(config, Configuration): value = getattr(config, attr, NOTFOUND) else: value = config.get(attr, NOTFOUND) or config.get(alias, NOTFOUND) if value is NOTFOUND: continue setattr(self, attr, value)
[docs] def update_from_yaml(self, file_path): """Read and update the configuration from a yaml file""" try: with open(file_path) as f: # pylint: disable=unspecified-encoding config = yaml.safe_load(f) self.update(config) except OSError as error: print("Could not read config file", file_path, "-", error) except YAMLError as error: print("Could not parse config file", file_path, "-", error)
configuration = Configuration()