|
Server : Apache System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64 User : lonias5 ( 3576) PHP Version : 7.3.33 Disable Function : NONE Directory : /proc/self/root/proc/thread-self/root/opt/saltstack/salt/extras-3.10/restic/ |
Upload File : |
"""Restic dataclasses"""
from typing import Literal, Optional, Union, TypedDict, TYPE_CHECKING
import os
from urllib.parse import unquote as url_unquote
import shlex
from pathlib import PurePath
from collections.abc import Generator
from datetime import datetime
from dataclasses import dataclass
from subprocess import CompletedProcess, run as s_run
import sys
from cproc import Proc
if sys.version_info < (3, 11):
import dateutil
ISOFORMAT_USE_DATEUTIL = True
else:
ISOFORMAT_USE_DATEUTIL = False
if TYPE_CHECKING:
from . import Restic
class SQLBackupGroupDict(TypedDict):
"""Return format of SQLBackupGroup.serialize()"""
time: int # unix time
user: str
type: Literal["mysql", "pgsql"]
failover: bool
fin: str # snapshot ID
dbs: dict[str, str] # dbname -> snapshot ID
class BackupDict(TypedDict):
"""Return format of Backup.serialize()"""
failover: bool
snap: str # snapshot ID
time: int # unix time
type: Literal["homedir", "dirs", "pkgacct"]
user: str
@dataclass(init=True)
class ResticRepo:
"""Dataclass holding restic/S3 keys and bucket name
Args:
bucket(str): bucket name
restic_pass(str): restic password
access_key(str): S3 access key
secret_key(str): S3 access key
"""
__module__ = 'restic'
bucket: str
restic_pass: str
access_key: str
secret_key: str
class Snapshot:
"""Represents a restic snapshot
Attributes:
restic (Restic): restic instance this snapshot was found in
id (str): short snapshot ID
tags (list[str]): tags supplied when the snapshot was created
datetime (datetime.datetime): backup creation time
timestamp (int): backup creation time
paths (list[str]): top level paths the snapshot contains
"""
__module__ = 'restic'
def __init__(self, *, restic: 'Restic', data: dict):
self.restic = restic
self.id = str(data['id'])
self.tags: list[str] = [
_tag_quote(x, url_unquote) for x in data.get('tags', [])
]
# time uses RFC3339 / ISO8601 format
# ex: 2023-01-22T11:07:33.30417099-05:00
if ISOFORMAT_USE_DATEUTIL:
self.datetime: datetime = dateutil.parser.isoparse(data['time'])
else:
self.datetime: datetime = datetime.fromisoformat(data['time'])
self.timestamp = int(self.datetime.timestamp())
self.paths: list[str] = list(data['paths'])
def __repr__(self):
return f'Snapshot<{self.id}>'
def listdir(
self, path: Union[str, PurePath]
) -> list[Union['SnapFile', 'SnapDir']]:
"""Returns a list of files/dirs in this snapshot
Args:
path (str | PurePath): full path inside the snapshot to list the
contents of
Raises:
ValueError: requested path was not a full path
ResticError: Any error listing snapshot contents from restic
Returns:
list[SnapFile | SnapDir]: files or directories
"""
return self.restic.listdir(snap=self, path=path)
def scandir(
self, path: Union[str, PurePath]
) -> Generator[Union['SnapFile', 'SnapDir'], None, None]:
"""Iterates files/dirs in this snapshot
Args:
path (str | PurePath): full path inside the snapshot to list the
contents of
Raises:
ValueError: requested path was not a full path
ResticError: Any error listing snapshot contents from restic
Yields:
Generator[SnapFile | SnapDir, None, None]: files or directories
"""
return self.restic.scandir(snap=self, path=path)
def restore(
self,
*,
includes: Optional[list[Union[str, PurePath]]] = None,
excludes: Optional[list[Union[str, PurePath]]] = None,
target: Union[str, PurePath] = '/',
) -> 'ResticCmd':
"""Calls a ResticCmd to restore from this snapshot
Args:
includes (list[str | PurePath], optional): --include paths
excludes (list[str | PurePath], optional): --exclude paths
target (str | PurePath): base directory prefix to restore to.
Defaults to '/', which restores to the original path
Returns:
ResticCmd: restic command executor
"""
return self.restic.restore(
self, includes=includes, excludes=excludes, target=target
)
def dump(self, filename: Union[str, PurePath]) -> 'ResticCmd':
"""Crafts a ResticCmd to dump a file from this snapshot
Args:
filename (str | PurePath): filename to retrieve
Returns:
ResticCmd: restic command executor
"""
return self.restic.dump(self, filename)
def forget(self, *, prune: bool = False, no_lim: bool = True):
"""Run restic forget on this snapshot
Args:
prune (bool): whether to automatically run prune if at
least one snapshot was removed. Defaults to False
no_lim (bool): do not CPU limit the command as it runs regardless
of the lim arg in Restic
Raises:
ResticError: if the restic forget command fails
"""
return self.restic.forget(self.id, prune=prune, no_lim=no_lim)
def _tag_quote(tag: str, func) -> str:
if ':' in tag[1:-1]:
prefix, suffix = tag.split(':', maxsplit=1)
return f"{prefix}:{func(suffix)}"
return tag
class ResticCmd:
"""Return type of Restic's build() function. Can be cast to a str() to
get a shell-escaped representation of the command.
Attributes:
.cmd (list[str]): Popen arguments
"""
__module__ = 'restic'
def __init__(
self,
cmd,
restic: 'Restic',
):
self.cmd = cmd
self.restic = restic
def __str__(self) -> str:
return shlex.join(self.cmd)
def run(
self,
*,
stdout: Union[int, None] = Proc.DEVNULL,
stderr: Union[int, None] = Proc.PIPE,
stdin: Union[int, None] = None,
input: Union[str, None] = None, # pylint: disable=redefined-builtin
check: bool = False,
timeout: Union[int, float, None] = None,
no_lim: bool = False,
**kwargs,
) -> CompletedProcess:
"""Execute the restic command and return a CompletedProcess
Args:
stdout (int | None): stdout redirection. Defaults to DEVNULL
stderr (int | None): stderr redirection. Defaults to PIPE (because
ResticError will need this if you raise it using the result)
stdin (int | None): stdin redirection. Defaults to None
input (bytes | str | None): text to send to stdin. Do not use the
stdin= kwarg if you use input=
timeout (int | float | None): optional command timeout
check (bool): if set True, raise CalledProcessError on non-zero
exit codes. Defaults to False
no_lim (bool): do not CPU limit the command as it runs regardless
of the lim arg in Restic
Raises:
CalledProcessError: program exited with a non-zero exit code
and check=True was specified
TimeoutExpired: program took too long to execute and timeout=
was specified
Returns:
CompletedProcess: process results
"""
kwargs.update(
{
'encoding': 'UTF-8',
'env': self.restic.env,
'stdout': stdout,
'stderr': stderr,
'check': check,
'shell': False,
'timeout': timeout,
}
)
if input is None:
kwargs['stdin'] = stdin
else:
kwargs['input'] = input
if no_lim or self.restic.lim is None:
# pylint: disable=subprocess-run-check
return s_run(self.cmd, **kwargs)
return Proc.run(self.cmd, lim=self.restic.lim, **kwargs)
def execv(self):
"""runs os.execv with the given restic args/env, replacing the
current process with restic"""
os.environ.update(self.restic.env)
os.execv(self.cmd[0], self.cmd)
def proc(
self,
*,
stdout: Union[int, None] = Proc.DEVNULL,
stderr: Union[int, None] = Proc.PIPE,
stdin: Union[int, None] = None,
no_lim: bool = False,
**kwargs,
) -> Proc:
"""Start and return the command
Args:
stdout (int | None): stdout redirection. Defaults to DEVNULL
stderr (int | None): stderr redirection. Defaults to PIPE (because
ResticError will need this if you raise it using the result)
stdin (int | None): stdin redirection. Defaults to None
no_lim (bool): do not CPU limit the command as it runs regardless
of the lim arg in Restic
Returns:
cproc.Proc: Running process
"""
kwargs.update(
{
'encoding': 'UTF-8',
'env': self.restic.env,
'stdout': stdout,
'stderr': stderr,
'stdin': stdin,
'shell': False,
}
)
if no_lim or self.restic.lim is None:
# pylint: disable=subprocess-run-check,consider-using-with
return Proc(self.cmd, lim=None, **kwargs)
return Proc(self.cmd, lim=self.restic.lim, **kwargs)
@dataclass(init=True)
class SnapPath:
"""Base class for a remote path in a restic snapshot. When Restic
instantiates this object, it'll be returned as one of its subclasses,
``SnapDir`` or ``SnapFile``"""
__module__ = 'restic'
# SnapPath intentionally does not subclass os.PathLike or os.DirEntry
# because the path isn't mounted anywhere that normal filesystem ops
# will work against it
snapshot: Snapshot
restic: 'Restic'
name: str
type: str
path: str
uid: int
gid: int
mode: Union[int, None]
permissions: Union[str, None]
def __new__(cls, type: str, **_): # pylint: disable=redefined-builtin
if type == 'dir':
return object.__new__(SnapDir)
return object.__new__(SnapFile)
def __str__(self):
return self.path
class SnapFile(SnapPath):
"""A remote file in a restic snapshot
Attributes:
snapshot (Snapshot): snapshot instance this file was found in
restic (Restic): restic instance this file was found in
name (str): base filename
type (str): "file"
path (str): full path
uid (int): original UID of the file when backed up
gid (int): original GID of the file when backed up
mode (int | None): original file mode when backed up
permissions (str | None): original file permissions when backed up
"""
__module__ = 'restic'
def dump(self) -> 'ResticCmd':
"""Crafts a ResticCmd to dump this file's contents
Returns:
ResticCmd: restic command executor
"""
return self.restic.dump(snap=self.snapshot, filename=self.path)
class SnapDir(SnapPath):
"""A remote directory in a restic snapshot
Attributes:
snapshot (Snapshot): snapshot instance this directory was found in
restic (Restic): restic instance this directory was found in
name (str): base directory name
type (str): "dir"
path (str): full path
uid (int): original UID of the directory when backed up
gid (int): original GID of the directory when backed up
mode (int): original directory mode when backed up
permissions (str | None): original directory permissions when backed up
"""
__module__ = 'restic'
def listdir(self) -> list[Union['SnapFile', 'SnapDir']]:
"""List files/dirs found in this path
Raises:
ResticError: Any error listing snapshot contents from restic
Returns:
list[SnapFile | SnapDir]: files or directories
"""
return self.restic.listdir(snap=self.snapshot, path=self.path)
def scandir(self) -> Generator[Union['SnapFile', 'SnapDir'], None, None]:
"""Iterates over files/dirs found in this path
Raises:
ResticError: Any error listing snapshot contents from restic
Yields:
Generator[SnapFile | SnapDir, None, None]: files or directories
"""
return self.restic.scandir(snap=self.snapshot, path=self.path)
class Backup:
"""Object representing a restic snapshot formatted in a way specific to
backups3.x's backup-runner.
Args:
snap (Snapshot): snapshot from Restic.snapshots()
Raises:
KeyError: snapshot did not have required tags (created manually?)
ValueError: __new__() tried to return a sql subclass, but its start:
timestamp tag was invalid
Returns:
(Backup, SQLBackupGroup, SQLBackupItem): depending on the type of
backups 3.x snapshot
Attributes:
snap (Snapshot): snapshot object
failover (bool): whether this is a failover copy
user (str): cPanel username or root
type (str): mysql or pgsql
time (int): creation timestamp
"""
__module__ = 'restic'
def __new__(cls, snap: Snapshot):
if Backup.get_label(snap, 'type') in ('mysql', 'pgsql'):
if 'finished' in snap.tags:
return object.__new__(SQLBackupGroup)
return object.__new__(SQLBackupItem)
return object.__new__(cls) # normal Backup()
def __init__(self, snap: Snapshot):
self.snap = snap
self.failover = 'failover' in snap.tags
self.user = self.get_label(snap, 'user')
self.type = self.get_label(snap, 'type')
if self.type in ('mysql', 'pgsql'):
self.time = int(self.get_label(snap, 'start'))
else:
self.time = snap.timestamp
@staticmethod
def get_label(snap: Snapshot, name: str) -> str:
"""Search for the value of a tag if read in the format 'name:value'
Args:
snap (Snapshot): snapshot object
name (str): name to search for
Raises:
KeyError: if the label was not found
Returns:
str: the value portion of the 'name:value' tag
"""
prefix = f'{name}:'
for tag in snap.tags:
if tag.startswith(prefix):
return tag.split(':', 1)[1]
raise KeyError(name)
def serialize(self) -> dict:
"""Used internally by Restic.get_backups() if serialize=True"""
ret = {
'snap': self.snap.id,
'time': self.time,
'user': self.user,
'type': self.type,
'failover': self.failover,
}
return ret
class SQLBackupGroup(Backup):
"""Holds a group of SQL snapshots created by imh-backup-client, representing
one sql backup run. Backups 3.x stores each database in its own snapshot,
then artificially groups them together as one.
Attributes:
snap (Snapshot): snapshot object for the snapshot signifying the
backup's completion. This snapshot contains no SQL data. See the
``.dbs`` attribute instead for that
failover (bool): whether this is a failover copy
user (str): cPanel username or root
type (str): mysql or pgsql
time (int): creation timestamp
dbs (dict[str, SQLBackupItem]): database names mapped to SQLBackupItems
"""
__module__ = 'restic'
def __init__(self, snap: Snapshot):
super().__init__(snap)
self.dbs: dict[str, SQLBackupItem] = {}
def serialize(self) -> SQLBackupGroupDict:
"""Used internally by Restic.get_backups(serialize=True)"""
ret = super().serialize()
ret['fin'] = ret.pop('snap')
ret['dbs'] = {k: v.serialize() for k, v in self.dbs.items()}
return ret
class SQLBackupItem(Backup):
"""Represents one SQL snapshot in a ``SQLBackupGroup``.
Attributes:
snap (Snapshot): snapshot object
failover (bool): whether this is a failover copy
user (str): cPanel username or root
type (str): mysql or pgsql
time (int): creation timestamp
dbname (str): database name
"""
__module__ = 'restic'
def __init__(self, snap: Snapshot):
super().__init__(snap)
self.dbname = Backup.get_label(snap, 'dbname')
def dump(self) -> ResticCmd:
"""Crafts a ResticCmd to dump this file's contents
Returns:
ResticCmd: restic command executor
"""
return self.snap.restic.dump(snap=self.snap, filename=self.dbname)
def serialize(self) -> dict:
"""Used internally by Restic.get_backups(serialize=True)"""
return self.snap.id
BakTypeStr = Literal["mysql", "pgsql", "homedir", "dirs", "pkgacct"]
UserBackupDicts = dict[
BakTypeStr, Union[list[BackupDict], list[SQLBackupGroupDict]]
]
UserBackups = dict[BakTypeStr, Union[list[Backup], list[SQLBackupGroup]]]