Prv8 Shell
Server : Apache
System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64
User : lonias5 ( 3576)
PHP Version : 7.3.33
Disable Function : NONE
Directory :  /proc/self/root/proc/thread-self/root/opt/sharedrads/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/proc/thread-self/root/opt/sharedrads/show-conns
#!/usr/bin/imh-python3

import argparse
import re
import subprocess
import sys
from collections import defaultdict
from ipaddress import ip_address, ip_network
from prettytable import PrettyTable
from rads import color


class ServiceRegistry:
    """Maps services to ports."""

    def __init__(self):
        self.services = {
            'tcp': {
                'http': [80, 443],
                'imap': [143, 993],
                'pop3': [110, 995],
                'smtp': [25, 465, 587],
                'ftp': [20, 21],
                'ssh': [22, 2222],
                'cpanel': [2082, 2083],
                'whm': [2086, 2087],
                'webmail': [2095, 2096],
            },
            'udp': {'dns': [53], 'ntp': [123]},
        }
        self.port_to_service_name_map = {'tcp': {}, 'udp': {}}
        for protocol, service_to_ports_map in self.services.items():
            for service_name, port_list in service_to_ports_map.items():
                for port_number in port_list:
                    self.port_to_service_name_map[protocol][
                        port_number
                    ] = service_name

    # Gets a service name.
    def get_service_name(self, port_number: int, protocol: str) -> str:
        return self.port_to_service_name_map.get(protocol, {}).get(
            port_number, 'other'
        )

    # Checks if port is tracked.
    def is_tracked_port(self, port_number: int, protocol: str) -> bool:
        return port_number in self.port_to_service_name_map.get(protocol, {})

    # Gets all tracked ports.
    def get_all_ports(self, protocol: str) -> set:
        return set(self.port_to_service_name_map.get(protocol, {}).keys())


