Prv8 Shell
Server : Apache
System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64
User : lonias5 ( 3576)
PHP Version : 7.3.33
Disable Function : NONE
Directory :  /proc/self/root/proc/thread-self/root/opt/sharedrads/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/proc/thread-self/root/opt/sharedrads/blockip
#!/usr/lib/rads/venv/bin/python3
"""wrapper script for blocking/unblocking IPs
on shared via ipset

Also ensures that the IPs are safe to block by
importing the imh-whcheck module from imh-fail2ban"""

import sys
import subprocess
import syslog
import os
import pwd
import sqlite3
import logging
from socket import AddressFamily  # pylint:disable=no-name-in-module
from argparse import ArgumentParser
from datetime import datetime, timedelta, timezone
import psutil
from netaddr import IPNetwork

DB_PATH = "/etc/rads/blocked_ips.db"
LOG_PATH = "/var/log/blockip.log"

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
    handlers=[
        logging.FileHandler(LOG_PATH),
    ],
)
logger = logging.getLogger('blockip')


def log_action(msg, level='info', console_msg=None):
    """
    Log action to both syslog and log file
    If console_msg is provided, print that to console
    """
    try:
        user = os.getlogin()
    except Exception:
        try:
            user = pwd.getpwuid(os.getuid()).pw_name
        except Exception:
            user = 'unknown'

    full_msg = f"[{user}] {msg}"

    # Log to syslog
    syslog.openlog('blockip')
    syslog.syslog(full_msg)
    syslog.closelog()

    # Log to file
    log_func = getattr(logger, level.lower())
    log_func(full_msg)

    # Only print to console if console_msg is provided
    if console_msg:
        print(console_msg)


def init_db():
    """Initialize SQLite database and create table if it doesn't exist"""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute(
        '''CREATE TABLE IF NOT EXISTS blocked_ips
                 (ip TEXT PRIMARY KEY,
                  reason TEXT,
                  blocked_at TEXT,
                  expires_at TEXT)'''
    )
    conn.commit()
    conn.close()


def add_ip_to_db(ip, reason, blocked_at, expires_at):
    """Add IP entry to database"""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute(
        '''INSERT OR REPLACE INTO blocked_ips
                 (ip, reason, blocked_at, expires_at)
                 VALUES (?, ?, ?, ?)''',
        (ip, reason, blocked_at, expires_at),
    )
    conn.commit()
    conn.close()


def remove_ip_from_db(ip):
    """Remove IP entry from database"""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.execute('DELETE FROM blocked_ips WHERE ip = ?', (ip,))
    conn.commit()
    conn.close()


def get_expired_ips():
    """Get list of expired IPs"""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
    c.execute('SELECT ip FROM blocked_ips WHERE expires_at <= ?', (now,))
    expired_ips = [row[0] for row in c.fetchall()]
    conn.close()
    return expired_ips


def remove_expired_ips(expired_ips):
    """Remove expired IPs from database"""
    conn = sqlite3.connect(DB_PATH)
    c = conn.cursor()
    c.executemany(
        'DELETE FROM blocked_ips WHERE ip = ?', [(ip,) for ip in expired_ips]
    )
    conn.commit()
    conn.close()


def parse_cli():
    """
    Parse CLI arguments
    """
    parser = ArgumentParser(description="ipset wrapper for IMH shared firewall")

    mgroup = parser.add_mutually_exclusive_group(required=True)
    mgroup.add_argument(
        '--block',
        '-d',
        action='store',
        nargs=2,
        metavar=("IP/CIDR", "REASON"),
        help="IP Address or network to block + reason/comment",
    )
    mgroup.add_argument(
        '--unblock',
        '-u',
        action='store',
        metavar="IP/CIDR",
        help="IP Address or network to unblock",
    )
    parser.add_argument(
        '--time',
        '-t',
        type=str,
        help="Required block duration (e.g., 7d, 4h, 30m)",
    )
    mgroup.add_argument(
        '--clean',
        action='store_true',
        help="Unblock IPs whose expiration time has passed (used by cron)",
    )

    return parser.parse_args()


def parse_time_duration(duration):
    """Parse time duration string into timedelta"""
    units = {'m': 'minutes', 'h': 'hours', 'd': 'days'}
    unit = duration[-1]
    value = int(duration[:-1])

    if unit not in units:
        raise ValueError("Invalid time unit. Use m/h/d.")

    return timedelta(**{units[unit]: value})


