Prv8 Shell
Server : Apache
System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64
User : lonias5 ( 3576)
PHP Version : 7.3.33
Disable Function : NONE
Directory :  /usr/lib/imh-whmapi/venv/lib/python3.13/site-packages/rads/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/lib/imh-whmapi/venv/lib/python3.13/site-packages/rads/vz.py
"""VZ / HA functions"""

import enum
import json
import shlex
import subprocess
from pathlib import Path
from typing import TypedDict
from collections.abc import Iterable
import distro


class CT:
    """Create a CT object representing a VZ Container

    Args:
        ctid (str): container ID to collect information from. All vzctl/prlctl
            actions will interact with this container.

    Raises:
        VZError: if vzlist fails to run
    """

    def __init__(self, ctid: str):
        self.ctid = str(ctid)

        # List of options for vzlist
        opts = [
            "cpulimit",
            "cpus",
            "cpuunits",
            "ctid",
            "description",
            "device",
            "disabled",
            "diskinodes",
            "diskspace",
            "hostname",
            "ip",
            "laverage",
            "name",
            "netfilter",
            "numiptent",
            "numproc",
            "onboot",
            "ostemplate",
            "physpages",
            "private",
            "root",
            "status",
            "swappages",
            "uptime",
            "uuid",
            "veid",
        ]

        # Build vzlist command
        command = ["vzlist", "-jo", ",".join(opts), self.ctid]

        # Execute vzlist command
        try:
            result = subprocess.run(
                command, capture_output=True, encoding='utf-8', check=True
            )
        except subprocess.CalledProcessError as exc:
            raise VZError(f"{exc.stderr}") from exc
        except OSError as exc:
            raise VZError(str(exc)) from exc

        # Parse vzlist command result
        data: dict = json.loads(result.stdout)[0]

        self.cpulimit: int = data.get("cpulimit", 0)
        self.cpus: int = data.get("cpus", 0)
        self.cpuunits: int = data.get("cpuunits", 0)
        self.ctid: int | str = data.get("ctid", "")
        self.description: str = data.get("description", "")
        self.device: str = data.get("device", "")
        self.disabled: bool = data.get("disabled", False)
        self.diskinodes: dict = data.get("diskinodes", {})
        self.diskspace: dict = data.get("diskspace", {})
        self.hostname: str = data.get("hostname", "")
        self.ip: list[str] = data.get("ip", [])
        self.laverage: list[float] = data.get("laverage", [])
        self.name: str = data.get("name", "")
        self.netfilter: str = data.get("netfilter", "")
        self.numiptent: dict = data.get("numiptent", {})
        self.numproc: dict = data.get("numproc", {})
        self.onboot: bool = data.get("onboot", False)
        self.ostemplate: str = data.get("ostemplate", "")
        self.physpages: dict = data.get("physpages", {})
        self.private: str = data.get("private", "")
        self.root: str = data.get("root", "")
        self.status: str = data.get("status", "")
        self.swappages: dict = data.get("swappages", {})
        self.uptime: float = data.get("uptime", 0.0)
        self.uuid: str = data.get("uuid", "")
        self.veid: str = data.get("veid", "")

    def __repr__(self):
        attrs = ", ".join(
            f"{key}={repr(getattr(self, key))}" for key in self.__dict__
        )
        return f"CT({attrs})"

    def start(self) -> subprocess.CompletedProcess:
        """Starts the container using 'vzctl start'

        Returns:
            subprocess.CompletedProcess: result of the 'vzctl start' command
        """
        command = ["vzctl", "start", self.ctid]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def stop(self) -> subprocess.CompletedProcess:
        """Stops the container using 'vzctl stop'

        Returns:
            subprocess.CompletedProcess: result of the 'vzctl stop' command
        """
        command = ["vzctl", "stop", self.ctid]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def restart(self) -> subprocess.CompletedProcess:
        """Restarts the container using 'vzctl restart'

        Returns:
            subprocess.CompletedProcess: result of the 'vzctl restart' command
        """
        command = ["vzctl", "restart", self.ctid]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def mount(self) -> subprocess.CompletedProcess:
        """Mounts the container's filesystem using the 'vzctl mount' command

        Returns:
            subprocess.CompletedProcess: result of the 'vzctl mount' command
        """
        command = ["vzctl", "mount", self.ctid]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def umount(self) -> subprocess.CompletedProcess:
        """Unmounts the container's filesystem using the 'vzctl umount' command

        Returns:
            subprocess.CompletedProcess: result of the 'vzctl umount' command
        """
        command = ["vzctl", "umount", self.ctid]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def destroy(self) -> subprocess.CompletedProcess:
        """Destroys the container using the 'vzctl destroy' command

        Returns:
            subprocess.CompletedProcess: result of the 'vzctl destroy' command
        """
        command = ["vzctl", "destroy", self.ctid]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def register(self, dst: str) -> subprocess.CompletedProcess:
        """Registers the container at the specified destination using the
        'vzctl register' command

        Args:
            dst (str): The destination path where the container is to
                be registered
        Returns:
            subprocess.CompletedProcess: result of the 'vzctl register' command
        """
        command = ["vzctl", "register", dst, self.ctid]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def unregister(self) -> subprocess.CompletedProcess:
        """Unregisters the container using the 'vzctl unregister' command

        Returns:
            subprocess.CompletedProcess: result of the 'vzctl unregister'
                command
        """
        command = ["vzctl", "unregister", self.ctid]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def clone(self, name: str) -> subprocess.CompletedProcess:
        """Clones the container with the specified name using the 'prlctl clone'

        Args:
            name (str): The name for the new cloned container
        Returns:
            subprocess.CompletedProcess: result of the 'prlctl clone' command
        """
        command = ["prlctl", "clone", self.ctid, "--name", str(name)]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def exec2(self, cmd: str) -> subprocess.CompletedProcess:
        """Executes the specified command within the container using the
        'vzctl exec2' command

        Args:
            cmd (str): The command to execute within the container. If args for
                the command come from user input, you may want to use shlex.join
                or shlex.quote on them
        Returns:
            subprocess.CompletedProcess: result of the 'vzctl exec2' command
        """

        command = ["vzctl", "exec2", self.ctid, *shlex.split(cmd)]
        return subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

    def set(self, **kwargs: str) -> subprocess.CompletedProcess:
        """Sets various configuration options for the container

        This method constructs a command to modify the container's configuration
        using the `vzctl set` command. It accepts keyword arguments where each
        key-value pair represents an option and its corresponding value to be
        set. The method appends "--save" to the command to save the changes
        permanently. After running the command, the method reinitializes the
        object to reflect the updated configuration.

        Args:
            **kwargs: Arbitrary keyword arguments representing configuration
                options and values.

        Returns:
            subprocess.CompletedProcess: The result of the `subprocess.run`
                call, containing information about the execution of the command
        """
        arg_list = []

        for key, value in kwargs.items():
            arg_list.extend([f"--{key}", f"{value}"])
        arg_list.append("--save")

        command = ["vzctl", "set", self.ctid, *arg_list]
        result = subprocess.run(
            command, capture_output=True, encoding='utf-8', check=False
        )

        # re-initialize attributes
        self.__init__(self.ctid)  # pylint:disable=unnecessary-dunder-call

        return result

    def fix_ctid(self, start: bool = True) -> None:
        """A tool for offline fixing CTID fields of VZ7 containers. Re-registers
        the container while offline.

        This method stops the container, unregisters it, renames the container
        directory from the current CTID to the container name, updates the
        instance attributes accordingly, and re-registers the container with
        the new details. Optionally, it can restart the container if the `start`
        parameter is set to True.

        Args:
            start (bool): If True, the container will be restarted after the
                CTID fix is applied. Defaults to True

        Returns:
            None
        """
        self.stop()
        self.unregister()

        src = Path(f"/vz/private/{self.ctid}")
        dst = Path(f"/vz/private/{self.name}")
        src.rename(dst)

        self.private = str(dst)
        self.root = f"/vz/root/{self.name}"
        self.ctid = self.veid = self.name

        self.register(self.private)

        if start:
            self.start()


