Source code for titanfe.apps.control_peer.services
#
# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#
"""Install a Brick"""
import asyncio
import json
from http import HTTPStatus
from abc import ABC, abstractmethod
from aiohttp.client_exceptions import ClientError
from aiohttp_requests import Requests # initiate a new client every time,
# because we don't know how many threads are used and each will have it's own asyncio loop
# there must be a better way, but right now I'm short on time...
from titanfe import log as logging
from titanfe.config import configuration
log = logging.getLogger(__name__)
[docs]class ServiceError(Exception):
pass
[docs]class ControlPeerServiceRegistration(ABC):
"""BaseClass to handle control peer registration of various services"""
@property
@abstractmethod
def control_peer_endpoint(self):
pass
[docs] async def register(self, own_api_address):
"""Inquire registration at target_address"""
while True:
try:
requests = Requests()
response = await requests.post(self.control_peer_endpoint,
json=json.dumps(own_api_address).strip('"'))
if response.status not in (HTTPStatus.OK, HTTPStatus.CREATED, HTTPStatus.ACCEPTED):
raise ServiceError(
f"Failed to register own API {own_api_address}"
f" at {self.control_peer_endpoint}: "
f"{HTTPStatus(response.status)}" # pylint: disable=no-value-for-parameter
)
log.info("Successfully registered own API <%s> at <%s>", own_api_address,
self.control_peer_endpoint)
return
except ClientError:
log.warning("Failed to register at <%s> - Retry",
self.control_peer_endpoint)
await asyncio.sleep(1)
[docs] async def deregister(self, own_api_address):
"""Cancel registration at target_address"""
try:
requests = Requests()
response = await requests.delete(self.control_peer_endpoint,
json=json.dumps(own_api_address).strip('"'))
except ClientError:
log.warning("Removing registration from <%s> - Failed!", self.control_peer_endpoint)
else:
if response.status not in (HTTPStatus.OK, HTTPStatus.ACCEPTED):
log.error(
"Removing registration from <%s> failed: %s",
self.control_peer_endpoint,
HTTPStatus(response.status), # pylint: disable=no-value-for-parameter
)
else:
log.info("Successfully removed registration from <%s>", self.control_peer_endpoint)
[docs]class PackageManager(ControlPeerServiceRegistration):
""" handle all requests to the package manager """
@property
def address(self):
return f"{configuration.packagemanager_address}"
@property
def brick_endpoint(self):
return f"{self.address}/bricks"
@property
def brick_code_endpoint(self):
return f"{self.address}/source"
@property
def control_peer_endpoint(self):
return f"{self.address}/controlpeers"
[docs] @staticmethod
async def get(endpoint, context):
"""get endpoint"""
requests = Requests()
response = await requests.get(endpoint)
if response.status != HTTPStatus.OK:
raise ServiceError(f"{context} failed: {response!r}")
return await response.read()
[docs] async def get_bricks(self):
"""get bricks"""
requests = Requests()
response = await requests.get(self.brick_endpoint)
if response.status != HTTPStatus.OK:
raise ServiceError(f"Getting bricks failed: {response!r}")
return await response.json()
[docs] async def get_source_files(self, brick_id):
"""get the source files archive from the package manager"""
return await self.get(self.brick_code_endpoint + "/" + brick_id + ".zip",
"Downloading source files")
[docs]class GridManager(ControlPeerServiceRegistration):
"""handle all requests to the grid manager"""
@property
def address(self):
return f"{configuration.gridmanager_address}"
@property
def control_peer_endpoint(self):
return f"{self.address}/controlpeers"
package_manager = PackageManager() # pylint: disable=invalid-name
grid_manager = GridManager() # pylint: disable=invalid-name