Prv8 Shell
Server : Apache
System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64
User : lonias5 ( 3576)
PHP Version : 7.3.33
Disable Function : NONE
Directory :  /usr/lib/imh-whmapi/venv/lib/python3.13/site-packages/rads/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //usr/lib/imh-whmapi/venv/lib/python3.13/site-packages/rads/_users.py
"""Functions for fetching basic info on user accounts"""

from pathlib import Path
import pwd
import grp
import re
import os
import tarfile
from typing import Literal, overload
import yaml
from ._yaml import DumbYamlLoader
from . import SYS_USERS, STAFF_GROUPS, OUR_RESELLERS


class CpuserError(Exception):
    """Raised when there's something wrong collecting cPanel user info"""

    __module__ = 'rads'


def get_login() -> str:
    """Obtain which user ran this script

    Returns:
        username
    """
    try:
        blame = os.getlogin()
    except OSError:
        blame = pwd.getpwuid(os.geteuid()).pw_name
    return blame


get_login.__module__ = 'rads'

def is_cwpuser(user: str) -> bool:
    """Checks if the user is a valid CWP user.

    Args:
        user (str): CWP username to check

    Returns:
        bool: whether or not the CWP user exists and is valid
    """
    try:
        homedir = pwd.getpwnam(user).pw_dir
    except KeyError:
        return False

    return all(
        (
            os.path.isdir(homedir),
            os.path.isfile(os.path.join(
                '/usr/local/cwpsrv/conf.d/users/', f'{user}.conf'
            ))
        )
    )


is_cwpuser.__module__ = 'rads'


def is_cpuser(user: str) -> bool:
    """Checks if a user is a valid cPanel user.

    Warning:
        This only checks if the user exists and will also be true for restricted
        cPanel users. Use ``cpuser_safe`` instead if you need to check for those
    Args:
        user: cPanel username to check
    Returns:
        Whether the cPanel user exists
    """
    try:
        homedir = pwd.getpwnam(user).pw_dir
    except KeyError:
        return False
    return all(
        (
            os.path.isdir(homedir),
            os.path.isfile(os.path.join('/var/cpanel/users', user)),
            os.path.isdir(os.path.join('/var/cpanel/userdata', user)),
        )
    )


is_cpuser.__module__ = 'rads'


@overload
def all_cpusers(owners: Literal[False] = False) -> list[str]: ...


@overload
def all_cpusers(owners: Literal[True] = True) -> dict[str, str]: ...


def all_cpusers(owners: bool = False) -> dict[str, str] | list[str]:
    """Returns cPanel users from /etc/trueuserowners

    Args:
        owners: whether to return users as a dict with owners as the values
    Raises:
        CpuserError: if /etc/trueuserowners is invalid
    Returns:
        either a list of all users, or a dict of users (keys) to owners (vals)
    """
    with open('/etc/trueuserowners', encoding='utf-8') as userowners:
        userdict = yaml.load(userowners, DumbYamlLoader)
    if not isinstance(userdict, dict):
        raise CpuserError('/etc/trueuserowners is invalid')
    if owners:
        return userdict
    return list(userdict.keys())


all_cpusers.__module__ = 'rads'


def main_cpusers() -> list:
    """Get a all non-child, non-system, "main" cPanel users

    Raises:
        CpuserError: if /etc/trueuserowners is invalid"""
    return [
        user
        for user, owner in all_cpusers(owners=True).items()
        if owner in OUR_RESELLERS or owner == user
    ]


main_cpusers.__module__ = 'rads'


def get_owner(user: str) -> str:
    """Get a user's owner (even if the account has reseller ownership of itself)

    Warning:
        the owner may be root, which is not a cPanel user
    Hint:
        If looking this up for multiple users,
        use ``get_cpusers(owners=True)`` instead
    Args:
        user: cPanel username to find the owner for
    Raises:
        CpuserError: if /etc/trueuserowners is invalid or the
            requested user is not defined in there
    """
    try:
        return all_cpusers(owners=True)[user]
    except KeyError as exc:
        raise CpuserError(f'{user} is not in /etc/trueuserowners') from exc


get_owner.__module__ = 'rads'


def is_child(user: str) -> bool:
    """Check if a cPanel user is not self-owned and not owned by a system user

    Args:
        user: cPanel username to check
    Raises:
        CpuserError: if /etc/trueuserowners is invalid or the
            requested user is not defined in there
    """
    owner = get_owner(user)
    return owner not in OUR_RESELLERS and owner != user


is_child.__module__ = 'rads'