class ListCmd(enum.Enum):
    """vz list base commands"""

    VZLIST = ["/usr/sbin/vzlist", "-H"]
    PRLCTL = ["/usr/bin/prlctl", "list", "-H"]


class VZError(Exception):
    """Raised for errors with VZ and OpenVZ"""


def is_vz() -> bool:
    """Checks if host is a Virtuozzo node"""
    return bool("Virtuozzo" in distro.name())


def is_vz7() -> bool:
    """Check if host is a Virtuozzo 7 node"""
    return bool(is_vz() and distro.major_version() == "7")


def is_vps() -> bool:
    """Check if host is a Virtuozzo container"""
    try:
        with open("/proc/vz/veinfo", encoding="ascii") as handle:
            ve_data = handle.read().strip()
    except OSError:
        return False  # if veinfo doesn't exist this can't be a vps
    if ve_data.count("\n") != 0:
        return False
    try:
        veid = int(
            ve_data.split()[0]
        )  # if veinfo contains >1 line, this is a CL or VZ node
    except ValueError:
        return True  # veinfo contains a UUID
    return veid != 0


class VeInfoDict(TypedDict):
    """Values used in the return type of rads.vz.veinfo(). procs is the number
    of processes in the container; ips is a list of ip addresses"""

    procs: int
    ips: list[str]


