|
Server : Apache System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64 User : lonias5 ( 3576) PHP Version : 7.3.33 Disable Function : NONE Directory : /proc/self/root/proc/thread-self/root/opt/sharedrads/ |
Upload File : |
#!/usr/lib/rads/venv/bin/python3
"""Resets email passwords and sends an email notification"""
from platform import node
from datetime import datetime
import argparse # for Input selection
import random
import string # for random
import logging
import sys
import subprocess
from os import urandom
from pathlib import Path
import json
from cpapis import cpapi2
import pp_api
import rads
from rads.color import red, green
run = lambda x: subprocess.check_output(
x, encoding='UTF-8', errors='replace'
).splitlines()
def domain_owner(domain):
"""Obtain the cPanel use who owns a provided domain"""
try:
owner = run(['/scripts/whoowns', domain])
except Exception:
error = f"Error running whoowns on {domain}"
print(red(error))
sys.exit(1)
return owner
def gather_email_accounts(user_to_parse, reset_all):
"""Get a list of usernames or an individual cPanel user"""
email_list = []
# pull list of email accounts from cPanel
pops = cpapi2(module='Email::listpopssingle', user=user_to_parse)
for entry in pops['cpanelresult']['data']:
email_list.append(entry['login'])
if reset_all:
# If the option to reset all emails was selected
picker_opts = email_list
else:
# Reset only the specific accounts selected
picker_opts = picker_menu(email_list)
return picker_opts
def picker_menu(email_list):
"""Show a menu allowing the user to select from
a list of email addresses"""
opts = []
print("Select email to reset")
print("\n".join(opts))
print("send q to finish")
while True:
user_input = input().lower()
if user_input in email_list and user_input not in opts:
opts.append(user_input)
elif user_input == "q":
break
split = user_input.split(" ")
if len(split) > 1:
for name in split:
if name in email_list and name not in opts:
opts.append(name)
return opts
def parse_args():
"""Parse commandline arguments with argparse"""
parser = argparse.ArgumentParser()
# The Email and User options are mutually-exclusive
usergroup = parser.add_mutually_exclusive_group()
usergroup.add_argument(
'-u', '-user', action='store', dest='User', help='Define your User'
)
usergroup.add_argument(
'-e', '-email', nargs='+', dest='Emails', help='Set email to reset'
)
parser.add_argument(
'-m',
'-message',
action='store',
dest='Message',
help='Reason why the password was reset',
)
parser.add_argument(
'-a',
'-all',
action='store_true',
dest='UserAll',
help='Reset all USER\'s email accounts',
)
parser.add_argument(
'-c',
'-clear',
action='store_true',
dest='Clear',
help='Search exim queue and kill from user',
)
results = parser.parse_args()
# If neither User nor Email selected
if not results.User and not results.Emails:
print(red('No user or email defined'))
sys.exit(1)
# Confirm User is actual cPanel user
if results.User:
if not rads.is_cpuser(results.User):
print(red('Not a valid user'))
sys.exit(1)
if not results.Message:
sys.exit('Use the -m flag to add a message')
return results
def prelim_checks(user):
"""
Function to check for bulk mail exemptions via
/home/$user/.senderlimit and
/home/$user/.imh/.exim_bulk
"""
senderlimit_path = f'/home/{user}/.senderlimit'
exim_bulk_path = f'/home/{user}/.imh/.exim_bulk'
consider_flag = True
print("Checking for bulk mail exemptions...")
if Path(senderlimit_path).is_file():
# Grab text from senderlimit file
with open(senderlimit_path, encoding='ascii') as senderlimit_file:
senderlimit = senderlimit_file.read().strip()
# Push warning in case of limit over 250 and prompt
# whether to continue or not
if int(senderlimit) > 250:
consider_flag = False
answer = rads.prompt_y_n(
f"Senderlimit for {user} is set to {senderlimit}, "
"do you wish to proceed with resetting password?"
)
if not answer:
sys.exit(1)
if Path(exim_bulk_path).is_file():
# System appears to generate a blank file for
# /home/$user/.imh/.exim_bulk, we just need to see if it exists
answer = rads.prompt_y_n(
"Exim bulk limit file exists, please check PowerPanel for more "
"information. Do you wish to proceed with resetting password? "
)
consider_flag = False
if not answer:
sys.exit(1)
# If neither warning appeared, continue without prompting.
if consider_flag:
print(
"No bulk mail conflicts to consider. ",
f"Checked {senderlimit_path} and {exim_bulk_path}",
)
def main():
"""Main program logic"""
rads.setup_logging(
path='/var/log/suspension.log', loglevel=logging.INFO, print_out=False
)
args = parse_args()
userdict = {}
if args.User:
# Get email accounts to reset
# dict formation: {'userna5': ['email2', 'email2']}
userdict[args.User] = gather_email_accounts(args.User, args.UserAll)
if args.Emails:
for email in args.Emails:
# initialize
domain = owner = None
# parse domain for each email and find the owner
domain = email.split('@')[1]
owner = domain_owner(domain)[0]
if len(owner) < 1:
print(f"Failed to find owner for {domain}")
sys.exit(1)
else:
if owner in userdict:
userdict[owner].append(email)
else:
userdict[owner] = [email]
valid_emails = []
prelim_checks(owner)
# Get list of emails owned by found owners
for owner, emails in userdict.items():
for entry in cpapi2(module='Email::listpopssingle', user=owner)[
'cpanelresult'
]['data']:
valid_emails.append(entry['login'])
email_to_reset_valid = []
# Ensure each email actually exists in owner's account
for email in emails:
if email in valid_emails:
email_to_reset_valid.append(email)
else:
print(
"{} does not exist in {}'s account.".format(
email, owner
)
)
# reset userdict with only the actual email accounts
userdict[owner] = email_to_reset_valid
# send the processed userdict to reset_email_with_api
reset_email_with_api(userdict, args.Message, args.Clear)
def reset_email_with_api(userdict, reason, clear):
"""Resets a list of eail passwords and sends a notification"""
length = 18
chars = string.ascii_letters + string.digits + '!#$%^&*()_+{}?><'
if 'inmotionhosting' in node() or 'servconfig' in node():
eid = 617
elif 'webhostinghub' in node():
eid = 618
else:
sys.exit('Cannot find brand to connect to AMP. Check the hostname.')
for owner, emails in userdict.items():
msg = ""
for email in emails:
# check and update password reset history
resets = check_email_records(email, owner)
# generate new password
random.seed = urandom(1024)
newpass = ''.join(random.choice(chars) for _ in range(length))
print("---")
print(f"{email} new password: {newpass}")
emaildomain = email.split('@')[1]
emailuser = email.split('@')[0]
# reset password with cPanel api
try:
result = cpapi2(
module='Email::passwdpop',
user=owner,
args={
"domain": emaildomain,
"email": emailuser,
"password": newpass,
},
)
if result['cpanelresult']['data'][0]['result'] == 1:
print(green('Success'))
except Exception as exc:
print(red(f"Failed to reset password. Debug info: {exc}"))
# if resets > 2, we've done this before.
# Time to review the accounts
if resets > 3:
msg = (
"[!!!] {} has been reset multiple times. "
"Please check {}'s account for a final notice "
"and suspend if necessary.".format(email, owner)
)
print(red(msg))
elif resets > 2:
msg = (
"[!!] {} has been reset more than twice, "
"please send final notice.".format(email)
)
print(red(msg))
# if msg contains a warning, give T2S a link to review the account
if len(msg) >= 1:
print(
"Please review account: "
"http://gm1.imhadmin.net/cgi-bin/ppanel-search.cgi?query="
"{}".format(owner)
)
print(f"Now emailing {owner}...")
message = {
'subject': "Email Password Reset",
'eid': eid,
'var1': reason,
'var2': emails,
}
mail_users(message, owner)
# clear if we opted to clear the queue, warn if not
if clear:
clear_queue(emails)
if not clear:
print("-c not used. Please clear the queue.")
print(green("--- Resets complete!"))
def mail_users(message, cpuser):
"""This part sends the message to the user.
Reference for pp_api emails"""
# check to see if we need to tattle to a reseller about their child
if 'servconfig' in node():
owner = rads.get_owner(cpuser)
if owner not in rads.OUR_RESELLERS:
cpuser = owner
pp_connect = pp_api.PowerPanel()
template_id = int(message['eid'])
data = {
'template': template_id,
'cpanelUser': cpuser,
'variable_1': message['var1'],
'variable_2': ', '.join(message['var2']),
'variable_3': None,
}
results = pp_connect.call("notification.send", **data)
if results.status != 0:
log_error = f"Failed to send notification to {cpuser}"
logging.error(log_error)
print("{} {}".format(vars(results)['message'], cpuser))
print(red("Failed to get a valid response from AMP."))
else:
log_success = "Success: {} sent to {} by {}".format(
message['subject'], cpuser, rads.get_login()
)
logging.info(log_success)
print("Message Sent Successfully")
def search_exim_queue(kill_email, all_messages):
"""Collect the list of outgoing emails in queue to be killed"""
killcount = 0
print(green(f"Killing messages in queue from {kill_email}"))
for email_id in all_messages:
try:
header = run(['/usr/sbin/exim', '-Mvh', email_id])
except subprocess.CalledProcessError:
continue
for line in header:
if line.startswith('--auth_id'):
if kill_email in line:
killcount = kill_from_queue(email_id, killcount)
break
if killcount > 0:
print(f'Successfully killed {killcount} messages')
def clear_queue(email_list):
"""Collect the list of bouncebacks resulting from the spam to be
removed from the queue."""
email_killed = 0
all_messages = run(['/usr/sbin/exiqgrep', '-if', '".*"'])
for email in email_list:
search_exim_queue(email, all_messages)
kill_bounce_list = run(['/usr/sbin/exiqgrep', '-if', '<>', '-r', email])
print(green(f"Killing bouncebacks in queue from {email}"))
for email_id in kill_bounce_list:
email_killed = kill_from_queue(email_id, email_killed)
if email_killed > 0:
print(f"Bouncebacks killed from queue {email_killed}")
def kill_from_queue(email_id, kill_count):
"""Given an email id, exim -Mrm id"""
if email_id:
try:
run(['/usr/sbin/exim', '-Mrm', email_id])
kill_count += 1
except subprocess.CalledProcessError:
return kill_count
return kill_count
def check_email_records(email_account, account_owner):
"""
Checking the amount of times an account has had its password
reset within 6 months.
:param email_account: Email account in question.
:param account_owner: The owner of the email account.
:return: Number showing how many times the account has
had its password reset.
"""
path = Path('/home', account_owner, '.imh/reset_email.json')
path.parent.mkdir(exist_ok=True, mode=0o755)
# Loading the data.
try:
with open(path, encoding='ascii') as fname:
data = json.loads(fname.read())
except (OSError, ValueError):
data = {}
print(green(f"Creating {path}"))
if email_account not in list(data.keys()):
data[email_account] = [str(datetime.now())]
with open(path, "w", encoding='ascii') as fname:
fname.write(json.dumps(data))
return 0
# Updating the list now; before we count.
data[email_account].append(str(datetime.now()))
with open(path, "w", encoding='ascii') as fname:
fname.write(json.dumps(data))
# String to expect: ['2018-11-12 13:49:47.212776']
count = 0
for date in data[email_account]:
now = datetime.now()
time = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f")
if abs(now - time).days <= 180:
count += 1
return count
if __name__ == "__main__":
main()