Compare commits

...

21 Commits
v3.0.0 ... main

Author SHA1 Message Date
22000ec4c0 don't wait for stdin to close 2023-05-29 17:48:30 +02:00
6bb276b509 strip output only right 2023-05-29 17:47:10 +02:00
1045dddb46 memoize stdin/stdout/stderr fds 2023-05-29 17:42:12 +02:00
aa1f9dbd1c reduce logging 2023-05-29 17:34:36 +02:00
8044d32ad8 strip output 2023-05-29 17:27:12 +02:00
43d85f597d decode output 2023-05-29 17:24:36 +02:00
057004d7da translate borg output line endings 2023-05-29 17:21:30 +02:00
95973ae2c8 more logging 2023-05-29 17:18:16 +02:00
1003ca01bd read from buffers directly 2023-05-29 17:14:24 +02:00
b757e4bc23 initialize local var 2023-05-29 17:12:19 +02:00
202e2e391f extend, not append 2023-05-29 17:10:40 +02:00
6be4f57a49 nonblocking operation 2023-05-29 17:06:18 +02:00
9027c83f94 events hex 2023-05-29 16:44:43 +02:00
d7d5e6ad5a more logging 2023-05-29 16:43:20 +02:00
f41a21208e move pipe chunking downstream 2023-05-29 16:30:56 +02:00
f455d9f4d6 better(?) polling 2023-05-29 16:25:32 +02:00
b8076256cb poll with timeout 2023-05-29 16:20:23 +02:00
47eb6ee63b more logging, PAGESIZE chunks to STDIN 2023-05-29 15:50:03 +02:00
86767b2e39 better logging 2023-05-29 15:31:44 +02:00
bab70ee743 Work on retry logic, more logging 2023-05-29 15:14:52 +02:00
ace6dbd49f retry download if minio closes connection 2023-05-29 14:40:35 +02:00

View File

