Source code for titanfe.apps.brick_runner.output.consumer

#
# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#

"""a Consumer represents a connection made to the output server"""

import asyncio

from titanfe.utils import create_uid


[docs]class Consumer: """wrap incoming connections and handle sending packets""" def __init__(self, port_name, brick_instance_id, connection): self.uid = create_uid(f"C-{brick_instance_id}-") self.port_name = port_name self.brick_instance_id = brick_instance_id self.connection = connection self.listener = asyncio.create_task(self.listen()) self._packets_expected = 0 self._receptive = asyncio.Event() self.disconnected = asyncio.Event() def __repr__(self): return ( f"Consumer(" f"uid={self.uid}, " f"port_name={self.port_name}, " f"brick_instance_id={self.brick_instance_id})" )
[docs] async def is_receptive(self): await self._receptive.wait() return self
[docs] async def listen(self): """wait for packet requests, set disconnected-Event if the connection gets closed""" async for message in self.connection: self._packets_expected += message.content self._receptive.set() self.disconnected.set() self._receptive.clear()
[docs] async def close_connection(self): self.listener.cancel() await self.connection.close()
[docs] async def send(self, packet): """send a packet""" self._packets_expected -= 1 if self._packets_expected == 0: self._receptive.clear() packet.update_output_exit() await self.connection.send(packet.as_message())