Prv8 Shell
Server : Apache
System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64
User : lonias5 ( 3576)
PHP Version : 7.3.33
Disable Function : NONE
Directory :  /proc/self/root/proc/thread-self/root/opt/saltstack/salt/extras-3.10/restic/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/proc/thread-self/root/opt/saltstack/salt/extras-3.10/restic/data.py
"""Restic dataclasses"""

from typing import Literal, Optional, Union, TypedDict, TYPE_CHECKING
import os
from urllib.parse import unquote as url_unquote
import shlex
from pathlib import PurePath
from collections.abc import Generator
from datetime import datetime
from dataclasses import dataclass
from subprocess import CompletedProcess, run as s_run
import sys
from cproc import Proc

if sys.version_info < (3, 11):
    import dateutil

    ISOFORMAT_USE_DATEUTIL = True
else:
    ISOFORMAT_USE_DATEUTIL = False

if TYPE_CHECKING:
    from . import Restic


class SQLBackupGroupDict(TypedDict):
    """Return format of SQLBackupGroup.serialize()"""

    time: int  # unix time
    user: str
    type: Literal["mysql", "pgsql"]
    failover: bool
    fin: str  # snapshot ID
    dbs: dict[str, str]  # dbname -> snapshot ID


class BackupDict(TypedDict):
    """Return format of Backup.serialize()"""

    failover: bool
    snap: str  # snapshot ID
    time: int  # unix time
    type: Literal["homedir", "dirs", "pkgacct"]
    user: str


@dataclass(init=True)
class ResticRepo:
    """Dataclass holding restic/S3 keys and bucket name

    Args:
        bucket(str): bucket name
        restic_pass(str): restic password
        access_key(str): S3 access key
        secret_key(str): S3 access key
    """

    __module__ = 'restic'

    bucket: str
    restic_pass: str
    access_key: str
    secret_key: str


class Snapshot:
    """Represents a restic snapshot

    Attributes:
        restic (Restic): restic instance this snapshot was found in
        id (str): short snapshot ID
        tags (list[str]): tags supplied when the snapshot was created
        datetime (datetime.datetime): backup creation time
        timestamp (int): backup creation time
        paths (list[str]): top level paths the snapshot contains
    """

    __module__ = 'restic'

    def __init__(self, *, restic: 'Restic', data: dict):
        self.restic = restic
        self.id = str(data['id'])
        self.tags: list[str] = [
            _tag_quote(x, url_unquote) for x in data.get('tags', [])
        ]
        # time uses RFC3339 / ISO8601 format
        # ex: 2023-01-22T11:07:33.30417099-05:00
        if ISOFORMAT_USE_DATEUTIL:
            self.datetime: datetime = dateutil.parser.isoparse(data['time'])
        else:
            self.datetime: datetime = datetime.fromisoformat(data['time'])
        self.timestamp = int(self.datetime.timestamp())
        self.paths: list[str] = list(data['paths'])

    def __repr__(self):
        return f'Snapshot<{self.id}>'

    def listdir(
        self, path: Union[str, PurePath]
    ) -> list[Union['SnapFile', 'SnapDir']]:
        """Returns a list of files/dirs in this snapshot

        Args:
            path (str | PurePath): full path inside the snapshot to list the
                contents of

        Raises:
            ValueError: requested path was not a full path
            ResticError: Any error listing snapshot contents from restic

        Returns:
            list[SnapFile | SnapDir]: files or directories
        """
        return self.restic.listdir(snap=self, path=path)

    def scandir(
        self, path: Union[str, PurePath]
    ) -> Generator[Union['SnapFile', 'SnapDir'], None, None]:
        """Iterates files/dirs in this snapshot

        Args:
            path (str | PurePath): full path inside the snapshot to list the
                contents of

        Raises:
            ValueError: requested path was not a full path
            ResticError: Any error listing snapshot contents from restic

        Yields:
            Generator[SnapFile | SnapDir, None, None]: files or directories
        """
        return self.restic.scandir(snap=self, path=path)

    def restore(
        self,
        *,
        includes: Optional[list[Union[str, PurePath]]] = None,
        excludes: Optional[list[Union[str, PurePath]]] = None,
        target: Union[str, PurePath] = '/',
    ) -> 'ResticCmd':
        """Calls a ResticCmd to restore from this snapshot

        Args:
            includes (list[str | PurePath], optional): --include paths
            excludes (list[str | PurePath], optional): --exclude paths
            target (str | PurePath): base directory prefix to restore to.
                Defaults to '/', which restores to the original path

        Returns:
            ResticCmd: restic command executor
        """
        return self.restic.restore(
            self, includes=includes, excludes=excludes, target=target
        )

    def dump(self, filename: Union[str, PurePath]) -> 'ResticCmd':
        """Crafts a ResticCmd to dump a file from this snapshot

        Args:
            filename (str | PurePath): filename to retrieve

        Returns:
            ResticCmd: restic command executor
        """
        return self.restic.dump(self, filename)

    def forget(self, *, prune: bool = False, no_lim: bool = True):
        """Run restic forget on this snapshot

        Args:
            prune (bool): whether to automatically run prune if at
                least one snapshot was removed. Defaults to False
            no_lim (bool): do not CPU limit the command as it runs regardless
                of the lim arg in Restic

        Raises:
            ResticError: if the restic forget command fails
        """
        return self.restic.forget(self.id, prune=prune, no_lim=no_lim)