@ -16,6 +16,9 @@ import logging
from base64 import b64decode
from minio import Minio
from http.client import IncompleteRead
from time import sleep
from random import uniform
from datetime import datetime, timezone, timedelta
from dateutil.tz import gettz, UTC
@ -34,6 +37,7 @@ import json
import subprocess
from select import PIPE_BUF
import select
import time
import math
@ -400,64 +404,200 @@ def main():
import_args += [archive_name, "-"]
logger.debug("%s", {"import_args": import_args, "env": env})
try:
download = minio.get_object(
bucket_name="gitlab-backups",
object_name=backup.filename,
version_id=backup.version_id,
def download_iter():
try:
offset = 0
retries = 10
download = None
while True:
logger.info(
"Downloading %s (%s, %s)...",
backup.filename,
backup.creation,
backup.version_id,
)
if download:
download.close()
download.release_conn()
download = minio.get_object(
bucket_name="gitlab-backups",
object_name=backup.filename,
version_id=backup.version_id,
offset=offset,
)
download_stream = download.stream(amt=1024 * 1024)
try:
while chunk := next(download_stream, b""):
# logger.debug("Read chunk of length %d", len(chunk))
offset += len(chunk)
retries = 10
yield chunk
else:
break
except IncompleteRead as e:
logger.warn(
"IncompleteRead, retries=%d, offset=%d",
retries,
offset,
exc_info=True,
)
if retries <= 0:
logger.error("Max retries exceeded")
raise e
retries -= 1
sleep(uniform(0, 10))
finally:
download.close()
download.release_conn()
download_stream = download_iter()
with subprocess.Popen(
import_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=lambda: as_borg(),
text=True,
) as proc:
proc_logger = logger.getChild("borg")
stdout_logger = proc_logger.getChild("stdout")
stderr_logger = proc_logger.getChild("stderr")
stdin_fd = proc.stdin.fileno()
stdout_fd = proc.stdout.fileno()
stderr_fd = proc.stderr.fileno()
os.set_blocking(stdin_fd, False)
os.set_blocking(stdout_fd, False)
os.set_blocking(stderr_fd, False)
poll = select.poll()
poll.register(
proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR
)
download_stream = download.stream(decode_content=True)
with subprocess.Popen(
import_args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
preexec_fn=lambda: as_borg(),
text=True,
) as proc:
proc_logger = logger.getChild("borg")
stdout_logger = proc_logger.getChild("stdout")
stderr_logger = proc_logger.getChild("stderr")
poll = select.poll()
poll.register(proc.stdin, select.POLLOUT | select.POLLHUP)
poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
pollc = 2
events = poll.poll()
while pollc > 0 and len(events) > 0:
for rfd, event in events:
if event & select.POLLOUT:
if rfd == proc.stdin.fileno():
if chunk := next(download_stream, b""):
proc.stdin.buffer.write(chunk)
poll.register(
proc.stdout,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
)
poll.register(
proc.stderr,
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
)
pollc = 2
# logger.debug("First poll...")
events = poll.poll()
# logger.debug("Done, %d event(s)", len(events))
pipe_buffer = b""
stdout_line_buffer = bytearray(b"")
stderr_line_buffer = bytearray(b"")
newline_trans = bytearray.maketrans(b"\r", b"\n")
while pollc > 0 and len(events) > 0:
for rfd, event in events:
# logger.debug("rfd=%d, event=%x", rfd, event)
if event & select.POLLOUT:
if rfd == stdin_fd:
if chunk := pipe_buffer[:PIPE_BUF]:
# logger.debug(
# "Writing chunk of length %d...", len(chunk)
# )
proc.stdin.buffer.write(chunk)
# logger.debug("Done")
pipe_buffer = pipe_buffer[PIPE_BUF:]
elif pipe_buffer := next(download_stream, b""):
# logger.debug(
# "Accepted chunk of length %d",
# len(pipe_buffer),
# )
pass
else:
proc.stdin.close()
if event & select.POLLIN or event & select.POLLPRI:
if rfd == stdout_fd:
# logger.debug("Reading from stdout...")
if chunk := proc.stdout.buffer.read(PIPE_BUF):
# logger.debug("Done, length %d", len(chunk))
stdout_line_buffer.extend(
chunk.translate(newline_trans)
)
# logger.debug(
# "Buffer at length %d", len(stdout_line_buffer)
# )
while True:
line, sep, rest = stdout_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stdout_logger.info(
line.decode(errors="replace").rstrip()
)
stdout_line_buffer = rest
else:
proc.stdin.close()
if event & select.POLLIN:
if rfd == proc.stdout.fileno():
if line := proc.stdout.readline():
stdout_logger.info(line[:-1])
if rfd == proc.stderr.fileno():
if line := proc.stderr.readline():
stderr_logger.info(line[:-1])
if event & select.POLLHUP:
poll.unregister(rfd)
break
if rfd == stderr_fd:
# logger.debug("Reading from stderr...")
if chunk := proc.stderr.buffer.read(PIPE_BUF):
# logger.debug("Done, length %d", len(chunk))
stderr_line_buffer.extend(
chunk.translate(newline_trans)
)
# logger.debug(
# "Buffer at length %d", len(stderr_line_buffer)
# )
while True:
line, sep, rest = stderr_line_buffer.partition(
b"\n"
)
# logger.debug("Split: %r, %r, %r", line, sep, rest)
if sep:
stderr_logger.info(
line.decode(errors="replace").rstrip()
)
stderr_line_buffer = rest
else:
break
if event & select.POLLERR:
if rfd == stdin_fd:
logger.error("STDIN error")
if rfd == stdout_fd:
logger.error("STDOUT error")
if rfd == stderr_fd:
logger.error("STDERR error")
if event & select.POLLHUP:
if rfd == stdin_fd:
logger.debug("STDIN closed")
if rfd == stdout_fd:
logger.debug("STDOUT closed")
pollc -= 1
if rfd == stderr_fd:
logger.debug("STDERR closed")
pollc -= 1
poll.unregister(rfd)
if pollc > 0:
events = poll.poll()
if pollc > 0:
# logger.debug("Poll...")
events = poll.poll()
# logger.debug("Done, %d event(s)", len(events))
else:
# logger.debug("Nothing left to poll")
pass
for handler in proc_logger.handlers:
handler.flush()
if stdout_line_buffer:
for line in stdout_line_buffer.split(b"\n"):
stdout_logger.info(line)
if stderr_line_buffer:
for line in stderr_line_buffer.split(b"\n"):
stderr_logger.info(line)
ret = proc.wait()
if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}")
finally:
download.close()
download.release_conn()
for handler in proc_logger.handlers:
handler.flush()
# logger.debug("Waiting on subprocess...")
ret = proc.wait()
# logger.debug("Done")
if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}")
copied.add(backup)