|
Server : Apache System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64 User : lonias5 ( 3576) PHP Version : 7.3.33 Disable Function : NONE Directory : /proc/self/root/proc/thread-self/root/opt/sharedrads/ |
Upload File : |
#! /usr/lib/rads/venv/bin/python3
'''List sources of email sent by address and directory.'''
import os
import sys
import re
import glob
import gzip
import datetime
from collections import defaultdict
from argparse import ArgumentParser
__author__ = "Daniel K"
__email__ = "danielk@inmotionhosting.com"
def email_lines(all_logs=False):
'''Return iterable over email log lines'''
log_list = []
if all_logs:
log_list = glob.glob('/var/log/exim_mainlog?*')
for log_file in log_list:
if not os.path.exists(log_file):
print(f"Could not find log file: {log_file}")
sys.exit(1)
with gzip.open(log_file, 'r') as mail_log:
try:
yield from mail_log
except OSError as error:
print(f"Error reading file '{log_file}': {error}")
sys.exit(1)
log_file = "/var/log/exim_mainlog"
if not os.path.exists(log_file):
print(f"Could not find log file: {log_file}")
sys.exit(1)
with open(log_file, encoding='utf-8') as mail_log:
try:
yield from mail_log
except OSError:
print(f"Error reading file {log_file}")
sys.exit(1)
except UnicodeDecodeError as e:
print(f"Received decoding error for {log_file}:")
print(f"{e}")
print("continuing...")
def get_domains(username=''):
'''Get domain regex for username'''
if username == '':
return r'[^@ ]+'
domain_list = []
user_file = f"/var/cpanel/users/{username}"
if not os.path.exists(user_file):
print(
"Could not find domains for {}. "
"Invalid cPanel user? Cannot find {}".format(username, user_file)
)
sys.exit(1)
dns_rx = re.compile(r"^DNS[0-9]*=(.*)$")
with open(user_file, encoding='utf-8') as mail_log:
try:
for line in mail_log:
dns_match = dns_rx.search(line)
if dns_match is not None:
domain_list.append(dns_match.groups(1)[0])
except OSError as error:
print(f"Error reading file '{user_file}': {error}")
sys.exit(1)
return '|'.join(domain_list)
def get_sources(all_logs=False, username='', time=''):
'''Returns touple of dicts of email sources'''
email_logins = defaultdict(int)
working_directories = defaultdict(int)
spoofing = defaultdict(int)
domains = get_domains(username)
if time == '':
date = ''
duration = 0
elif '-' in str(time):
date = time
duration = 0
else:
assert isinstance(time, int), "Time is not date or number"
date = ''
duration = int(time)
target = datetime.datetime.now()
datetime_rx = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
login_rx = re.compile(
r'(courier|dovecot)_(plain|login):(?P<login>[^@ ]+(@(?P<domain>{}))?) '
r'.*for (?P<for>.*)$'.format(domains)
)
spoofing_rx = re.compile(
r'<= (?P<sender>[^@]*@[^@ ]+)'
r'.*(courier|dovecot)_(plain|login):'
r'(?P<login>(?!(?P=sender))[^@ ]+(@(?P<sdom>{}))?)'
r'.*for (?P<for>.*)$'.format(domains)
)
directory_rx = re.compile(fr'cwd=(?P<directory>/home/{username}[^ ]*)')
for line in email_lines(all_logs):
if date != '' and not line.startswith(date):
continue
if not datetime_rx.match(line):
continue
# If duration is set, skip any lines not within that duration
if duration > 0 and not (
duration
> (
target
- datetime.datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S")
).total_seconds()
):
continue
rx_match = spoofing_rx.search(line.lower())
if rx_match:
logged_in = "{} as {}".format(
rx_match.group('login'), rx_match.group('sender')
)
spoofing[logged_in] = spoofing[logged_in] + len(
rx_match.group('for').split()
)
rx_match = login_rx.search(line.lower())
if rx_match:
address = rx_match.group('login')
email_logins[address] = email_logins[address] + len(
rx_match.group('for').split()
)
continue
rx_match = directory_rx.search(line)
if rx_match:
directory = rx_match.group('directory')
if '/usr/local/cpanel/' in directory:
continue
working_directories[directory] = working_directories[directory] + 1
continue
return (email_logins, working_directories, spoofing)
def print_sorted_dict(dictionary):
'''Print a dictionary sorted by values'''
for value in sorted(dictionary, key=dictionary.get):
print(f"{dictionary[value]:>7}\t{value}")
def parse_args():
'''Parse command line aruments'''
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"-a",
"--all",
action='store_true',
help="Search all email logs, rather than only the recent log.",
)
parser.add_argument(
'username',
metavar='USER',
type=str,
nargs='?',
help="Search for only email from a specific cPanel account",
)
time_group = parser.add_mutually_exclusive_group()
time_group.add_argument(
"-d",
"--date",
action='store',
type=str,
default='',
help=(
"Search for entries from a certain date. "
"Must be in the format of YYYY-MM-DD."
),
)
time_group.add_argument(
"-s",
"--seconds",
action='store',
type=int,
default=0,
help=(
"Search entries which were made within the specified "
"number of seconds. Overrides --all."
),
)
time_group.add_argument(
"-r",
"--recent",
action='store_true',
help=(
"Search recent entries, from the last hour. "
"This is the same as -s 3600. Also overrides --all"
),
)
args = parser.parse_args()
all_logs = args.all
if args.username is None:
username = ''
else:
username = args.username
date_rx = re.compile(r"\d{4}-\d{2}-\d{2}")
if args.recent:
time = 3600
all_logs = False
elif args.date != '':
if not date_rx.match(args.date):
print(f"Date is not in the correct format: {args.date}")
sys.exit(1)
time = args.date
elif args.seconds > 0:
time = args.seconds
all_logs = False
else:
time = ''
return all_logs, username, time
def main():
'''Main function for script'''
(all_logs, username, time) = parse_args()
(email_logins, working_directories, spoofing) = get_sources(
all_logs, username, time
)
print("Email Logins:")
print_sorted_dict(email_logins)
print("\nSource directories:")
print_sorted_dict(working_directories)
print("\nPossibly spoofed emails:")
if not len(spoofing) == 0:
print_sorted_dict(spoofing)
else:
print("\tNo obvious spoofs found")
if __name__ == "__main__":
main()