Prv8 Shell
Server : Apache
System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64
User : lonias5 ( 3576)
PHP Version : 7.3.33
Disable Function : NONE
Directory :  /proc/self/root/proc/thread-self/root/opt/sharedrads/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/proc/thread-self/root/opt/sharedrads/reset_email
#!/usr/lib/rads/venv/bin/python3
"""Resets email passwords and sends an email notification"""
from platform import node
from datetime import datetime
import argparse  # for Input selection
import random
import string  # for random
import logging
import sys
import subprocess
from os import urandom
from pathlib import Path
import json
from cpapis import cpapi2
import pp_api
import rads
from rads.color import red, green


run = lambda x: subprocess.check_output(
    x, encoding='UTF-8', errors='replace'
).splitlines()


def domain_owner(domain):
    """Obtain the cPanel use who owns a provided domain"""
    try:
        owner = run(['/scripts/whoowns', domain])
    except Exception:
        error = f"Error running whoowns on {domain}"
        print(red(error))
        sys.exit(1)
    return owner


def gather_email_accounts(user_to_parse, reset_all):
    """Get a list of usernames or an individual cPanel user"""
    email_list = []
    # pull list of email accounts from cPanel
    pops = cpapi2(module='Email::listpopssingle', user=user_to_parse)
    for entry in pops['cpanelresult']['data']:
        email_list.append(entry['login'])
    if reset_all:
        # If the option to reset all emails was selected
        picker_opts = email_list
    else:
        # Reset only the specific accounts selected
        picker_opts = picker_menu(email_list)
    return picker_opts


def picker_menu(email_list):
    """Show a menu allowing the user to select from
    a list of email addresses"""
    opts = []
    print("Select email to reset")
    print("\n".join(opts))
    print("send q to finish")
    while True:
        user_input = input().lower()
        if user_input in email_list and user_input not in opts:
            opts.append(user_input)
        elif user_input == "q":
            break
        split = user_input.split(" ")
        if len(split) > 1:
            for name in split:
                if name in email_list and name not in opts:
                    opts.append(name)
    return opts


def parse_args():
    """Parse commandline arguments with argparse"""
    parser = argparse.ArgumentParser()
    # The Email and User options are mutually-exclusive
    usergroup = parser.add_mutually_exclusive_group()
    usergroup.add_argument(
        '-u', '-user', action='store', dest='User', help='Define your User'
    )
    usergroup.add_argument(
        '-e', '-email', nargs='+', dest='Emails', help='Set email to reset'
    )
    parser.add_argument(
        '-m',
        '-message',
        action='store',
        dest='Message',
        help='Reason why the password was reset',
    )
    parser.add_argument(
        '-a',
        '-all',
        action='store_true',
        dest='UserAll',
        help='Reset all USER\'s email accounts',
    )
    parser.add_argument(
        '-c',
        '-clear',
        action='store_true',
        dest='Clear',
        help='Search exim queue and kill from user',
    )
    results = parser.parse_args()
    # If neither User nor Email selected
    if not results.User and not results.Emails:
        print(red('No user or email defined'))
        sys.exit(1)
    # Confirm User is actual cPanel user
    if results.User:
        if not rads.is_cpuser(results.User):
            print(red('Not a valid user'))
            sys.exit(1)

    if not results.Message:
        sys.exit('Use the -m flag to add a message')
    return results


def prelim_checks(user):
    """
    Function to check for bulk mail exemptions via
    /home/$user/.senderlimit and
    /home/$user/.imh/.exim_bulk
    """
    senderlimit_path = f'/home/{user}/.senderlimit'
    exim_bulk_path = f'/home/{user}/.imh/.exim_bulk'
    consider_flag = True
    print("Checking for bulk mail exemptions...")
    if Path(senderlimit_path).is_file():
        # Grab text from senderlimit file
        with open(senderlimit_path, encoding='ascii') as senderlimit_file:
            senderlimit = senderlimit_file.read().strip()
        # Push warning in case of limit over 250 and prompt
        # whether to continue or not
        if int(senderlimit) > 250:
            consider_flag = False
            answer = rads.prompt_y_n(
                f"Senderlimit for {user} is set to {senderlimit}, "
                "do you wish to proceed with resetting password?"
            )
            if not answer:
                sys.exit(1)

    if Path(exim_bulk_path).is_file():
        # System appears to generate a blank file for
        # /home/$user/.imh/.exim_bulk, we just need to see if it exists
        answer = rads.prompt_y_n(
            "Exim bulk limit file exists, please check PowerPanel for more "
            "information. Do you wish to proceed with resetting password? "
        )
        consider_flag = False
        if not answer:
            sys.exit(1)

    # If neither warning appeared, continue without prompting.
    if consider_flag:
        print(
            "No bulk mail conflicts to consider. ",
            f"Checked {senderlimit_path} and {exim_bulk_path}",
        )


