Skip to content

Suisa sendemeldung

SUISA Sendemeldung bugs SUISA with email once per month.

Fetches data on our playout history and formats them in a CSV file format containing the data (like Track, Title and ISRC) requested by SUISA. Also takes care of sending the report to SUISA via email for hands-off operations.

check_duplicate(entry_a, entry_b)

Check if two entries are duplicates by checking their acrid in all music items.


entry_a: first entry
entry_b: second entry

True if the entries are duplicates, False otherwise
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def check_duplicate(entry_a: Any, entry_b: Any) -> bool:  # noqa: ANN401
    """Check if two entries are duplicates by checking their acrid in all music items.

    Arguments:
    ---------
        entry_a: first entry
        entry_b: second entry

    Returns:
    -------
        True if the entries are duplicates, False otherwise

    """
    try:
        entry_a = entry_a["metadata"]["music"]
    except KeyError:
        entry_a = entry_a["metadata"]["custom_files"]
    try:
        entry_b = entry_b["metadata"]["music"]
    except KeyError:
        entry_b = entry_b["metadata"]["custom_files"]
    for music_a in entry_a:
        for music_b in entry_b:
            if music_a["acrid"] == music_b["acrid"]:
                return True
    return False

create_message(sender, recipient, subject, text, filename, filetype, data, cc=None, bcc=None)

Create email message.


sender: The sender of the email. Login will be made with this user.
recipient: The recipient of the email. Can be a list.
subject: The subject of the email.
text: The body of the email.
filename: The filename of the attachment
filetype: The filetype of the attachment
data: The attachment data.
cc: cc recipient
bcc: bcc recipient
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def create_message(  # noqa: PLR0913
    sender: str,
    recipient: str,
    subject: str,
    text: str,
    filename: str,
    filetype: str,
    data: Any,  # noqa: ANN401
    cc: str | None = None,
    bcc: str | None = None,
) -> MIMEMultipart:
    """Create email message.

    Arguments:
    ---------
        sender: The sender of the email. Login will be made with this user.
        recipient: The recipient of the email. Can be a list.
        subject: The subject of the email.
        text: The body of the email.
        filename: The filename of the attachment
        filetype: The filetype of the attachment
        data: The attachment data.
        cc: cc recipient
        bcc: bcc recipient

    """
    msg = MIMEMultipart()
    msg["From"] = sender
    msg["To"] = recipient
    if cc:
        msg["Cc"] = cc
    if bcc:
        msg["Bcc"] = bcc
    msg["Date"] = formatdate(localtime=True)
    msg["Subject"] = subject
    # set body
    msg.attach(MIMEText(text))
    msg.attach(get_email_attachment(filename, filetype, data))

    return msg

funge_release_date(release_date='')

Make a release_date from ACR conform to what seems to be the spec.

Source code in suisa_sendemeldung/suisa_sendemeldung.py
def funge_release_date(release_date: str = "") -> str:
    """Make a release_date from ACR conform to what seems to be the spec."""
    if len(release_date) == 10:  # noqa: PLR2004
        # we can make it look like what suisa has in their examples if it's the
        # right length
        try:
            return datetime.strptime(release_date, "%Y-%m-%d").strftime("%Y%m%d")  # noqa: DTZ007
        except ValueError:
            return ""
    # we discard other records since there is no way to convert records like a plain
    # year into dd/mm/yyyy properly without further guidance from whomever ingests
    # the data, in some cases this means we discard data that only contain a year
    # since they dont have that amount of precision.
    return ""

get_arguments(parser, sysargs)

Create :class:ArgumentParser with arguments.


parser: the parser to add arguments
sysargs: sys.arg[1:] or something else for testing