def check_ignore(ip):
    """
    Check if @ip is safe block, or should be ignored
    This uses the imh-whcheck script from imh-fail2ban
    returns True if we should ignore, otherwise False
    """
    cmd = ['/etc/fail2ban/filter.d/ignorecommands/imh-whcheck', '-m', ip]
    try:
        return subprocess.call(cmd) == 0
    except OSError as exc:
        print(exc, file=sys.stderr)
        return False


def check_local(ip):
    """
    Check if @ip is local
    """
    local_ips = [
        IPNetwork(x[0].address)
        for x in list(psutil.net_if_addrs().values())
        if x[0].family is AddressFamily.AF_INET
    ]
    return ip in local_ips


def write_syslog(msg):
    """
    Track actions via syslog and logging servers
    """
    syslog.openlog('blockip')
    syslog.syslog(msg)
    syslog.closelog()


def ipset(cmd: str, ip, setname='blacklist', comment=''):
    """
    Run ipset with specified @cmd for @ip on @setname
    """
    try:
        ret = subprocess.run(
            ['ipset', cmd, setname, str(ip)],
            encoding='utf-8',
            stderr=subprocess.PIPE,
            stdout=subprocess.DEVNULL,
            check=False,
        )
    except Exception as e:
        error_msg = f"ERROR: Failed to execute ipset: {str(e)}"
        log_action(error_msg, 'error')
        return

    if ret.returncode == 0:
        log_msg = f"{setname} {cmd} {ip}"
        if comment:
            log_msg += f" - {comment}"
        log_action(log_msg)
    else:
        error_msg = f"FAILED: {setname} {cmd} {ip}: {ret.stderr}"
        log_action(error_msg, 'error')


def unblock_expired_ips():
    """Unblock IPs whose expiration time has passed"""
    expired_ips = get_expired_ips()

    for ip in expired_ips:
        ipset('del', ip, comment='Cleaned Expired IP')
        log_msg = f"Cleaned Expired IP {ip}"
        log_action(log_msg)
        print(f"Unblocked IP {ip} (expired)")

    remove_expired_ips(expired_ips)


def _main():
    """Entry point"""
    try:
        # Ensure log directory exists
        os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)

        # Initialize database
        init_db()

        args = parse_cli()

        if args.block and not args.time:
            error_msg = "ERROR: --time is required when using --block"
            log_action(error_msg, 'error', console_msg=error_msg)
            sys.exit(3)

        if args.clean:
            log_action("Starting cleanup of expired IPs")
            unblock_expired_ips()
            log_action("Finished cleanup of expired IPs")
            return  # Exit after cron task finishes

        ip_list = args.block if args.block else [args.unblock]

        try:
            tip = IPNetwork(ip_list[0])
        except Exception as e:
            error_msg = f"ERROR: {str(e)}"
            log_action(error_msg, 'error')
            sys.exit(1)

        if tip.size > 1024:
            error_msg = "ERROR: Using networks larger than /22 is not allowed!"
            log_action(error_msg, 'error', console_msg=error_msg)
            sys.exit(2)
        elif check_local(str(tip.ip)):
            error_msg = f"ERROR: Unable to block local IP {tip}"
            log_action(error_msg, 'error', console_msg=error_msg)
            sys.exit(2)
        elif check_ignore(str(tip.ip)):
            error_msg = f"ERROR: Unable to block IP {tip} in whitelist"
            log_action(error_msg, 'error', console_msg=error_msg)
            sys.exit(2)

        if args.block:
            comment = ip_list[1]
            expiry_msg = ""
            blocked_at = datetime.now(timezone.utc).strftime(
                "%Y-%m-%dT%H:%M:%SZ"
            )

            if args.time:
                duration = parse_time_duration(args.time)
                expires_at = datetime.now(timezone.utc) + duration
                expiry_iso = expires_at.strftime("%Y-%m-%dT%H:%M:%SZ")
                expiry_msg = f" [Block expires at {expiry_iso}]"

                add_ip_to_db(str(tip), comment, blocked_at, expiry_iso)
                log_action(
                    f"Added IP {tip} to database with expiry {expiry_iso}"
                )

            ipset('add', tip, comment=comment)
            log_action(
                f"Blocked IP {tip} ({comment}){expiry_msg}",
                console_msg=f"SUCCESS: Blocked IP {tip} "
                f"({comment}){expiry_msg}",
            )

        elif args.unblock:
            ipset('del', tip)
            remove_ip_from_db(str(tip))
            log_action(
                f"Unblocked IP {tip}",
                console_msg=f"SUCCESS: Unblocked IP {tip}",
            )

    except Exception as e:
        error_msg = f"CRITICAL ERROR: {str(e)}"
        log_action(error_msg, 'critical')
        sys.exit(1)


if __name__ == '__main__':
    _main()

@StableExploit - 2025