def _tag_quote(tag: str, func) -> str:
    if ':' in tag[1:-1]:
        prefix, suffix = tag.split(':', maxsplit=1)
        return f"{prefix}:{func(suffix)}"
    return tag


class ResticCmd:
    """Return type of Restic's build() function. Can be cast to a str() to
    get a shell-escaped representation of the command.

    Attributes:
        .cmd (list[str]): Popen arguments
    """

    __module__ = 'restic'

    def __init__(
        self,
        cmd,
        restic: 'Restic',
    ):
        self.cmd = cmd
        self.restic = restic

    def __str__(self) -> str:
        return shlex.join(self.cmd)

    def run(
        self,
        *,
        stdout: Union[int, None] = Proc.DEVNULL,
        stderr: Union[int, None] = Proc.PIPE,
        stdin: Union[int, None] = None,
        input: Union[str, None] = None,  # pylint: disable=redefined-builtin
        check: bool = False,
        timeout: Union[int, float, None] = None,
        no_lim: bool = False,
        **kwargs,
    ) -> CompletedProcess:
        """Execute the restic command and return a CompletedProcess

        Args:
            stdout (int | None): stdout redirection. Defaults to DEVNULL
            stderr (int | None): stderr redirection. Defaults to PIPE (because
                ResticError will need this if you raise it using the result)
            stdin (int | None): stdin redirection. Defaults to None
            input (bytes | str | None): text to send to stdin. Do not use the
                stdin= kwarg if you use input=
            timeout (int | float | None): optional command timeout
            check (bool): if set True, raise CalledProcessError on non-zero
                exit codes. Defaults to False
            no_lim (bool): do not CPU limit the command as it runs regardless
                of the lim arg in Restic

        Raises:
            CalledProcessError: program exited with a non-zero exit code
                and check=True was specified
            TimeoutExpired: program took too long to execute and timeout=
                was specified

        Returns:
            CompletedProcess: process results
        """
        kwargs.update(
            {
                'encoding': 'UTF-8',
                'env': self.restic.env,
                'stdout': stdout,
                'stderr': stderr,
                'check': check,
                'shell': False,
                'timeout': timeout,
            }
        )
        if input is None:
            kwargs['stdin'] = stdin
        else:
            kwargs['input'] = input
        if no_lim or self.restic.lim is None:
            # pylint: disable=subprocess-run-check
            return s_run(self.cmd, **kwargs)
        return Proc.run(self.cmd, lim=self.restic.lim, **kwargs)

    def execv(self):
        """runs os.execv with the given restic args/env, replacing the
        current process with restic"""
        os.environ.update(self.restic.env)
        os.execv(self.cmd[0], self.cmd)

    def proc(
        self,
        *,
        stdout: Union[int, None] = Proc.DEVNULL,
        stderr: Union[int, None] = Proc.PIPE,
        stdin: Union[int, None] = None,
        no_lim: bool = False,
        **kwargs,
    ) -> Proc:
        """Start and return the command

        Args:
            stdout (int | None): stdout redirection. Defaults to DEVNULL
            stderr (int | None): stderr redirection. Defaults to PIPE (because
                ResticError will need this if you raise it using the result)
            stdin (int | None): stdin redirection. Defaults to None
            no_lim (bool): do not CPU limit the command as it runs regardless
                of the lim arg in Restic

        Returns:
            cproc.Proc: Running process
        """
        kwargs.update(
            {
                'encoding': 'UTF-8',
                'env': self.restic.env,
                'stdout': stdout,
                'stderr': stderr,
                'stdin': stdin,
                'shell': False,
            }
        )
        if no_lim or self.restic.lim is None:
            # pylint: disable=subprocess-run-check,consider-using-with
            return Proc(self.cmd, lim=None, **kwargs)
        return Proc(self.cmd, lim=self.restic.lim, **kwargs)