def veinfo() -> dict[str, VeInfoDict]:
    """Read running containers from /proc/vz/veinfo. VMs are not listed.

    Returns:
        dict[str, VeInfoDict]: container IDs mapped to dicts. Each dict contains
            "procs" (the number of processes in the container) and "ips"
            (all ip addresses assigned to the container)
    """
    ret = {}
    # VZ Docs say the format of each line is
    # CT_ID reserved number_of_processes IP_address [IP_address ...]
    # where reserved isn't used and will always be 0
    with open("/proc/vz/veinfo", encoding="ascii") as handle:
        for line in handle:
            ctid, _, procs, *ips = line.split()
            if ctid == '0':
                continue
            ret[ctid] = {'procs': int(procs), 'ips': ips}
    return ret


def _exec(cmd: Iterable) -> subprocess.CompletedProcess[str]:
    """For executing prlctl or vzlist"""
    try:
        ret = subprocess.run(
            cmd, capture_output=True, encoding="utf-8", check=False
        )
    except FileNotFoundError as exc:
        raise VZError(exc) from exc
    if ret.returncode:  # nonzero
        raise VZError(f"Error running {cmd!r}. stderr={ret.stderr!r}")
    return ret


def is_ct_running(ctid: str | int) -> bool:
    """Checks if a container is running

    Args:
        ctid: container ID to check
    Returns:
        True if the container is running on this node, False if it
        isn't or if some other error occurs
    """
    try:
        ret = subprocess.run(
            ["/usr/bin/prlctl", "list", "-H", "-o", "status", str(ctid)],
            stdout=subprocess.PIPE,
            stderr=subprocess.DEVNULL,
            encoding="utf-8",
            check=True,
        )
    except FileNotFoundError:
        pass  # retry with vzlist
    except subprocess.CalledProcessError:
        return False
    else:
        return ret.stdout.split()[0] == "running"
    try:
        ret = _exec(["/usr/sbin/vzlist", "-H", "-o", "status", str(ctid)])
    except VZError:  # CTID probably doesn't exist
        return False
    return ret.stdout.split()[0] == "running"


def uuid2ctid(uuid: str) -> str:
    """get the legacy CTID of a container

    Args:
        uuid: VZ UUID to find the legacy CTID for
    Raises:
        VZError: if the prlctl command fails
    """
    ret = _exec(["/usr/bin/prlctl", "list", "-H", "-o", "name", uuid])
    return ret.stdout.split()[0]


def ctid2uuid(ctid: int | str) -> str:
    """Obtain the UUID of a container from its legacy CTID

    Warning:
        This does not work on VZ4
    Args:
        ctid: Legacy CTID to get the UUID for
    Raises:
        VZError: if the prlctl command fails
    """
    ret = _exec(["/usr/bin/prlctl", "list", "-H", "-o", "uuid", str(ctid)])
    return ret.stdout.split()[0].strip(r"{}")


