Source code for titanfe.repository

#
# Copyright (c) 2019-present, wobe-systems GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# found in the LICENSE file in the root directory of this source tree.
#

"""Repository can be used to connect to the titan repository service"""

from http import HTTPStatus
from typing import Any, Optional, Callable

from dataclasses import dataclass, field
from logging import LoggerAdapter
import requests

from dataclasses_json import dataclass_json

from titanfe.constants import BRICKRUNNER_DATABASE
from titanfe.config import configuration


[docs]@dataclass_json @dataclass class RequestData: """Request data object sent to the Repository service""" collection: str document: str value: Any find: Optional[dict] = field(default_factory=dict) database: str = BRICKRUNNER_DATABASE
[docs]@dataclass class Request: """Request object Args: address : str Target address method : Callable requests method(get, put, delete..) logger : the logger instance of the parent content: RequestData request content to be sent """ address: str method: Callable content: RequestData log: LoggerAdapter response: Any = field(default_factory=dict)
[docs] def send(self): """send request """ if self.content: try: response = self.method(f"{self.address}/", data=self.content) if response.status_code == HTTPStatus.OK: if response.content: self.response = response.json() return except requests.ConnectionError: self.log.error("Sending request to repo service failed", exc_info=True) else: self.log.error("Sending request to repo service failed: %r", response) return
[docs]class RepositoryService: """Repository service implements a connection to the titan repository service Args: brick_name (string) : the name of the brick instance logger : the logger instance of the parent repo_service(string) : optional, address of the repository service """ def __init__(self, logger, reposervice_address=None): self.address = reposervice_address or configuration.reposervice_address self.log = logger.getChild("RepositoryService") self.request = None # for unit testing def __create_request(self, method, collection, document, value=None, find=None): "create and send request object understood by the repo service" request_data = RequestData( # pylint: disable=no-member document=document, collection=collection, value=value, find=find, ).to_json() # pylint: disable=no-member self.log.debug("Created request data %r :", request_data) return Request(address=self.address, method=method, content=request_data, log=self.log)
[docs] def store(self, collection, document, value): "store data using the repository service" request = self.__create_request( method=requests.patch, collection=collection, document=document, value=value ) request.send() return request
[docs] def delete(self, collection, document): "delete data using the repository service" request = self.__create_request( method=requests.delete, collection=collection, document=document ) request.send() return request
[docs] def get(self, collection, document, find): "get data using the repository service" request = self.__create_request( method=requests.get, collection=collection, document=document, find=find ) request.send() return request