Source code for padrelay.core.config

"""Helpers for reading and validating configuration"""
import argparse
import configparser
import getpass
import os
from ..security.auth import Authenticator
from ..security.password_strength import warn_weak_password
from typing import Optional, Tuple
from .logging_utils import get_logger
from ..server.constants import (
    DEFAULT_MAX_REQUESTS,
    DEFAULT_RATE_LIMIT_WINDOW,
    DEFAULT_UDP_MAX_REQUESTS,
    DEFAULT_BLOCK_DURATION,
)

logger = get_logger(__name__)

[docs] def load_config(config_path: str) -> Optional[configparser.ConfigParser]: config = configparser.ConfigParser() try: # Check file permissions (warn if world-readable) import stat from pathlib import Path config_file = Path(config_path) if config_file.exists(): file_stat = config_file.stat() mode = file_stat.st_mode # Check if file is world-readable (others can read) if mode & stat.S_IROTH: logger.warning( f"Configuration file {config_path} is world-readable! " "This is a security risk if the file contains passwords. " f"Consider setting permissions to 600: chmod 600 {config_path}" ) # Check if file is group-readable elif mode & stat.S_IRGRP: logger.warning( f"Configuration file {config_path} is group-readable. " f"Consider setting permissions to 600 for better security: chmod 600 {config_path}" ) with open(config_path, 'r') as f: config.read_file(f) logger.info(f"Configuration loaded from {config_path}") return config except Exception as e: logger.error(f"Error loading configuration: {e}") return None
[docs] def parse_client_args() -> Tuple[argparse.Namespace, Optional[configparser.ConfigParser]]: parser = argparse.ArgumentParser(description="PadRelay Client") parser.add_argument("--server-ip", type=str, help="IP address of the server") parser.add_argument("--server-port", type=int, help="Server port") parser.add_argument("--protocol", type=str, choices=["tcp", "udp"], help="Transport protocol (tcp or udp)") parser.add_argument("--joystick-index", type=int, help="Index of the joystick to use") parser.add_argument("--update-rate", type=int, help="Update rate in Hz") parser.add_argument("--config", type=str, help="Path to configuration file (INI format)") parser.add_argument("--password", type=str, help="Authentication password") parser.add_argument( "--password-hash", type=str, help="Pre-hashed authentication password (overrides --password)", ) parser.add_argument( "--enable-tls", action="store_true", help="Enable TLS/SSL encryption (recommended for security)", ) parser.add_argument( "--disable-tls", action="store_true", help="Disable TLS/SSL encryption (use only on trusted networks)", ) args = parser.parse_args() env_password = os.getenv("PASSWORD") env_hash = os.getenv("PASSWORD_HASH") config_obj = None if args.config: config_obj = load_config(args.config) if config_obj and config_obj.has_section('network'): if not args.server_ip and 'server_ip' in config_obj['network']: args.server_ip = config_obj['network']['server_ip'] if not args.server_port and 'server_port' in config_obj['network']: args.server_port = config_obj.getint('network', 'server_port') if not args.protocol and 'protocol' in config_obj['network']: args.protocol = config_obj['network']['protocol'] if ( not args.password and not args.password_hash and 'password' in config_obj['network'] ): args.password = config_obj['network']['password'] if not args.password_hash and 'password_hash' in config_obj['network']: args.password_hash = config_obj['network']['password_hash'] if config_obj and config_obj.has_section('joystick'): if not args.joystick_index and 'index' in config_obj['joystick']: args.joystick_index = config_obj.getint('joystick', 'index') if config_obj and config_obj.has_section('client'): if not args.update_rate and 'update_rate' in config_obj['client']: args.update_rate = config_obj.getint('client', 'update_rate') # TLS settings if config_obj and config_obj.has_section('security'): if 'enable_tls' in config_obj['security'] and not args.enable_tls and not args.disable_tls: tls_enabled = config_obj.getboolean('security', 'enable_tls') if tls_enabled: args.enable_tls = True else: args.disable_tls = True # Set defaults if not provided if not args.server_ip: args.server_ip = "127.0.0.1" if not args.server_port: args.server_port = 9999 if not args.protocol: args.protocol = "tcp" if not args.joystick_index: args.joystick_index = 0 if not args.update_rate: args.update_rate = 60 # Apply environment overrides if env_password: args.password = env_password if env_hash: args.password_hash = env_hash # Finalize password selection if args.password_hash: args.password = args.password_hash elif not args.password and args.protocol.lower() == 'tcp': args.password = getpass.getpass("Enter the authentication password: ") # Check password strength and warn if weak if args.password: warn_weak_password(args.password) # TLS default: enabled for TCP unless explicitly disabled if not hasattr(args, 'enable_tls'): args.enable_tls = False if not hasattr(args, 'disable_tls'): args.disable_tls = False if args.protocol.lower() == 'tcp': if not args.disable_tls and not args.enable_tls: # Default to TLS enabled for TCP args.enable_tls = True return args, config_obj
[docs] def parse_server_args() -> Tuple[argparse.Namespace, Optional[configparser.ConfigParser]]: parser = argparse.ArgumentParser(description="PadRelay Server") parser.add_argument('--host', type=str, help='Host to bind to') parser.add_argument('--port', type=int, help='Port to listen on') parser.add_argument('--password', help='Authentication password') parser.add_argument('--gamepad-type', choices=['xbox360', 'ds4'], help='Gamepad type') parser.add_argument('--protocol', type=str, choices=['tcp', 'udp'], help='Transport protocol (tcp or udp)') parser.add_argument('--config', type=str, help='Path to configuration file (INI format)') parser.add_argument('--rate-limit-window', type=int, help='Rate limiting window in seconds') parser.add_argument('--max-requests', type=int, help='Maximum requests per window') parser.add_argument('--block-duration', type=int, help='Block duration in seconds when rate limit is exceeded') parser.add_argument( '--enable-tls', action='store_true', help='Enable TLS/SSL encryption (recommended for security)', ) parser.add_argument( '--disable-tls', action='store_true', help='Disable TLS/SSL encryption (use only on trusted networks)', ) parser.add_argument( '--cert-path', type=str, help='Path to TLS certificate file (optional, auto-generated if not provided)', ) parser.add_argument( '--key-path', type=str, help='Path to TLS private key file (optional, auto-generated if not provided)', ) args = parser.parse_args() env_password = os.getenv('PASSWORD') env_hash = os.getenv('PASSWORD_HASH') config_obj = None if args.config: config_obj = load_config(args.config) if config_obj and config_obj.has_section('server'): if not args.host and 'host' in config_obj['server']: args.host = config_obj['server']['host'] if not args.port and 'port' in config_obj['server']: args.port = config_obj.getint('server', 'port') if not args.password and not env_password and not env_hash and 'password' in config_obj['server']: args.password = config_obj['server']['password'] if not args.protocol and 'protocol' in config_obj['server']: args.protocol = config_obj['server']['protocol'] if not args.rate_limit_window and 'rate_limit_window' in config_obj['server']: args.rate_limit_window = config_obj.getint('server', 'rate_limit_window') if not args.max_requests and 'max_requests' in config_obj['server']: args.max_requests = config_obj.getint('server', 'max_requests') if not args.block_duration and 'block_duration' in config_obj['server']: args.block_duration = config_obj.getint('server', 'block_duration') if config_obj and config_obj.has_section('vgamepad'): if not args.gamepad_type and 'type' in config_obj['vgamepad']: args.gamepad_type = config_obj['vgamepad']['type'] # TLS settings if config_obj and config_obj.has_section('security'): if 'enable_tls' in config_obj['security'] and not args.enable_tls and not args.disable_tls: tls_enabled = config_obj.getboolean('security', 'enable_tls') if tls_enabled: args.enable_tls = True else: args.disable_tls = True if not args.cert_path and 'cert_path' in config_obj['security']: args.cert_path = config_obj['security']['cert_path'] if not args.key_path and 'key_path' in config_obj['security']: args.key_path = config_obj['security']['key_path'] # Convert plaintext password to hash if needed if ( config_obj and 'password' in config_obj['server'] and not env_password and not env_hash ): stored_pw = config_obj['server']['password'] if not stored_pw.startswith('pbkdf2_sha256$'): hashed = Authenticator.hash_password(stored_pw) config_obj['server']['password'] = hashed args.password = hashed try: with open(args.config, 'w') as f: config_obj.write(f) logger.info('Converted plaintext password to hashed password in config') except Exception as e: logger.error(f'Failed to update config with hashed password: {e}') # Set defaults if not args.host: args.host = '127.0.0.1' if not args.port: args.port = 9999 if not args.gamepad_type: args.gamepad_type = 'xbox360' if not args.protocol: args.protocol = 'tcp' if not args.rate_limit_window: args.rate_limit_window = DEFAULT_RATE_LIMIT_WINDOW if not args.max_requests: if args.protocol.lower() == 'udp': args.max_requests = DEFAULT_UDP_MAX_REQUESTS else: args.max_requests = DEFAULT_MAX_REQUESTS if not args.block_duration: args.block_duration = DEFAULT_BLOCK_DURATION # If password not provided and protocol is TCP, prompt for it if env_password: args.password = env_password elif env_hash: args.password = env_hash elif not args.password and args.protocol.lower() == 'tcp': args.password = getpass.getpass("Enter authentication password: ") # Check password strength and warn if weak if args.password: warn_weak_password(args.password) # TLS default: enabled for TCP unless explicitly disabled if not hasattr(args, 'enable_tls'): args.enable_tls = False if not hasattr(args, 'disable_tls'): args.disable_tls = False if not hasattr(args, 'cert_path'): args.cert_path = None if not hasattr(args, 'key_path'): args.key_path = None if args.protocol.lower() == 'tcp': if not args.disable_tls and not args.enable_tls: # Default to TLS enabled for TCP args.enable_tls = True return args, config_obj
[docs] def validate_server_config(args: argparse.Namespace, config_obj: Optional[configparser.ConfigParser]) -> None: """Validate the server configuration arguments""" if not args.host: raise ValueError("Host not specified.") if not (1 <= args.port <= 65535): raise ValueError("Port must be in range 1-65535.") if args.protocol.lower() not in ("tcp", "udp"): raise ValueError("Protocol must be 'tcp' or 'udp'.") if args.gamepad_type.lower() not in ("xbox360", "ds4"): raise ValueError("Gamepad type must be 'xbox360' or 'ds4'.") if args.block_duration <= 0: raise ValueError("Block duration must be positive.") # TCP requires password authentication if args.protocol.lower() == "tcp" and not args.password: raise ValueError("Password is required for TCP mode.") if config_obj: for section in ['server', 'vgamepad']: if not config_obj.has_section(section): logger.warning(f"Configuration file does not contain [{section}] section. Using default values.")