args: the parsed args from the parser
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def get_arguments(parser: ArgumentParser, sysargs: list[str]) -> ArgparseNamespace:
    """Create :class:`ArgumentParser` with arguments.

    Arguments:
    ---------
        parser: the parser to add arguments
        sysargs: sys.arg[1:] or something else for testing

    Returns:
    -------
        args: the parsed args from the parser

    """
    parser.add_argument(
        "--bearer-token",
        env_var="BEARER_TOKEN",
        help="the bearer token for ACRCloud (required)",
        required=True,
    )
    parser.add_argument(
        "--project-id",
        env_var="PROJECT_ID",
        help="the id of the project at ACRCloud (required)",
        required=True,
    )
    parser.add_argument(
        "--stream-id",
        env_var="STREAM_ID",
        help="the id of the stream at ACRCloud (required)",
        required=True,
    )
    parser.add_argument(
        "--station-name",
        env_var="STATION_NAME",
        help="Station name, used in Output and Emails",
        default="Radio Bern RaBe",
    )
    parser.add_argument(
        "--station-name-short",
        env_var="STATION_NAME_SHORT",
        help="Shortname for station as used in Filenames (locally and in attachment)",
        default="rabe",
    )
    parser.add_argument(
        "--file",
        env_var="FILE",
        help="create file",
        action="store_true",
    )
    parser.add_argument(
        "--filetype",
        env_var="FILETYPE",
        help="filetype to attach to email or write to file",
        choices=("xlsx", "csv"),
        default="xlsx",
    )
    parser.add_argument(
        "--email",
        env_var="EMAIL",
        help="send an email",
        action="store_true",
    )
    parser.add_argument(
        "--email-from",
        env_var="EMAIL_FROM",
        help="the sender of the email",
    )
    parser.add_argument(
        "--email-to",
        env_var="EMAIL_TO",
        help="the recipients of the email",
    )
    parser.add_argument(
        "--email-cc",
        env_var="EMAIL_CC",
        help="the cc recipients of the email",
    )
    parser.add_argument(
        "--email-bcc",
        env_var="EMAIL_BCC",
        help="the bcc recipients of the email",
    )
    parser.add_argument(
        "--email-server",
        env_var="EMAIL_SERVER",
        help="the smtp server to send the mail with",
    )
    parser.add_argument(
        "--email-login",
        env_var="EMAIL_LOGIN",
        help="the username to logon to the smtp server (default: email_from)",
    )
    parser.add_argument(
        "--email-pass",
        env_var="EMAIL_PASS",
        help="the password for the smtp server",
    )
    parser.add_argument(
        "--email-subject",
        env_var="EMAIL_SUBJECT",
        help="""
        Template for subject of the email.

        Placeholders are $station_name, $year and $month.
        """,
        default="SUISA Sendemeldung von $station_name für $year-$month",
    )
    parser.add_argument(
        "--email-text",
        env_var="EMAIL_TEXT",
        help="""
        Template for email text.

        Placeholders are $station_name, $month, $year, $previous_year,
        $responsible_email, and $email_footer.
        """,
        default=_EMAIL_TEMPLATE,
    )
    parser.add_argument(
        "--email-footer",
        env_var="EMAIL_FOOTER",
        help="Footer for the Email",
        default="Email generated by <https://github.com/radiorabe/suisa_sendemeldung>",
    )
    parser.add_argument(
        "--responsible-email",
        env_var="RESPONSIBLE_EMAIL",
        help="Used to hint whom to contact in the emails text.",
    )
    parser.add_argument(
        "--start-date",
        env_var="START_DATE",
        help="the start date of the interval in format YYYY-MM-DD (default: 30 days\
                              before end_date)",
    )
    parser.add_argument(
        "--end-date",
        env_var="END_DATE",
        help="the end date of the interval in format YYYY-MM-DD (default: today)",
    )
    parser.add_argument(
        "--last-month",
        env_var="LAST_MONTH",
        action="store_true",
        help="download data of whole last month",
    )
    parser.add_argument(
        "--timezone",
        env_var="TIMEZONE",
        help="set the timezone for localization",
        required=True,
        default="UTC",
    )
    parser.add_argument(
        "--locale",
        env_var="LOCALE",
        help="set locale for date and time formatting",
        default="de_CH",
    )
    parser.add_argument(
        "--filename",
        env_var="FILENAME",
        help="""
        Output filename.

        Default:
        - <station_name_short>_<year>_<month>.csv when reporting last month
        - <station_name_short>_<start_date>.csv else
        """,
    )
    parser.add_argument(
        "--stdout",
        env_var="STDOUT",
        help="also print to stdout",
        action="store_true",
    )
    args = parser.parse_args(sysargs)
    validate_arguments(parser, args)  # pragma: no cover
    return args  # pragma: no cover

get_artist(music)

Get artist from a given dict.


music: music dict from API

