extend, not append

This commit is contained in:
Gregor Kleen 2023-05-29 17:10:40 +02:00
parent 6be4f57a49
commit 202e2e391f

View File

@ -404,9 +404,8 @@ def main():
import_args += [archive_name, "-"] import_args += [archive_name, "-"]
logger.debug("%s", {"import_args": import_args, "env": env}) logger.debug("%s", {"import_args": import_args, "env": env})
try: def download_iter():
try:
def download_iter():
offset = 0 offset = 0
retries = 10 retries = 10
while True: while True:
@ -416,6 +415,9 @@ def main():
backup.creation, backup.creation,
backup.version_id, backup.version_id,
) )
if download:
download.close()
download.release_conn()
download = minio.get_object( download = minio.get_object(
bucket_name="gitlab-backups", bucket_name="gitlab-backups",
object_name=backup.filename, object_name=backup.filename,
@ -443,141 +445,135 @@ def main():
raise e raise e
retries -= 1 retries -= 1
sleep(uniform(0, 10)) sleep(uniform(0, 10))
finally:
download.close()
download.release_conn()
download_stream = download_iter() download_stream = download_iter()
with subprocess.Popen( with subprocess.Popen(
import_args, import_args,
stdin=subprocess.PIPE, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
env=env, env=env,
preexec_fn=lambda: as_borg(), preexec_fn=lambda: as_borg(),
text=True, text=True,
) as proc: ) as proc:
proc_logger = logger.getChild("borg") proc_logger = logger.getChild("borg")
stdout_logger = proc_logger.getChild("stdout") stdout_logger = proc_logger.getChild("stdout")
stderr_logger = proc_logger.getChild("stderr") stderr_logger = proc_logger.getChild("stderr")
os.set_blocking(proc.stdin.fileno(), False) os.set_blocking(proc.stdin.fileno(), False)
os.set_blocking(proc.stdout.fileno(), False) os.set_blocking(proc.stdout.fileno(), False)
os.set_blocking(proc.stderr.fileno(), False) os.set_blocking(proc.stderr.fileno(), False)
poll = select.poll() poll = select.poll()
poll.register( poll.register(
proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR
) )
poll.register( poll.register(
proc.stdout, proc.stdout,
select.POLLIN select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
| select.POLLPRI )
| select.POLLHUP poll.register(
| select.POLLERR, proc.stderr,
) select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
poll.register( )
proc.stderr, pollc = 3
select.POLLIN logger.debug("First poll...")
| select.POLLPRI events = poll.poll()
| select.POLLHUP logger.debug("Done, %d event(s)", len(events))
| select.POLLERR, pipe_buffer = b""
) stdout_line_buffer = bytearray(b"")
pollc = 3 stderr_line_buffer = bytearray(b"")
logger.debug("First poll...") while pollc > 0 and len(events) > 0:
events = poll.poll() for rfd, event in events:
logger.debug("Done, %d event(s)", len(events)) logger.debug("rfd=%d, event=%x", rfd, event)
pipe_buffer = b"" if event & select.POLLOUT:
stdout_line_buffer = bytearray(b"") if rfd == proc.stdin.fileno():
stderr_line_buffer = bytearray(b"") if chunk := pipe_buffer[:PIPE_BUF]:
while pollc > 0 and len(events) > 0: logger.debug(
for rfd, event in events: "Writing chunk of length %d...", len(chunk)
logger.debug("rfd=%d, event=%x", rfd, event) )
if event & select.POLLOUT: proc.stdin.buffer.write(chunk)
if rfd == proc.stdin.fileno(): logger.debug("Done")
if chunk := pipe_buffer[:PIPE_BUF]: pipe_buffer = pipe_buffer[PIPE_BUF:]
logger.debug( elif pipe_buffer := next(download_stream, b""):
"Writing chunk of length %d...", len(chunk) logger.debug(
) "Accepted chunk of length %d",
proc.stdin.buffer.write(chunk) len(pipe_buffer),
logger.debug("Done") )
pipe_buffer = pipe_buffer[PIPE_BUF:] else:
elif pipe_buffer := next(download_stream, b""): proc.stdin.close()
logger.debug( if event & select.POLLIN or event & select.POLLPRI:
"Accepted chunk of length %d", if rfd == proc.stdout.fileno():
len(pipe_buffer), logger.debug("Reading from stdout...")
) if chunk := proc.stdout.read(PIPE_BUF):
logger.debug("Done, length %d", len(chunk))
stdout_line_buffer.extend(chunk)
while True:
line, sep, rest = stdout_line_buffer.partition(
b"\n"
)
if sep:
stdout_logger.info(line)
stdout_line_buffer = rest
else: else:
proc.stdin.close() break
if event & select.POLLIN or event & select.POLLPRI: if rfd == proc.stderr.fileno():
if rfd == proc.stdout.fileno(): logger.debug("Reading from stderr...")
logger.debug("Reading from stdout...") if chunk := proc.stderr.read(PIPE_BUF):
if chunk := proc.stdout.read(PIPE_BUF): logger.debug("Done, length %d", len(chunk))
logger.debug("Done, length %d", len(chunk)) stderr_line_buffer.extend(chunk)
stdout_line_buffer.append(chunk) while True:
while True: line, sep, rest = stderr_line_buffer.partition(
line, sep, rest = stdout_line_buffer.partition( b"\n"
b"\n" )
) if sep:
if sep: stderr_logger.info(line)
stdout_logger.info(line) stderr_line_buffer = rest
stdout_line_buffer = rest else:
else: break
break if event & select.POLLERR:
if rfd == proc.stderr.fileno(): if rfd == proc.stdin.fileno():
logger.debug("Reading from stderr...") logger.error("STDIN error")
if chunk := proc.stderr.read(PIPE_BUF): if rfd == proc.stdout.fileno():
logger.debug("Done, length %d", len(chunk)) logger.error("STDOUT error")
stderr_line_buffer.append(chunk) if rfd == proc.stderr.fileno():
while True: logger.error("STDERR error")
line, sep, rest = stderr_line_buffer.partition( if event & select.POLLHUP:
b"\n" if rfd == proc.stdin.fileno():
) logger.debug("STDIN closed")
if sep: if rfd == proc.stdout.fileno():
stderr_logger.info(line) logger.debug("STDOUT closed")
stderr_line_buffer = rest if rfd == proc.stderr.fileno():
else: logger.debug("STDERR closed")
break poll.unregister(rfd)
if event & select.POLLERR: pollc -= 1
if rfd == proc.stdin.fileno():
logger.error("STDIN error")
if rfd == proc.stdout.fileno():
logger.error("STDOUT error")
if rfd == proc.stderr.fileno():
logger.error("STDERR error")
if event & select.POLLHUP:
if rfd == proc.stdin.fileno():
logger.debug("STDIN closed")
if rfd == proc.stdout.fileno():
logger.debug("STDOUT closed")
if rfd == proc.stderr.fileno():
logger.debug("STDERR closed")
poll.unregister(rfd)
pollc -= 1
if pollc > 0: if pollc > 0:
logger.debug("Poll...") logger.debug("Poll...")
events = poll.poll() events = poll.poll()
logger.debug("Done, %d event(s)", len(events)) logger.debug("Done, %d event(s)", len(events))
else: else:
logger.debug("Nothing left to poll") logger.debug("Nothing left to poll")
if stdout_line_buffer: if stdout_line_buffer:
for line in stdout_line_buffer.split(b"\n"): for line in stdout_line_buffer.split(b"\n"):
stdout_logger.info(line) stdout_logger.info(line)
if stderr_line_buffer: if stderr_line_buffer:
for line in stderr_line_buffer.split(b"\n"): for line in stderr_line_buffer.split(b"\n"):
stderr_logger.info(line) stderr_logger.info(line)
for handler in proc_logger.handlers: for handler in proc_logger.handlers:
handler.flush() handler.flush()
logger.debug("Waiting on subprocess...") logger.debug("Waiting on subprocess...")
ret = proc.wait() ret = proc.wait()
logger.debug("Done") logger.debug("Done")
if ret != 0: if ret != 0:
raise Exception(f"borg subprocess exited with returncode {ret}") raise Exception(f"borg subprocess exited with returncode {ret}")
finally:
download.close()
download.release_conn()
copied.add(backup) copied.add(backup)