Compare commits

..

No commits in common. "main" and "v3.0.0" have entirely different histories.
main ... v3.0.0

View File

@ -16,9 +16,6 @@ import logging
from base64 import b64decode
from minio import Minio
from http.client import IncompleteRead
from time import sleep
from random import uniform
from datetime import datetime, timezone, timedelta
from dateutil.tz import gettz, UTC
@ -37,7 +34,6 @@ import json
import subprocess
from select import PIPE_BUF
import select
import time
import math
@ -404,200 +400,64 @@ def main():
import_args += [archive_name, "-"]
logger.debug("%s", {"import_args": import_args, "env": env})
def download_iter():
try:
offset = 0
retries = 10
download = None
while True:
logger.info(
"Downloading %s (%s, %s)...",
backup.filename,
backup.creation,
backup.version_id,
)
if download:
download.close()
download.release_conn()
download = minio.get_object(
bucket_name="gitlab-backups",
object_name=backup.filename,
version_id=backup.version_id,
offset=offset,
)
download_stream = download.stream(amt=1024 * 1024)
try:
while chunk := next(download_stream, b""):
# logger.debug("Read chunk of length %d", len(chunk))
offset += len(chunk)
retries = 10
yield chunk
else:
break
except IncompleteRead as e:
logger.warn(
"IncompleteRead, retries=%d, offset=%d",
retries,
offset,
exc_info=True,
)
if retries <= 0:
logger.error("Max retries exceeded")
raise e
retries -= 1
sleep(uniform(0, 10))
finally:
download.close()
download.release_conn()
download_stream = download_iter()
with subprocess.Popen(
import_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=lambda: as_borg(),
text=True,
) as proc:
proc_logger = logger.getChild("borg")
stdout_logger = proc_logger.getChild("stdout")
stderr_logger = proc_logger.getChild("stderr")
stdin_fd = proc.stdin.fileno()
stdout_fd = proc.stdout.fileno()
stderr_fd = proc.stderr.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, False)
os.set_blocking(stderr_fd, False)
poll = select.poll()
poll.register(
proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR
try:
download = minio.get_object(
bucket_name="gitlab-backups",
object_name=backup.filename,
version_id=backup.version_id,
)
poll.register(
proc.stdout,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
)
poll.register(
proc.stderr,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
)
pollc = 2
# logger.debug("First poll...")
events = poll.poll()
# logger.debug("Done, %d event(s)", len(events))
pipe_buffer = b""
stdout_line_buffer = bytearray(b"")
stderr_line_buffer = bytearray(b"")
newline_trans = bytearray.maketrans(b"\r", b"\n")
while pollc > 0 and len(events) > 0:
for rfd, event in events:
# logger.debug("rfd=%d, event=%x", rfd, event)
if event & select.POLLOUT:
if rfd == stdin_fd:
if chunk := pipe_buffer[:PIPE_BUF]:
# logger.debug(
# "Writing chunk of length %d...", len(chunk)
# )
proc.stdin.buffer.write(chunk)
# logger.debug("Done")
pipe_buffer = pipe_buffer[PIPE_BUF:]
elif pipe_buffer := next(download_stream, b""):
# logger.debug(
# "Accepted chunk of length %d",
# len(pipe_buffer),
# )
pass
else:
proc.stdin.close()
if event & select.POLLIN or event & select.POLLPRI:
if rfd == stdout_fd:
# logger.debug("Reading from stdout...")
if chunk := proc.stdout.buffer.read(PIPE_BUF):
# logger.debug("Done, length %d", len(chunk))
stdout_line_buffer.extend(
chunk.translate(newline_trans)
)
# logger.debug(
# "Buffer at length %d", len(stdout_line_buffer)
# )
while True:
line, sep, rest = stdout_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stdout_logger.info(
line.decode(errors="replace").rstrip()
)
stdout_line_buffer = rest
download_stream = download.stream(decode_content=True)
with subprocess.Popen(
import_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=lambda: as_borg(),
text=True,
) as proc:
proc_logger = logger.getChild("borg")
stdout_logger = proc_logger.getChild("stdout")
stderr_logger = proc_logger.getChild("stderr")
poll = select.poll()
poll.register(proc.stdin, select.POLLOUT | select.POLLHUP)
poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
pollc = 2
events = poll.poll()
while pollc > 0 and len(events) > 0:
for rfd, event in events:
if event & select.POLLOUT:
if rfd == proc.stdin.fileno():
if chunk := next(download_stream, b""):
proc.stdin.buffer.write(chunk)
else:
break
if rfd == stderr_fd:
# logger.debug("Reading from stderr...")
if chunk := proc.stderr.buffer.read(PIPE_BUF):
# logger.debug("Done, length %d", len(chunk))
stderr_line_buffer.extend(
chunk.translate(newline_trans)
)
# logger.debug(
# "Buffer at length %d", len(stderr_line_buffer)
# )
while True:
line, sep, rest = stderr_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stderr_logger.info(
line.decode(errors="replace").rstrip()
)
stderr_line_buffer = rest
else:
break
if event & select.POLLERR:
if rfd == stdin_fd:
logger.error("STDIN error")
if rfd == stdout_fd:
logger.error("STDOUT error")
if rfd == stderr_fd:
logger.error("STDERR error")
if event & select.POLLHUP:
if rfd == stdin_fd:
logger.debug("STDIN closed")
if rfd == stdout_fd:
logger.debug("STDOUT closed")
proc.stdin.close()
if event & select.POLLIN:
if rfd == proc.stdout.fileno():
if line := proc.stdout.readline():
stdout_logger.info(line[:-1])
if rfd == proc.stderr.fileno():
if line := proc.stderr.readline():
stderr_logger.info(line[:-1])
if event & select.POLLHUP:
poll.unregister(rfd)
pollc -= 1
if rfd == stderr_fd:
logger.debug("STDERR closed")
pollc -= 1
poll.unregister(rfd)
if pollc > 0:
# logger.debug("Poll...")
events = poll.poll()
# logger.debug("Done, %d event(s)", len(events))
else:
# logger.debug("Nothing left to poll")
pass
if pollc > 0:
events = poll.poll()
if stdout_line_buffer:
for line in stdout_line_buffer.split(b"\n"):
stdout_logger.info(line)
if stderr_line_buffer:
for line in stderr_line_buffer.split(b"\n"):
stderr_logger.info(line)
for handler in proc_logger.handlers:
handler.flush()
for handler in proc_logger.handlers:
handler.flush()
# logger.debug("Waiting on subprocess...")
ret = proc.wait()
# logger.debug("Done")
if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}")
ret = proc.wait()
if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}")
finally:
download.close()
download.release_conn()
copied.add(backup)