def main():
    """Main program logic"""
    rads.setup_logging(
        path='/var/log/suspension.log', loglevel=logging.INFO, print_out=False
    )
    args = parse_args()

    userdict = {}
    if args.User:
        # Get email accounts to reset
        # dict formation: {'userna5': ['email2', 'email2']}
        userdict[args.User] = gather_email_accounts(args.User, args.UserAll)

    if args.Emails:
        for email in args.Emails:
            # initialize
            domain = owner = None
            # parse domain for each email and find the owner
            domain = email.split('@')[1]
            owner = domain_owner(domain)[0]
            if len(owner) < 1:
                print(f"Failed to find owner for {domain}")
                sys.exit(1)
            else:
                if owner in userdict:
                    userdict[owner].append(email)
                else:
                    userdict[owner] = [email]

        valid_emails = []
        prelim_checks(owner)
        # Get list of emails owned by found owners
        for owner, emails in userdict.items():
            for entry in cpapi2(module='Email::listpopssingle', user=owner)[
                'cpanelresult'
            ]['data']:
                valid_emails.append(entry['login'])
            email_to_reset_valid = []

            # Ensure each email actually exists in owner's account
            for email in emails:
                if email in valid_emails:
                    email_to_reset_valid.append(email)
                else:
                    print(
                        "{} does not exist in {}'s account.".format(
                            email, owner
                        )
                    )
            # reset userdict with only the actual email accounts
            userdict[owner] = email_to_reset_valid

    # send the processed userdict to reset_email_with_api
    reset_email_with_api(userdict, args.Message, args.Clear)


def reset_email_with_api(userdict, reason, clear):
    """Resets a list of eail passwords and sends a notification"""
    length = 18
    chars = string.ascii_letters + string.digits + '!#$%^&*()_+{}?><'
    if 'inmotionhosting' in node() or 'servconfig' in node():
        eid = 617
    elif 'webhostinghub' in node():
        eid = 618
    else:
        sys.exit('Cannot find brand to connect to AMP. Check the hostname.')

    for owner, emails in userdict.items():
        msg = ""
        for email in emails:
            # check and update password reset history
            resets = check_email_records(email, owner)
            # generate new password
            random.seed = urandom(1024)
            newpass = ''.join(random.choice(chars) for _ in range(length))
            print("---")
            print(f"{email} new password: {newpass}")
            emaildomain = email.split('@')[1]
            emailuser = email.split('@')[0]
            # reset password with cPanel api
            try:
                result = cpapi2(
                    module='Email::passwdpop',
                    user=owner,
                    args={
                        "domain": emaildomain,
                        "email": emailuser,
                        "password": newpass,
                    },
                )
                if result['cpanelresult']['data'][0]['result'] == 1:
                    print(green('Success'))
            except Exception as exc:
                print(red(f"Failed to reset password. Debug info: {exc}"))

            # if resets > 2, we've done this before.
            # Time to review the accounts
            if resets > 3:
                msg = (
                    "[!!!] {} has been reset multiple times. "
                    "Please check {}'s account for a final notice "
                    "and suspend if necessary.".format(email, owner)
                )
                print(red(msg))
            elif resets > 2:
                msg = (
                    "[!!] {} has been reset more than twice, "
                    "please send final notice.".format(email)
                )
                print(red(msg))

            # if msg contains a warning, give T2S a link to review the account
            if len(msg) >= 1:
                print(
                    "Please review account: "
                    "http://gm1.imhadmin.net/cgi-bin/ppanel-search.cgi?query="
                    "{}".format(owner)
                )

            print(f"Now emailing {owner}...")
            message = {
                'subject': "Email Password Reset",
                'eid': eid,
                'var1': reason,
                'var2': emails,
            }
            mail_users(message, owner)

        # clear if we opted to clear the queue, warn if not
        if clear:
            clear_queue(emails)
    if not clear:
        print("-c not used. Please clear the queue.")
    print(green("--- Resets complete!"))


