From 202e2e391f7f0711f915aa9aca8a15afb1b0ddcb Mon Sep 17 00:00:00 2001 From: Gregor Kleen Date: Mon, 29 May 2023 17:10:40 +0200 Subject: [PATCH] extend, not append --- k8s_gitlab_borg/__main__.py | 254 ++++++++++++++++++------------------ 1 file changed, 125 insertions(+), 129 deletions(-) diff --git a/k8s_gitlab_borg/__main__.py b/k8s_gitlab_borg/__main__.py index ef68990..8d2e916 100644 --- a/k8s_gitlab_borg/__main__.py +++ b/k8s_gitlab_borg/__main__.py @@ -404,9 +404,8 @@ def main(): import_args += [archive_name, "-"] logger.debug("%s", {"import_args": import_args, "env": env}) - try: - - def download_iter(): + def download_iter(): + try: offset = 0 retries = 10 while True: @@ -416,6 +415,9 @@ def main(): backup.creation, backup.version_id, ) + if download: + download.close() + download.release_conn() download = minio.get_object( bucket_name="gitlab-backups", object_name=backup.filename, @@ -443,141 +445,135 @@ def main(): raise e retries -= 1 sleep(uniform(0, 10)) + finally: + download.close() + download.release_conn() - download_stream = download_iter() + download_stream = download_iter() - with subprocess.Popen( - import_args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=env, - preexec_fn=lambda: as_borg(), - text=True, - ) as proc: - proc_logger = logger.getChild("borg") - stdout_logger = proc_logger.getChild("stdout") - stderr_logger = proc_logger.getChild("stderr") + with subprocess.Popen( + import_args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + preexec_fn=lambda: as_borg(), + text=True, + ) as proc: + proc_logger = logger.getChild("borg") + stdout_logger = proc_logger.getChild("stdout") + stderr_logger = proc_logger.getChild("stderr") - os.set_blocking(proc.stdin.fileno(), False) - os.set_blocking(proc.stdout.fileno(), False) - os.set_blocking(proc.stderr.fileno(), False) + os.set_blocking(proc.stdin.fileno(), False) + os.set_blocking(proc.stdout.fileno(), False) + os.set_blocking(proc.stderr.fileno(), False) - poll = select.poll() - poll.register( - proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR - ) - poll.register( - proc.stdout, - select.POLLIN - | select.POLLPRI - | select.POLLHUP - | select.POLLERR, - ) - poll.register( - proc.stderr, - select.POLLIN - | select.POLLPRI - | select.POLLHUP - | select.POLLERR, - ) - pollc = 3 - logger.debug("First poll...") - events = poll.poll() - logger.debug("Done, %d event(s)", len(events)) - pipe_buffer = b"" - stdout_line_buffer = bytearray(b"") - stderr_line_buffer = bytearray(b"") - while pollc > 0 and len(events) > 0: - for rfd, event in events: - logger.debug("rfd=%d, event=%x", rfd, event) - if event & select.POLLOUT: - if rfd == proc.stdin.fileno(): - if chunk := pipe_buffer[:PIPE_BUF]: - logger.debug( - "Writing chunk of length %d...", len(chunk) - ) - proc.stdin.buffer.write(chunk) - logger.debug("Done") - pipe_buffer = pipe_buffer[PIPE_BUF:] - elif pipe_buffer := next(download_stream, b""): - logger.debug( - "Accepted chunk of length %d", - len(pipe_buffer), - ) + poll = select.poll() + poll.register( + proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR + ) + poll.register( + proc.stdout, + select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR, + ) + poll.register( + proc.stderr, + select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR, + ) + pollc = 3 + logger.debug("First poll...") + events = poll.poll() + logger.debug("Done, %d event(s)", len(events)) + pipe_buffer = b"" + stdout_line_buffer = bytearray(b"") + stderr_line_buffer = bytearray(b"") + while pollc > 0 and len(events) > 0: + for rfd, event in events: + logger.debug("rfd=%d, event=%x", rfd, event) + if event & select.POLLOUT: + if rfd == proc.stdin.fileno(): + if chunk := pipe_buffer[:PIPE_BUF]: + logger.debug( + "Writing chunk of length %d...", len(chunk) + ) + proc.stdin.buffer.write(chunk) + logger.debug("Done") + pipe_buffer = pipe_buffer[PIPE_BUF:] + elif pipe_buffer := next(download_stream, b""): + logger.debug( + "Accepted chunk of length %d", + len(pipe_buffer), + ) + else: + proc.stdin.close() + if event & select.POLLIN or event & select.POLLPRI: + if rfd == proc.stdout.fileno(): + logger.debug("Reading from stdout...") + if chunk := proc.stdout.read(PIPE_BUF): + logger.debug("Done, length %d", len(chunk)) + stdout_line_buffer.extend(chunk) + while True: + line, sep, rest = stdout_line_buffer.partition( + b"\n" + ) + if sep: + stdout_logger.info(line) + stdout_line_buffer = rest else: - proc.stdin.close() - if event & select.POLLIN or event & select.POLLPRI: - if rfd == proc.stdout.fileno(): - logger.debug("Reading from stdout...") - if chunk := proc.stdout.read(PIPE_BUF): - logger.debug("Done, length %d", len(chunk)) - stdout_line_buffer.append(chunk) - while True: - line, sep, rest = stdout_line_buffer.partition( - b"\n" - ) - if sep: - stdout_logger.info(line) - stdout_line_buffer = rest - else: - break - if rfd == proc.stderr.fileno(): - logger.debug("Reading from stderr...") - if chunk := proc.stderr.read(PIPE_BUF): - logger.debug("Done, length %d", len(chunk)) - stderr_line_buffer.append(chunk) - while True: - line, sep, rest = stderr_line_buffer.partition( - b"\n" - ) - if sep: - stderr_logger.info(line) - stderr_line_buffer = rest - else: - break - if event & select.POLLERR: - if rfd == proc.stdin.fileno(): - logger.error("STDIN error") - if rfd == proc.stdout.fileno(): - logger.error("STDOUT error") - if rfd == proc.stderr.fileno(): - logger.error("STDERR error") - if event & select.POLLHUP: - if rfd == proc.stdin.fileno(): - logger.debug("STDIN closed") - if rfd == proc.stdout.fileno(): - logger.debug("STDOUT closed") - if rfd == proc.stderr.fileno(): - logger.debug("STDERR closed") - poll.unregister(rfd) - pollc -= 1 + break + if rfd == proc.stderr.fileno(): + logger.debug("Reading from stderr...") + if chunk := proc.stderr.read(PIPE_BUF): + logger.debug("Done, length %d", len(chunk)) + stderr_line_buffer.extend(chunk) + while True: + line, sep, rest = stderr_line_buffer.partition( + b"\n" + ) + if sep: + stderr_logger.info(line) + stderr_line_buffer = rest + else: + break + if event & select.POLLERR: + if rfd == proc.stdin.fileno(): + logger.error("STDIN error") + if rfd == proc.stdout.fileno(): + logger.error("STDOUT error") + if rfd == proc.stderr.fileno(): + logger.error("STDERR error") + if event & select.POLLHUP: + if rfd == proc.stdin.fileno(): + logger.debug("STDIN closed") + if rfd == proc.stdout.fileno(): + logger.debug("STDOUT closed") + if rfd == proc.stderr.fileno(): + logger.debug("STDERR closed") + poll.unregister(rfd) + pollc -= 1 - if pollc > 0: - logger.debug("Poll...") - events = poll.poll() - logger.debug("Done, %d event(s)", len(events)) - else: - logger.debug("Nothing left to poll") + if pollc > 0: + logger.debug("Poll...") + events = poll.poll() + logger.debug("Done, %d event(s)", len(events)) + else: + logger.debug("Nothing left to poll") - if stdout_line_buffer: - for line in stdout_line_buffer.split(b"\n"): - stdout_logger.info(line) - if stderr_line_buffer: - for line in stderr_line_buffer.split(b"\n"): - stderr_logger.info(line) + if stdout_line_buffer: + for line in stdout_line_buffer.split(b"\n"): + stdout_logger.info(line) + if stderr_line_buffer: + for line in stderr_line_buffer.split(b"\n"): + stderr_logger.info(line) - for handler in proc_logger.handlers: - handler.flush() + for handler in proc_logger.handlers: + handler.flush() - logger.debug("Waiting on subprocess...") - ret = proc.wait() - logger.debug("Done") - if ret != 0: - raise Exception(f"borg subprocess exited with returncode {ret}") - finally: - download.close() - download.release_conn() + logger.debug("Waiting on subprocess...") + ret = proc.wait() + logger.debug("Done") + if ret != 0: + raise Exception(f"borg subprocess exited with returncode {ret}") copied.add(backup)