artist: string representing the artist
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def get_artist(music: Any) -> str:  # noqa: ANN401
    """Get artist from a given dict.

    Arguments:
    ---------
        music: music dict from API

    Returns:
    -------
        artist: string representing the artist

    """
    artist = ""
    if music.get("artists") is not None:
        artists = music.get("artists")
        if isinstance(artists, list):
            artist = ", ".join([a.get("name") for a in artists])
        else:
            # Yet another 'wrong' entry in the database:
            # artists in custom_files was sometimes recorded as single value
            # @TODO also remove once way in the past? (2023-01-31)
            artist = artists
    elif music.get("artist") is not None:
        artist = music.get("artist")
    elif music.get("Artist") is not None:  # pragma: no cover
        # Uppercase is a hack needed for Jun 2021 since there is a 'wrong' entry
        # in the database. Going forward the record will be available as 'artist'
        # in lowercase.
        # @TODO remove once is waaaay in the past
        artist = music.get("Artist")
    return artist

get_csv(data, station_name='')

Create SUISA compatible csv data.


data: To data to create csv from
station_name: Default station name for output

csv: The converted data
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def get_csv(data: dict, station_name: str = "") -> str:
    """Create SUISA compatible csv data.

    Arguments:
    ---------
        data: To data to create csv from
        station_name: Default station name for output

    Returns:
    -------
        csv: The converted data

    """
    header = [
        "Titel",
        "Komponist",
        "Interpret",
        "Interpreten-Info",
        "Sender",
        "Sendedatum",
        "Sendedauer",
        "Sendezeit",
        "Werkverzeichnisangaben",
        "ISRC",
        "Label",
        "CD ID / Katalog-Nummer",
        "Aufnahmedatum",
        "Aufnahmeland",
        "Erstveröffentlichungsdatum",
        "Titel des Tonträgers (Albumtitel)",
        "Autor Text",
        "Track Nummer",
        "Genre",
        "Programm",
        "Bestellnummer",
        "Marke",
        "Label Code",
        "EAN/GTIN",
        "Identifikationsnummer",
    ]
    csv = StringIO()
    csv_writer = writer(csv, dialect="excel")
    csv_writer.writerow(header)

    for entry in tqdm(data, desc="preparing tracks for report"):
        metadata = entry.get("metadata")
        # parse timestamp
        timestamp = datetime.strptime(metadata.get("timestamp_local"), ACRClient.TS_FMT)  # noqa: DTZ007

        ts_date = timestamp.strftime("%Y%m%d")
        ts_time = timestamp.strftime("%H:%M:%S")
        hours, remainder = divmod(metadata.get("played_duration"), 60 * 60)
        minutes, seconds = divmod(remainder, 60)
        # required format of duration field: hh:mm:ss
        duration = f"{hours:02}:{minutes:02}:{seconds:02}"

        try:
            music = metadata.get("music")[0]
        except TypeError:
            music = metadata.get("custom_files")[0]
        title = music.get("title")

        artist = get_artist(music)

        composer = ", ".join(music.get("contributors", {}).get("composers", ""))
        works_composer = ", ".join(
            [
                c["name"]
                for c in [
                    item
                    for sublist in [w["creators"] for w in music.get("works", [])]
                    for item in sublist
                ]
                if c.get("role", "") in ["C", "Composer", "W", "Writer"]
            ],
        )
        if works_composer and (not composer or composer == artist):
            composer = works_composer

        isrc = get_isrc(music)
        label = music.get("label")

        # load some "best-effort" fields
        album = music.get("album", "")
        # it's a dict if it's from the ACRCloud bucket, a string if from a custom bucket
        if isinstance(album, dict):
            album = album.get("name", "")
        upc = music.get("external_ids", {}).get("upc", "")
        release_date = funge_release_date(music.get("release_date", ""))

        # cridlib only supports timezone-aware datetime values, so we convert one
        timestamp_utc = pytz.utc.localize(
            datetime.strptime(metadata.get("timestamp_utc"), ACRClient.TS_FMT),  # noqa: DTZ007
        )
        # we include the acrid in our CRID so we know about the data's provenience
        # in case any questions about the data we delivered are asked
        acrid = music.get("acrid")
        local_id = cridlib.get(timestamp=timestamp_utc, fragment=f"acrid={acrid}")

        csv_writer.writerow(
            [
                title,
                composer,
                artist,
                "",  # Interpreten-Info
                station_name,
                ts_date,
                duration,
                ts_time,
                "",  # Werkverzeichnisangaben
                isrc,
                label,
                "",  # CD ID / Katalog-Nummer
                "",  # Aufnahmedatum
                "",  # Aufnahmeland
                release_date,
                album,
                "",  # Autor Text
                "",  # Track Nummer
                "",  # Genre
                "",  # Programm
                "",  # Bestellnummer
                "",  # Marke
                "",  # Label Code
                upc,
                local_id,
            ],
        )
    return csv.getvalue()

