Skip to content

Api

ApiServer

The API server.

Source code in nowplaying/api.py
class ApiServer:
    """The API server."""

    def __init__(self, options, event_queue: Queue, realm: str = "nowplaying"):
        self.options = options
        self.event_queue = event_queue
        self.realm = realm

        self.url_map = Map([Rule("/webhook", endpoint="webhook")])

    def run_server(self):
        """Run the API server."""
        if self.options.debug:
            from werkzeug.serving import run_simple

            self._server = run_simple(
                self.options.apiBindAddress,
                self.options.apiPort,
                self,
                use_debugger=True,
                use_reloader=True,
            )
        else:  # pragma: no cover
            cherrypy.tree.graft(self, "/")
            cherrypy.server.unsubscribe()

            self._server = cherrypy._cpserver.Server()

            self._server.socket_host = self.options.apiBindAddress
            self._server.socket_port = self.options.apiPort

            self._server.subscribe()

            cherrypy.engine.start()
            cherrypy.engine.block()

    def stop_server(self):
        """Stop the server."""
        self._server.stop()
        cherrypy.engine.exit()

    def __call__(self, environ, start_response):
        return self.wsgi_app(environ, start_response)

    def wsgi_app(self, environ, start_response):
        request = Request(environ)
        auth = request.authorization
        if auth and self.check_auth(auth.username, auth.password):
            response = self.dispatch_request(request)
        else:
            response = self.auth_required(request)
        return response(environ, start_response)

    def check_auth(self, username, password):
        return (
            username in self.options.apiAuthUsers
            and self.options.apiAuthUsers[username] == password
        )

    def auth_required(self, request):
        return Response(
            "Could not verify your access level for that URL.\n"
            "You have to login with proper credentials",
            401,
            {"WWW-Authenticate": f'Basic realm="{self.realm}"'},
        )

    def dispatch_request(self, request):
        adapter = self.url_map.bind_to_environ(request.environ)
        try:
            endpoint, values = adapter.match()
            return getattr(self, f"on_{endpoint}")(request, **values)
        except HTTPException as e:
            return Response(
                json.dumps(e.description),
                e.code,
                {"Content-Type": "application/json"},
            )

    def on_webhook(self, request):
        """Receive a CloudEvent and put it into the event queue."""
        logger.warning("Received a webhook")
        if (
            request.headers.get("Content-Type")
            not in _RABE_CLOUD_EVENTS_SUPPORTED_MEDIA_TYPES
        ):
            raise UnsupportedMediaType()
        try:
            event = from_http(request.headers, request.data)
        except CloudEventException as error:
            raise BadRequest(description=f"{error}")

        try:
            crid = cridlib.parse(event["id"])
            logger.debug("Detected CRID: %s", crid)
        except cridlib.CRIDError as error:
            raise BadRequest(
                description=f"CRID '{event['id']}' is not a RaBe CRID"
            ) from error

        logger.info("Received event: %s", event)

        if event["type"] in _RABE_CLOUD_EVENTS_SUBS:
            self.event_queue.put(event)

        return Response(status="200 Event Received")

on_webhook(request)

Receive a CloudEvent and put it into the event queue.

Source code in nowplaying/api.py
def on_webhook(self, request):
    """Receive a CloudEvent and put it into the event queue."""
    logger.warning("Received a webhook")
    if (
        request.headers.get("Content-Type")
        not in _RABE_CLOUD_EVENTS_SUPPORTED_MEDIA_TYPES
    ):
        raise UnsupportedMediaType()
    try:
        event = from_http(request.headers, request.data)
    except CloudEventException as error:
        raise BadRequest(description=f"{error}")

    try:
        crid = cridlib.parse(event["id"])
        logger.debug("Detected CRID: %s", crid)
    except cridlib.CRIDError as error:
        raise BadRequest(
            description=f"CRID '{event['id']}' is not a RaBe CRID"
        ) from error

    logger.info("Received event: %s", event)

    if event["type"] in _RABE_CLOUD_EVENTS_SUBS:
        self.event_queue.put(event)

    return Response(status="200 Event Received")

run_server()

Run the API server.

Source code in nowplaying/api.py
def run_server(self):
    """Run the API server."""
    if self.options.debug:
        from werkzeug.serving import run_simple

        self._server = run_simple(
            self.options.apiBindAddress,
            self.options.apiPort,
            self,
            use_debugger=True,
            use_reloader=True,
        )
    else:  # pragma: no cover
        cherrypy.tree.graft(self, "/")
        cherrypy.server.unsubscribe()

        self._server = cherrypy._cpserver.Server()

        self._server.socket_host = self.options.apiBindAddress
        self._server.socket_port = self.options.apiPort

        self._server.subscribe()

        cherrypy.engine.start()
        cherrypy.engine.block()

stop_server()

Stop the server.

Source code in nowplaying/api.py
def stop_server(self):
    """Stop the server."""
    self._server.stop()
    cherrypy.engine.exit()