Compare commits
No commits in common. "main" and "v1.0.0" have entirely different histories.
@ -82,14 +82,15 @@ in {
|
|||||||
};
|
};
|
||||||
|
|
||||||
config = mkIf cfg.enable {
|
config = mkIf cfg.enable {
|
||||||
services.k8s-gitlab-borg.config = mkDefault ({
|
services.k8s-gitlab-borg.config = {
|
||||||
keep.within = "15min";
|
k8s-gitlab-borg.timezone = mkIf (config.time.timeZone != null) config.time.timeZone;
|
||||||
keep."4h".count = 6;
|
|
||||||
keep.daily.count = 7;
|
|
||||||
|
|
||||||
copy.daily.count = 1;
|
keep.within = "15min";
|
||||||
}
|
keep."4h".count = 6;
|
||||||
// optionalAttrs (config.time.timeZone != null) {k8s-gitlab-borg.timezone = config.time.timeZone;});
|
keep.daily.count = 7;
|
||||||
|
|
||||||
|
copy.daily.count = 1;
|
||||||
|
};
|
||||||
|
|
||||||
systemd.services."k8s-gitlab-borg" = {
|
systemd.services."k8s-gitlab-borg" = {
|
||||||
description = "Copy GitLab backups to borg";
|
description = "Copy GitLab backups to borg";
|
||||||
|
|||||||
@ -16,9 +16,6 @@ import logging
|
|||||||
from base64 import b64decode
|
from base64 import b64decode
|
||||||
|
|
||||||
from minio import Minio
|
from minio import Minio
|
||||||
from http.client import IncompleteRead
|
|
||||||
from time import sleep
|
|
||||||
from random import uniform
|
|
||||||
|
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
from dateutil.tz import gettz, UTC
|
from dateutil.tz import gettz, UTC
|
||||||
@ -37,7 +34,6 @@ import json
|
|||||||
|
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
from select import PIPE_BUF
|
|
||||||
import select
|
import select
|
||||||
import time
|
import time
|
||||||
import math
|
import math
|
||||||
@ -330,14 +326,14 @@ def main():
|
|||||||
desired_count = config.get("copy", {}).get(rule, {}).get("count", 0)
|
desired_count = config.get("copy", {}).get(rule, {}).get("count", 0)
|
||||||
|
|
||||||
periods = OrderedDict()
|
periods = OrderedDict()
|
||||||
for backup in sorted(backups):
|
for backup in sorted(backups, reverse=True):
|
||||||
period = pattern(backup.creation.astimezone(pattern_timezone))
|
period = pattern(backup.creation.astimezone(pattern_timezone))
|
||||||
if period not in periods:
|
if period not in periods:
|
||||||
periods[period] = deque()
|
periods[period] = deque()
|
||||||
periods[period].append(backup)
|
periods[period].append(backup)
|
||||||
|
|
||||||
to_exec = desired_count
|
to_exec = desired_count
|
||||||
ordered_periods = reversed(periods.items())
|
ordered_periods = periods.items()
|
||||||
for period, period_backups in ordered_periods:
|
for period, period_backups in ordered_periods:
|
||||||
if to_exec == 0:
|
if to_exec == 0:
|
||||||
break
|
break
|
||||||
@ -404,202 +400,64 @@ def main():
|
|||||||
import_args += [archive_name, "-"]
|
import_args += [archive_name, "-"]
|
||||||
logger.debug("%s", {"import_args": import_args, "env": env})
|
logger.debug("%s", {"import_args": import_args, "env": env})
|
||||||
|
|
||||||
def download_iter():
|
try:
|
||||||
try:
|
download = minio.get_object(
|
||||||
offset = 0
|
bucket_name="gitlab-backups",
|
||||||
retries = 10
|
object_name=backup.filename,
|
||||||
download = None
|
version_id=backup.version_id,
|
||||||
while True:
|
|
||||||
logger.info(
|
|
||||||
"Downloading ‘%s’ (%s, ‘%s’)...",
|
|
||||||
backup.filename,
|
|
||||||
backup.creation,
|
|
||||||
backup.version_id,
|
|
||||||
)
|
|
||||||
if download:
|
|
||||||
download.close()
|
|
||||||
download.release_conn()
|
|
||||||
download = minio.get_object(
|
|
||||||
bucket_name="gitlab-backups",
|
|
||||||
object_name=backup.filename,
|
|
||||||
version_id=backup.version_id,
|
|
||||||
offset=offset,
|
|
||||||
)
|
|
||||||
download_stream = download.stream(amt=1024 * 1024)
|
|
||||||
try:
|
|
||||||
while chunk := next(download_stream, b""):
|
|
||||||
# logger.debug("Read chunk of length %d", len(chunk))
|
|
||||||
offset += len(chunk)
|
|
||||||
retries = 10
|
|
||||||
yield chunk
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
except IncompleteRead as e:
|
|
||||||
logger.warn(
|
|
||||||
"IncompleteRead, retries=%d, offset=%d",
|
|
||||||
retries,
|
|
||||||
offset,
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
if retries <= 0:
|
|
||||||
logger.error("Max retries exceeded")
|
|
||||||
raise e
|
|
||||||
retries -= 1
|
|
||||||
sleep(uniform(0, 10))
|
|
||||||
finally:
|
|
||||||
download.close()
|
|
||||||
download.release_conn()
|
|
||||||
|
|
||||||
download_stream = download_iter()
|
|
||||||
|
|
||||||
with subprocess.Popen(
|
|
||||||
import_args,
|
|
||||||
stdin=subprocess.PIPE,
|
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.PIPE,
|
|
||||||
env=env,
|
|
||||||
preexec_fn=lambda: as_borg(),
|
|
||||||
text=True,
|
|
||||||
) as proc:
|
|
||||||
proc_logger = logger.getChild("borg")
|
|
||||||
stdout_logger = proc_logger.getChild("stdout")
|
|
||||||
stderr_logger = proc_logger.getChild("stderr")
|
|
||||||
|
|
||||||
stdin_fd = proc.stdin.fileno()
|
|
||||||
stdout_fd = proc.stdout.fileno()
|
|
||||||
stderr_fd = proc.stderr.fileno()
|
|
||||||
os.set_blocking(stdin_fd, False)
|
|
||||||
os.set_blocking(stdout_fd, False)
|
|
||||||
os.set_blocking(stderr_fd, False)
|
|
||||||
|
|
||||||
poll = select.poll()
|
|
||||||
poll.register(
|
|
||||||
proc.stdin, select.POLLOUT | select.POLLHUP | select.POLLERR
|
|
||||||
)
|
)
|
||||||
poll.register(
|
download_stream = download.stream(decode_content=True)
|
||||||
proc.stdout,
|
|
||||||
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
|
with subprocess.Popen(
|
||||||
)
|
import_args,
|
||||||
poll.register(
|
stdin=subprocess.PIPE,
|
||||||
proc.stderr,
|
stdout=subprocess.PIPE,
|
||||||
select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR,
|
stderr=subprocess.PIPE,
|
||||||
)
|
env=env,
|
||||||
pollc = 2
|
preexec_fn=lambda: as_borg(),
|
||||||
# logger.debug("First poll...")
|
text=True,
|
||||||
events = poll.poll()
|
) as proc:
|
||||||
# logger.debug("Done, %d event(s)", len(events))
|
proc_logger = logger.getChild("borg")
|
||||||
pipe_buffer = b""
|
stdout_logger = proc_logger.getChild("stdout")
|
||||||
stdout_line_buffer = bytearray(b"")
|
stderr_logger = proc_logger.getChild("stderr")
|
||||||
stderr_line_buffer = bytearray(b"")
|
|
||||||
newline_trans = bytearray.maketrans(b"\r", b"\n")
|
poll = select.poll()
|
||||||
while pollc > 0 and len(events) > 0:
|
poll.register(proc.stdin, select.POLLOUT | select.POLLHUP)
|
||||||
for rfd, event in events:
|
poll.register(proc.stdout, select.POLLIN | select.POLLHUP)
|
||||||
# logger.debug("rfd=%d, event=%x", rfd, event)
|
poll.register(proc.stderr, select.POLLIN | select.POLLHUP)
|
||||||
if event & select.POLLOUT:
|
pollc = 2
|
||||||
if rfd == stdin_fd:
|
events = poll.poll()
|
||||||
if chunk := pipe_buffer[:PIPE_BUF]:
|
while pollc > 0 and len(events) > 0:
|
||||||
# logger.debug(
|
for rfd, event in events:
|
||||||
# "Writing chunk of length %d...", len(chunk)
|
if event & select.POLLOUT:
|
||||||
# )
|
if rfd == proc.stdin.fileno():
|
||||||
proc.stdin.buffer.write(chunk)
|
if chunk := next(download_stream, b""):
|
||||||
# logger.debug("Done")
|
proc.stdin.buffer.write(chunk)
|
||||||
pipe_buffer = pipe_buffer[PIPE_BUF:]
|
|
||||||
elif pipe_buffer := next(download_stream, b""):
|
|
||||||
# logger.debug(
|
|
||||||
# "Accepted chunk of length %d",
|
|
||||||
# len(pipe_buffer),
|
|
||||||
# )
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
proc.stdin.close()
|
|
||||||
if event & select.POLLIN or event & select.POLLPRI:
|
|
||||||
if rfd == stdout_fd:
|
|
||||||
# logger.debug("Reading from stdout...")
|
|
||||||
if chunk := proc.stdout.buffer.read(PIPE_BUF):
|
|
||||||
# logger.debug("Done, length %d", len(chunk))
|
|
||||||
stdout_line_buffer.extend(
|
|
||||||
chunk.translate(newline_trans)
|
|
||||||
)
|
|
||||||
# logger.debug(
|
|
||||||
# "Buffer at length %d", len(stdout_line_buffer)
|
|
||||||
# )
|
|
||||||
while True:
|
|
||||||
line, sep, rest = stdout_line_buffer.partition(
|
|
||||||
b"\n"
|
|
||||||
)
|
|
||||||
# logger.debug("Split: %r, %r, %r", line, sep, rest)
|
|
||||||
if sep:
|
|
||||||
stdout_logger.info(
|
|
||||||
line.decode(errors="replace").rstrip()
|
|
||||||
)
|
|
||||||
stdout_line_buffer = rest
|
|
||||||
else:
|
else:
|
||||||
break
|
proc.stdin.close()
|
||||||
if rfd == stderr_fd:
|
if event & select.POLLIN:
|
||||||
# logger.debug("Reading from stderr...")
|
if rfd == proc.stdout.fileno():
|
||||||
if chunk := proc.stderr.buffer.read(PIPE_BUF):
|
if line := proc.stdout.readline():
|
||||||
# logger.debug("Done, length %d", len(chunk))
|
stdout_logger.info(line[:-1])
|
||||||
stderr_line_buffer.extend(
|
if rfd == proc.stderr.fileno():
|
||||||
chunk.translate(newline_trans)
|
if line := proc.stderr.readline():
|
||||||
)
|
stderr_logger.info(line[:-1])
|
||||||
# logger.debug(
|
if event & select.POLLHUP:
|
||||||
# "Buffer at length %d", len(stderr_line_buffer)
|
poll.unregister(rfd)
|
||||||
# )
|
|
||||||
while True:
|
|
||||||
line, sep, rest = stderr_line_buffer.partition(
|
|
||||||
b"\n"
|
|
||||||
)
|
|
||||||
# logger.debug("Split: %r, %r, %r", line, sep, rest)
|
|
||||||
if sep:
|
|
||||||
stderr_logger.info(
|
|
||||||
line.decode(errors="replace").rstrip()
|
|
||||||
)
|
|
||||||
stderr_line_buffer = rest
|
|
||||||
else:
|
|
||||||
break
|
|
||||||
if event & select.POLLERR:
|
|
||||||
if rfd == stdin_fd:
|
|
||||||
logger.error("STDIN error")
|
|
||||||
if rfd == stdout_fd:
|
|
||||||
logger.error("STDOUT error")
|
|
||||||
if rfd == stderr_fd:
|
|
||||||
logger.error("STDERR error")
|
|
||||||
if event & select.POLLHUP:
|
|
||||||
if rfd == stdin_fd:
|
|
||||||
logger.debug("STDIN closed")
|
|
||||||
if rfd == stdout_fd:
|
|
||||||
logger.debug("STDOUT closed")
|
|
||||||
pollc -= 1
|
pollc -= 1
|
||||||
if rfd == stderr_fd:
|
|
||||||
logger.debug("STDERR closed")
|
|
||||||
pollc -= 1
|
|
||||||
poll.unregister(rfd)
|
|
||||||
|
|
||||||
if pollc > 0:
|
if pollc > 0:
|
||||||
# logger.debug("Poll...")
|
events = poll.poll()
|
||||||
events = poll.poll()
|
|
||||||
# logger.debug("Done, %d event(s)", len(events))
|
|
||||||
else:
|
|
||||||
# logger.debug("Nothing left to poll")
|
|
||||||
pass
|
|
||||||
|
|
||||||
if stdout_line_buffer:
|
for handler in proc_logger.handlers:
|
||||||
for line in stdout_line_buffer.split(b"\n"):
|
handler.flush()
|
||||||
stdout_logger.info(line)
|
|
||||||
if stderr_line_buffer:
|
|
||||||
for line in stderr_line_buffer.split(b"\n"):
|
|
||||||
stderr_logger.info(line)
|
|
||||||
|
|
||||||
for handler in proc_logger.handlers:
|
ret = proc.wait()
|
||||||
handler.flush()
|
if ret != 0:
|
||||||
|
raise Exception(f"borg subprocess exited with returncode {ret}")
|
||||||
# logger.debug("Waiting on subprocess...")
|
finally:
|
||||||
ret = proc.wait()
|
download.close()
|
||||||
# logger.debug("Done")
|
download.release_conn()
|
||||||
if ret != 0:
|
|
||||||
raise Exception(f"borg subprocess exited with returncode {ret}")
|
|
||||||
|
|
||||||
copied.add(backup)
|
|
||||||
|
|
||||||
copy_candidates -= copied
|
copy_candidates -= copied
|
||||||
for candidate in copy_candidates:
|
for candidate in copy_candidates:
|
||||||
@ -629,14 +487,14 @@ def main():
|
|||||||
desired_count = config.get("keep", {}).get(rule, {}).get("count", 0)
|
desired_count = config.get("keep", {}).get(rule, {}).get("count", 0)
|
||||||
|
|
||||||
periods = OrderedDict()
|
periods = OrderedDict()
|
||||||
for backup in sorted(backups):
|
for backup in sorted(backups, reverse=True):
|
||||||
period = pattern(backup.creation.astimezone(pattern_timezone))
|
period = pattern(backup.creation.astimezone(pattern_timezone))
|
||||||
if period not in periods:
|
if period not in periods:
|
||||||
periods[period] = deque()
|
periods[period] = deque()
|
||||||
periods[period].append(backup)
|
periods[period].append(backup)
|
||||||
|
|
||||||
to_keep = desired_count
|
to_keep = desired_count
|
||||||
ordered_periods = reversed(periods.items())
|
ordered_periods = periods.items()
|
||||||
for period, period_backups in ordered_periods:
|
for period, period_backups in ordered_periods:
|
||||||
if to_keep == 0:
|
if to_keep == 0:
|
||||||
break
|
break
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user