get_email_attachment(filename, filetype, data)

Create attachment based on required filetype and data.


filename: The filename of the attachment
filetype: The filetype of the attachment
data: The attachment data
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def get_email_attachment(filename: str, filetype: str, data: Any) -> MIMEBase:  # noqa: ANN401
    """Create attachment based on required filetype and data.

    Arguments:
    ---------
        filename: The filename of the attachment
        filetype: The filetype of the attachment
        data: The attachment data

    """
    maintype: str
    subtype: str
    payload: str
    if filetype == "xlsx":
        maintype = "application"
        subtype = "vnd.ms-excel"
        payload = data.getvalue()
    elif filetype == "csv":
        maintype = "text"
        subtype = "csv"
        payload = data.encode("utf-8")
        part = MIMEBase("text", "csv")

    part = MIMEBase(maintype, subtype)
    part.set_payload(payload)
    encode_base64(part)
    part.add_header(
        "Content-Disposition", f"attachment; filename={Path(filename).name}"
    )
    return part

get_isrc(music)

Get a valid ISRC from the music record or return an empty string.

Source code in suisa_sendemeldung/suisa_sendemeldung.py
def get_isrc(music: Any) -> str:  # noqa: ANN401
    """Get a valid ISRC from the music record or return an empty string."""
    isrc = ""
    if music.get("external_ids", {}).get("isrc"):
        isrc = music.get("external_ids").get("isrc")
    elif music.get("isrc"):
        isrc = music.get("isrc")
    # was a list with a singular entry for a while back in 2021
    if isinstance(isrc, list):
        isrc = isrc[0]
    # some records contain the "ISRC" prefix that is described as legacy
    # in the ISRC handbook from IFPI.
    if isrc and isrc[:4] == "ISRC":
        isrc = isrc[4:]
    # take care of cases where the isrc is space delimited even though the
    # record is technically wrong but happens often enough to warrant this
    # hack.
    if isrc:
        isrc = isrc.replace(" ", "")

    if not ISRC.validate(isrc):
        isrc = ""
    return isrc

get_xlsx(data, station_name='')

Create SUISA compatible xlsx data.


data: The data to create xlsx from
station_name: Default station name for output

xlsx: The converted data as BytesIO object
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def get_xlsx(data: Any, station_name: str = "") -> BytesIO:  # noqa: ANN401
    """Create SUISA compatible xlsx data.

    Arguments:
    ---------
        data: The data to create xlsx from
        station_name: Default station name for output

    Returns:
    -------
        xlsx: The converted data as BytesIO object

    """
    csv = get_csv(data, station_name=station_name)
    csv_reader = reader(StringIO(csv))

    xlsx = BytesIO()
    workbook: Workbook = Workbook()
    if not workbook.active:  # pragma: no cover
        raise RuntimeError
    worksheet: Worksheet = workbook.active  # type: ignore[assignment]

    for row in csv_reader:
        worksheet.append(row)

    # the columns that should be styled as required (grey background)
    required_columns = [
        "Titel",
        "Komponist",
        "Interpret",
        "Sendedatum",
        "Sendedauer",
        "Sendezeit",
        "ISRC",
        "Label",
        "Label Code",
        "Identifikationsnummer",
    ]
    font = Font(name="Calibri", bold=True, size=12)
    side = Side(border_style="thick", color="000000")
    border = Border(top=side, left=side, right=side, bottom=side)
    fill = PatternFill("solid", bgColor="d9d9d9", fgColor="d9d9d9")
    for cell in worksheet[1]:  # xlsx is 1-indexed
        cell.font = font
        cell.border = border
        if cell.value in required_columns:
            cell.fill = fill

    # Try to approximate the required width by finding the longest values per column
    dims: dict[str, int] = {}
    for row in worksheet.rows:  # type: ignore[assignment]
        for cell in row:  # type: ignore[assignment]
            if cell.value:
                dims[cell.column_letter] = max(
                    (dims.get(cell.column_letter, 0), len(str(cell.value))),
                )
    # apply estimated width to each column
    padding = 3
    for col, value in dims.items():
        worksheet.column_dimensions[col].width = value + padding

    workbook.save(xlsx)
    return xlsx

main()

Entrypoint for SUISA Sendemeldung .