def get_envid(ctid: int | str) -> str:
    """Obtain the EnvID of a container

    Note:
        This determines what the subdirectory of /vz/root and /vz/private will
        be. This also has to run on VZ4 which lacks the envid field or prlctl,
        so we just return the CTID
    Args:
        ctid: legacy CTID to find the envid for
    Raises:
        VZError: if the prlctl command fails or
            /etc/virtuozzo-release is missing
    """
    try:
        with open("/etc/virtuozzo-release", encoding="utf-8") as handle:
            if "Virtuozzo release 4" in handle.read():
                return str(ctid)
    except FileNotFoundError as exc:
        raise VZError(exc) from exc
    ret = _exec(["/usr/bin/prlctl", "list", "-H", "-o", "envid", str(ctid)])
    return ret.stdout.split()[0]


def _list_cmd(
    opts: list, args: list | None, list_cmd: ListCmd | None
) -> tuple[ListCmd, list[str]]:
    """Deterines the cmd to run based on VZ version for get_cts()

    Args:
        opts: items to send into ``-o/--output``
        args: optional params to send such as ``--all``
        list_cmd (ListCmd): set this to ListCmd.VZLIST to switch to that command
    """
    if list_cmd == ListCmd.VZLIST:
        conv_opts = {x: ("veid" if x == "envid" else x) for x in opts}
    else:
        # prctl refers to 'ctid' as 'name'
        conv_opts = {x: ("name" if x == "ctid" else x) for x in opts}
    cmd = list_cmd.value.copy()
    if args is not None:
        cmd.extend(args)
    # forces opts's vals to be in the same order as args
    cmd_opts = ",".join([conv_opts[x] for x in opts])
    cmd.extend(["-o", cmd_opts])
    return list_cmd, cmd


def _read_row(
    list_cmd: ListCmd, cmd: list[str], row: list[str], opts: list[str]
) -> dict[str, str]:
    # if number of rows matches requested options, return normally
    if len(row) == len(opts):
        return {x: row[i] for i, x in enumerate(opts)}
    # handle an edge case: prlctl can print missing ostemplates as '' while
    # vzlist prints it as '-', making the prlctl one harder to parse
    if (
        list_cmd == ListCmd.PRLCTL
        and len(row) == len(opts) - 1
        and "ostemplate" in opts
    ):
        opts = opts.copy()
        opts.remove("ostemplate")
        ret = {x: row[i] for i, x in enumerate(opts)}
        ret["ostemplate"] = "-"
        return ret
    raise VZError(
        f"{shlex.join(cmd)} expected {len(opts)} columns,"
        f" but got {len(row)}: {row}"
    )


def get_cts(
    opts: list | None = None,
    args: list | None = None,
    list_cmd: ListCmd = ListCmd.PRLCTL,
) -> list[dict[str, str]]:
    """Returns containers according to platform as a list of dicts

    Args:
        opts: items to send into -o/--output (will default to ['ctid'] if None)
        args: optional params to send such as --all
        list_cmd (ListCmd): set this to ListCmd.VZLIST force using that command.
            Otherwise, we use prlctl.
    Raises:
        VZError: if the prlctl or vzlist command fails
    """
    if not opts:
        opts = ["ctid"]
    ret = []
    if not args and list_cmd == ListCmd.VZLIST and opts == ['ctid']:
        # if requesting just running ctids with vzlist, we can get that faster
        # from /proc/vz/veinfo
        for ctid in veinfo():
            ret.append({'ctid': ctid})
        return ret
    # process each line as a dict where keys are the arg and vals are the result
    list_cmd, cmd = _list_cmd(opts, args, list_cmd)
    for row in _exec(cmd).stdout.splitlines():
        row = row.strip()
        if not row:
            continue  # blank line
        ret.append(_read_row(list_cmd, cmd, row.split(), opts))
    return ret

@StableExploit - 2025