Compare commits

..

No commits in common. "main" and "v2.0.0" have entirely different histories.
main ... v2.0.0

View File

@ -16,9 +16,6 @@ import logging
from base64 import b64decode from base64 import b64decode
from minio import Minio from minio import Minio
from http.client import IncompleteRead
from time import sleep
from random import uniform
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
from dateutil.tz import gettz, UTC from dateutil.tz import gettz, UTC
@ -37,7 +34,6 @@ import json
import subprocess import subprocess
from select import PIPE_BUF
import select import select
import time import time
import math import math
@ -404,202 +400,64 @@ def main():
import_args += [archive_name, "-"] import_args += [archive_name, "-"]
logger.debug("%s", {"import_args": import_args, "env": env}) logger.debug("%s", {"import_args": import_args, "env": env})
def download_iter(): try:
try: download = minio.get_object(
offset = 0 bucket_name="gitlab-backups",
retries = 10 object_name=backup.filename,
download = None version_id=backup.version_id,
while True:
logger.info(
"Downloading %s (%s, %s)...",
backup.filename,
backup.creation,
backup.version_id,
)
if download:
download.close()
download.release_conn()
download = minio.get_object(
bucket_name="gitlab-backups",
object_name=backup.filename,
version_id=backup.version_id,
offset=offset,
)
download_stream = download.stream(amt=1024 * 1024)
try:
while chunk := next(download_stream, b""):
# logger.debug("Read chunk of length %d", len(chunk))
offset += len(chunk)
retries = 10
yield chunk
else:
break
except IncompleteRead as e:
logger.warn(
"IncompleteRead, retries=%d, offset=%d",
retries,
offset,
exc_info=True,
)
if retries <= 0:
logger.error("Max retries exceeded")
raise e
retries -= 1
sleep(uniform(0, 10))
finally:
download.close()
download.release_conn()
download_stream = download_iter()
with subprocess.Popen(
import_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=lambda: as_borg(),
text=True,
) as proc:
proc_logger = logger.getChild("borg")
stdout_logger = proc_logger.getChild("stdout")
stderr_logger = proc_logger.getChild("stderr")
stdin_fd = proc.stdin.fileno()
stdout_fd = proc.stdout.fileno()
stderr_fd = proc.stderr.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, False)
os.set_blocking(stderr_fd, False)
poll = select.poll()
poll.register(
proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR
) )
poll.register( download_stream = download.stream(decode_content=True)
proc.stdout,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR, with subprocess.Popen(
) import_args,
poll.register( stdin=subprocess.PIPE,
proc.stderr, stdout=subprocess.PIPE,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR, stderr=subprocess.PIPE,
) env=env,
pollc = 2 preexec_fn=lambda: as_borg(),
# logger.debug("First poll...") text=True,
events = poll.poll() ) as proc:
# logger.debug("Done, %d event(s)", len(events)) proc_logger = logger.getChild("borg")
pipe_buffer = b"" stdout_logger = proc_logger.getChild("stdout")
stdout_line_buffer = bytearray(b"") stderr_logger = proc_logger.getChild("stderr")
stderr_line_buffer = bytearray(b"")
newline_trans = bytearray.maketrans(b"\r", b"\n") poll = select.poll()
while pollc > 0 and len(events) > 0: poll.register(proc.stdin, select.POLLOUT | select.POLLHUP)
for rfd, event in events: poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
# logger.debug("rfd=%d, event=%x", rfd, event) poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
if event & select.POLLOUT: pollc = 2
if rfd == stdin_fd: events = poll.poll()
if chunk := pipe_buffer[:PIPE_BUF]: while pollc > 0 and len(events) > 0:
# logger.debug( for rfd, event in events:
# "Writing chunk of length %d...", len(chunk) if event & select.POLLOUT:
# ) if rfd == proc.stdin.fileno():
proc.stdin.buffer.write(chunk) if chunk := next(download_stream, b""):
# logger.debug("Done") proc.stdin.buffer.write(chunk)
pipe_buffer = pipe_buffer[PIPE_BUF:]
elif pipe_buffer := next(download_stream, b""):
# logger.debug(
# "Accepted chunk of length %d",
# len(pipe_buffer),
# )
pass
else:
proc.stdin.close()
if event & select.POLLIN or event & select.POLLPRI:
if rfd == stdout_fd:
# logger.debug("Reading from stdout...")
if chunk := proc.stdout.buffer.read(PIPE_BUF):
# logger.debug("Done, length %d", len(chunk))
stdout_line_buffer.extend(
chunk.translate(newline_trans)
)
# logger.debug(
# "Buffer at length %d", len(stdout_line_buffer)
# )
while True:
line, sep, rest = stdout_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stdout_logger.info(
line.decode(errors="replace").rstrip()
)
stdout_line_buffer = rest
else: else:
break proc.stdin.close()
if rfd == stderr_fd: if event & select.POLLIN:
# logger.debug("Reading from stderr...") if rfd == proc.stdout.fileno():
if chunk := proc.stderr.buffer.read(PIPE_BUF): if line := proc.stdout.readline():
# logger.debug("Done, length %d", len(chunk)) stdout_logger.info(line[:-1])
stderr_line_buffer.extend( if rfd == proc.stderr.fileno():
chunk.translate(newline_trans) if line := proc.stderr.readline():
) stderr_logger.info(line[:-1])
# logger.debug( if event & select.POLLHUP:
# "Buffer at length %d", len(stderr_line_buffer) poll.unregister(rfd)
# )
while True:
line, sep, rest = stderr_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stderr_logger.info(
line.decode(errors="replace").rstrip()
)
stderr_line_buffer = rest
else:
break
if event & select.POLLERR:
if rfd == stdin_fd:
logger.error("STDIN error")
if rfd == stdout_fd:
logger.error("STDOUT error")
if rfd == stderr_fd:
logger.error("STDERR error")
if event & select.POLLHUP:
if rfd == stdin_fd:
logger.debug("STDIN closed")
if rfd == stdout_fd:
logger.debug("STDOUT closed")
pollc -= 1 pollc -= 1
if rfd == stderr_fd:
logger.debug("STDERR closed")
pollc -= 1
poll.unregister(rfd)
if pollc > 0: if pollc > 0:
# logger.debug("Poll...") events = poll.poll()
events = poll.poll()
# logger.debug("Done, %d event(s)", len(events))
else:
# logger.debug("Nothing left to poll")
pass
if stdout_line_buffer: for handler in proc_logger.handlers:
for line in stdout_line_buffer.split(b"\n"): handler.flush()
stdout_logger.info(line)
if stderr_line_buffer:
for line in stderr_line_buffer.split(b"\n"):
stderr_logger.info(line)
for handler in proc_logger.handlers: ret = proc.wait()
handler.flush() if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}")
# logger.debug("Waiting on subprocess...") finally:
ret = proc.wait() download.close()
# logger.debug("Done") download.release_conn()
if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}")
copied.add(backup)
copy_candidates -= copied copy_candidates -= copied
for candidate in copy_candidates: for candidate in copy_candidates: