Source code for titanfe.apps.control_peer.webapi.state

# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#

"""Routes for Flow management"""

from functools import partial
from typing import List, Dict

from fastapi import APIRouter


[docs]def create_state_router(control_peer): """Create a router for state Arguments: control_peer (ControlPeer): an instance of the ControlPeer Returns: APIRouter: router/routes to manage the control peer's flows """ router = APIRouter() router.add_api_route("/", partial(get_state, control_peer)) return router
[docs]def get_state(control_peer) -> List[Dict]: """Create a router for state Arguments: control_peer (ControlPeer): an instance of the ControlPeer Returns: list of brick information """ def brick_info(runner): brick = runner.brick return { "runner": runner.uid, "flow": brick.flow.uid, "uid": brick.uid, "name": brick.name, } return [brick_info(runner) for runner in control_peer.runners.values()]