def get_children(owner: str) -> list[str]:
    """Get a list of child accounts for a reseller

    Args:
        owner: cPanel username to lookup
    Returns:
        all child accounts of a reseller, excluding itself
    Raises:
        CpuserError: if /etc/trueuserowners is invalid
    """
    return [
        usr
        for usr, own in all_cpusers(owners=True).items()
        if own == owner and usr != own
    ]


get_children.__module__ = 'rads'


def cpuser_safe(user: str) -> bool:
    """Checks whether the user is safe for support to operate on

    - The user exists and is a valid cPanel user
    - The user is not a reserved account
    - The user is not in a staff group

    Args:
        user: cPanel or CWP username to check
    """
    # SYS_USERS includes SECURE_USER
    if user in SYS_USERS or user in OUR_RESELLERS:
        return False
    if not is_cpuser(user) and not is_cwpuser(user):
        return False

    for group in [x.gr_name for x in grp.getgrall() if user in x.gr_mem]:
        if group in STAFF_GROUPS:
            return False
    return True


cpuser_safe.__module__ = 'rads'


def cpuser_suspended(user: str) -> bool:
    """Check if a user is currently suspended

    Warning:
        This does not check for pending suspensions
    Args:
        user: cPanel username to check
    """
    return os.path.exists(os.path.join('/var/cpanel/suspended', user))


cpuser_suspended.__module__ = 'rads'


def get_homedir(user: str):
    """Get home directory path for a cPanel user

    Args:
        user: cPanel username to check
    Raises:
        CpuserError: if the user does not exist or the home directory path found
            does not match the expected pattern
    """
    try:
        homedir = pwd.getpwnam(user).pw_dir
    except KeyError as exc:
        raise CpuserError(f'{user}: no such user') from exc
    if re.match(r'/home[0-9]*/\w+', homedir) is None:
        # Even though we fetched the homedir successfully from /etc/passwd,
        # treat this as an error due to unexpected output. If the result was
        # '/' for example, some calling programs might misbehave or even
        # rm -rf / depending on what it's being used for
        raise CpuserError(f'{user!r} does not match expected pattern')
    return homedir


get_homedir.__module__ = 'rads'


def get_primary_domain(user: str) -> str:
    """Get primary domain from cpanel userdata

    Args:
        user: cPanel username to check
    Raises:
        CpuserError: if cpanel userdata cannot be read or main_domain is missing
    """
    userdata_path = os.path.join('/var/cpanel/userdata', user, 'main')
    try:
        with open(userdata_path, encoding='utf-8') as userdata_filehandle:
            return yaml.safe_load(userdata_filehandle)['main_domain']
    except (yaml.YAMLError, KeyError, OSError) as exc:
        raise CpuserError(exc) from exc


get_primary_domain.__module__ = 'rads'


def whoowns(domain: str) -> str:
    """
    Get the cPanel username that owns a domain

    Args:
        domain: Domain name to look up
    Returns:
        The name of a cPanel user that owns the domain name, or None on failure
    """
    try:
        with open('/etc/userdomains', encoding='utf-8') as file:
            match = next(x for x in file if x.startswith(f'{domain}: '))
            return match.rstrip().split(': ')[1]
    except (OSError, FileNotFoundError, StopIteration):
        return None


whoowns.__module__ = 'rads'


def get_plan(user: str) -> str | None:
    """
    Retrieves the hosting plan name for a given cPanel user.

    This function reads the user's configuration file from /var/cpanel/users
    and extracts the value assigned to the PLAN variable, if present.

    Parameters:
        user (str): The cPanel username.

    Returns:
        Optional[str]: The plan name if found, otherwise None.
    """
    path = Path(f"/var/cpanel/users/{user}")
    if not path.exists():
        return None

    for line in path.read_text(encoding="utf-8").splitlines():
        if line.startswith("PLAN="):
            return line.split("=", 1)[1].strip()
    return None


get_plan.__module__ = 'rads'


