Source code for pawnlib.config.globalconfig

import logging
import os
import configparser
import sys
import json
import re
from uuid import uuid4
from pathlib import Path
from typing import Optional, Callable
from collections import namedtuple, defaultdict
from collections.abc import Mapping
from pawnlib.__version__ import __title__, __version__
from pawnlib.config.__fix_import import Null
from pawnlib.config.console import Console
from collections import OrderedDict
from rich.traceback import install as rich_traceback_install
import copy
from types import SimpleNamespace
from functools import partial
from rich import inspect as rich_inspect
from rich.table import Table
from rich.panel import Panel
from contextlib import contextmanager
from inspect import stack as inspect_stack


[docs]class ConfigManager: def __init__(self): self._config = {}
[docs] @contextmanager def use_config(self, config): old_config = self._config.copy() self._config.update(config) try: yield finally: self._config = old_config
[docs] def set(self, key, value): self._config[key] = value
[docs] def get(self, key, default=None): return self._config.get(key, default)
[docs]class ConfigHandler:
[docs] def __init__(self, config_file='config.ini', args=None, allowed_env_keys=None, env_prefix=None, section_pattern=None, defaults=None): """ Initialize the ConfigHandler with a config file, command-line arguments, and environment variables. Only environment variables specified in allowed_env_keys or with env_prefix are considered. Additionally, environment variables corresponding to keys in args or config.ini are included. Args: config_file (str): Path to the configuration file. args (Namespace, optional): Parsed command-line arguments. allowed_env_keys (list, optional): List of environment variable keys to allow (case-insensitive). env_prefix (str, optional): Prefix for environment variables to include (case-insensitive). section_pattern : Regex for find a section name defaults (dict, optional): Default values for configuration keys. """ self.config = configparser.ConfigParser() self.config_file = config_file self.config.read(config_file) self.args = {k.lower(): v for k, v in vars(args).items()} if args else {} self.allowed_env_keys = [k.lower() for k in allowed_env_keys] if allowed_env_keys else [] self.env_prefix = env_prefix.lower() if env_prefix else None self.section_pattern = section_pattern self.defaults = defaults or {} if self.config.has_section('default'): self.config_keys = set(k.lower() for k in self.config['default']) else: self.config_keys = set() self.args_keys = set(self.args.keys()) self.combined_keys = self.args_keys.union(self.config_keys) self.env = self._filter_env(os.environ) self.original_keys = {} self._populate_original_keys() # Initialize source history self.source_history = defaultdict(list) self._initialize_source_history()
def _populate_original_keys(self): """ Store the original key names for consistent output in visualizations. Priority for original keys: 1. Args 2. Env 3. Config.ini """ # Original keys from args for key in self.args: if key not in self.original_keys: self.original_keys[key] = key # Original keys from environment variables for key in self.env: if key not in self.original_keys: # Retrieve original case from os.environ original_key = next((k for k in os.environ if k.lower() == key), key) self.original_keys[key] = original_key # Original keys from config.ini if self.config.has_section('default'): for key in self.config['default']: key_lower = key.lower() if key_lower not in self.original_keys: self.original_keys[key_lower] = key # """ # Initialize the source history for each key based on the current sources. # """ # for key_lower in self.combined_keys: # if key_lower in self.args and self.args[key_lower] is not None: # self.source_history[key_lower].append('args') # elif key_lower in self.env: # self.source_history[key_lower].append('env') # elif self.config.has_option('default', key_lower): # self.source_history[key_lower].append(f"{self.config_file}") # else: # self.source_history[key_lower].append('default') def _initialize_source_history(self): """ Initialize the source history for each key based on the current sources. """ for key_lower in self.combined_keys: if key_lower in self.args and self.args[key_lower] is not None: self.source_history[key_lower].append('args') elif key_lower in self.env: self.source_history[key_lower].append('env') elif self.config.has_option('default', key_lower): self.source_history[key_lower].append('config.ini') else: # 기본값이 있을 경우 'default' if key_lower in self.defaults: self.source_history[key_lower].append('default') else: self.source_history[key_lower].append('undefined') def _filter_env(self, env): """ Filters environment variables by allowed keys or prefix. Additionally, includes environment variables that correspond to keys in args or config.ini. Args: env (dict): Dictionary of environment variables. Returns: dict: Filtered environment variables with lowercase keys. """ filtered_env = {} for k, v in env.items(): key_lower = k.lower() # Check if the key corresponds to a key in args or config.ini if key_lower in self.combined_keys: filtered_env[key_lower] = v continue # Check if the key is in allowed_env_keys if self.allowed_env_keys and key_lower in self.allowed_env_keys: filtered_env[key_lower] = v continue # Check if the key starts with env_prefix if self.env_prefix and key_lower.startswith(self.env_prefix): stripped_key = key_lower[len(self.env_prefix):] if stripped_key: filtered_env[stripped_key] = v continue # If none of the above, ignore the environment variable return filtered_env @staticmethod def _convert_value(value): """ Attempts to convert a value to int, float, or bool if possible. Args: value (str): The value to convert. Returns: int/float/bool/str: Converted value. """ if isinstance(value, bool) or value is None: return value if isinstance(value, str): value_lower = value.lower() if value_lower in ('true', 'yes', 'on'): return True if value_lower in ('false', 'no', 'off'): return False try: if '.' in value: return float(value) return int(value) except ValueError: return value return value # def get(self, key, default=None): # """ # Get a value with the following priority: # 1. Command-line arguments (args) # 2. Environment variables (env) # 3. Config file (config.ini) # 4. Default value # # Args: # key (str): The configuration key to retrieve. # default: The default value if the key is not found. # # Returns: # The value associated with the key. # """ # key_lower = key.lower() # if key_lower in self.args and self.args[key_lower] is not None: # return self.args[key_lower] # if key_lower in self.env: # return self._convert_value(self.env.get(key_lower)) # if self.config.has_option('default', key_lower): # return self._convert_value(self.config.get('default', key_lower)) # return self._convert_value(default)
[docs] def get(self, key, default=None): """ Get a value with the following priority: 1. Command-line arguments (args) 2. Environment variables (env) 3. Config file (config.ini) 4. Code defaults Args: key (str): The configuration key to retrieve. default: The default value if the key is not found. Returns: The value associated with the key. """ key_lower = key.lower() if key_lower in self.args and self.args[key_lower] is not None: return self.args[key_lower] if key_lower in self.env: return self._convert_value(self.env.get(key_lower)) if self.config.has_option('default', key_lower): return self._convert_value(self.config.get('default', key_lower)) if key_lower in self.defaults: return self.defaults[key_lower] return self._convert_value(default)
[docs] def as_dict(self): """ Returns the final merged configuration as a dictionary with all keys in lowercase. Priority: args > env > config.ini > code defaults Returns: dict: Merged configuration with lowercase keys. """ merged = {} # Add config.ini values if self.config.has_section('default'): for key, value in self.config.items('default'): key_lower = key.lower() merged[key_lower] = self._convert_value(value) # Add environment variables, overwriting config.ini for key, value in self.env.items(): merged[key] = self._convert_value(value) # Add args, overwriting env and config.ini for key, value in self.args.items(): if value is not None: merged[key] = value # Add code defaults, overwriting only if not set by args/env/config.ini for key, value in self.defaults.items(): key_lower = key.lower() if key_lower not in merged: merged[key_lower] = self._convert_value(value) return merged
[docs] def as_namespace(self): """ Returns the final merged configuration as a Namespace object. Returns: Namespace: Merged configuration. """ return NestedNamespace(**self.as_dict())
[docs] def get_source_chain(self, key): """ Returns the source chain of the value (e.g., 'config.ini -> args (updated)'). Args: key (str): The configuration key. Returns: str: Source chain of the value. """ key_lower = key.lower() return self.source_history[key_lower]
[docs] def get_source(self, key): """ Returns the latest source of the value (args, env, config.ini, or default). Args: key (str): The configuration key. Returns: str: Latest source of the value. """ key_lower = key.lower() if key_lower in self.args and self.args[key_lower] is not None: return 'args' if key_lower in self.env: return 'env' if self.config.has_option('default', key_lower): return 'config.ini' if key_lower in self.defaults: return 'default' return 'undefined'
[docs] def update(self, updates: dict): """ Update multiple configuration values. These updates are treated as command-line arguments and have the highest priority. Args: updates (dict): A dictionary of key-value pairs to update. """ for key, value in updates.items(): key_lower = key.lower() self.args[key_lower] = value # Update original_keys with the provided casing self.original_keys[key_lower] = key # Determine if the key is being added or updated if len(self.source_history[key_lower]) == 0: # New key added self.source_history[key_lower].append('args (added)') else: # Existing key updated self.source_history[key_lower].append('args (updated)')
[docs] def set(self, key: str, value): """ Update a single configuration value. This update is treated as a command-line argument and has the highest priority. Args: key (str): The configuration key to update. value: The new value to set. """ self.update({key: value})
[docs] def get_section(self, section_name): """ Returns all key-value pairs for a given section as a dictionary. """ if self.config.has_section(section_name): return {key: self._convert_value(value) for key, value in self.config.items(section_name)} return {}
# def get_all_sections(self): # """ # Returns all sections and their key-value pairs as a dictionary of dictionaries. # # Returns: # dict: A dictionary where keys are section names and values are dictionaries of key-value pairs. # """ # all_sections = {} # for section in self.config.sections(): # all_sections[section] = {key: self._convert_value(value) for key, value in self.config.items(section)} # return all_sections
[docs] def get_all_sections(self, pattern=None): """ Returns sections and their key-value pairs as a dictionary of dictionaries based on a regex pattern. Args: pattern (str, optional): Regex pattern to match section names. If None, returns an empty dictionary. Returns: dict: A dictionary where keys are section names and values are dictionaries of key-value pairs. """ _pattern = pattern or self.section_pattern regex = re.compile(_pattern, re.IGNORECASE) if _pattern else None # Compile regex with case-insensitive flag all_sections = {} for section in self.config.sections(): if regex is None or re.compile(_pattern, re.IGNORECASE).search(section): # Match section names using regex all_sections[section] = { key: self._convert_value(value) for key, value in self.config.items(section) } return all_sections
[docs] def print_config(self): """ Prints a table showing the key, value, and source (args, env, config.ini, default). Each row is colored based on the latest source for easy distinction. The source column displays the history of sources in the format "config.ini -> args (updated)". """ console = Console() table = Table(title="Configuration Overview") table.add_column("Key", justify="left", style="bold") table.add_column("Value", justify="left") table.add_column("Source", justify="left") # Define color mapping based on latest source source_colors = { 'args': 'green', 'args (updated)': 'bright_green', 'env': 'blue', 'env (updated)': 'bright_blue', self.config_file: 'yellow', f'{self.config_file} (updated)': 'bright_yellow', 'default': 'white', 'args (added)': 'bright_green', 'env (added)': 'bright_blue', f'{self.config_file} (added)': 'bright_yellow', 'undefined': 'dim', } # Collect unique keys from config, env, args keys = set() if self.config.has_section('default'): keys.update([k.lower() for k in self.config['default']]) keys.update(self.env.keys()) keys.update(self.args.keys()) keys.update([k.lower() for k in self.defaults.keys()]) # for key_lower in sorted(keys): # original_key = self.original_keys.get(key_lower, key_lower) # value = self.get(key_lower) # source_chain = self.get_source_chain(key_lower) # # latest_source = self.source_history[key_lower][-1] if self.source_history[key_lower] else 'default' # color = source_colors.get(latest_source, 'white') # # source_display = " -> ".join(self.source_history[key_lower]) # # table.add_row( # original_key, # str(value), # source_display, # style=color # Set the entire row's color # ) for key_lower in sorted(keys): original_key = self.original_keys.get(key_lower, key_lower) value = self.get(key_lower) source_chain = self.get_source_chain(key_lower) latest_source = self.source_history[key_lower][-1] if self.source_history[key_lower] else 'undefined' color = source_colors.get(latest_source, 'white') source_display = " -> ".join(self.source_history[key_lower]) table.add_row( original_key, str(value), source_display, style=color # Set the entire row's color ) console.print(table)
[docs] def print_all_sections_tree(self, pattern=None): """ Prints all sections and their key-value pairs in a hierarchical tree format. """ _pattern = pattern or self.section_pattern all_sections = self.get_all_sections(pattern=_pattern) pattern_text = f"([dim]pattern=\"{_pattern}\"[/dim])" if _pattern else "" console = Console() from rich.tree import Tree tree = Tree(f"[bold cyan]All Configuration Sections[/bold cyan] {pattern_text}") for section in sorted(all_sections.keys()): section_node = tree.add(f"[bold green]{section}[/bold green]") settings = all_sections[section] sorted_keys = sorted(settings.keys()) for key in sorted_keys: original_key = self.original_keys.get(key.lower(), key) value = settings[key] source_chain = self.config_file section_node.add(f"[bold]{original_key}[/bold]: {value} \t[dim]{source_chain}[/dim]") console.print(tree)
[docs] def print_all_sections_panels(self, pattern=None): """ Prints each section and their key-value pairs in separate panels. """ _pattern = pattern or self.section_pattern all_sections = self.get_all_sections(pattern=_pattern) pattern_text = f"([dim]pattern=\"{_pattern}\"[/dim])" if _pattern else "" console = Console() from rich import box from rich.panel import Panel console.rule(f"[bold cyan]All Configuration Sections[/bold cyan] {pattern_text}") for section in sorted(all_sections.keys()): settings = all_sections[section] sorted_keys = sorted(settings.keys()) table = Table(show_header=True, header_style="bold magenta", box=box.MINIMAL) table.add_column("Key", style="bold", no_wrap=True, width=20) table.add_column("Value", style="magenta", no_wrap=False, width=50) table.add_column("Source", style="green", no_wrap=False, width=40) for key in sorted_keys: original_key = self.original_keys.get(key.lower(), key) value = settings[key] source_chain = self.config_file table.add_row( str(original_key), str(value), str(source_chain) ) panel = Panel(table, title=f"[bold green]{section}[/bold green]", border_style="white") console.print(panel)
[docs]class NestedNamespace(SimpleNamespace): @staticmethod def _map_entry(entry): """ Helper method to map dictionary entries to NestedNamespace instances. :param entry: Dictionary or other object to map. :return: NestedNamespace instance if the entry is a dictionary, otherwise returns the entry as is. """ if isinstance(entry, dict): return NestedNamespace(**entry) return entry
[docs] def __init__(self, **kwargs): """ Initialize the NestedNamespace with nested dictionaries and lists converted to NestedNamespace instances. :param kwargs: Keyword arguments where values can be dictionaries or lists. """ super().__init__(**kwargs) for key, val in kwargs.items(): if isinstance(val, dict): setattr(self, key, NestedNamespace(**val)) elif isinstance(val, list): setattr(self, key, list(map(self._map_entry, val)))
[docs] def keys(self) -> list: """ Get a list of keys in the current namespace. :return: List of keys. """ return list(self.__dict__.keys())
[docs] def values(self) -> list: """ Get a list of values in the current namespace. :return: List of values. """ return list(self.__dict__.values())
[docs] def as_dict(self) -> dict: """ Convert the NestedNamespace to a dictionary, recursively converting all nested namespaces. :return: Dictionary representation of the NestedNamespace. """ return self._namespace_to_dict(self.__dict__)
def _namespace_to_dict(self, _dict): """ Helper method to recursively convert a NestedNamespace to a dictionary. :param _dict: The dictionary to convert. :return: Converted dictionary. """ result = {} for key, value in _dict.items(): if isinstance(value, (dict, NestedNamespace)): result[key] = self._namespace_to_dict(value._asdict()) else: result[key] = value return result def _asdict(self) -> dict: """ Get the internal dictionary representation of the current namespace. :return: Internal dictionary representation. """ return self.__dict__
[docs] def get_nested(self, keys: list): """ Retrieve a nested value from the namespace using a list of keys. :param keys: List of keys to traverse the nested structure. :return: The nested value if found, otherwise None. Example: >>> ns = NestedNamespace(level1={'level2': {'level3': 'value'}}) >>> ns.get_nested(['level1', 'level2', 'level3']) 'value' >>> ns.get_nested(['level1', 'nonexistent', 'level3']) None """ result = self for key in keys: if isinstance(result, dict): result = result.get(key, None) else: result = getattr(result, key, None) if result is None: return None return result
def __repr__(self, indent=4): result = self.__class__.__name__ + '(' items_len = len(self.__dict__) _first = 0 _indent_space = '' for k, v in self.__dict__.items(): if _first == 0 and items_len > 0: result += "\n" _first = 1 if k.startswith('__'): continue if isinstance(v, NestedNamespace): value_str = v.__repr__(indent + 4) else: value_str = str(v) if k and value_str: if _first: _indent_space = ' ' * indent result += _indent_space + k + '=' + value_str + ",\n" result += ' ' * (len(_indent_space) - 4) + f')' return result
[docs]def nestednamedtuple(dict_items: dict, ignore_keys: list = []) -> namedtuple: """ Converts dictionary to a nested namedtuple recursively. :param: dictionary: Dictionary to convert into a nested namedtuple. :example: .. code-block:: python from pawnlib.config.globalconfig import nestednamedtuple nt = nestednamedtuple({"hello": {"ola": "mundo"}}) print(nt) # >>> namedtupled(hello=namedtupled(ola='mundo')) """ dictionary = copy.copy(dict_items) if isinstance(dictionary, Mapping) and not isinstance(dictionary, fdict): # for ignore_type in ["configparser.SectionProxy", "configparser.ConfigParser"]: for ignore_type in ["configparser.ConfigParser"]: if ignore_type in str(type(dictionary)): for key, value in list(dictionary.items()): dictionary[key] = value return dictionary for key, value in list(dictionary.items()): # if key in ignore_keys: # dictionary[key] = value # else: dictionary[key] = nestednamedtuple(value) return namedtuple("namedtupled", dictionary)(**dictionary) elif isinstance(dictionary, list): return [nestednamedtuple(item) for item in dictionary] return dictionary
[docs]class fdict(dict): """ Forced dictionary. Prevents dictionary from becoming a nested namedtuple. :example: .. code-block:: python from pawnlib.config.globalconfig import nestednamedtuple, fdict d = {"hello": "world"} nt = nestednamedtuple({"forced": fdict(d), "notforced": d}) print(nt.notforced) # >>> namedtupled(hello='world') print(nt.forced) # >>> {'hello': 'world'} """ pass
[docs]def singleton(class_): instances = {} def getinstance(*args, **kwargs): if class_ not in instances: instances[class_] = class_(*args, **kwargs) return instances[class_] return getinstance
[docs]class ConfigSectionMap(configparser.ConfigParser): """ override configparser.ConfigParser Example: .. code-block:: python config = ConfigSectionMap() config.read('config.ini') config_file = config.as_dict() """ def __init__(self): # https://stackoverflow.com/questions/47640354/reading-special-characters-text-from-ini-file-in-python super(configparser.ConfigParser, self).__init__(interpolation=None)
[docs] def as_dict(self, section=None): d = dict(self._sections) if self._defaults: d["DEFAULT"] = self._defaults for k in d: d[k] = dict(self._defaults, **d[k]) d[k].pop('__name__', None) if section: return d.get(section) return d
[docs] def get_default(self): return dict(self._defaults)
# # class Singleton(type): # _instances = {} # # def __call__(cls, *args, **kwargs): # if cls not in cls._instances: # cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) # return cls._instances[cls]
[docs]class Singleton(type): _instances = {} def __call__(cls, *args, **kwargs): if cls in cls._instances: print(f"[WARN] {cls.__name__} instance already exists. Returning the existing instance.") return cls._instances[cls] instance = super(Singleton, cls).__call__(*args, **kwargs) cls._instances[cls] = instance return instance
# @singleton
[docs]class PawnlibConfig(metaclass=Singleton):
[docs] def __init__( self, global_name="pawnlib_global_config", app_logger=Null(), error_logger=Null(), timeout=6000, debug=False, use_global_namespace=True ): """ This class can share variables using globals(). :param global_name: Global Variable Name :param app_logger: global app logger :param error_logger: global error logger :param timeout: global timeout :example .. code :: python # auto attach from pawnlib.config.globalconfig import pawnlib_config as pwn from pawnlib.output.file import get_real_path pwn.set( PAWN_LOGGER=dict( app_name="default_app", log_path=f"{get_script_path(__file__)}/logs", stdout=True, use_hook_exception=False, ), PAWN_DEBUG=True, app_name=APP_NAME, data={} # Global NameSpace ) .. code :: python # attach logger from pawnlib.config.globalconfig import pawnlib_config as pwn from pawnlib.output.file import get_real_path app_logger, error_logger = log.AppLogger( app_name="default_app", log_path=f"{get_real_path(__file__)}/logs", stdout=True, use_hook_exception=False, ).get_logger() pwn.set( PAWN_APP_LOGGER=app_logger, PAWN_ERROR_LOGGER=error_logger, PAWN_DEBUG=True, app_name=APP_NAME, data={} # Global NameSpace ) """ self.global_name = f"{global_name}_{uuid4()}" self.app_logger = app_logger self.error_logger = error_logger self.app_name = "" self.timeout = timeout self.verbose = 0 self.debug = debug self.version = f"{__title__}/{__version__}" self.version_number = __version__ self.env_prefix = "PAWN" self._environments = {} self.data = NestedNamespace() self._current_path: Optional[Path] = None self._config_file = None self._pawn_configs = {} self.console = Null() self.console_options = None self.stdout_log_formatter = None self._none_string = "____NONE____" self._loaded = { "console": False, "on_ready": False } self.use_global_namespace = use_global_namespace if self.use_global_namespace: globals()[self.global_name] = {} else: self.config_manager = ConfigManager() self._do_not_execute_namespace_keys = [f"{self.env_prefix}_LOGGER", f"{self.env_prefix}_CONSOLE"] self.log_time_format = None self._init_console(force_init=True)
[docs] @staticmethod def inspect(*args, **kwargs): """ Inspect function which can produce a report on any Python object, such as class, instance, or builtin. :param args: :param kwargs: :return: """ return rich_inspect(*args, **kwargs)
def _log_formatter(self, dt): if self.log_time_format.endswith('.%f'): return dt.strftime(self.log_time_format)[:-3] else: return dt.strftime(self.log_time_format) def _init_console(self, force_init=True): is_interactive = hasattr(sys, 'ps1') or sys.stdin.isatty() _console_options = dict( pawn_debug=self.debug, redirect=False if is_interactive else self.debug, # 대화식일때는 무조건 redirect가 false여야함 record=True, soft_wrap=False, force_terminal=True, # log_time_format="[%Y-%m-%d %H:%M:%S.%f]" log_time_format=lambda dt: f"[{dt.strftime('%H:%M:%S,%f')[:-3]}]" ) if not self._loaded.get('console'): # There are visible problems with InquirerPy. _console_options['redirect'] = False if self._loaded.get('console') or force_init: if self.console_options: if self.console_options.get('log_time_format') and not isinstance(self.console_options.get('log_time_format'), Callable): _log_time_format = self.console_options['log_time_format'] self.log_time_format = self.console_options['log_time_format'] if ".%f" in _log_time_format: self.console_options['log_time_format'] = lambda dt: f"[{dt.strftime(_log_time_format)[:-3]}]" else: self.console_options['log_time_format'] = lambda dt: f"[{dt.strftime(_log_time_format)}]" self.stdout_log_formatter = self.console_options['log_time_format'] _console_options.update(self.console_options) # self.console_options = copy.deepcopy(_console_options) self.console = Console(**_console_options) def _load_config_file(self, config_path=None): if self._loaded['on_ready']: config = ConfigSectionMap() config.optionxform = str _config_filename = self.get_path(self._config_file) try: if _config_filename.is_file(): config.read(_config_filename) config_dict = config.as_dict() for section, config_item in config_dict.items(): for key, value in config_dict[section].items(): # Try to parse value as JSON try: parsed_value = json.loads(value) config_dict[section][key] = parsed_value except (json.JSONDecodeError, TypeError): # If it's not JSON, just keep the original value config_dict[section][key] = value self.set(PAWN_CONFIG=config_dict) else: self.set(PAWN_CONFIG={}) self.console.debug(f"[bold red] cannot found config_file - {_config_filename}") for config_category, config_value in config.items(): lower_keys = [key.lower() for key in config[config_category].keys()] duplicate_keys = _list_duplicates(lower_keys) for conf_key, conf_value in config[config_category].items(): if conf_key.lower() in duplicate_keys: self.console.log(f"[yellow]\[WARN] Similar keys exist in config.ini - \[{config_category}] {conf_key}={conf_value}") except Exception as e: self.console.log(f"[bold red]Error occurred while loading config.ini - {e}") sys.exit(-1)
[docs] def get_path(self, path: str = "") -> Path: """Get Path from the directory where the configure.json file is. :param path: file_name or path :return: """ if self._current_path: root_path = Path(self._current_path) else: root_path = Path(os.path.join(os.getcwd())) return root_path.joinpath(path)
[docs] @staticmethod def get_app_path(): """Get Path from the directory where the app file. :param path: file_name or path :return: """ caller_frame = inspect_stack()[1] caller_file = caller_frame.filename return os.path.abspath(os.path.dirname(caller_file))
[docs] @staticmethod def pawnlib_path(): _dir = os.path.dirname(__file__) if "/config" in _dir: return _dir.replace("/config", "") return _dir
[docs] @staticmethod def get_python_version(): major, minor, micro = sys.version_info[:3] return f"Python {major}.{minor}.{micro} {sys.platform}"
[docs] def init_with_env(self, **kwargs): """ Initialize with environmental variables. :param kwargs: dict :return: """ self.fill_config_from_environment() self.set(**kwargs) # self._load_config_file() self._config_file = self.get('PAWN_CONFIG_FILE', 'config.ini') # Set _config_file here self._loaded['on_ready'] = True self.console.debug(f"🐍 {self.get_python_version()}, ♙ {__title__.title()}/{__version__}, PATH={self.pawnlib_path()}") self._load_config_file() return self
[docs] @staticmethod def str2bool(v): """ This function is intended to return a boolean value. :param v: :return: """ true_list = ("yes", "true", "t", "1", "True", "TRUE") if isinstance(v, bool): return v if isinstance(v, str): return v.lower() in true_list return eval(f"{v}") in true_list
[docs] def fill_config_from_environment(self): """ Initialize with environmental variables. .. code :: python # default environments PAWN_INI = False PAWN_DEBUG = False PAWN_VERBOSE = 0 PAWN_TIMEOUT = 7000 PAWN_APP_LOGGER = "" PAWN_ERROR_LOGGER = "" PAWN_VERSION = PAWN_GLOBAL_NAME = pawnlib_global_config_UUID :return: """ default_structure = { "VERBOSE": { "type": int, "default": 0, }, "INI": { "type": self.str2bool, "default": False, }, "CONFIG_FILE": { "type": str, "default": "config.ini", }, "DEBUG": { "type": self.str2bool, "default": False, }, "TIMEOUT": { "type": int, "default": 7000, }, "APP_LOGGER": { "type": str, "default": "" }, "ERROR_LOGGER": { "type": str, "default": "" }, "LOGGER": { "type": dict, "default": {} }, "VERSION": { "type": str, "default": self.version.title() }, "GLOBAL_NAME": { "type": str, "default": self.global_name }, "USE_GLOBAL_NS": { "type": str, "default": self.use_global_namespace }, "CONSOLE": { "type": dict, "default": {} }, "LINE": { "type": self.str2bool, "default": True, }, "PATH": { "type": str, "default": Path(os.path.join(os.getcwd())) }, "TIME_FORMAT": { "type": str, "default": "%H:%M:%S.%f" }, "SSL_CHECK": { "type": self.str2bool, "default": True } } if not self.use_global_namespace: del default_structure['GLOBAL_NAME'] mandatory_environments = list(default_structure.keys()) for environment in mandatory_environments: environment_name = f"{self.env_prefix}_{environment}" environment_value = os.getenv(environment_name) filled_environment_value = "" if default_structure.get(environment): required_type = default_structure[environment].get("type", None) if required_type is None: self.console.log(f"[red]{environment_name} type is None. Required type") if environment_value in [None, 0, ""]: filled_environment_value = default_structure[environment].get("default") elif required_type: filled_environment_value = required_type(environment_value) self._environments[environment_name] = { "input": os.getenv(environment_name), "value": filled_environment_value, } self.set(**{environment_name: filled_environment_value}) if isinstance(self.verbose, int) and self.verbose >= 3: self.console.debug(f"{environment_name}={filled_environment_value} (env={environment_value})")
[docs] def make_config(self, dictionary: Optional[dict] = None, **kwargs) -> None: """Creates a global configuration that can be accessed anywhere during runtime. This function is a useful replacement to passing configuration classes between classes. Instead of creating a `Config` object, one may use :func:`make_config` to create a global runtime configuration that can be accessed by any module, function, or object. :param dictionary: Dictionary to create global configuration with. :param kwargs: Arguments to make global configuration with. Example: .. code-block:: python from pawnlib.config.globalconfig import PawnlibConfig PawnlibConfig.make_config(hello="world") """ dictionary = dictionary or {} globals()[self.global_name] = {**dictionary, **kwargs}
[docs] def get(self, key=None, default=None): """ This method is intended to return a key value :param key: :param default: :return: Example: .. code-block:: python from pawnlib.config.globalconfig import pawnlib_config pawnlib_config.set(hello="world") pawnlib_config.get("hello") """ if self.use_global_namespace: if self.global_name in globals(): return globals()[self.global_name].get(key, default) return default else: return self.config_manager.get(key, default)
[docs] def set(self, **kwargs): """ This method is intended to store key values. :param kwargs: :return: Example: .. code-block:: python from pawnlib.config.globalconfig import pawnlib_config pawnlib_config.set(hello="world") pawnlib_config.get("hello") """ priority_keys = [f"{self.env_prefix}_PATH", f"{self.env_prefix}_TIME_FORMAT", f"{self.env_prefix}_DEBUG", f"{self.env_prefix}_VERBOSE"] order_dict = OrderedDict(kwargs) def _enforce_set_value(source_key=None, target_key=None, target_dict=None): if kwargs.get(source_key): if isinstance(target_dict, dict) and not target_dict.get(target_key): target_dict[target_key] = kwargs[source_key] if isinstance(self.verbose, int) and self.verbose >= 3: self.console.debug(f'set => {target_key}={kwargs[source_key]}') for priority_key in priority_keys: if order_dict.get(priority_key): order_dict.move_to_end(key=priority_key, last=False) if self.global_name in globals() or not self.use_global_namespace: for p_key, p_value in order_dict.items(): if self._environments.get(p_key, self._none_string) != self._none_string \ and self._environments[p_key].get("input"): if self._environments[p_key].get('input') and \ self._environments[p_key].get('value') != p_value: self.console.log(f"[yellow][WARN] Environment variables and settings are different. " f"'{p_key}': {self._environments[p_key]['value']}(ENV) != {p_value}(Config)") p_value = self._environments[p_key]['value'] if p_key == f"{self.env_prefix}_LOGGER" and p_value: if isinstance(p_value, dict): from pawnlib.utils.log import AppLogger if p_value.get('app_name') is None and kwargs.get('app_name'): p_value['app_name'] = kwargs['app_name'] self.app_name = kwargs['app_name'] _enforce_set_value(source_key=f'{self.env_prefix}_TIME_FORMAT', target_key='stdout_log_formatter', target_dict=p_value) self.app_logger, self.error_logger = AppLogger(**p_value).get_logger() elif p_key == f"{self.env_prefix}_APP_LOGGER" and p_value: self.app_logger = self._check_logger_not_null(p_key, p_value) elif p_key == f"{self.env_prefix}_ERROR_LOGGER" and p_value: self.error_logger = self._check_logger_not_null(p_key, p_value) elif p_key == f"{self.env_prefix}_DEBUG": self.debug = self.str2bool(p_value) self.console.pawn_debug = self.str2bool(p_value) if self.debug: if not self._loaded.get('rich_traceback_installed'): rich_traceback_install(show_locals=True, width=160) self._loaded['rich_traceback_installed'] = True if self.app_logger: set_debug_logger(self.app_logger) elif p_key == f"{self.env_prefix}_LINE": _hide_log_path = {"log_path": p_value} if isinstance(self.console_options, dict): self.console_options.update(**_hide_log_path) else: self.console_options = _hide_log_path self._init_console() # self._loaded['console'] = True elif p_key == f"{self.env_prefix}_CONSOLE": _enforce_set_value(source_key=f'{self.env_prefix}_TIME_FORMAT', target_key='log_time_format', target_dict=p_value) self.console_options = p_value self._init_console() self._loaded['console'] = True elif p_key == f"{self.env_prefix}_TIMEOUT": self.timeout = p_value elif p_key == f"{self.env_prefix}_VERBOSE": self.verbose = p_value elif p_key == f"{self.env_prefix}_PATH": self._current_path = p_value self._load_config_file() elif p_key == f"{self.env_prefix}_CONFIG_FILE": self._config_file = p_value self._load_config_file() elif p_key == "data" and p_value != self._none_string: if isinstance(p_value, dict): self.data = NestedNamespace(**p_value) else: self.console.log("[bold red] The data namespace value must be a Dict") # else: # self.data = NestedNamespace() # setattr(self, p_key, p_value) # self.console.log(f"fff => {self.data}") p_value = self.data if self.use_global_namespace: globals()[self.global_name][p_key] = p_value else: self.config_manager.set(p_key, p_value)
def _check_logger_not_null(self, key, value): if type(value).__name__ == "Logger": return value else: self.console.debug(f"[red]Invalid Logger [/red] => {key}: {value} ({type(value)})") return Null()
[docs] def increase(self, **kwargs): """ Find the key and increment the number. :param kwargs: :return: Example: .. code-block:: python from pawnlib.config.globalconfig import pawnlib_config pawnlib_config.increase(count=1) print(pawnlib_config.get("count")) # >> 1 pawnlib_config.increase(count=1) print(pawnlib_config.get("count")) # >> 2 pawnlib_config.increase(count=10) print(pawnlib_config.get("count")) # >> 12 """ return self._modify_value(_command="increase", **kwargs) or 0
[docs] def decrease(self, **kwargs): """ Find the key and decrement the number. :param kwargs: :return: Example: .. code-block:: python from pawnlib.config.globalconfig import pawnlib_config pawnlib_config.set(count=100) print(pawnlib_config.get("count")) # >> 100 pawnlib_config.decrease(count=1) print(pawnlib_config.get("count")) # >> 99 pawnlib_config.decrease(count=10) print(pawnlib_config.get("count")) # >> 89 """ return self._modify_value(_command="decrease", **kwargs) or 0
[docs] def append_list(self, **kwargs): """ Find the key and append the value to list. :param kwargs: :return: Example: .. code-block:: python from pawnlib.config.globalconfig import pawnlib_config pawnlib_config.append_list(results="result1") pawnlib_config.append_list(results="result2") print(pawnlib_config.get("results")) # >> ['result1', 'result2'] """ return self._modify_value(_command="append_list", **kwargs) or []
[docs] def remove_list(self, **kwargs): """ Find the key and remove the value to list. :param kwargs: :return: Example: .. code-block:: python from pawnlib.config.globalconfig import pawnlib_config pawnlib_config.set(results=['result1', 'result2']) pawnlib_config.remove_list(results="result2") print(pawnlib_config.get("results")) # >> ['result1'] """ return self._modify_value(_command="remove_list", **kwargs) or []
@staticmethod def _modify_value_initialize(_command=None): init_values = { "increase": 0, "decrease": 0, "append_list": [], "remove_list": [], } return init_values.get(_command) def _modify_value(self, _command=None, **kwargs): """ Find the key and modify the value. :param _command: :param kwargs: :return: """ is_modify = False init_value = self._modify_value_initialize(_command=_command) for key, value in kwargs.items(): tmp_result = self.get(key=key, default="___NONE_VALUE___") if tmp_result == "___NONE_VALUE___": tmp_result = init_value if _command == "increase": if isinstance(tmp_result, int) or isinstance(tmp_result, float): tmp_result += value is_modify = True elif _command == "decrease": if isinstance(tmp_result, int) or isinstance(tmp_result, float): tmp_result -= value is_modify = True elif _command == "append_list" and isinstance(tmp_result, list): tmp_result.append(value) is_modify = True elif _command == "remove_list" and isinstance(tmp_result, list): tmp_result.remove(value) is_modify = True if is_modify: if self.use_global_namespace: globals()[self.global_name][key] = tmp_result return tmp_result else: self.config_manager.set(key, tmp_result) return init_value def __str__(self): return f"<{self.version.title()}>[{self.global_name}]\n{self.to_dict()}"
[docs] def conf(self) -> NestedNamespace: """Access global configuration as a :class:`pawnlib.config.globalconfig.PawnlibConfig`. Example: .. code-block:: python from pawnlib.config.globalconfig import pawnlib_config print(pawnlib_config.conf().hello) # >>> 'world' """ if self.use_global_namespace: g = globals() if self.global_name in g: # return nestednamedtuple(g[self.global_name], ignore_keys=self._do_not_execute_namespace_keys) return NestedNamespace(**g[self.global_name]) else: return NestedNamespace() else: return NestedNamespace(**self.config_manager._config)
[docs] def to_dict(self) -> dict: """Access global configuration as a dict. Example: .. code-block:: python from pawnlib.config.globalconfig import pawnlib_config print(pawnlib_config.to_dict().get("hello")) # >>> 'world' """ if self.use_global_namespace: g = globals() if self.global_name in g: return g[self.global_name] else: return {} else: return self.config_manager._config
def _list_duplicates(seq): seen = set() seen_add = seen.add # adds all elements it doesn't know yet to seen and all other to seen_twice seen_twice = set(x for x in seq if x in seen or seen_add(x)) # turn the set into a list (as requested) return list(seen_twice)
[docs]def set_debug_logger(logger_name=None, propagate=0, get_logger_name='PAWNS', level='DEBUG'): if logger_name: __logger = logging.getLogger(get_logger_name) __logger.propagate = propagate __logger.setLevel(level) __logger.addHandler(logger_name)
[docs]def create_pawn(use_global_namespace=True) -> PawnlibConfig: return PawnlibConfig(global_name="pawnlib_global_config", use_global_namespace=use_global_namespace).init_with_env()
pawnlib_config: PawnlibConfig = create_pawn(use_global_namespace=False) pawn = pawnlib_config pconf = partial(pawn.conf) global_verbose = pawnlib_config.get('PAWN_VERBOSE', 0)