@dataclass(init=True)
class SnapPath:
    """Base class for a remote path in a restic snapshot. When Restic
    instantiates this object, it'll be returned as one of its subclasses,
    ``SnapDir`` or ``SnapFile``"""

    __module__ = 'restic'

    # SnapPath intentionally does not subclass os.PathLike or os.DirEntry
    # because the path isn't mounted anywhere that normal filesystem ops
    # will work against it
    snapshot: Snapshot
    restic: 'Restic'
    name: str
    type: str
    path: str
    uid: int
    gid: int
    mode: Union[int, None]
    permissions: Union[str, None]

    def __new__(cls, type: str, **_):  # pylint: disable=redefined-builtin
        if type == 'dir':
            return object.__new__(SnapDir)
        return object.__new__(SnapFile)

    def __str__(self):
        return self.path


class SnapFile(SnapPath):
    """A remote file in a restic snapshot

    Attributes:
        snapshot (Snapshot): snapshot instance this file was found in
        restic (Restic): restic instance this file was found in
        name (str): base filename
        type (str): "file"
        path (str): full path
        uid (int): original UID of the file when backed up
        gid (int): original GID of the file when backed up
        mode (int | None): original file mode when backed up
        permissions (str | None): original file permissions when backed up
    """

    __module__ = 'restic'

    def dump(self) -> 'ResticCmd':
        """Crafts a ResticCmd to dump this file's contents

        Returns:
            ResticCmd: restic command executor
        """
        return self.restic.dump(snap=self.snapshot, filename=self.path)


class SnapDir(SnapPath):
    """A remote directory in a restic snapshot

    Attributes:
        snapshot (Snapshot): snapshot instance this directory was found in
        restic (Restic): restic instance this directory was found in
        name (str): base directory name
        type (str): "dir"
        path (str): full path
        uid (int): original UID of the directory when backed up
        gid (int): original GID of the directory when backed up
        mode (int): original directory mode when backed up
        permissions (str | None): original directory permissions when backed up
    """

    __module__ = 'restic'

    def listdir(self) -> list[Union['SnapFile', 'SnapDir']]:
        """List files/dirs found in this path

        Raises:
            ResticError: Any error listing snapshot contents from restic

        Returns:
            list[SnapFile | SnapDir]: files or directories
        """
        return self.restic.listdir(snap=self.snapshot, path=self.path)

    def scandir(self) -> Generator[Union['SnapFile', 'SnapDir'], None, None]:
        """Iterates over files/dirs found in this path

        Raises:
            ResticError: Any error listing snapshot contents from restic

        Yields:
            Generator[SnapFile | SnapDir, None, None]: files or directories
        """
        return self.restic.scandir(snap=self.snapshot, path=self.path)