class UserData:
    """Object representing the data parsed from userdata

    Args:
        user: cPanel username to read cPanel userdata for. Required if pkgacct
            is not set.
        data_dir: override this to read /var/cpanel/userdata from some other
            directory, such as from a restored backup. Ignored if pkgacct is set
        all_subs: if True, list all subdomains, even those which have were
            created so an addon domain can be parked on them
        pkgacct: Don't set this manually. See UserData.from_pkgacct instead.
        tar: Don't set this manually. See UserData.from_pkgacct instead.
    Raises:
        CpuserError: if cPanel userdata is invalid
    Attributes:
        user (str): username
        primary (UserDomain): UserDomain object for the main domain
        addons (list): UserDomain objects for addon domains
        parked (list): UserDomain objects for parked domains
        subs (list): UserDomain objects for subdomains
    Hint:
        Use vars() to view this ``UserData`` object as a dict
    """

    user: str
    primary: 'UserDomain'
    addons: list['UserDomain']
    parked: list['UserDomain']
    subs: list['UserDomain']

    __module__ = 'rads'

    def __init__(
        self,
        user: str | None = None,
        data_dir: str = '/var/cpanel/userdata',
        all_subs: bool = False,
        pkgacct: str | None = None,
        tar: tarfile.TarFile | None = None,
    ):
        """Initializes a UserData object given a cPanel username"""
        self.pkgacct = pkgacct
        if user is None and pkgacct is None:
            raise TypeError("either user or pkgacct must be set")
        if user is not None and pkgacct is not None:
            raise TypeError("user cannot be set if pkgacct is set")
        if pkgacct is not None and tar is None:
            raise TypeError(
                "tar must be set if pkgacct is set; "
                "use the UserData.from_pkgacct alias instead"
            )
        if pkgacct:
            filename = Path(pkgacct).name
            file_re = re.compile(r'(?:cpmove|pkgacct)-(.*).tar.gz$')
            if match := file_re.match(filename):
                self.user = match.group(1)
            else:
                raise CpuserError(
                    f"{filename} does not follow the expected cpmove/pkgacct "
                    "filename pattern."
                )
        else:
            self.user = user
        main_data = self._read_userdata(
            user=self.user,
            data_dir=data_dir,
            pkgacct=pkgacct,
            domfile='main',
            required={
                'main_domain': str,
                'addon_domains': dict,
                'parked_domains': list,
                'sub_domains': list,
            },
            tar=tar,
        )
        dom_data = self._read_userdata(
            user=self.user,
            domfile=main_data['main_domain'],
            required={'documentroot': str},
            data_dir=data_dir,
            pkgacct=pkgacct,
            tar=tar,
        )
        # populate primary domain
        self.primary = UserDomain(
            domain=main_data['main_domain'],
            has_ssl=dom_data['has_ssl'],
            docroot=dom_data['documentroot'],
        )
        # populate addon domains
        self.addons = []
        addon_subs = set()
        for addon, addon_file in main_data['addon_domains'].items():
            addon_subs.add(addon_file)
            addon_data = self._read_userdata(
                user=self.user,
                domfile=addon_file,
                required={'documentroot': str},
                data_dir=data_dir,
                pkgacct=pkgacct,
                tar=tar,
            )
            self.addons.append(
                UserDomain(
                    subdom=addon_file,
                    domain=addon,
                    has_ssl=addon_data['has_ssl'],
                    docroot=addon_data['documentroot'],
                )
            )
        # populate parked domains
        self.parked = []
        for parked in main_data['parked_domains']:
            self.parked.append(
                UserDomain(
                    domain=parked, has_ssl=False, docroot=self.primary.docroot
                )
            )
        # populate subdomains
        self.subs = []
        for sub in main_data['sub_domains']:
            if all_subs or sub not in addon_subs:
                sub_data = self._read_userdata(
                    user=self.user,
                    domfile=sub,
                    required={'documentroot': str},
                    data_dir=data_dir,
                    pkgacct=pkgacct,
                    tar=tar,
                )
                self.subs.append(
                    UserDomain(
                        domain=sub,
                        has_ssl=sub_data['has_ssl'],
                        docroot=sub_data['documentroot'],
                    )
                )

    @staticmethod
    def from_pkgacct(path: str) -> 'UserData':
        """Alternate constructor to read userdata from a pkgacct/cpmove file"""
        try:
            with tarfile.open(path, 'r:gz') as tar:
                return UserData(pkgacct=path, tar=tar)
        except FileNotFoundError as exc:
            raise CpuserError(exc) from exc

    @classmethod
    def _read_userdata(
        cls,
        user: str,
        data_dir: str,
        pkgacct: dict | None,
        domfile: str,
        required: dict,
        tar: tarfile.TarFile | None,
    ):
        if pkgacct:
            return cls._read_from_pkgacct(pkgacct, domfile, required, tar)
        return cls._read_userdata_file(user, domfile, required, data_dir)

    @staticmethod
    def _tar_extract(tar: tarfile.TarFile, path: str):
        # docs say non-file members return None and missing files raise KeyError
        # This makes it return None in both error cases
        try:
            return tar.extractfile(path)
        except KeyError:
            return None

    @classmethod
    def _read_from_pkgacct(
        cls, tar_path: str, domfile: str, required: dict, tar: tarfile.TarFile
    ) -> dict:
        prefix = Path(tar_path).name[:-7]
        path = f"{prefix}/userdata/{domfile}"
        contents = cls._tar_extract(tar, path).read()
        has_ssl = cls._tar_extract(tar, f"{path}_SSL") is not None
        if not contents:
            raise CpuserError(
                f"{path} was not a file in the contents of {tar_path}"
            )
        try:
            data = yaml.load(str(contents, 'utf-8'), Loader=yaml.SafeLoader)
            if not isinstance(data, dict):
                raise ValueError
        except ValueError as exc:
            raise CpuserError(
                f'{path} inside {tar_path} could not be parsed'
            ) from exc
        for key, req_type in required.items():
            if key not in data:
                raise CpuserError(f'{path} is missing {key!r}')
            if not isinstance(data[key], req_type):
                raise CpuserError(f'{path} contains invalid data for {key!r}')
        data['has_ssl'] = has_ssl
        return data

    def __repr__(self):
        if self.pkgacct:
            return f'UserData(pkgacct={self.pkgacct!r})'
        return f'UserData({self.user!r})'

    @property
    def __dict__(self):
        return {
            'user': self.user,
            'primary': vars(self.primary),
            'addons': [vars(x) for x in self.addons],
            'parked': [vars(x) for x in self.parked],
            'subs': [vars(x) for x in self.subs],
        }

    @property
    def all_roots(self) -> list[str]:
        """All site document roots (list)"""
        all_dirs = {self.primary.docroot}
        all_dirs.update([x.docroot for x in self.subs])
        all_dirs.update([x.docroot for x in self.addons])
        return list(all_dirs)

    @property
    def merged_roots(self) -> list[str]:
        """Merged, top-level document roots for a user (list)"""
        merged = []
        for test_path in sorted(self.all_roots):
            head, tail = os.path.split(test_path)
            while head and tail:
                if head in merged:
                    break
                head, tail = os.path.split(head)
            else:
                if test_path not in merged:
                    merged.append(test_path)
        return merged

    @staticmethod
    def _read_userdata_file(
        user: str, domfile: str, required: dict, data_dir: str
    ) -> dict:
        """Internal helper function for UserData to strictly parse YAML files"""
        path = os.path.join(data_dir, user, domfile)
        try:
            with open(path, encoding='utf-8') as handle:
                data = yaml.load(handle, Loader=yaml.SafeLoader)
            if not isinstance(data, dict):
                raise ValueError
        except OSError as exc:
            raise CpuserError(f'{path} could not be opened') from exc
        except ValueError as exc:
            raise CpuserError(f'{path} could not be parsed') from exc
        for key, req_type in required.items():
            if key not in data:
                raise CpuserError(f'{path} is missing {key!r}')
            if not isinstance(data[key], req_type):
                raise CpuserError(f'{path} contains invalid data for {key!r}')
        data['has_ssl'] = os.path.isfile(f'{path}_SSL')
        return data