class ConnectionReporter:
    """Manages connection reporting."""

    def __init__(self):
        self.service_registry = ServiceRegistry()
        self.command_line_args = self.parse_command_line_arguments()
        self.selected_ports = set()
        self.requested_tcp_ports = set()
        self.requested_udp_ports = set()
        self.should_show_all_ports = True
        self.tcp_connections_by_port = defaultdict(lambda: defaultdict(int))
        self.udp_connections_by_port = defaultdict(lambda: defaultdict(int))
        self.processed_connection_data = {}
        self.collected_client_ips_for_subnetting = []
        self.process_port_selection_from_args()

    # Parses command-line arguments.
    def parse_command_line_arguments(self):
        parser = argparse.ArgumentParser(
            description='Summarize TCP/UDP connection info.'
        )
        for service_name in self.service_registry.services['tcp']:
            parser.add_argument(
                f'--{service_name}',
                action='store_true',
                help=f'Show only {service_name.upper()} connections',
            )
        parser.add_argument(
            '--tcp', action='store_true', help='Show all TCP connections'
        )
        parser.add_argument(
            '--udp', action='store_true', help='Show all UDP connections'
        )
        parser.add_argument(
            '--top',
            action='store_true',
            help='Only show services ≥2000 connections or IPs >200',
        )
        parser.add_argument(
            '-s',
            '--subnet',
            action='store_true',
            help='Group connections by /24 and /16 subnets',
        )
        parser.add_argument(
            '-p',
            '--port',
            nargs='+',
            help='Specify up to 10 ports (space-separated)',
        )
        parser.add_argument(
            'ports', nargs='*', help='Deprecated: use -p or --port instead'
        )
        return parser.parse_args()

    # Processes selected port arguments.
    def process_port_selection_from_args(self):
        any_port_was_selected = False
        if self.command_line_args.tcp:
            all_tcp_ports = self.service_registry.get_all_ports('tcp')
            self.selected_ports.update(all_tcp_ports)
            self.requested_tcp_ports.update(all_tcp_ports)
            any_port_was_selected = True
        else:
            for service_name in self.service_registry.services['tcp']:
                if getattr(self.command_line_args, service_name, False):
                    ports = self.service_registry.services['tcp'][service_name]
                    self.selected_ports.update(ports)
                    self.requested_tcp_ports.update(ports)
                    any_port_was_selected = True

        if self.command_line_args.port:
            if len(self.command_line_args.port) > 10:
                print("Error: max 10 ports")
                sys.exit(1)
            for port_string in self.command_line_args.port:
                if port_string.isdigit():
                    port_number = int(port_string)
                    self.selected_ports.add(port_number)
                    self.requested_tcp_ports.add(port_number)
                    any_port_was_selected = True

        for port_string in self.command_line_args.ports:
            if port_string.isdigit():
                port_number = int(port_string)
                self.selected_ports.add(port_number)
                self.requested_tcp_ports.add(port_number)
                any_port_was_selected = True

        if not any_port_was_selected:
            all_tcp_ports = self.service_registry.get_all_ports('tcp')
            self.selected_ports.update(all_tcp_ports)
            self.requested_tcp_ports.update(all_tcp_ports)

        self.should_show_all_ports = False

    # Collects TCP connection data.
    def collect_tcp_connections(self):
        try:
            subprocess_output = subprocess.run(
                ["/usr/sbin/ss", "-tan"],
                stdout=subprocess.PIPE,
                check=True,
                text=True,
            )
        except subprocess.CalledProcessError:
            print("Error running: ss -tan", file=sys.stderr)
            sys.exit(1)
        for line in subprocess_output.stdout.splitlines():
            self._parse_ss_output_line(
                line, 'tcp', self.tcp_connections_by_port
            )

    # Collects UDP connection data.
    def collect_udp_connections(self):
        try:
            subprocess_output = subprocess.run(
                ["/usr/sbin/ss", "-uan"],
                stdout=subprocess.PIPE,
                check=True,
                text=True,
            )
        except subprocess.CalledProcessError:
            print("Error running: ss -uan", file=sys.stderr)
            return
        for line in subprocess_output.stdout.splitlines():
            self._parse_ss_output_line(
                line, 'udp', self.udp_connections_by_port
            )

    # Parses a single connection line.
    def _parse_ss_output_line(self, line, protocol, connection_storage_dict):
        line_parts = re.split(r'\s+', line.strip())
        if len(line_parts) < 5:
            return
        try:
            server_address_string = re.sub(r'^::ffff:', '', line_parts[3])
            client_address_string = re.sub(r'^::ffff:', '', line_parts[4])
            server_ip, server_port_string = server_address_string.rsplit(':', 1)
            client_ip, _ = client_address_string.rsplit(':', 1)
        except ValueError:
            return

        if (
            server_ip in {'*', '0.0.0.0', '127.0.0.1', '[::]', '::'}
            or server_port_string == '*'
        ):
            return
        try:
            port_number = int(server_port_string)
        except ValueError:
            return

        if self.command_line_args.subnet:
            self.collected_client_ips_for_subnetting.append(client_ip)
            return

        is_connection_allowed_by_filters = (
            self.service_registry.is_tracked_port(port_number, protocol)
            if self.should_show_all_ports
            else port_number in self.selected_ports
        )
        if is_connection_allowed_by_filters:
            connection_storage_dict[port_number][client_ip] += 1

    # Summarizes all connection data.
    def summarize_collected_connections(self):
        if not self.command_line_args.udp:
            for (
                port_number,
                ip_to_count_map,
            ) in self.tcp_connections_by_port.items():
                self.processed_connection_data[f"tcp:{port_number}"] = {
                    'total': sum(ip_to_count_map.values()),
                    'ips': ip_to_count_map,
                }
        if self.command_line_args.udp or self.command_line_args.top:
            for (
                port_number,
                ip_to_count_map,
            ) in self.udp_connections_by_port.items():
                self.processed_connection_data[f"udp:{port_number}"] = {
                    'total': sum(ip_to_count_map.values()),
                    'ips': ip_to_count_map,
                }

    # Ensures requested services appear.
    def ensure_all_requested_services_in_summary(self):
        protocol = 'udp' if self.command_line_args.udp else 'tcp'
        requested_ports = (
            self.requested_udp_ports
            if self.command_line_args.udp
            else self.requested_tcp_ports
        )
        for port_number in requested_ports:
            data_key = f"{protocol}:{port_number}"
            if data_key not in self.processed_connection_data:
                self.processed_connection_data[data_key] = {
                    'total': 0,
                    'ips': {},
                }

    # Prints tables side-by-side.
    def print_service_tables_side_by_side(
        self, service_title, tables_with_row_data
    ):
        if not tables_with_row_data:
            return

        # Generate table strings and find max height
        list_of_table_line_lists = []
        list_of_high_traffic_ip_sets = []
        total_connections = 0
        for table, rows in tables_with_row_data:
            list_of_table_line_lists.append(table.get_string().splitlines())
            list_of_high_traffic_ip_sets.append(
                {ip for ip, count in rows if count > 200}
            )
            total_connections += sum(count for _, count in rows)

        max_height = max(
            len(lines) for lines in list_of_table_line_lists if lines
        )

        # Pad shorter tables
        for i, lines in enumerate(list_of_table_line_lists):
            if not lines:
                continue
            table_width = len(lines[0])
            while len(lines) < max_height:
                lines.append(' ' * table_width)

        # Determine if service total is high
        service_is_hot = total_connections > 2000

        # Colorize and print header
        title_string = f"{service_title.upper()} ({total_connections})"
        if service_is_hot:
            print(color.red(title_string.center(100)))
        else:
            print(color.magenta(title_string.center(100)))

        # Print table content line by line
        for i in range(max_height):
            line_parts = []
            for current_table_index, lines in enumerate(
                list_of_table_line_lists
            ):
                line = lines[i]
                is_header = i == 1
                is_border = line.strip().startswith('+') or not line.strip()

                # Color logic
                if is_border:
                    line_parts.append(color.cyan(line))
                elif is_header:
                    line_parts.append(color.magenta(line))
                else:
                    is_hot_line = any(
                        hot_ip in line
                        for hot_ip in list_of_high_traffic_ip_sets[
                            current_table_index
                        ]
                    )
                    if is_hot_line:
                        line_parts.append(color.red(line))
                    else:
                        line_parts.append(line)
            print("  ".join(line_parts))
        print()

    # Reports on service connections.
    def generate_connection_report_tables(self):
        service_map_by_protocol = defaultdict(
            lambda: defaultdict(lambda: {'total': 0, 'ips': {}})
        )
        for data_key, connection_data in self.processed_connection_data.items():
            protocol, port_string = data_key.split(':')
            port_number = int(port_string)
            service_name = self.service_registry.get_service_name(
                port_number, protocol
            )
            service_map_by_protocol[(protocol, service_name)][port_number] = {
                'total': connection_data['total'],
                'ips': connection_data['ips'],
            }

        service_print_order = [
            'http',
            'cpanel',
            'whm',
            'smtp',
            'imap',
            'pop3',
            'webmail',
            'ssh',
            'ftp',
            'mysql',
        ]

        for protocol in ['tcp', 'udp']:
            if protocol == 'udp' and not self.command_line_args.udp:
                continue

            i = 0
            while i < len(service_print_order):
                service_name = service_print_order[i]

                # Group mail services
                if service_name == 'imap':
                    combined_mail_services = ['imap', 'pop3', 'webmail']
                    tables_to_print = []
                    ports_data = {}
                    for sub_service_name in combined_mail_services:
                        sub_service_ports_data = service_map_by_protocol.get(
                            (protocol, sub_service_name), {}
                        )
                        ports_data.update(sub_service_ports_data)
                        for port_number, info in sub_service_ports_data.items():
                            rows = sorted(
                                info['ips'].items(), key=lambda x: -x[1]
                            )[:10]
                            table = PrettyTable(
                                [
                                    f"{sub_service_name.upper()}:{port_number}",
                                    "Count",
                                ]
                            )
                            for row in rows:
                                table.add_row(row)
                            table.align = "l"
                            table.align["Count"] = "r"
                            tables_to_print.append((table, rows))

                    # --top filtering logic
                    total_connections = sum(
                        d['total'] for d in ports_data.values()
                    )
                    any_ip_over_200 = any(
                        c > 200
                        for d in ports_data.values()
                        for c in d['ips'].values()
                    )
                    should_print_service = not self.command_line_args.top or (
                        total_connections >= 2000 or any_ip_over_200
                    )

                    if tables_to_print and should_print_service:
                        self.print_service_tables_side_by_side(
                            'MAIL SERVICES', tables_to_print
                        )

                    i += len(combined_mail_services)
                    continue

                # Handle single services
                ports_data = service_map_by_protocol.get(
                    (protocol, service_name), {}
                )
                if ports_data:
                    tables_to_print = []
                    for port_number, info in ports_data.items():
                        rows = sorted(info['ips'].items(), key=lambda x: -x[1])[
                            :10
                        ]
                        table = PrettyTable(
                            [f"{service_name.upper()}:{port_number}", "Count"]
                        )
                        for row in rows:
                            table.add_row(row)
                        table.align = "l"
                        table.align["Count"] = "r"
                        tables_to_print.append((table, rows))

                    # --top filtering logic
                    total_connections = sum(
                        d['total'] for d in ports_data.values()
                    )
                    any_ip_over_200 = any(
                        c > 200
                        for d in ports_data.values()
                        for c in d['ips'].values()
                    )
                    should_print_service = not self.command_line_args.top or (
                        total_connections >= 2000 or any_ip_over_200
                    )

                    if tables_to_print and should_print_service:
                        self.print_service_tables_side_by_side(
                            service_name, tables_to_print
                        )
                i += 1

    # Summarizes subnet connections.
    def generate_subnet_summary_tables(self):
        slash_24_subnet_counts = defaultdict(int)
        slash_16_subnet_counts = defaultdict(int)
        for ip_string in self.collected_client_ips_for_subnetting:
            try:
                ip_address_object = ip_address(ip_string)
                if (
                    ip_address_object.is_loopback
                    or ip_address_object.is_private
                    or ip_address_object.is_unspecified
                ):
                    continue
                slash_24_subnet_counts[
                    str(ip_network(f"{ip_address_object}/24", strict=False))
                ] += 1
                slash_16_subnet_counts[
                    str(ip_network(f"{ip_address_object}/16", strict=False))
                ] += 1
            except ValueError:
                continue

        for table_title_label, subnet_counts_data in (
            ("Top /24 Subnets", slash_24_subnet_counts),
            ("Top /16 Subnets", slash_16_subnet_counts),
        ):
            if not subnet_counts_data:
                continue

            # Filter rows based on the --top flag
            all_rows_sorted = sorted(
                subnet_counts_data.items(), key=lambda kv: -kv[1]
            )
            if self.command_line_args.top:
                rows_to_display = [
                    row for row in all_rows_sorted if row[1] > 200
                ]
            else:
                rows_to_display = all_rows_sorted[:20]

            if not rows_to_display:
                continue

            table = PrettyTable(["Subnet", "Count"])
            table.title = table_title_label

            high_traffic_row_indices = {
                i for i, row in enumerate(rows_to_display) if row[1] > 200
            }
            for row in rows_to_display:
                table.add_row(row)

            table.align["Subnet"] = "l"
            table.align["Count"] = "r"

            lines = table.get_string().splitlines()
            for i, line in enumerate(lines):
                # Line 1 is title, Line 3 is header text
                if i in {1, 3}:
                    print(color.magenta(line))
                # Data rows start at index 5
                elif i >= 5 and (i - 5) in high_traffic_row_indices:
                    print(color.red(line))
                else:
                    print(color.cyan(line))
            print()

    # Main execution function.
    def run(self):
        if self.command_line_args.subnet:
            self.collect_tcp_connections()
            self.collect_udp_connections()
            self.generate_subnet_summary_tables()
            return

        if self.command_line_args.udp or self.command_line_args.top:
            self.collect_udp_connections()

        if not self.command_line_args.udp:
            self.collect_tcp_connections()

        self.summarize_collected_connections()
        self.ensure_all_requested_services_in_summary()
        self.generate_connection_report_tables()


if __name__ == "__main__":
    ConnectionReporter().run()

@StableExploit - 2025