def mail_users(message, cpuser):
    """This part sends the message to the user.
    Reference for pp_api emails"""
    # check to see if we need to tattle to a reseller about their child
    if 'servconfig' in node():
        owner = rads.get_owner(cpuser)
        if owner not in rads.OUR_RESELLERS:
            cpuser = owner
    pp_connect = pp_api.PowerPanel()
    template_id = int(message['eid'])
    data = {
        'template': template_id,
        'cpanelUser': cpuser,
        'variable_1': message['var1'],
        'variable_2': ', '.join(message['var2']),
        'variable_3': None,
    }
    results = pp_connect.call("notification.send", **data)
    if results.status != 0:
        log_error = f"Failed to send notification to {cpuser}"
        logging.error(log_error)
        print("{} {}".format(vars(results)['message'], cpuser))
        print(red("Failed to get a valid response from AMP."))
    else:
        log_success = "Success: {} sent to {} by {}".format(
            message['subject'], cpuser, rads.get_login()
        )
        logging.info(log_success)
        print("Message Sent Successfully")


def search_exim_queue(kill_email, all_messages):
    """Collect the list of outgoing emails in queue to be killed"""
    killcount = 0
    print(green(f"Killing messages in queue from {kill_email}"))
    for email_id in all_messages:
        try:
            header = run(['/usr/sbin/exim', '-Mvh', email_id])
        except subprocess.CalledProcessError:
            continue
        for line in header:
            if line.startswith('--auth_id'):
                if kill_email in line:
                    killcount = kill_from_queue(email_id, killcount)
                break
    if killcount > 0:
        print(f'Successfully killed {killcount} messages')


def clear_queue(email_list):
    """Collect the list of bouncebacks resulting from the spam to be
    removed from the queue."""
    email_killed = 0
    all_messages = run(['/usr/sbin/exiqgrep', '-if', '".*"'])
    for email in email_list:
        search_exim_queue(email, all_messages)
        kill_bounce_list = run(['/usr/sbin/exiqgrep', '-if', '<>', '-r', email])
        print(green(f"Killing bouncebacks in queue from {email}"))
        for email_id in kill_bounce_list:
            email_killed = kill_from_queue(email_id, email_killed)
        if email_killed > 0:
            print(f"Bouncebacks killed from queue {email_killed}")


def kill_from_queue(email_id, kill_count):
    """Given an email id, exim -Mrm id"""
    if email_id:
        try:
            run(['/usr/sbin/exim', '-Mrm', email_id])
            kill_count += 1
        except subprocess.CalledProcessError:
            return kill_count
    return kill_count


def check_email_records(email_account, account_owner):
    """
    Checking the amount of times an account has had its password
    reset within 6 months.
    :param email_account: Email account in question.
    :param account_owner: The owner of the email account.
    :return: Number showing how many times the account has
    had its password reset.
    """
    path = Path('/home', account_owner, '.imh/reset_email.json')
    path.parent.mkdir(exist_ok=True, mode=0o755)

    # Loading the data.
    try:
        with open(path, encoding='ascii') as fname:
            data = json.loads(fname.read())
    except (OSError, ValueError):
        data = {}
        print(green(f"Creating {path}"))

    if email_account not in list(data.keys()):
        data[email_account] = [str(datetime.now())]
        with open(path, "w", encoding='ascii') as fname:
            fname.write(json.dumps(data))
        return 0

    # Updating the list now; before we count.
    data[email_account].append(str(datetime.now()))
    with open(path, "w", encoding='ascii') as fname:
        fname.write(json.dumps(data))

    # String to expect: ['2018-11-12 13:49:47.212776']
    count = 0
    for date in data[email_account]:
        now = datetime.now()
        time = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f")
        if abs(now - time).days <= 180:
            count += 1

    return count


if __name__ == "__main__":
    main()

@StableExploit - 2025