class UserDomain:
    """Object representing a cPanel domain in ``rads.UserData()``

    Attributes:
        domain (str): domain name
        has_ssl (bool): True/False if the domain has ssl
        docroot (str): document root on the disk
        subdom (str|None): if this is an addon domain, this is the subdomain
            it's parked on which is also its config's filename
    Hint:
        vars() can be run on this object to convert it into a dict
    """

    __module__ = 'rads'

    def __init__(
        self,
        domain: str,
        has_ssl: bool,
        docroot: str,
        subdom: str | None = None,
    ):
        self.domain = domain
        self.has_ssl = has_ssl
        self.docroot = docroot
        self.subdom = subdom

    def __repr__(self):
        if self.subdom:
            return (
                f"UserDomain(domain={self.domain!r}, has_ssl={self.has_ssl!r}, "
                f"docroot={self.docroot!r}, subdom={self.subdom!r})"
            )
        return (
            f"UserDomain(domain={self.domain!r}, has_ssl={self.has_ssl!r}, "
            f"docroot={self.docroot!r})"
        )

    @property
    def __dict__(self):
        myvars = {}
        for attr in ('domain', 'has_ssl', 'docroot'):
            myvars[attr] = getattr(self, attr)
        if self.subdom is not None:
            myvars['subdom'] = self.subdom
        return myvars

@StableExploit - 2025