Source code in suisa_sendemeldung/suisa_sendemeldung.py
def main() -> None:  # pragma: no cover
    """Entrypoint for SUISA Sendemeldung ."""
    default_config_file: str = Path(__file__).name.replace(".py", ".conf")
    # config file in /etc gets overriden by the one in $HOME which gets overriden by the
    # one in the current directory
    default_config_files = [
        str(Path("/etc") / default_config_file),
        str(Path("~").expanduser() / default_config_file),
        default_config_file,
    ]
    parser = ArgumentParser(
        default_config_files=default_config_files,
        description="ACRCloud client for SUISA reporting @ RaBe.",
    )
    args = get_arguments(parser, sys.argv[1:])

    start_date, end_date = parse_date(args)
    filename = parse_filename(args, start_date)

    client = ACRClient(bearer_token=args.bearer_token)
    data = client.get_interval_data(
        args.project_id,
        args.stream_id,
        start_date,
        end_date,
        timezone=args.timezone,
    )
    data = merge_duplicates(data)
    if args.filetype == "xlsx":
        data = get_xlsx(data, station_name=args.station_name)
    elif args.filetype == "csv":
        data = get_csv(data, station_name=args.station_name)
    if args.email:
        email_subject = Template(args.email_subject).substitute(
            {
                "station_name": args.station_name,
                "year": format_date(start_date, format="yyyy", locale=args.locale),
                "month": format_date(start_date, format="MM", locale=args.locale),
            },
        )
        # generate body
        text = Template(args.email_text).substitute(
            {
                "station_name": args.station_name,
                "month": format_date(start_date, format="MMMM", locale=args.locale),
                "year": format_date(start_date, format="yyyy", locale=args.locale),
                "previous_year": format_date(
                    start_date - timedelta(days=365),
                    format="yyyy",
                    locale=args.locale,
                ),
                "in_three_months": format_date(
                    datetime.now() + relativedelta(months=+3),  # noqa: DTZ005
                    format="long",
                    locale=args.locale,
                ),
                "responsible_email": args.responsible_email,
                "email_footer": args.email_footer,
            },
        )
        msg = create_message(
            args.email_from,
            args.email_to,
            email_subject,
            text,
            filename,
            args.filetype,
            data,
            cc=args.email_cc,
            bcc=args.email_bcc,
        )
        send_message(
            msg,
            server=args.email_server,
            login=args.email_login,
            password=args.email_pass,
        )
    if args.file and args.filetype == "xlsx":
        write_xlsx(filename, data)
    elif args.file and args.filetype == "csv":
        write_csv(filename, data)
    if args.stdout and args.filetype == "csv":
        print(data)  # noqa: T201

merge_duplicates(data)

Merge consecutive entries into one if they are duplicates.


data: The data provided by ACRClient

data: The processed data
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def merge_duplicates(data: Any) -> Any:  # noqa: ANN401
    """Merge consecutive entries into one if they are duplicates.

    Arguments:
    ---------
        data: The data provided by ACRClient

    Returns:
    -------
        data: The processed data

    """
    prev = data[0]
    mark = []
    for entry in data[1:]:
        if check_duplicate(prev, entry):
            prev["metadata"]["played_duration"] = (
                prev["metadata"]["played_duration"]
                + entry["metadata"]["played_duration"]
            )
            # mark entry for removal
            mark.append(entry)
        else:
            prev = entry
    # remove marked entries
    for entry in mark:
        data.remove(entry)
    return data

parse_date(args)

Parse date from args.


args: the arguments provided to the script

start_date: the start date of the requested interval
end_date: the end date of the requested interval
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def parse_date(args: ArgparseNamespace) -> tuple[date, date]:
    """Parse date from args.

    Arguments:
    ---------
        args: the arguments provided to the script

    Returns:
    -------
        start_date: the start date of the requested interval
        end_date: the end date of the requested interval

    """
    # date parsing logic
    if args.last_month:
        today = date.today()  # noqa: DTZ011
        # get first of this month
        this_month = today.replace(day=1)
        # last day of last month = first day of this month - 1 day
        end_date = this_month - timedelta(days=1)
        start_date = end_date.replace(day=1)
    else:
        if args.end_date:
            end_date = datetime.strptime(args.end_date, "%Y-%m-%d").date()  # noqa: DTZ007
        else:
            # if no end_date was set, default to today
            end_date = date.today()  # noqa: DTZ011
        if args.start_date:
            start_date = datetime.strptime(args.start_date, "%Y-%m-%d").date()  # noqa: DTZ007
        else:
            # if no start_date was set, default to 30 days before end_date
            start_date = end_date - timedelta(days=30)
    return start_date, end_date

