Skip to content

Misc

Implemets the legacy serial over ip interface for the saemubox.

The other part of the implementation is the virtual-saemubox. Both parts are scheduled to be removed in the future using the built in api client in modern pathfinder versions.

This file has a bunch of untested ignored branches.

SaemuBox

Receive and validate info from Sämu Box for nowplaying.

Source code in nowplaying/misc/saemubox.py
class SaemuBox:
    """Receive and validate info from Sämu Box for nowplaying."""

    output_mapping = {
        1: "Klangbecken",
        2: "Live + Replay",
        3: "Frei",
        4: "Vorproduktion",
        5: "Hörmal",
        6: "Studio Live",
    }

    def __init__(self, saemubox_ip, check_sender=True):
        warnings.warn(
            "Saemubox will be replaced with Pathfinder", PendingDeprecationWarning
        )
        self.output = ""

        # listening ip adress (all)
        self.bind_ip = "0.0.0.0"

        # listening port
        self.port = 4001

        # allowed sender ip addresses
        self.senders = {saemubox_ip}

        # check self.senders for validity
        self.check_sender = check_sender

        # valid saemubox ids
        self.valid_ids = [str(i) for i in self.output_mapping]

    def run(self):  # pragma: no cover
        self._setup_socket()
        # wait for some data to arrive
        time.sleep(0.2)

    def _setup_socket(self):  # pragma: no cover
        try:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.sock.bind((self.bind_ip, self.port))
            logger.info("SaemuBox: listening on %s:%i." % (self.bind_ip, self.port))
        except OSError as e:  # pragma: no cover
            self.sock = None
            logger.error("SaemuBox: cannot bind to %s:%i." % (self.bind_ip, self.port))
            raise SaemuBoxError() from e

    def __update(self):  # pragma: no cover
        if self.sock is None or (hasattr(self.sock, "_closed") and self.sock._closed):
            logger.warn("SaemuBox: socket closed unexpectedly, retrying...")
            self._setup_socket()

        output = None
        seen_senders = set()

        # read from socket while there is something to read (non-blocking)
        while select.select([self.sock], [], [], 0)[0]:
            data, addr = self.sock.recvfrom(1024)
            if self.check_sender and addr[0] not in self.senders:
                logger.warn("SaemuBox: receiving data from invalid host: %s " % addr[0])
                continue

            ids = data.split()  # several saemubox ids might come in one packet
            if ids:
                id = ids[-1].decode("utf-8")  # only take last id
                if id in self.valid_ids:
                    seen_senders.add(addr[0])
                    output = id
                else:
                    logger.warn("SaemuBox: received invalid data: %s" % data)

        if output is None:
            logger.error("SaemuBox: could not read current status.")
            output = 0
            raise SaemuBoxError("Cannot read data from SaemuBox")
        elif seen_senders != self.senders:
            for missing_sender in self.senders - seen_senders:
                logger.warn("SaemuBox: missing sender: %s" % missing_sender)

        self.output = int(output)

    def get_active_output_id(self):  # pragma: no cover
        self.__update()
        return self.output

    def get_active_output_name(self):  # pragma: no cover
        self.__update()
        return self.output_mapping[self.output]

    def get_id_as_name(self, number):  # pragma: no cover
        return self.output_mapping[number]

SaemuBoxError

Bases: Exception

SaemuBox related exception.

Source code in nowplaying/misc/saemubox.py
class SaemuBoxError(Exception):
    """SaemuBox related exception."""

    pass