|
Server : Apache System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64 User : lonias5 ( 3576) PHP Version : 7.3.33 Disable Function : NONE Directory : /proc/self/root/proc/thread-self/root/opt/sharedrads/ |
Upload File : |
#!/usr/bin/imh-python3
import argparse
import re
import subprocess
import sys
from collections import defaultdict
from ipaddress import ip_address, ip_network
from prettytable import PrettyTable
from rads import color
class ServiceRegistry:
"""Maps services to ports."""
def __init__(self):
self.services = {
'tcp': {
'http': [80, 443],
'imap': [143, 993],
'pop3': [110, 995],
'smtp': [25, 465, 587],
'ftp': [20, 21],
'ssh': [22, 2222],
'cpanel': [2082, 2083],
'whm': [2086, 2087],
'webmail': [2095, 2096],
},
'udp': {'dns': [53], 'ntp': [123]},
}
self.port_to_service_name_map = {'tcp': {}, 'udp': {}}
for protocol, service_to_ports_map in self.services.items():
for service_name, port_list in service_to_ports_map.items():
for port_number in port_list:
self.port_to_service_name_map[protocol][
port_number
] = service_name
# Gets a service name.
def get_service_name(self, port_number: int, protocol: str) -> str:
return self.port_to_service_name_map.get(protocol, {}).get(
port_number, 'other'
)
# Checks if port is tracked.
def is_tracked_port(self, port_number: int, protocol: str) -> bool:
return port_number in self.port_to_service_name_map.get(protocol, {})
# Gets all tracked ports.
def get_all_ports(self, protocol: str) -> set:
return set(self.port_to_service_name_map.get(protocol, {}).keys())
class ConnectionReporter:
"""Manages connection reporting."""
def __init__(self):
self.service_registry = ServiceRegistry()
self.command_line_args = self.parse_command_line_arguments()
self.selected_ports = set()
self.requested_tcp_ports = set()
self.requested_udp_ports = set()
self.should_show_all_ports = True
self.tcp_connections_by_port = defaultdict(lambda: defaultdict(int))
self.udp_connections_by_port = defaultdict(lambda: defaultdict(int))
self.processed_connection_data = {}
self.collected_client_ips_for_subnetting = []
self.process_port_selection_from_args()
# Parses command-line arguments.
def parse_command_line_arguments(self):
parser = argparse.ArgumentParser(
description='Summarize TCP/UDP connection info.'
)
for service_name in self.service_registry.services['tcp']:
parser.add_argument(
f'--{service_name}',
action='store_true',
help=f'Show only {service_name.upper()} connections',
)
parser.add_argument(
'--tcp', action='store_true', help='Show all TCP connections'
)
parser.add_argument(
'--udp', action='store_true', help='Show all UDP connections'
)
parser.add_argument(
'--top',
action='store_true',
help='Only show services ≥2000 connections or IPs >200',
)
parser.add_argument(
'-s',
'--subnet',
action='store_true',
help='Group connections by /24 and /16 subnets',
)
parser.add_argument(
'-p',
'--port',
nargs='+',
help='Specify up to 10 ports (space-separated)',
)
parser.add_argument(
'ports', nargs='*', help='Deprecated: use -p or --port instead'
)
return parser.parse_args()
# Processes selected port arguments.
def process_port_selection_from_args(self):
any_port_was_selected = False
if self.command_line_args.tcp:
all_tcp_ports = self.service_registry.get_all_ports('tcp')
self.selected_ports.update(all_tcp_ports)
self.requested_tcp_ports.update(all_tcp_ports)
any_port_was_selected = True
else:
for service_name in self.service_registry.services['tcp']:
if getattr(self.command_line_args, service_name, False):
ports = self.service_registry.services['tcp'][service_name]
self.selected_ports.update(ports)
self.requested_tcp_ports.update(ports)
any_port_was_selected = True
if self.command_line_args.port:
if len(self.command_line_args.port) > 10:
print("Error: max 10 ports")
sys.exit(1)
for port_string in self.command_line_args.port:
if port_string.isdigit():
port_number = int(port_string)
self.selected_ports.add(port_number)
self.requested_tcp_ports.add(port_number)
any_port_was_selected = True
for port_string in self.command_line_args.ports:
if port_string.isdigit():
port_number = int(port_string)
self.selected_ports.add(port_number)
self.requested_tcp_ports.add(port_number)
any_port_was_selected = True
if not any_port_was_selected:
all_tcp_ports = self.service_registry.get_all_ports('tcp')
self.selected_ports.update(all_tcp_ports)
self.requested_tcp_ports.update(all_tcp_ports)
self.should_show_all_ports = False
# Collects TCP connection data.
def collect_tcp_connections(self):
try:
subprocess_output = subprocess.run(
["/usr/sbin/ss", "-tan"],
stdout=subprocess.PIPE,
check=True,
text=True,
)
except subprocess.CalledProcessError:
print("Error running: ss -tan", file=sys.stderr)
sys.exit(1)
for line in subprocess_output.stdout.splitlines():
self._parse_ss_output_line(
line, 'tcp', self.tcp_connections_by_port
)
# Collects UDP connection data.
def collect_udp_connections(self):
try:
subprocess_output = subprocess.run(
["/usr/sbin/ss", "-uan"],
stdout=subprocess.PIPE,
check=True,
text=True,
)
except subprocess.CalledProcessError:
print("Error running: ss -uan", file=sys.stderr)
return
for line in subprocess_output.stdout.splitlines():
self._parse_ss_output_line(
line, 'udp', self.udp_connections_by_port
)
# Parses a single connection line.
def _parse_ss_output_line(self, line, protocol, connection_storage_dict):
line_parts = re.split(r'\s+', line.strip())
if len(line_parts) < 5:
return
try:
server_address_string = re.sub(r'^::ffff:', '', line_parts[3])
client_address_string = re.sub(r'^::ffff:', '', line_parts[4])
server_ip, server_port_string = server_address_string.rsplit(':', 1)
client_ip, _ = client_address_string.rsplit(':', 1)
except ValueError:
return
if (
server_ip in {'*', '0.0.0.0', '127.0.0.1', '[::]', '::'}
or server_port_string == '*'
):
return
try:
port_number = int(server_port_string)
except ValueError:
return
if self.command_line_args.subnet:
self.collected_client_ips_for_subnetting.append(client_ip)
return
is_connection_allowed_by_filters = (
self.service_registry.is_tracked_port(port_number, protocol)
if self.should_show_all_ports
else port_number in self.selected_ports
)
if is_connection_allowed_by_filters:
connection_storage_dict[port_number][client_ip] += 1
# Summarizes all connection data.
def summarize_collected_connections(self):
if not self.command_line_args.udp:
for (
port_number,
ip_to_count_map,
) in self.tcp_connections_by_port.items():
self.processed_connection_data[f"tcp:{port_number}"] = {
'total': sum(ip_to_count_map.values()),
'ips': ip_to_count_map,
}
if self.command_line_args.udp or self.command_line_args.top:
for (
port_number,
ip_to_count_map,
) in self.udp_connections_by_port.items():
self.processed_connection_data[f"udp:{port_number}"] = {
'total': sum(ip_to_count_map.values()),
'ips': ip_to_count_map,
}
# Ensures requested services appear.
def ensure_all_requested_services_in_summary(self):
protocol = 'udp' if self.command_line_args.udp else 'tcp'
requested_ports = (
self.requested_udp_ports
if self.command_line_args.udp
else self.requested_tcp_ports
)
for port_number in requested_ports:
data_key = f"{protocol}:{port_number}"
if data_key not in self.processed_connection_data:
self.processed_connection_data[data_key] = {
'total': 0,
'ips': {},
}
# Prints tables side-by-side.
def print_service_tables_side_by_side(
self, service_title, tables_with_row_data
):
if not tables_with_row_data:
return
# Generate table strings and find max height
list_of_table_line_lists = []
list_of_high_traffic_ip_sets = []
total_connections = 0
for table, rows in tables_with_row_data:
list_of_table_line_lists.append(table.get_string().splitlines())
list_of_high_traffic_ip_sets.append(
{ip for ip, count in rows if count > 200}
)
total_connections += sum(count for _, count in rows)
max_height = max(
len(lines) for lines in list_of_table_line_lists if lines
)
# Pad shorter tables
for i, lines in enumerate(list_of_table_line_lists):
if not lines:
continue
table_width = len(lines[0])
while len(lines) < max_height:
lines.append(' ' * table_width)
# Determine if service total is high
service_is_hot = total_connections > 2000
# Colorize and print header
title_string = f"{service_title.upper()} ({total_connections})"
if service_is_hot:
print(color.red(title_string.center(100)))
else:
print(color.magenta(title_string.center(100)))
# Print table content line by line
for i in range(max_height):
line_parts = []
for current_table_index, lines in enumerate(
list_of_table_line_lists
):
line = lines[i]
is_header = i == 1
is_border = line.strip().startswith('+') or not line.strip()
# Color logic
if is_border:
line_parts.append(color.cyan(line))
elif is_header:
line_parts.append(color.magenta(line))
else:
is_hot_line = any(
hot_ip in line
for hot_ip in list_of_high_traffic_ip_sets[
current_table_index
]
)
if is_hot_line:
line_parts.append(color.red(line))
else:
line_parts.append(line)
print(" ".join(line_parts))
print()
# Reports on service connections.
def generate_connection_report_tables(self):
service_map_by_protocol = defaultdict(
lambda: defaultdict(lambda: {'total': 0, 'ips': {}})
)
for data_key, connection_data in self.processed_connection_data.items():
protocol, port_string = data_key.split(':')
port_number = int(port_string)
service_name = self.service_registry.get_service_name(
port_number, protocol
)
service_map_by_protocol[(protocol, service_name)][port_number] = {
'total': connection_data['total'],
'ips': connection_data['ips'],
}
service_print_order = [
'http',
'cpanel',
'whm',
'smtp',
'imap',
'pop3',
'webmail',
'ssh',
'ftp',
'mysql',
]
for protocol in ['tcp', 'udp']:
if protocol == 'udp' and not self.command_line_args.udp:
continue
i = 0
while i < len(service_print_order):
service_name = service_print_order[i]
# Group mail services
if service_name == 'imap':
combined_mail_services = ['imap', 'pop3', 'webmail']
tables_to_print = []
ports_data = {}
for sub_service_name in combined_mail_services:
sub_service_ports_data = service_map_by_protocol.get(
(protocol, sub_service_name), {}
)
ports_data.update(sub_service_ports_data)
for port_number, info in sub_service_ports_data.items():
rows = sorted(
info['ips'].items(), key=lambda x: -x[1]
)[:10]
table = PrettyTable(
[
f"{sub_service_name.upper()}:{port_number}",
"Count",
]
)
for row in rows:
table.add_row(row)
table.align = "l"
table.align["Count"] = "r"
tables_to_print.append((table, rows))
# --top filtering logic
total_connections = sum(
d['total'] for d in ports_data.values()
)
any_ip_over_200 = any(
c > 200
for d in ports_data.values()
for c in d['ips'].values()
)
should_print_service = not self.command_line_args.top or (
total_connections >= 2000 or any_ip_over_200
)
if tables_to_print and should_print_service:
self.print_service_tables_side_by_side(
'MAIL SERVICES', tables_to_print
)
i += len(combined_mail_services)
continue
# Handle single services
ports_data = service_map_by_protocol.get(
(protocol, service_name), {}
)
if ports_data:
tables_to_print = []
for port_number, info in ports_data.items():
rows = sorted(info['ips'].items(), key=lambda x: -x[1])[
:10
]
table = PrettyTable(
[f"{service_name.upper()}:{port_number}", "Count"]
)
for row in rows:
table.add_row(row)
table.align = "l"
table.align["Count"] = "r"
tables_to_print.append((table, rows))
# --top filtering logic
total_connections = sum(
d['total'] for d in ports_data.values()
)
any_ip_over_200 = any(
c > 200
for d in ports_data.values()
for c in d['ips'].values()
)
should_print_service = not self.command_line_args.top or (
total_connections >= 2000 or any_ip_over_200
)
if tables_to_print and should_print_service:
self.print_service_tables_side_by_side(
service_name, tables_to_print
)
i += 1
# Summarizes subnet connections.
def generate_subnet_summary_tables(self):
slash_24_subnet_counts = defaultdict(int)
slash_16_subnet_counts = defaultdict(int)
for ip_string in self.collected_client_ips_for_subnetting:
try:
ip_address_object = ip_address(ip_string)
if (
ip_address_object.is_loopback
or ip_address_object.is_private
or ip_address_object.is_unspecified
):
continue
slash_24_subnet_counts[
str(ip_network(f"{ip_address_object}/24", strict=False))
] += 1
slash_16_subnet_counts[
str(ip_network(f"{ip_address_object}/16", strict=False))
] += 1
except ValueError:
continue
for table_title_label, subnet_counts_data in (
("Top /24 Subnets", slash_24_subnet_counts),
("Top /16 Subnets", slash_16_subnet_counts),
):
if not subnet_counts_data:
continue
# Filter rows based on the --top flag
all_rows_sorted = sorted(
subnet_counts_data.items(), key=lambda kv: -kv[1]
)
if self.command_line_args.top:
rows_to_display = [
row for row in all_rows_sorted if row[1] > 200
]
else:
rows_to_display = all_rows_sorted[:20]
if not rows_to_display:
continue
table = PrettyTable(["Subnet", "Count"])
table.title = table_title_label
high_traffic_row_indices = {
i for i, row in enumerate(rows_to_display) if row[1] > 200
}
for row in rows_to_display:
table.add_row(row)
table.align["Subnet"] = "l"
table.align["Count"] = "r"
lines = table.get_string().splitlines()
for i, line in enumerate(lines):
# Line 1 is title, Line 3 is header text
if i in {1, 3}:
print(color.magenta(line))
# Data rows start at index 5
elif i >= 5 and (i - 5) in high_traffic_row_indices:
print(color.red(line))
else:
print(color.cyan(line))
print()
# Main execution function.
def run(self):
if self.command_line_args.subnet:
self.collect_tcp_connections()
self.collect_udp_connections()
self.generate_subnet_summary_tables()
return
if self.command_line_args.udp or self.command_line_args.top:
self.collect_udp_connections()
if not self.command_line_args.udp:
self.collect_tcp_connections()
self.summarize_collected_connections()
self.ensure_all_requested_services_in_summary()
self.generate_connection_report_tables()
if __name__ == "__main__":
ConnectionReporter().run()