Prv8 Shell
Server : Apache
System : Linux ecngx264.inmotionhosting.com 4.18.0-553.77.1.lve.el8.x86_64 #1 SMP Wed Oct 8 14:21:00 UTC 2025 x86_64
User : lonias5 ( 3576)
PHP Version : 7.3.33
Disable Function : NONE
Directory :  /proc/self/root/lib/imh-fail2ban/lib/python3.13/site-packages/fail2ban/client/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/lib/imh-fail2ban/lib/python3.13/site-packages/fail2ban/client/jailreader.py
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*-
# vi: set ft=python sts=4 ts=4 sw=4 noet :

# This file is part of Fail2Ban.
#
# Fail2Ban is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Fail2Ban is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Fail2Ban; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Author: Cyril Jaquier
# 

__author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"

import glob
import json
import os.path
import re

from .configreader import ConfigReaderUnshared, ConfigReader, NoSectionError
from .filterreader import FilterReader
from .actionreader import ActionReader
from ..version import version
from ..helpers import _merge_dicts, getLogger, extractOptions, splitWithOptions, splitwords

# Gets the instance of the logger.
logSys = getLogger(__name__)


class NoJailError(ValueError):
	pass

class JailReader(ConfigReader):
	
	def __init__(self, name, force_enable=False, **kwargs):
		ConfigReader.__init__(self, **kwargs)
		self.__name = name
		self.__filter = None
		self.__force_enable = force_enable
		self.__actions = list()
		self.__opts = None
	
	@property
	def options(self):
		return self.__opts

	def setName(self, value):
		self.__name = value
	
	def getName(self):
		return self.__name
	
	def read(self):
		out = ConfigReader.read(self, "jail")
		# Before returning -- verify that requested section
		# exists at all
		if not (self.__name in self.sections()):
			raise NoJailError("Jail %r was not found among available"
							 % self.__name)
		return out
	
	def isEnabled(self):
		return self.__force_enable or (
			self.__opts and self.__opts.get("enabled", False))

	@staticmethod
	def _glob(path):
		"""Given a path for glob return list of files to be passed to server.

		Dangling symlinks are warned about and not returned
		"""
		pathList = []
		for p in glob.glob(path):
			if os.path.exists(p):
				pathList.append(p)
			else:
				logSys.warning("File %s is a dangling link, thus cannot be monitored" % p)
		return pathList

	_configOpts1st = {
		"enabled": ["bool", False],
		"backend": ["string", "auto"],
		"filter": ["string", ""]
	}
	_configOpts = {
		"enabled": ["bool", False],
		"backend": ["string", "auto"],
		"maxretry": ["int", None],
		"maxmatches": ["int", None],
		"findtime": ["string", None],
		"bantime": ["string", None],
		"bantime.increment": ["bool", None],
		"bantime.factor": ["string", None],
		"bantime.formula": ["string", None],
		"bantime.multipliers": ["string", None],
		"bantime.maxtime": ["string", None],
		"bantime.rndtime": ["string", None],
		"bantime.overalljails": ["bool", None],
		"ignorecommand": ["string", None],
		"ignoreself": ["bool", None],
		"ignoreip": ["string", None],
		"ignorecache": ["string", None],
		"filter": ["string", ""],
		"logtimezone": ["string", None],
		"logencoding": ["string", None],
		"logpath": ["string", None],
		"action": ["string", ""]
	}
	_configOpts.update(FilterReader._configOpts)

	_ignoreOpts = set(['action', 'filter', 'enabled', 'backend'] + list(FilterReader._configOpts.keys()))

	def getOptions(self, addOpts=None):

		basedir = self.getBaseDir()

		# Before interpolation (substitution) add static options always available as default:
		self.merge_defaults({
			"fail2ban_version": version,
			"fail2ban_confpath": basedir
		})

		try:

			# Read first options only needed for merge defaults ('known/...' from filter):
			self.__opts = ConfigReader.getOptions(self, self.__name, self._configOpts1st,
				shouldExist=True)
			if not self.__opts: # pragma: no cover
				raise JailDefError("Init jail options failed")
			if addOpts:
				self.__opts = _merge_dicts(self.__opts, addOpts)
		
			if not self.isEnabled():
				return True
				
			# Read filter
			flt = self.__opts["filter"]
			if flt:
				try:
					filterName, filterOpt = extractOptions(flt)
				except ValueError as e:
					raise JailDefError("Invalid filter definition %r: %s" % (flt, e))
				if addOpts:
					filterOpt = _merge_dicts(filterOpt, addOpts)
				self.__filter = FilterReader(
					filterName, self.__name, filterOpt, 
					share_config=self.share_config, basedir=basedir)
				ret = self.__filter.read()
				if not ret:
					raise JailDefError("Unable to read the filter %r" % filterName)
				# set backend-related options (logtype):
				self.__filter.applyAutoOptions(self.__opts.get('backend', ''))
				# merge options from filter as 'known/...' (all options unfiltered):
				self.__filter.getOptions(self.__opts, all=True)
				ConfigReader.merge_section(self, self.__name, self.__filter.getCombined(), 'known/')
			else:
				self.__filter = None
				logSys.warning("No filter set for jail %s" % self.__name)

			# Read second all options (so variables like %(known/param) can be interpolated):
			self.__opts = ConfigReader.getOptions(self, self.__name, self._configOpts)
			if not self.__opts: # pragma: no cover
				raise JailDefError("Read jail options failed")
		
			# cumulate filter options again (ignore given in jail):
			if self.__filter:
				self.__filter.getOptions(self.__opts)
		
			# Read action
			for act in splitWithOptions(self.__opts["action"]):
				try:
					act = act.strip()
					if not act:			  # skip empty actions
						continue
					# join with previous line if needed (consider possible new-line):
					try:
						actName, actOpt = extractOptions(act)
					except ValueError as e:
						raise JailDefError("Invalid action definition %r: %s" % (act, e))
					if actName.endswith(".py"):
						self.__actions.append([
							"set",
							self.__name,
							"addaction",
							actOpt.pop("actname", os.path.splitext(actName)[0]),
							os.path.join(
								basedir, "action.d", actName),
							json.dumps(actOpt),
							])
					else:
						action = ActionReader(
							actName, self.__name, actOpt,
							share_config=self.share_config, basedir=basedir)
						ret = action.read()
						if ret:
							action.getOptions(self.__opts)
							self.__actions.append(action)
						else:
							raise JailDefError("Unable to read action %r" % actName)
				except JailDefError:
					raise
				except Exception as e:
					logSys.debug("Caught exception: %s", e, exc_info=True)
					raise ValueError("Error in action definition %r: %r" % (act, e))
			if not len(self.__actions):
				logSys.warning("No actions were defined for %s" % self.__name)
			
		except JailDefError as e:
			e = str(e)
			logSys.error(e)
			if not self.__opts:
				self.__opts = dict()
			self.__opts['config-error'] = e
			return False
		return True
	
	@property
	def filter(self):
		return self.__filter

	def getCombined(self):
		if not self.__filter:
			return self.__opts
		return _merge_dicts(self.__opts, self.__filter.getCombined())

	def convert(self, allow_no_files=False):
		"""Convert read before __opts to the commands stream

		Parameters
		----------
		allow_missing : bool
		  Either to allow log files to be missing entirely.  Primarily is
		  used for testing
		 """

		stream = []
		stream2 = []
		e = self.__opts.get('config-error')
		if e:
			stream.extend([['config-error', "Jail '%s' skipped, because of wrong configuration: %s" % (self.__name, e)]])
			return stream
		# fill jail with filter options, using filter (only not overridden in jail):
		if self.__filter:
			stream.extend(self.__filter.convert())
		# and using options from jail:
		FilterReader._fillStream(stream, self.__opts, self.__name)
		backend = self.__opts.get('backend', 'auto')
		for opt, value in self.__opts.items():
			if opt == "logpath":
				if backend.startswith("systemd"): continue
				found_files = 0
				for path in value.split("\n"):
					path = path.rsplit(" ", 1)
					path, tail = path if len(path) > 1 else (path[0], "head")
					pathList = JailReader._glob(path)
					if len(pathList) == 0:
						logSys.notice("No file(s) found for glob %s" % path)
					for p in pathList:
						found_files += 1
						# logpath after all log-related data (backend, date-pattern, etc)
						stream2.append(
							["set", self.__name, "addlogpath", p, tail])
				if not found_files:
					msg = "Have not found any log file for %s jail" % self.__name
					if not allow_no_files:
						raise ValueError(msg)
					logSys.warning(msg)
			elif opt == "ignoreip":
				stream.append(["set", self.__name, "addignoreip"] + splitwords(value))
			elif opt not in JailReader._ignoreOpts:
				stream.append(["set", self.__name, opt, value])
		# consider options order (after other options):
		if stream2: stream += stream2
		for action in self.__actions:
			if isinstance(action, (ConfigReaderUnshared, ConfigReader)):
				stream.extend(action.convert())
			else:
				stream.append(action)
		stream.insert(0, ["add", self.__name, backend])
		return stream
	
class JailDefError(Exception):
	pass

@StableExploit - 2025