|
Server : Apache System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64 User : lonias5 ( 3576) PHP Version : 7.3.33 Disable Function : NONE Directory : /usr/lib/imh-whmapi/venv/lib/python3.13/site-packages/rads/ |
Upload File : |
"""Functions for fetching basic info on user accounts"""
from pathlib import Path
import pwd
import grp
import re
import os
import tarfile
from typing import Literal, overload
import yaml
from ._yaml import DumbYamlLoader
from . import SYS_USERS, STAFF_GROUPS, OUR_RESELLERS
class CpuserError(Exception):
"""Raised when there's something wrong collecting cPanel user info"""
__module__ = 'rads'
def get_login() -> str:
"""Obtain which user ran this script
Returns:
username
"""
try:
blame = os.getlogin()
except OSError:
blame = pwd.getpwuid(os.geteuid()).pw_name
return blame
get_login.__module__ = 'rads'
def is_cwpuser(user: str) -> bool:
"""Checks if the user is a valid CWP user.
Args:
user (str): CWP username to check
Returns:
bool: whether or not the CWP user exists and is valid
"""
try:
homedir = pwd.getpwnam(user).pw_dir
except KeyError:
return False
return all(
(
os.path.isdir(homedir),
os.path.isfile(os.path.join(
'/usr/local/cwpsrv/conf.d/users/', f'{user}.conf'
))
)
)
is_cwpuser.__module__ = 'rads'
def is_cpuser(user: str) -> bool:
"""Checks if a user is a valid cPanel user.
Warning:
This only checks if the user exists and will also be true for restricted
cPanel users. Use ``cpuser_safe`` instead if you need to check for those
Args:
user: cPanel username to check
Returns:
Whether the cPanel user exists
"""
try:
homedir = pwd.getpwnam(user).pw_dir
except KeyError:
return False
return all(
(
os.path.isdir(homedir),
os.path.isfile(os.path.join('/var/cpanel/users', user)),
os.path.isdir(os.path.join('/var/cpanel/userdata', user)),
)
)
is_cpuser.__module__ = 'rads'
@overload
def all_cpusers(owners: Literal[False] = False) -> list[str]: ...
@overload
def all_cpusers(owners: Literal[True] = True) -> dict[str, str]: ...
def all_cpusers(owners: bool = False) -> dict[str, str] | list[str]:
"""Returns cPanel users from /etc/trueuserowners
Args:
owners: whether to return users as a dict with owners as the values
Raises:
CpuserError: if /etc/trueuserowners is invalid
Returns:
either a list of all users, or a dict of users (keys) to owners (vals)
"""
with open('/etc/trueuserowners', encoding='utf-8') as userowners:
userdict = yaml.load(userowners, DumbYamlLoader)
if not isinstance(userdict, dict):
raise CpuserError('/etc/trueuserowners is invalid')
if owners:
return userdict
return list(userdict.keys())
all_cpusers.__module__ = 'rads'
def main_cpusers() -> list:
"""Get a all non-child, non-system, "main" cPanel users
Raises:
CpuserError: if /etc/trueuserowners is invalid"""
return [
user
for user, owner in all_cpusers(owners=True).items()
if owner in OUR_RESELLERS or owner == user
]
main_cpusers.__module__ = 'rads'
def get_owner(user: str) -> str:
"""Get a user's owner (even if the account has reseller ownership of itself)
Warning:
the owner may be root, which is not a cPanel user
Hint:
If looking this up for multiple users,
use ``get_cpusers(owners=True)`` instead
Args:
user: cPanel username to find the owner for
Raises:
CpuserError: if /etc/trueuserowners is invalid or the
requested user is not defined in there
"""
try:
return all_cpusers(owners=True)[user]
except KeyError as exc:
raise CpuserError(f'{user} is not in /etc/trueuserowners') from exc
get_owner.__module__ = 'rads'
def is_child(user: str) -> bool:
"""Check if a cPanel user is not self-owned and not owned by a system user
Args:
user: cPanel username to check
Raises:
CpuserError: if /etc/trueuserowners is invalid or the
requested user is not defined in there
"""
owner = get_owner(user)
return owner not in OUR_RESELLERS and owner != user
is_child.__module__ = 'rads'
def get_children(owner: str) -> list[str]:
"""Get a list of child accounts for a reseller
Args:
owner: cPanel username to lookup
Returns:
all child accounts of a reseller, excluding itself
Raises:
CpuserError: if /etc/trueuserowners is invalid
"""
return [
usr
for usr, own in all_cpusers(owners=True).items()
if own == owner and usr != own
]
get_children.__module__ = 'rads'
def cpuser_safe(user: str) -> bool:
"""Checks whether the user is safe for support to operate on
- The user exists and is a valid cPanel user
- The user is not a reserved account
- The user is not in a staff group
Args:
user: cPanel or CWP username to check
"""
# SYS_USERS includes SECURE_USER
if user in SYS_USERS or user in OUR_RESELLERS:
return False
if not is_cpuser(user) and not is_cwpuser(user):
return False
for group in [x.gr_name for x in grp.getgrall() if user in x.gr_mem]:
if group in STAFF_GROUPS:
return False
return True
cpuser_safe.__module__ = 'rads'
def cpuser_suspended(user: str) -> bool:
"""Check if a user is currently suspended
Warning:
This does not check for pending suspensions
Args:
user: cPanel username to check
"""
return os.path.exists(os.path.join('/var/cpanel/suspended', user))
cpuser_suspended.__module__ = 'rads'
def get_homedir(user: str):
"""Get home directory path for a cPanel user
Args:
user: cPanel username to check
Raises:
CpuserError: if the user does not exist or the home directory path found
does not match the expected pattern
"""
try:
homedir = pwd.getpwnam(user).pw_dir
except KeyError as exc:
raise CpuserError(f'{user}: no such user') from exc
if re.match(r'/home[0-9]*/\w+', homedir) is None:
# Even though we fetched the homedir successfully from /etc/passwd,
# treat this as an error due to unexpected output. If the result was
# '/' for example, some calling programs might misbehave or even
# rm -rf / depending on what it's being used for
raise CpuserError(f'{user!r} does not match expected pattern')
return homedir
get_homedir.__module__ = 'rads'
def get_primary_domain(user: str) -> str:
"""Get primary domain from cpanel userdata
Args:
user: cPanel username to check
Raises:
CpuserError: if cpanel userdata cannot be read or main_domain is missing
"""
userdata_path = os.path.join('/var/cpanel/userdata', user, 'main')
try:
with open(userdata_path, encoding='utf-8') as userdata_filehandle:
return yaml.safe_load(userdata_filehandle)['main_domain']
except (yaml.YAMLError, KeyError, OSError) as exc:
raise CpuserError(exc) from exc
get_primary_domain.__module__ = 'rads'
def whoowns(domain: str) -> str:
"""
Get the cPanel username that owns a domain
Args:
domain: Domain name to look up
Returns:
The name of a cPanel user that owns the domain name, or None on failure
"""
try:
with open('/etc/userdomains', encoding='utf-8') as file:
match = next(x for x in file if x.startswith(f'{domain}: '))
return match.rstrip().split(': ')[1]
except (OSError, FileNotFoundError, StopIteration):
return None
whoowns.__module__ = 'rads'
def get_plan(user: str) -> str | None:
"""
Retrieves the hosting plan name for a given cPanel user.
This function reads the user's configuration file from /var/cpanel/users
and extracts the value assigned to the PLAN variable, if present.
Parameters:
user (str): The cPanel username.
Returns:
Optional[str]: The plan name if found, otherwise None.
"""
path = Path(f"/var/cpanel/users/{user}")
if not path.exists():
return None
for line in path.read_text(encoding="utf-8").splitlines():
if line.startswith("PLAN="):
return line.split("=", 1)[1].strip()
return None
get_plan.__module__ = 'rads'
class UserData:
"""Object representing the data parsed from userdata
Args:
user: cPanel username to read cPanel userdata for. Required if pkgacct
is not set.
data_dir: override this to read /var/cpanel/userdata from some other
directory, such as from a restored backup. Ignored if pkgacct is set
all_subs: if True, list all subdomains, even those which have were
created so an addon domain can be parked on them
pkgacct: Don't set this manually. See UserData.from_pkgacct instead.
tar: Don't set this manually. See UserData.from_pkgacct instead.
Raises:
CpuserError: if cPanel userdata is invalid
Attributes:
user (str): username
primary (UserDomain): UserDomain object for the main domain
addons (list): UserDomain objects for addon domains
parked (list): UserDomain objects for parked domains
subs (list): UserDomain objects for subdomains
Hint:
Use vars() to view this ``UserData`` object as a dict
"""
user: str
primary: 'UserDomain'
addons: list['UserDomain']
parked: list['UserDomain']
subs: list['UserDomain']
__module__ = 'rads'
def __init__(
self,
user: str | None = None,
data_dir: str = '/var/cpanel/userdata',
all_subs: bool = False,
pkgacct: str | None = None,
tar: tarfile.TarFile | None = None,
):
"""Initializes a UserData object given a cPanel username"""
self.pkgacct = pkgacct
if user is None and pkgacct is None:
raise TypeError("either user or pkgacct must be set")
if user is not None and pkgacct is not None:
raise TypeError("user cannot be set if pkgacct is set")
if pkgacct is not None and tar is None:
raise TypeError(
"tar must be set if pkgacct is set; "
"use the UserData.from_pkgacct alias instead"
)
if pkgacct:
filename = Path(pkgacct).name
file_re = re.compile(r'(?:cpmove|pkgacct)-(.*).tar.gz$')
if match := file_re.match(filename):
self.user = match.group(1)
else:
raise CpuserError(
f"{filename} does not follow the expected cpmove/pkgacct "
"filename pattern."
)
else:
self.user = user
main_data = self._read_userdata(
user=self.user,
data_dir=data_dir,
pkgacct=pkgacct,
domfile='main',
required={
'main_domain': str,
'addon_domains': dict,
'parked_domains': list,
'sub_domains': list,
},
tar=tar,
)
dom_data = self._read_userdata(
user=self.user,
domfile=main_data['main_domain'],
required={'documentroot': str},
data_dir=data_dir,
pkgacct=pkgacct,
tar=tar,
)
# populate primary domain
self.primary = UserDomain(
domain=main_data['main_domain'],
has_ssl=dom_data['has_ssl'],
docroot=dom_data['documentroot'],
)
# populate addon domains
self.addons = []
addon_subs = set()
for addon, addon_file in main_data['addon_domains'].items():
addon_subs.add(addon_file)
addon_data = self._read_userdata(
user=self.user,
domfile=addon_file,
required={'documentroot': str},
data_dir=data_dir,
pkgacct=pkgacct,
tar=tar,
)
self.addons.append(
UserDomain(
subdom=addon_file,
domain=addon,
has_ssl=addon_data['has_ssl'],
docroot=addon_data['documentroot'],
)
)
# populate parked domains
self.parked = []
for parked in main_data['parked_domains']:
self.parked.append(
UserDomain(
domain=parked, has_ssl=False, docroot=self.primary.docroot
)
)
# populate subdomains
self.subs = []
for sub in main_data['sub_domains']:
if all_subs or sub not in addon_subs:
sub_data = self._read_userdata(
user=self.user,
domfile=sub,
required={'documentroot': str},
data_dir=data_dir,
pkgacct=pkgacct,
tar=tar,
)
self.subs.append(
UserDomain(
domain=sub,
has_ssl=sub_data['has_ssl'],
docroot=sub_data['documentroot'],
)
)
@staticmethod
def from_pkgacct(path: str) -> 'UserData':
"""Alternate constructor to read userdata from a pkgacct/cpmove file"""
try:
with tarfile.open(path, 'r:gz') as tar:
return UserData(pkgacct=path, tar=tar)
except FileNotFoundError as exc:
raise CpuserError(exc) from exc
@classmethod
def _read_userdata(
cls,
user: str,
data_dir: str,
pkgacct: dict | None,
domfile: str,
required: dict,
tar: tarfile.TarFile | None,
):
if pkgacct:
return cls._read_from_pkgacct(pkgacct, domfile, required, tar)
return cls._read_userdata_file(user, domfile, required, data_dir)
@staticmethod
def _tar_extract(tar: tarfile.TarFile, path: str):
# docs say non-file members return None and missing files raise KeyError
# This makes it return None in both error cases
try:
return tar.extractfile(path)
except KeyError:
return None
@classmethod
def _read_from_pkgacct(
cls, tar_path: str, domfile: str, required: dict, tar: tarfile.TarFile
) -> dict:
prefix = Path(tar_path).name[:-7]
path = f"{prefix}/userdata/{domfile}"
contents = cls._tar_extract(tar, path).read()
has_ssl = cls._tar_extract(tar, f"{path}_SSL") is not None
if not contents:
raise CpuserError(
f"{path} was not a file in the contents of {tar_path}"
)
try:
data = yaml.load(str(contents, 'utf-8'), Loader=yaml.SafeLoader)
if not isinstance(data, dict):
raise ValueError
except ValueError as exc:
raise CpuserError(
f'{path} inside {tar_path} could not be parsed'
) from exc
for key, req_type in required.items():
if key not in data:
raise CpuserError(f'{path} is missing {key!r}')
if not isinstance(data[key], req_type):
raise CpuserError(f'{path} contains invalid data for {key!r}')
data['has_ssl'] = has_ssl
return data
def __repr__(self):
if self.pkgacct:
return f'UserData(pkgacct={self.pkgacct!r})'
return f'UserData({self.user!r})'
@property
def __dict__(self):
return {
'user': self.user,
'primary': vars(self.primary),
'addons': [vars(x) for x in self.addons],
'parked': [vars(x) for x in self.parked],
'subs': [vars(x) for x in self.subs],
}
@property
def all_roots(self) -> list[str]:
"""All site document roots (list)"""
all_dirs = {self.primary.docroot}
all_dirs.update([x.docroot for x in self.subs])
all_dirs.update([x.docroot for x in self.addons])
return list(all_dirs)
@property
def merged_roots(self) -> list[str]:
"""Merged, top-level document roots for a user (list)"""
merged = []
for test_path in sorted(self.all_roots):
head, tail = os.path.split(test_path)
while head and tail:
if head in merged:
break
head, tail = os.path.split(head)
else:
if test_path not in merged:
merged.append(test_path)
return merged
@staticmethod
def _read_userdata_file(
user: str, domfile: str, required: dict, data_dir: str
) -> dict:
"""Internal helper function for UserData to strictly parse YAML files"""
path = os.path.join(data_dir, user, domfile)
try:
with open(path, encoding='utf-8') as handle:
data = yaml.load(handle, Loader=yaml.SafeLoader)
if not isinstance(data, dict):
raise ValueError
except OSError as exc:
raise CpuserError(f'{path} could not be opened') from exc
except ValueError as exc:
raise CpuserError(f'{path} could not be parsed') from exc
for key, req_type in required.items():
if key not in data:
raise CpuserError(f'{path} is missing {key!r}')
if not isinstance(data[key], req_type):
raise CpuserError(f'{path} contains invalid data for {key!r}')
data['has_ssl'] = os.path.isfile(f'{path}_SSL')
return data
class UserDomain:
"""Object representing a cPanel domain in ``rads.UserData()``
Attributes:
domain (str): domain name
has_ssl (bool): True/False if the domain has ssl
docroot (str): document root on the disk
subdom (str|None): if this is an addon domain, this is the subdomain
it's parked on which is also its config's filename
Hint:
vars() can be run on this object to convert it into a dict
"""
__module__ = 'rads'
def __init__(
self,
domain: str,
has_ssl: bool,
docroot: str,
subdom: str | None = None,
):
self.domain = domain
self.has_ssl = has_ssl
self.docroot = docroot
self.subdom = subdom
def __repr__(self):
if self.subdom:
return (
f"UserDomain(domain={self.domain!r}, has_ssl={self.has_ssl!r}, "
f"docroot={self.docroot!r}, subdom={self.subdom!r})"
)
return (
f"UserDomain(domain={self.domain!r}, has_ssl={self.has_ssl!r}, "
f"docroot={self.docroot!r})"
)
@property
def __dict__(self):
myvars = {}
for attr in ('domain', 'has_ssl', 'docroot'):
myvars[attr] = getattr(self, attr)
if self.subdom is not None:
myvars['subdom'] = self.subdom
return myvars