Source code for titanfe.apps.control_peer.webapi.bricks
# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#
"""Routes for Flow management"""
import asyncio
from fastapi import APIRouter
from pydantic import BaseModel # pylint: disable=no-name-in-module
# Request Parameter
from titanfe.apps.control_peer.brick import BrickInstanceDefinition, BrickBaseDefinition
from titanfe import log as logging
log = logging.getLogger(__name__)
[docs]class RequestBrickStart(BaseModel): # pylint: disable=too-few-public-methods
brick: dict
[docs]class RequestInstallBricks(BaseModel): # pylint: disable=too-few-public-methods
bricks: list
# Routes
[docs]def create_brick_router(control_peer):
"""Setup the routing for flow management
Arguments:
control_peer (ControlPeer): an instance of the ControlPeer
Returns:
APIRouter: router/routes to manage the control peer's flows
"""
router = APIRouter()
@router.put("/{brick_uid}")
async def change_state( # pylint: disable=unused-variable
brick_uid: str, request: RequestBrickStart # pylint: disable=unused-argument
):
await control_peer.start_new_runner(
brick=BrickInstanceDefinition.from_gridmanager(request.brick)
)
@router.post("/install")
async def install_bricks( # pylint: disable=unused-variable
request: RequestInstallBricks, # pylint: disable=unused-argument
):
for brick in request.bricks:
brick_id = brick.get("id")
task = asyncio.create_task(
BrickBaseDefinition(
uid=brick_id, name=brick.get("name"), logger=log
).install_or_update(force_update=True)
)
task.brick_id = brick_id
control_peer.tasks[brick_id] = task
task.add_done_callback(lambda t: control_peer.tasks.pop(t.brick_id))
return router