class Backup:
    """Object representing a restic snapshot formatted in a way specific to
    backups3.x's backup-runner.

    Args:
        snap (Snapshot): snapshot from Restic.snapshots()

    Raises:
        KeyError: snapshot did not have required tags (created manually?)
        ValueError: __new__() tried to return a sql subclass, but its start:
            timestamp tag was invalid

    Returns:
        (Backup, SQLBackupGroup, SQLBackupItem): depending on the type of
            backups 3.x snapshot

    Attributes:
        snap (Snapshot): snapshot object
        failover (bool): whether this is a failover copy
        user (str): cPanel username or root
        type (str): mysql or pgsql
        time (int): creation timestamp
    """

    __module__ = 'restic'

    def __new__(cls, snap: Snapshot):
        if Backup.get_label(snap, 'type') in ('mysql', 'pgsql'):
            if 'finished' in snap.tags:
                return object.__new__(SQLBackupGroup)
            return object.__new__(SQLBackupItem)
        return object.__new__(cls)  # normal Backup()

    def __init__(self, snap: Snapshot):
        self.snap = snap
        self.failover = 'failover' in snap.tags
        self.user = self.get_label(snap, 'user')
        self.type = self.get_label(snap, 'type')
        if self.type in ('mysql', 'pgsql'):
            self.time = int(self.get_label(snap, 'start'))
        else:
            self.time = snap.timestamp

    @staticmethod
    def get_label(snap: Snapshot, name: str) -> str:
        """Search for the value of a tag if read in the format 'name:value'

        Args:
            snap (Snapshot): snapshot object
            name (str): name to search for

        Raises:
            KeyError: if the label was not found

        Returns:
            str: the value portion of the 'name:value' tag
        """
        prefix = f'{name}:'
        for tag in snap.tags:
            if tag.startswith(prefix):
                return tag.split(':', 1)[1]
        raise KeyError(name)

    def serialize(self) -> dict:
        """Used internally by Restic.get_backups() if serialize=True"""
        ret = {
            'snap': self.snap.id,
            'time': self.time,
            'user': self.user,
            'type': self.type,
            'failover': self.failover,
        }
        return ret


class SQLBackupGroup(Backup):
    """Holds a group of SQL snapshots created by imh-backup-client, representing
    one sql backup run. Backups 3.x stores each database in its own snapshot,
    then artificially groups them together as one.

    Attributes:
        snap (Snapshot): snapshot object for the snapshot signifying the
            backup's completion. This snapshot contains no SQL data. See the
            ``.dbs`` attribute instead for that
        failover (bool): whether this is a failover copy
        user (str): cPanel username or root
        type (str): mysql or pgsql
        time (int): creation timestamp
        dbs (dict[str, SQLBackupItem]): database names mapped to SQLBackupItems
    """

    __module__ = 'restic'

    def __init__(self, snap: Snapshot):
        super().__init__(snap)
        self.dbs: dict[str, SQLBackupItem] = {}

    def serialize(self) -> SQLBackupGroupDict:
        """Used internally by Restic.get_backups(serialize=True)"""
        ret = super().serialize()
        ret['fin'] = ret.pop('snap')
        ret['dbs'] = {k: v.serialize() for k, v in self.dbs.items()}
        return ret


class SQLBackupItem(Backup):
    """Represents one SQL snapshot in a ``SQLBackupGroup``.

    Attributes:
        snap (Snapshot): snapshot object
        failover (bool): whether this is a failover copy
        user (str): cPanel username or root
        type (str): mysql or pgsql
        time (int): creation timestamp
        dbname (str): database name
    """

    __module__ = 'restic'

    def __init__(self, snap: Snapshot):
        super().__init__(snap)
        self.dbname = Backup.get_label(snap, 'dbname')

    def dump(self) -> ResticCmd:
        """Crafts a ResticCmd to dump this file's contents

        Returns:
            ResticCmd: restic command executor
        """
        return self.snap.restic.dump(snap=self.snap, filename=self.dbname)

    def serialize(self) -> dict:
        """Used internally by Restic.get_backups(serialize=True)"""
        return self.snap.id


BakTypeStr = Literal["mysql", "pgsql", "homedir", "dirs", "pkgacct"]
UserBackupDicts = dict[
    BakTypeStr, Union[list[BackupDict], list[SQLBackupGroupDict]]
]
UserBackups = dict[BakTypeStr, Union[list[Backup], list[SQLBackupGroup]]]

@StableExploit - 2025