Source code for titanfe.apps.brick_runner.grid_manager

#
# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#
"""GridManager communication"""

import json
from http import HTTPStatus

from aiohttp.client_exceptions import ClientError
from aiohttp_requests import requests

from titanfe import log as logging
from titanfe.config import configuration


[docs]class GridManager: """GridManager """ def __init__(self, runner_uid, brick_uid): self.runner_uid = runner_uid self.brick_uid = brick_uid self.log = logging.TitanPlatformLogger(__name__) @property def address(self): return configuration.gridmanager_address
[docs] async def register_runner(self, runner_address): """register brick runner at grid manager""" payload = { "runnerID": self.runner_uid, "address": "%s:%s" % runner_address, # pylint: disable=consider-using-f-string "brickId": self.brick_uid, } response = await requests.post(f"{self.address}/brickrunners/", data=json.dumps(payload)) if response.status != HTTPStatus.OK: error = ClientError(f"Failed to register at GridManager: {response!r}") self.log.with_context.error(error) raise error return await response.json()
[docs] async def deregister_runner(self): """deregister brick runner at grid manager""" payload = {"runnerId": self.runner_uid, "brickId": self.brick_uid} await requests.post(f"{self.address}/brickrunners/deregister", data=json.dumps(payload)) self.log.debug("Deregister: %r", payload)
[docs] async def request_scaling(self, consumer_uid): """send brick scaling request""" payload = {"brickId": self.brick_uid, "consumerId": consumer_uid} try: await requests.post(f"{self.address}/brickrunners/scaling", data=json.dumps(payload)) self.log.debug("Requested scaling: %r", payload) except ClientError as error: self.log.warning("ScalingRequest failed %s: %s", error, payload)