parse_filename(args, start_date)

Parse filename from args and start_date.


args: the arguments provided to the script
start_date: start of reporting period

filename: the filename to use for the csv data
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def parse_filename(args: ArgparseNamespace, start_date: date) -> str:
    """Parse filename from args and start_date.

    Arguments:
    ---------
        args: the arguments provided to the script
        start_date: start of reporting period

    Returns:
    -------
        filename: the filename to use for the csv data

    """
    if args.filename:
        filename = args.filename
    # depending on date args either append the month or the start_date
    elif args.last_month:
        date_part = f"{start_date.strftime('%Y')}_{start_date.strftime('%m')}"
        filename = f"{args.station_name_short}_{date_part}.{args.filetype}"
    else:
        filename = (
            f"{args.station_name_short}_"
            f"{start_date.strftime('%Y-%m-%d')}.{args.filetype}"
        )
    return filename

send_message(msg, server='127.0.0.1', login=None, password=None)

Send email.


msg: The message to send (an email.messag.Message object)
server: The SMTP server to use to send the email.
login: The username for `sender`@`server`.
password: The password for `sender`@`server`.
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def send_message(
    msg: MIMEMultipart,
    server: str = "127.0.0.1",
    login: str | None = None,
    password: str | None = None,
) -> None:
    """Send email.

    Arguments:
    ---------
        msg: The message to send (an email.messag.Message object)
        server: The SMTP server to use to send the email.
        login: The username for `sender`@`server`.
        password: The password for `sender`@`server`.

    """
    with SMTP(server) as smtp:
        smtp.starttls()
        if password:
            if login:
                smtp.login(login, password)
            else:
                smtp.login(msg["From"], password)
        smtp.send_message(msg)

validate_arguments(parser, args)

Validate the arguments provided to the script.

After this function we are sure that there are no conflicts in the arguments.


parser: the ArgumentParser to use for throwing errors
args: the arguments to validate
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def validate_arguments(parser: ArgumentParser, args: ArgparseNamespace) -> None:
    """Validate the arguments provided to the script.

    After this function we are sure that there are no conflicts in the arguments.

    Arguments:
    ---------
        parser: the ArgumentParser to use for throwing errors
        args: the arguments to validate

    """
    msgs = []
    # check length of bearer_token
    if not len(args.bearer_token) >= _ACRTOKEN_MAXLEN:
        msgs.append(
            "".join(
                (
                    "wrong format on bearer_token, "
                    "expected larger than 32 characters "
                    f"but got {len(args.bearer_token)}"
                ),
            ),
        )
    # check length of stream_id
    if len(args.stream_id) not in [9, 10]:
        msgs.append(
            (
                "wrong format on stream_id, "
                f"expected 9 or 10 characters but got {len(args.stream_id)}"
            ),
        )
    # one output option has to be set
    if not (args.file or args.email or args.stdout):
        msgs.append(
            "no output option has been set, specify one of --file, --email or --stdout",
        )
    # xlsx cannot be printed to stdout
    if args.stdout and args.filetype == "xlsx":
        msgs.append("xlsx cannot be printed to stdout, please set --filetype to csv")
    # last_month is in conflict with start_date and end_date
    if args.last_month and (args.start_date or args.end_date):
        msgs.append("argument --last_month not allowed with --start_date or --end_date")
    # exit if there are error messages
    if msgs:
        parser.error("\n- " + "\n- ".join(msgs))

write_csv(filename, csv)

Write contents of csv to file.


filename: The file to write to.
csv: The data to write to `filename`.
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def write_csv(filename: str, csv: str) -> None:  # pragma: no cover
    """Write contents of `csv` to file.

    Arguments:
    ---------
        filename: The file to write to.
        csv: The data to write to `filename`.

    """
    with Path(filename).open("w", encoding="utf-8") as csvfile:
        csvfile.write(csv)

write_xlsx(filename, xlsx)

Write contents of xlsx to file.


filename: The file to write to.
xlsx: The data to write to `filename`.
Source code in suisa_sendemeldung/suisa_sendemeldung.py
def write_xlsx(filename: str, xlsx: BytesIO) -> None:  # pragma: no cover
    """Write contents of `xlsx` to file.

    Arguments:
    ---------
        filename: The file to write to.
        xlsx: The data to write to `filename`.

    """
    with Path(filename).open("wb") as xlsxfile:
        xlsxfile.write(xlsx.getvalue())