# SPDX-FileCopyrightText: 2023 Gregor Kleen # # SPDX-License-Identifier: GPL-3.0-or-later import os, sys from os import environ from pwd import getpwnam import kubernetes as k8s from pathlib import Path import argparse import logging from base64 import b64decode from minio import Minio from http.client import IncompleteRead from time import sleep from random import uniform from datetime import datetime, timezone, timedelta from dateutil.tz import gettz, UTC import pytimeparse import re from dataclasses import dataclass from collections import OrderedDict, defaultdict, deque from functools import cache import toml import json import subprocess import select import time import math from math import floor TIME_PATTERNS = OrderedDict( [ ("secondly", lambda t: t.strftime("%Y-%m-%d %H:%M:%S")), ("minutely", lambda t: t.strftime("%Y-%m-%d %H:%M")), ("5m", lambda t: (t.strftime("%Y-%m-%d %H"), floor(t.minute / 5) * 5)), ("15m", lambda t: (t.strftime("%Y-%m-%d %H"), floor(t.minute / 15) * 15)), ("hourly", lambda t: t.strftime("%Y-%m-%d %H")), ("4h", lambda t: (t.strftime("%Y-%m-%d"), floor(t.hour / 4) * 4)), ("12h", lambda t: (t.strftime("%Y-%m-%d"), floor(t.hour / 12) * 12)), ("daily", lambda t: t.strftime("%Y-%m-%d")), ( "halfweekly", lambda t: (t.strftime("%G-%V"), floor(int(t.strftime("%u")) / 4) * 4), ), ("weekly", lambda t: t.strftime("%G-%V")), ("monthly", lambda t: t.strftime("%Y-%m")), ("yearly", lambda t: t.strftime("%Y")), ] ) BACKUP_PATTERN = re.compile(r"^(?P[0-9]+)_.*_gitlab_backup\.tar$") BUFFER_SIZE = os.sysconf("SC_PAGESIZE") @dataclass(eq=True, order=True, frozen=True) class GitlabBackup: creation: datetime filename: str version_id: str @dataclass(eq=True, order=True, frozen=True) class KeptBecause: rule: str ix: int period: str def convert_timedelta(secs_str): secs = pytimeparse.parse(secs_str) if secs is None: raise ValueError("Could not parse timedelta expression ‘%s’", secs_str) return timedelta(seconds=secs) def as_borg(): global logger, borg_pwd try: os.setgid(borg_pwd.pw_gid) os.setuid(borg_pwd.pw_uid) except Exception: logger.error(format_exc()) raise def _archive_name(backup, target, archive_prefix): return f"{target}::{_archive_basename(backup, archive_prefix)}" def _archive_basename(backup, archive_prefix): creation_time = backup.creation.astimezone(timezone.utc) return f'{archive_prefix}-{creation_time.strftime("%Y-%m-%dT%H:%M:%S")}' def main(): global logger logger = logging.getLogger(__name__) console_handler = logging.StreamHandler() console_handler.setFormatter( logging.Formatter("[%(levelname)s](%(name)s): %(message)s") ) if sys.stderr.isatty(): console_handler.setFormatter( logging.Formatter("%(asctime)s [%(levelname)s](%(name)s): %(message)s") ) burst_max = 10000 burst = burst_max last_use = None inv_rate = 1e6 def consume_filter(record): nonlocal burst, burst_max, inv_rate, last_use delay = None while True: now = time.monotonic_ns() burst = ( min(burst_max, burst + math.floor((now - last_use) / inv_rate)) if last_use else burst_max ) last_use = now if burst > 0: burst -= 1 if delay: delay = now - delay return True if delay is None: delay = now time.sleep(inv_rate / 1e9) console_handler.addFilter(consume_filter) logger.addHandler(console_handler) # log uncaught exceptions def log_exceptions(type, value, tb): global logger logger.error(value) sys.__excepthook__(type, value, tb) # calls default excepthook sys.excepthook = log_exceptions parser = argparse.ArgumentParser( prog="k8s-gitlab-borg", formatter_class=argparse.ArgumentDefaultsHelpFormatter ) parser.add_argument( "--verbosity", dest="log_level", action="append", type=int, help="Set verbosity to specific level", ) parser.add_argument( "--verbose", "-v", dest="log_level", action="append_const", const=1, help="Increase verbosity by one step", ) parser.add_argument( "--quiet", "-q", dest="log_level", action="append_const", const=-1, help="Decrease verbosity by one step", ) parser.add_argument( "--dry-run", "-n", action="store_true", help="Don't actually cause any changes" ) parser.add_argument( "--k8s-config", type=Path, metavar="FILE", default=Path(environ["CREDENTIALS_DIRECTORY"]) / "k8s.yaml" if "CREDENTIALS_DIRECTORY" in environ else k8s.config.KUBE_CONFIG_DEFAULT_LOCATION, help="Path to YAML file encoding k8s credentials", ) parser.add_argument( "--namespace", type=str, metavar="STR", default="gitlab", help="K8s namespace containing gitlab deployment(s)", ) parser.add_argument("--target", metavar="REPO", help="Borg repository") parser.add_argument( "--archive-prefix", metavar="STR", default="gitlab", help="Prefix for borg archive names", ) parser.add_argument("config", type=Path, metavar="FILE", help="Configuration file") args = parser.parse_args() LOG_LEVELS = [ logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL, ] DEFAULT_LOG_LEVEL = logging.ERROR log_level = LOG_LEVELS.index(DEFAULT_LOG_LEVEL) for adjustment in args.log_level or (): log_level = min(len(LOG_LEVELS) - 1, max(log_level - adjustment, 0)) logger.setLevel(LOG_LEVELS[log_level]) global borg_pwd borg_pwd = getpwnam("borg") config = toml.load(args.config) logger.debug("K8s config file: ‘%s’", args.k8s_config) with k8s.config.new_client_from_config(str(args.k8s_config)) as client: core_api = k8s.client.CoreV1Api(client) services = core_api.list_namespaced_service(namespace=args.namespace).items def is_minio(item): selector = item.spec.selector if "app" not in selector: return False if selector["app"] != "minio": return False if "release" not in selector: return False if selector["release"] != "gitlab": return False return True [minio_svc] = list(filter(is_minio, services)) secrets = core_api.list_namespaced_secret(namespace=args.namespace).items def is_minio(item): return item.metadata.name == "gitlab-minio-secret" [minio_secret] = list(filter(is_minio, secrets)) minio_host = minio_svc.spec.cluster_ip [minio_port] = minio_svc.spec.ports minio_port = minio_port.port logger.debug("Determined minio host: %s:%d", minio_host, minio_port) minio_accesskey = b64decode(minio_secret.data["accesskey"]).decode("utf-8") minio_secretkey = b64decode(minio_secret.data["secretkey"]).decode("utf-8") minio = Minio( f"{minio_host}:{minio_port:d}", secure=False, access_key=minio_accesskey, secret_key=minio_secretkey, ) logger.info("Loading backup list from MinIO...") backups = set() for obj in minio.list_objects("gitlab-backups"): match = BACKUP_PATTERN.match(obj.object_name) if not match: logger.warn("Could not match object name ‘%s’", obj.object_name) continue creation_time = datetime.fromtimestamp(int(match.group("ts"))) backup = GitlabBackup( filename=obj.object_name, creation=creation_time, version_id=obj.version_id, ) logger.debug(backup) backups.add(backup) logger.info("Loading archive list from borg...") archives = set() with subprocess.Popen( ["borg", "list", "--info", "--lock-wait=600", "--json", args.target], stdout=subprocess.PIPE, preexec_fn=as_borg, ) as proc: for archive in json.load(proc.stdout)["archives"]: # logger.debug('Archive: ‘%s’', archive["barchive"]) archives.add(archive["barchive"]) kept_count = defaultdict(lambda: 0) kept_because = OrderedDict() def keep_because(backup, rule, period=None): nonlocal kept_count, kept_because kept_count[rule] += 1 if backup not in kept_because: kept_because[backup] = deque() kept_because[backup].append( KeptBecause(rule=rule, ix=kept_count[rule], period=period) ) pattern_timezone = config.get("k8s-gitlab-borg", {}).get("timezone", None) if pattern_timezone: pattern_timezone = gettz(pattern_timezone) else: pattern_timezone = UTC logger.debug("Rule timezone: %s", pattern_timezone) copy_candidates = set() for rule, pattern in TIME_PATTERNS.items(): desired_count = config.get("copy", {}).get(rule, {}).get("count", 0) periods = OrderedDict() for backup in sorted(backups): period = pattern(backup.creation.astimezone(pattern_timezone)) if period not in periods: periods[period] = deque() periods[period].append(backup) to_exec = desired_count ordered_periods = reversed(periods.items()) for period, period_backups in ordered_periods: if to_exec == 0: break for backup in period_backups: copy_candidates.add(backup) logger.debug( "‘%s’ (%s) is candidate for copying", backup.filename, backup.creation, ) to_exec -= 1 break if to_exec > 0: logger.debug( "Missing %d to fulfill copy %s=%d", to_exec, rule, desired_count ) already_copied = set() for backup in copy_candidates: archive = _archive_basename(backup, args.archive_prefix) if archive in archives: already_copied.add(backup) logger.debug( "‘%s’ (%s, ‘%s’) already copied", backup.filename, backup.creation, archive, ) else: logger.debug( "‘%s’ (%s, ‘%s’) to copy", backup.filename, backup.creation, archive ) copy_candidates -= already_copied copyCount = config.get("copy", {}).get("count", 1) copied = set() for backup in sorted(copy_candidates, reverse=True): if copyCount > 0 and len(copied) >= copyCount: logger.debug("copyCount of %d reached", copyCount) break logger.info("Copying ‘%s’ (%s)...", backup.filename, backup.creation) env = environ.copy() import_args = [ "borg", "import-tar", "--lock-wait=600", "--compression=auto,zstd,10", "--chunker-params=10,23,16,4095", "--show-rc", "--upload-buffer=100", "--progress", "--list", "--filter=AMEi-x?", "--stats" if not args.dry_run else "--dry-run", ] creation_time = backup.creation.astimezone(timezone.utc) import_args += [ f'--timestamp={creation_time.strftime("%Y-%m-%dT%H:%M:%S")}' ] archive_name = _archive_name(backup, args.target, args.archive_prefix) import_args += [archive_name, "-"] logger.debug("%s", {"import_args": import_args, "env": env}) try: def download_iter(): offset = 0 retries = 10 while True: logger.info( "Downloading ‘%s’ (%s, ‘%s’)...", backup.filename, backup.creation, backup.version_id, ) download = minio.get_object( bucket_name="gitlab-backups", object_name=backup.filename, version_id=backup.version_id, offset=offset, ) download_stream = download.stream(amt=1024 * 1024) try: while chunk := next(download_stream, b""): logger.debug("Read chunk of %d bytes", len(chunk)) offset += len(chunk) retries = 10 while pagesize_chunk := chunk[:BUFFER_SIZE]: yield pagesize_chunk chunk = chunk[BUFFER_SIZE:] else: break except IncompleteRead as e: logger.warn( "IncompleteRead, retries=%d, offset=%d", retries, offset, exc_info=True, ) if retries <= 0: logger.error("Max retries exceeded") raise e retries -= 1 sleep(uniform(0, 10)) download_stream = download_iter() with subprocess.Popen( import_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, preexec_fn=lambda: as_borg(), text=True, ) as proc: proc_logger = logger.getChild("borg") stdout_logger = proc_logger.getChild("stdout") stderr_logger = proc_logger.getChild("stderr") poll = select.poll() poll.register(proc.stdin, select.POLLOUT | select.POLLHUP) poll.register(proc.stdout, select.POLLIN | select.POLLHUP) poll.register(proc.stderr, select.POLLIN | select.POLLHUP) pollc = 3 events = poll.poll() while pollc > 0 and len(events) > 0: for rfd, event in events: if event & select.POLLOUT: if rfd == proc.stdin.fileno(): if chunk := next(download_stream, b""): logger.debug( "Writing chunk of length %d...", len(chunk) ) proc.stdin.buffer.write(chunk) logger.debug("Done") else: proc.stdin.close() if event & select.POLLIN: if rfd == proc.stdout.fileno(): if line := proc.stdout.readline(): stdout_logger.info(line[:-1]) if rfd == proc.stderr.fileno(): if line := proc.stderr.readline(): stderr_logger.info(line[:-1]) if event & select.POLLHUP: if rfd == proc.stdin.fileno(): logger.debug("STDIN closed") if rfd == proc.stdout.fileno(): logger.debug("STDOUT closed") if rfd == proc.stderr.fileno(): logger.debug("STDERR closed") poll.unregister(rfd) pollc -= 1 if pollc > 0: events = poll.poll() for handler in proc_logger.handlers: handler.flush() ret = proc.wait() if ret != 0: raise Exception(f"borg subprocess exited with returncode {ret}") finally: download.close() download.release_conn() copied.add(backup) copy_candidates -= copied for candidate in copy_candidates: keep_because(candidate, "copy-candidate") within = convert_timedelta(config.get("keep", {}).get("within", "0s")) logger.debug("Keep within: %s", within) if within > timedelta(seconds=0): time_ref = max(backups, key=lambda backup: backup.creation, default=None) if not time_ref: logger.warn("Nothing to keep") else: logger.info( "Using ‘%s’ (%s) as time reference", time_ref.filename, time_ref.creation, ) within_cutoff = time_ref.creation - within for backup in sorted(backups, reverse=True): if backup.creation >= within_cutoff: keep_because(backup, "within") else: logger.warn("Skipping rule ‘within’ since retention period is zero") for rule, pattern in TIME_PATTERNS.items(): desired_count = config.get("keep", {}).get(rule, {}).get("count", 0) periods = OrderedDict() for backup in sorted(backups): period = pattern(backup.creation.astimezone(pattern_timezone)) if period not in periods: periods[period] = deque() periods[period].append(backup) to_keep = desired_count ordered_periods = reversed(periods.items()) for period, period_backups in ordered_periods: if to_keep == 0: break for backup in period_backups: keep_because(backup, rule, period=period) to_keep -= 1 break if to_keep > 0: logger.debug( "Missing %d to fulfill keep %s=%d", to_keep, rule, desired_count ) for backup, reasons in kept_because.items(): logger.info( "Keeping ‘%s’ (%s) because: %s", backup.filename, backup.creation, ", ".join(map(str, reasons)), ) to_destroy = backups - {*kept_because} if not to_destroy: logger.info("Nothing to prune") else: for backup in sorted(to_destroy): if args.dry_run: logger.info( "Would have pruned ‘%s’ (%s)", backup.filename, backup.creation ) else: minio.remove_object( bucket_name="gitlab-backups", object_name=backup.filename, version_id=backup.version_id, ) logger.info("Pruned ‘%s’ (%s)", backup.filename, backup.creation) if __name__ == "__main__": sys.exit(main())