import logging
import os
import sys
import time
import inspect
import errno
import signal
import atexit
import subprocess
import threading
import itertools
import warnings
from io import TextIOWrapper
from typing import Callable, List, Dict, Union, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from pawnlib.output import dump, debug_print, bcolors
from pawnlib import typing
from functools import wraps
# from pawnlib.config.globalconfig import pawnlib_config as pawn
from pawnlib.config import pawn, get_logger
from pawnlib.typing.converter import shorten_text
# warnings.simplefilter('default', DeprecationWarning)
logger = get_logger()
[docs]class ThreadPoolRunner:
"""
A class that runs a function with multiple arguments in parallel using a thread pool.
:param func: The function to run in parallel.
:type func: function
:param tasks: A list of arguments to pass to the function.
:type tasks: list
:param max_workers: The maximum number of worker threads to use.
:type max_workers: int
:param verbose: Whether to print the results of each task as they complete.
:type verbose: int
:param sleep: The number of seconds to sleep between runs when using `forever_run`.
:type sleep: int
Example:
.. code-block:: python
runner = ThreadPoolRunner(func=my_function, tasks=my_args, max_workers=10, verbose=1, sleep=5)
results = runner.run()
runner.forever_run()
"""
def __init__(self, func=None, tasks=[], max_workers=20, verbose=0, sleep=1):
self.func = func
self.tasks = tasks
self.max_workers = max_workers
self.results = []
self.sleep = sleep
self.verbose = verbose
self.stop_event = threading.Event()
[docs] def initializer_worker(self):
"""
A method that is run once by each worker thread when the thread pool is created.
"""
pass
[docs] def run(self, tasks=None, timeout: int = None) -> List[Any]:
"""
Run the function with the given arguments in parallel using a thread pool.
:param tasks: A list or generator of tasks.
:type tasks: list or generator
:param timeout: Timeout for each task in seconds. If None, no timeout is applied.
:type timeout: int or None
:return: A list of results from each task, in the same order as the input.
:rtype: list
"""
tasks = tasks or self.tasks
with ThreadPoolExecutor(max_workers=self.max_workers, initializer=self.initializer_worker) as pool:
futures = {pool.submit(self.func, _task): idx for idx, _task in enumerate(tasks)}
results = [None] * len(tasks)
try:
for future in as_completed(futures, timeout=timeout):
idx = futures[future]
_task = tasks[idx]
try:
result = future.result(timeout=timeout)
results[idx] = result
if self.verbose > 4:
logger.info(
f"Task {idx} completed. "
f"Function: {self.func.__name__}(), Arguments: {_task}, Result: {result}"
)
except Exception as e:
logger.error(
f"Task {idx} failed. "
f"Function: {self.func.__name__}, Arguments: {_task}, Exception: {e}"
)
except TimeoutError:
logger.error(f"Timeout exceeded while waiting for tasks to complete.")
except Exception as e:
logger.error(f"Critical error in thread pool execution: {e}")
pool.shutdown(wait=False)
for future in futures:
future.cancel()
raise
return results
[docs] @staticmethod
def log_results(results):
"""
Print the results of each task as they complete.
:param results: A list of results from each task.
:type results: list
"""
if results:
for result in results:
if result:
print(result)
# def forever_run(self):
# """
# Run the function with the given arguments in parallel using a thread pool indefinitely.
# """
# while True:
# self.run()
# time.sleep(self.sleep)
[docs] def forever_run(self):
"""
Run the function with the given arguments in parallel using a thread pool indefinitely.
"""
try:
while not self.stop_event.is_set():
self.run()
time.sleep(self.sleep)
except KeyboardInterrupt:
logger.info("Interrupted by user, stopping...")
self.stop()
[docs] def stop(self):
"""
Stop the forever_run loop.
"""
self.stop_event.set()
[docs]class Daemon(object):
"""
A generic daemon class.
Usage 1: subclass the Daemon class and override the run() method
Usage 2: subclass the Daemon class and use func parameter
:param pidfile: pid file location
:param func: function to run as daemon
:param stdin: standard input , The default is sys.stdin, and providing a filename will output to a file.
:param stdout: standard output, The default is sys.stdout, and providing a filename will output to a file.
:param stderr: standard error, The default is sys.stderr, and providing a filename will output to a file.
:param home_dir: home directory
:param umask: umask
:param verbose: verbosity level
:param use_gevent: use gevent
:param use_eventlet: use eventlet
\
Example:
.. code-block:: python
from pawnlib.utils.operate_handler import Daemon
def main():
while True:
print(f"main loop")
print("start daemon")
time.sleep(5)
if __name__ == "__main__":
if len(sys.argv) != 2:
sys.exit()
command = sys.argv[1]
daemon = Daemon(
pidfile="/tmp/jmon_agent.pid",
func=main
)
if command == "start":
daemon.start()
elif command == "stop":
daemon.stop()
else:
print("command not found [start/stop]")
"""
def __init__(self, pidfile, func=None, stdin=None,
stdout=None, stderr=None,
home_dir='.', umask=0o22, verbose=1,
use_gevent: bool = False, use_eventlet: bool = False):
self.stdin = stdin if stdin is not None else sys.stdin
self.stdout = stdout if stdout is not None else sys.stdout
self.stderr = stderr if stderr is not None else sys.stderr
self.pidfile = pidfile
self.func = func
self.home_dir = home_dir
self.verbose = verbose
self.umask = umask
self.daemon_alive = True
self.use_gevent = use_gevent
self.use_eventlet = use_eventlet
[docs] def log(self, *args):
if self.verbose >= 1:
print(*args)
[docs] def daemonize(self):
"""
Do the UNIX double-fork magic, see Stevens' "Advanced
Programming in the UNIX Environment" for details (ISBN 0201563177)
http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
"""
if self.use_eventlet:
import eventlet.tpool
eventlet.tpool.killall()
try:
pid = os.fork()
if pid > 0:
# Exit first parent
sys.exit(0)
except OSError as e:
sys.stderr.write(
"fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
# Decouple from parent environment
os.chdir(self.home_dir)
os.setsid()
os.umask(self.umask)
# Do second fork
try:
pid = os.fork()
if pid > 0:
# Exit from second parent
sys.exit(0)
except OSError as e:
sys.stderr.write(
"fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
sys.exit(1)
if sys.platform != 'darwin': # This block breaks on OS X
# Redirect standard file descriptors
sys.stdout.flush()
sys.stderr.flush()
si = open(self.stdin, 'r') if isinstance(self.stdin, str) else self.stdin
so = open(self.stdout, 'a+') if isinstance(self.stdout, str) else self.stdout
if self.stderr:
try:
se = open(self.stderr, 'a+') if isinstance(self.stderr, str) else self.stderr
except ValueError:
# Python 3 can't have unbuffered text I/O
se = open(self.stderr, 'a+', 1)
else:
se = so
if isinstance(si, TextIOWrapper):
os.dup2(si.fileno(), sys.stdin.fileno())
if isinstance(so, TextIOWrapper):
os.dup2(so.fileno(), sys.stdout.fileno())
if isinstance(se, TextIOWrapper):
os.dup2(se.fileno(), sys.stderr.fileno())
def sigtermhandler(signum, frame):
self.daemon_alive = False
sys.exit()
if self.use_gevent:
import gevent
gevent.reinit()
gevent.signal(signal.SIGTERM, sigtermhandler, signal.SIGTERM, None)
gevent.signal(signal.SIGINT, sigtermhandler, signal.SIGINT, None)
else:
signal.signal(signal.SIGTERM, sigtermhandler)
signal.signal(signal.SIGINT, sigtermhandler)
self.log("Started")
# Write pidfile
atexit.register(
self.delpid) # Make sure pid file is removed if we quit
pid = str(os.getpid())
open(self.pidfile, 'w+').write("%s\n" % pid)
[docs] def delpid(self):
try:
# the process may fork itself again
pid = int(open(self.pidfile, 'r').read().strip())
if pid == os.getpid():
os.remove(self.pidfile)
except OSError as e:
if e.errno == errno.ENOENT:
pass
else:
raise
[docs] def start(self, *args, **kwargs):
"""
Start the daemon
"""
self.log("Starting...")
# Check for a pidfile to see if the daemon already runs
try:
pf = open(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
except SystemExit:
pid = None
if pid:
message = "pidfile %s already exists. Is it already running?\n"
sys.stderr.write(message % self.pidfile)
sys.exit(1)
# Start the daemon
self.daemonize()
if self.func:
self.func(*args, **kwargs)
else:
self.run(*args, **kwargs)
[docs] def stop(self):
"""
Stop the daemon
"""
if self.verbose >= 1:
self.log("Stopping...")
# Get the pid from the pidfile
pid = self.get_pid()
if not pid:
message = "pidfile %s does not exist. Not running?\n"
sys.stderr.write(message % self.pidfile)
# Just to be sure. A ValueError might occur if the PID file is
# empty but does actually exist
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
return # Not an error in a restart
# Try killing the daemon process
try:
i = 0
while 1:
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
i = i + 1
if i % 10 == 0:
os.kill(pid, signal.SIGHUP)
except OSError as err:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print(str(err))
sys.exit(0)
self.log("Stopped")
[docs] def restart(self):
"""
Restart the daemon
"""
self.stop()
self.start()
[docs] def get_pid(self):
try:
pf = open(self.pidfile, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
pid = None
except SystemExit:
pid = None
return pid
[docs] def is_running(self):
pid = self.get_pid()
if pid is None:
self.log('Process is stopped')
return False
elif os.path.exists('/proc/%d' % pid):
self.log('Process (pid %d) is running...' % pid)
return True
else:
self.log('Process (pid %d) is killed' % pid)
return False
[docs] def run(self):
"""
You should override this method when you subclass Daemon.
It will be called after the process has been
daemonized by start() or restart().
"""
raise NotImplementedError
[docs]def timing(f):
"""
Get the time taken to complete the task.
:param f:
:return:
"""
@wraps(f)
def wrap(*args, **kwargs):
function_name = f"{f.__module__}.{f.__name__}()"
job_start(function_name)
start_time = time.time()
ret = f(*args, **kwargs)
end_time = time.time() - start_time
# cprint('{:s} function took {:.3f} ms'.format(f.__name__, (time2-time1)*1000.0))
job_done(full_module_name=function_name, elapsed=end_time)
return ret
return wrap
[docs]def get_inspect_module(full_module_name=None):
if full_module_name is None:
module_name = ''
stack = inspect.stack()
parent_frame = stack[1][0]
module = inspect.getmodule(parent_frame)
if module:
module_pieces = module.__name__.split('.')
module_name = typing.list_to_oneline_string(module_pieces)
function_name = stack[1][3]
full_module_name = "%s.%s()" % (module_name, function_name)
return full_module_name
[docs]def job_start(full_module_name=None):
full_module_name = get_inspect_module(full_module_name)
if pawn.get('PAWN_VERBOSE', 0) > 0:
debug_print(f"[START] {full_module_name}", "green")
[docs]def job_done(error: str = '', full_module_name: str = '', elapsed: Union[float, int] = 0):
title = get_inspect_module(full_module_name)
if pawn.get('PAWN_VERBOSE', 0) > 0:
if error == '':
debug_print(f"[DONE ] {title} {elapsed:.3f}sec", "green")
else:
debug_print(" NOT DONE %s (%.3fsec)- %s" % (title, elapsed, error), "red")
return title
[docs]def execute_function(module_func):
if "." in module_func:
[module_name, function_name] = module_func.split(".")
dump(globals())
module = __import__(f"{module_name}")
func = getattr(module, function_name)
return func()
return globals()[module_func]()
[docs]def run_execute(*args, **kwargs):
warnings.warn(
"run_execute is deprecated. Use execute_command instead.",
DeprecationWarning,
stacklevel=2
)
return execute_command(*args, **kwargs)
# def execute_command(*args, **kwargs):
# return run_execute(*args, **kwargs)
#
# def execute_command(
# cmd: str,
# text: Optional[str] = None,
# cwd: Optional[str] = None,
# check_output: bool = True,
# capture_output: bool = True,
# hook_function: Optional[Callable[[str, int], None]] = None,
# debug: bool = False,
# **kwargs
# ) -> Dict[str, Any]:
# """
# Executes a shell command and captures its output.
#
# Args:
# cmd (str): Command to be executed.
# text (str, optional): Descriptive text or title for the command.
# cwd (str, optional): Working directory to execute the command in.
# check_output (bool, optional): If True, logs the command execution result.
# capture_output (bool, optional): If True, captures the command's stdout.
# hook_function (Callable[[str, int], None], optional): Function to process each line of stdout.
# debug (bool, optional): If True, prints debug information.
# **kwargs: Additional keyword arguments to pass to the hook_function.
#
# Returns:
# Dict[str, Any]: A dictionary containing the command's execution results.
#
# Raises:
# OSError: If an error occurs while executing the command.
# """
# start_time = time.time()
#
# result = {
# "stdout": [],
# "stderr": None,
# "return_code": 0,
# "line_no": 0,
# "elapsed": 0.0,
# }
#
# if text is None:
# text = cmd
# else:
# text = f"{text} (cmd='{cmd}')"
#
# if debug:
# logger.debug(f"Executing command: {cmd}")
#
# try:
# process = subprocess.Popen(
# cmd,
# cwd=cwd,
# shell=True,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
# text=True, # Replaces 'universal_newlines=True'
# )
#
# # Process stdout line by line
# if process.stdout:
# for line in process.stdout:
# line_stripped = line.strip()
# if line_stripped:
# if callable(hook_function):
# hook_function(line=line_stripped, line_no=result['line_no'], **kwargs)
#
# if capture_output:
# result["stdout"].append(line_stripped)
#
# result['line_no'] += 1
#
# # Wait for the process to complete and capture stderr
# _, stderr = process.communicate()
#
# result["return_code"] = process.returncode
# if stderr:
# result["stderr"] = stderr.strip()
#
# except Exception as e:
# result['stderr'] = str(e)
# raise OSError(f"Error while running command '{cmd}': {e}") from e
#
# end_time = time.time()
# result['elapsed'] = round(end_time - start_time, 3)
#
# if check_output:
# if result.get("stderr"):
# logger.error(f"[FAIL] {text}, Error: '{result.get('stderr')}'")
# else:
# logger.info(f"[ OK ] {text}, elapsed={result['elapsed']}s")
#
# return result
[docs]def execute_command(
cmd: str,
text: Optional[str] = None,
cwd: Optional[str] = None,
check_output: bool = True,
capture_output: bool = True,
hook_function: Optional[Callable[[str, int], None]] = None,
debug: bool = False,
use_spinner: bool = False,
spinner_type: str = "dots",
spinner_text: Optional[str] = None,
**kwargs
) -> Dict[str, Any]:
"""
Executes a shell command and captures its output, with an optional spinner for visual feedback.
Args:
cmd (str): Command to be executed.
text (str, optional): Descriptive text or title for the command.
cwd (str, optional): Working directory to execute the command in.
check_output (bool, optional): If True, logs the command execution result.
capture_output (bool, optional): If True, captures the command's stdout.
hook_function (Callable[[str, int], None], optional): Function to process each line of stdout.
debug (bool, optional): If True, prints debug information.
use_spinner (bool, optional): If True, shows a spinner while the command executes.
spinner_type (str, optional): The type of spinner to use (from Rich library).
spinner_text (str, optional): Custom text to display alongside the spinner.
**kwargs: Additional keyword arguments to pass to the hook_function.
Returns:
Dict[str, Any]: A dictionary containing the command's execution results.
Raises:
OSError: If an error occurs while executing the command.
"""
start_time = time.time()
result = {
"stdout": [],
"stderr": None,
"return_code": 0,
"line_no": 0,
"elapsed": 0.0,
}
if text is None:
text = cmd
else:
text = f"{text} (cmd='{cmd}')"
if spinner_text is None:
spinner_text = f"Executing: {text}"
if debug:
pawn.console.log(f"Executing command: {text}")
try:
if use_spinner:
with pawn.console.status(spinner_text, spinner="dots"):
process = subprocess.Popen(
cmd,
cwd=cwd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True, # Replaces 'universal_newlines=True'
)
result = _process_command_output(process, capture_output, hook_function, result, **kwargs)
else:
process = subprocess.Popen(
cmd,
cwd=cwd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
result = _process_command_output(process, capture_output, hook_function, result, **kwargs)
except Exception as e:
result['stderr'] = str(e)
raise OSError(f"Error while running command '{cmd}': {e}") from e
end_time = time.time()
result['elapsed'] = round(end_time - start_time, 3)
if check_output:
if result.get("stderr"):
logger.error(f"[bold red][FAIL][/bold red] {text}, Error: '{result.get('stderr')}'")
else:
logger.info(f"[bold green][ OK ][/bold green] {text}, elapsed={result['elapsed']}s")
return result
def _process_command_output(process, capture_output, hook_function, result, **kwargs):
"""
Processes the output from the subprocess.
"""
if process.stdout:
for line in process.stdout:
line_stripped = line.strip()
if line_stripped:
if callable(hook_function):
hook_function(line=line_stripped, line_no=result['line_no'], **kwargs)
if capture_output:
result["stdout"].append(line_stripped)
result['line_no'] += 1
# Wait for the process to complete and capture stderr
_, stderr = process.communicate()
result["return_code"] = process.returncode
if stderr:
result["stderr"] = stderr.strip()
return result
[docs]def execute_command_batch(
tasks: Union[List[str], List[Dict[str, Any]]],
stop_on_error: bool = False,
slack_url: Optional[str] = None,
default_kwargs: Optional[Dict[str, Any]] = None,
function_registry=None,
) -> List[Dict[str, Any]]:
"""
Executes a batch of tasks, where each task can be a string (command) or a dictionary
containing arguments for execute_command. Handles errors and sends optional Slack notifications.
Args:
tasks (Union[List[str], List[Dict[str, Any]]]): List of commands (as strings or dictionaries).
stop_on_error (bool, optional): If True, stops execution upon encountering an error.
slack_url (str, optional): Slack webhook URL for sending notifications.
default_kwargs (Dict[str, Any], optional): Default arguments to apply to each task.
Returns:
List[Dict[str, Any]]: A list of results from execute_command for each task.
"""
from pawnlib.utils.notify import send_slack
function_registry = function_registry or {}
results = []
default_kwargs = default_kwargs or {"debug":False, "check_output": False}
for idx, task_item in enumerate(tasks):
# If task is a string, treat it as a simple command
if isinstance(task_item, str):
task_args = {"cmd": task_item}
elif isinstance(task_item, dict):
task_args = task_item
else:
logger.error(f"Invalid task format at index {idx}. Skipping task.")
continue
# Merge with default kwargs, with task-specific arguments taking precedence
task_args = {**default_kwargs, **task_args}
# Extract 'cmd' for logging purposes
cmd = task_args.get('cmd')
task_type = task_args.get('type', 'shell')
if not cmd:
logger.error(f"Task {idx + 1} is missing the 'cmd' argument.")
continue
text = task_args.get('text', f"Task {idx + 1}/{len(tasks)}: {cmd}")
status_emoji = "🚀" # Default emoji for in-progress status
try:
if task_type == "function":
result = execute_registered_function(cmd, function_registry=function_registry)
cmd += "()"
else:
result = execute_command(**task_args)
# success = result["return_code"] == 0
success = result["return_code"] in [0, 2]
elapsed = result["elapsed"]
error_msg = result.get('stderr')
status_emoji = "✅" if success else "❌"
status = "SUCCESS" if success else "FAILED"
if task_args.get('text'):
text_command = f"{task_args.get('text')} (cmd='{cmd}')"
else:
text_command = f"Command: '{cmd}'"
logger.info(
# f"{status_emoji} Task {idx + 1}/{len(tasks)} | Command: '{cmd}' | Status: {status} | "
f"{status_emoji} Task {idx + 1}/{len(tasks)} | {text_command} | Status: {status} | "
f"Elapsed: {elapsed:.3f}s"
)
if error_msg:
logger.error(f"{status_emoji} Task {idx + 1}/{len(tasks)} | {error_msg}")
results.append(result)
if slack_url:
command_stdout = shorten_text("\n".join(result.get("stdout", [])) if success else "", width=30)
command_stderr = shorten_text(result.get("stderr", "") if not success else "", width=30, truncate_side="left")
send_slack(
url=slack_url,
msg_text={
"Command": cmd,
"Elapsed": f"{elapsed:.3f}s",
# "Output": "\n".join(result.get("stdout", [])) if success else "",
"Output": command_stdout,
# "Error": result.get("stderr", "") if not success else "",
"Error": command_stderr,
"Return Code": result['return_code']
},
title=f"Task {idx + 1}/{len(tasks)} {status}",
send_user_name="TaskRunnerBot",
msg_level="info" if success else "error",
status="success" if success else "failed"
)
if not success and stop_on_error:
logger.error("Execution stopped due to an error.")
break
except Exception as e:
status_emoji = "❌"
logger.error(
f"{status_emoji} Task {idx + 1}/{len(tasks)} | Command: '{cmd}' | Status: EXCEPTION | "
f"Error: {e}"
)
result = {
"cmd": cmd,
"stdout": [],
"stderr": str(e),
"return_code": -1,
"elapsed": None,
}
results.append(result)
# Send Slack notification for exception
if slack_url:
send_slack(
url=slack_url,
msg_text={
"Command": cmd,
"Error": str(e)
},
title=f"Task {idx + 1}/{len(tasks)} Exception",
send_user_name="TaskRunnerBot",
msg_level="error",
status="critical"
)
if stop_on_error:
logger.error("Execution stopped due to an exception.")
break
return results
[docs]def execute_registered_function(function_name: str, args: Optional[Dict[str, Any]] = None, debug: bool = False, function_registry=None) -> Dict[str, Any]:
"""
Executes the specified function by name and returns the result.
Args:
function_name (str): The name of the function to execute.
args (Optional[Dict[str, Any]]): A dictionary of arguments to pass to the function.
debug (bool, optional): If True, enables debug logging.
function_registry (dict, optional): A registry of available functions.
Returns:
Dict[str, Any]: A dictionary containing the execution results, including 'stdout', 'stderr', 'return_code', 'line_no', and 'elapsed'.
"""
try:
func = function_registry.get(function_name)
if not callable(func):
raise ValueError(f"{function_name} is not a callable function.")
if debug:
logger.info(f"Executing function: {function_name} with args: {args}")
start_time = time.time()
result = func(**(args or {}))
end_time = time.time()
elapsed = round(end_time - start_time, 3)
return {
"stdout": [str(result)],
"stderr": None,
"return_code": 0,
"line_no": 0,
"elapsed": elapsed,
}
except AttributeError:
error_msg = f"Function '{function_name}' not found."
logger.error(error_msg)
return {
"stdout": [],
"stderr": error_msg,
"return_code": -1,
"line_no": 0,
"elapsed": 0.0,
}
except Exception as e:
error_msg = str(e)
logger.error(f"Error executing function '{function_name}': {error_msg}")
return {
"stdout": [],
"stderr": error_msg,
"return_code": -1,
"line_no": 0,
"elapsed": 0.0,
}
[docs]def hook_print(*args, **kwargs):
"""
Print to output every 10th line
:param args:
:param kwargs:
:return:
"""
if "amplify" in kwargs.get("line"):
print(f"[output hook - matching keyword] {args} {kwargs}")
if kwargs.get("line_no") % 100 == 0:
print(f"[output hook - matching line_no] {args} {kwargs}")
# print(kwargs.get('line'))
[docs]class Spinner:
"""
Create a spinning cursor
:param text: text
:param delay: sleep time
:Example
.. code-block:: python
from pawnlib.utils.operate_handler import Spinner
with Spinner(text="Wait message"):
time.sleep(10)
"""
def __init__(self, text="", delay=0.1):
self._spinner_items = itertools.cycle(['-', '/', '|', '\\'])
self.delay = delay
self.busy = False
self.spinner_visible = False
self.text = text
self._screen_lock = None
self.thread = None
self.spin_message = ""
self.line_up = '\033[1A'
self.line_up = '\x1b[1A'
self.line_clear = '\x1b[2K'
self.start_time = 0
if type(sys.stdout).__name__ == "FileProxy":
self._sys_stdout = getattr(sys.stdout, "rich_proxied_file", sys.stdout)
else:
self._sys_stdout = sys.stdout
[docs] def title(self, text=None):
# print(end=self.line_up)
self.text = text
[docs] def write_next(self):
with self._screen_lock:
if not self.spinner_visible:
if self.text:
self.spin_message = f"{self.text} ... {next(self._spinner_items)}"
else:
self.spin_message = next(self._spinner_items)
# sys.stdout.write(next(self.spinner))
self._sys_stdout.write(self.spin_message)
self.spinner_visible = True
self._sys_stdout.flush()
[docs] def remove_spinner(self, cleanup=False):
with self._screen_lock:
if self.spinner_visible:
b = len(self.spin_message)
self._sys_stdout.write('\b' * b)
self.spinner_visible = False
if cleanup:
self._sys_stdout.write(' ') # overwrite spinner with blank
self._sys_stdout.write('\r') # move to next line
self._sys_stdout.flush()
[docs] def spinner_task(self):
while self.busy:
self.write_next()
time.sleep(self.delay)
self.remove_spinner()
[docs] def start(self):
self._screen_lock = threading.Lock()
self.busy = True
self.thread = threading.Thread(target=self.spinner_task)
self.thread.start()
self.start_time = time.time() # 시작 시간 기록
[docs] def stop(self):
self.busy = False
self.remove_spinner(cleanup=True)
elapsed_time = time.time() - self.start_time # 경과 시간 계산
print(f"[DONE] {self.text} (took {elapsed_time:.2f} seconds)") # 경과 시간 출력
def __enter__(self):
if self._sys_stdout.isatty():
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_traceback):
if self._sys_stdout.isatty():
self.stop()
print(f"[DONE] {self.text}")
else:
self._sys_stdout.write('\r')
[docs]class WaitStateLoop:
"""
loop_function is continuously executed and values are compared with exit_function.
:param loop_function: function to run continuously
:param exit_function: function to exit the loop
:param timeout:
:param delay: sleep time
:param text: text message
Example:
.. code-block:: python
from pawnlib.utils.operate_handler import WaitStateLoop
from functools import partial
def check_func(param=None):
time.sleep(0.2)
random_int = random.randint(1, 1000)
# print(f"param= {param}, random_int = {random_int}")
return random_int
def loop_exit_func(result):
if result % 10 == 1.5:
return True
return False
WaitStateLoop(
loop_function=partial(check_func, "param_one"),
exit_function=loop_exit_func,
timeout=10
).run()
"""
def __init__(self,
loop_function: Callable,
exit_function: Callable,
timeout=30, delay=0.5, text="WaitStateLoop",
):
self.loop_function = loop_function
self.exit_function = exit_function
self.timeout = timeout
self.delay = delay
self.text = text
[docs] def run(self):
"""
run()
:return:
"""
spin_text = ""
error_text = ""
count = 0
start_time = time.time()
if getattr(self.loop_function, "func"):
func_name = self.loop_function.func.__name__
spin_text = f"[{self.text}] Wait for {func_name}{self.loop_function.args}"
with Spinner(text=spin_text) as spinner:
while True:
result = self.loop_function()
is_success = self.exit_function(result)
elapsed = int(time.time() - start_time)
# spinner.title(f"{error_text}[{count}]{spin_text}: result={result}, is_success={is_success}, {elapsed} {time.time()} < {start_time + self.timeout}")
if is_success is True:
return result
spinner.title(f"{error_text}[{count}]{spin_text}: result={shorten_text(result, 30)}, is_success={is_success}, {elapsed} secs passed")
# spinner.title(f" {elapsed} secs passed")
time.sleep(self.delay)
try:
assert time.time() < start_time + self.timeout
except AssertionError:
# text = f"[{count:.1f}s] [{timeout_limit}s Timeout] Waiting for {exec_function_name} / '{func_args}' :: '{wait_state}' -> {check_state} , {error_msg}"
error_text = f"[TIMEOUT]"
count += 1
[docs]def wait_state_loop(
exec_function=None,
func_args=[],
check_key="status",
wait_state="0x1",
timeout_limit=30,
increase_sec=0.5,
health_status=None,
description="",
force_dict=True,
logger=None
):
start_time = time.time()
count = 0
# arguments 가 한개만 있을 때의 예외
if isinstance(func_args, str):
tmp_args = ()
tmp_args = tmp_args + (func_args,)
func_args = tmp_args
exec_function_name = exec_function.__name__
act_desc = f"desc={description}, function={exec_function_name}, args={func_args}"
spinner = Halo(text=f"[START] Wait for {description} , {exec_function_name}, {func_args}", spinner='dots')
if logger and hasattr(logger, "info"):
logger.info(f"[SR] [START] {act_desc}")
spinner.start()
while True:
if isinstance(func_args, dict):
response = exec_function(**func_args)
else:
response = exec_function(*func_args)
if not isinstance(response, dict):
response = response.__dict__
if force_dict and isinstance(response.get("json"), list):
response['json'] = response['json'][0]
check_state = ""
error_msg = ""
if response.get("json") or health_status:
response_result = response.get("json")
check_state = response_result.get(check_key, "")
response_status = response.get("status_code")
if check_state == wait_state or health_status == response_status:
status_header = bcolors.OKGREEN + "[DONE]" + bcolors.ENDC
text = f"\t[{description}] count={count}, func={exec_function_name}, args={str(func_args)[:30]}, wait_state='{wait_state}', check_state='{check_state}'"
if health_status:
text += f", health_status={health_status}, status={response_status}"
spinner.succeed(f'{status_header} {text}')
spinner.stop()
spinner.clear()
# spinner.stop_and_persist(symbol='🦄'.encode('utf-8'), text="[DONE]")
break
else:
if type(response_result) == dict or type(check_state) == dict:
if response_result.get("failure"):
if response_result.get("failure").get("message"):
print("\n\n\n")
spinner.fail(f'[FAIL] {response_result.get("failure").get("message")}')
spinner.stop()
spinner.clear()
break
text = f"[{count:.1f}s] Waiting for {exec_function_name} / {func_args} :: '{wait_state}' -> '{check_state}' , {error_msg}"
spinner.start(text=text)
if logger and hasattr(logger, "info"):
logger.info(f"[SR] {text}")
try:
assert time.time() < start_time + timeout_limit
except AssertionError:
text = f"[{count:.1f}s] [{timeout_limit}s Timeout] Waiting for {exec_function_name} / '{func_args}' :: '{wait_state}' -> {check_state} , {error_msg}"
spinner.start(text=text)
if logger and hasattr(logger, "error"):
logger.info(f"[SR] {text}")
count = count + increase_sec
time.sleep(increase_sec)
spinner.stop()
if logger and hasattr(logger, "info"):
logger.info(f"[SR] [DONE] {act_desc}")
if health_status:
return response
[docs]def run_with_keyboard_interrupt(command, *args, **kwargs):
"""
run with KeyboardInterrupt
:param command:
:param args:
:param kwargs:
:return:
Example:
.. code-block:: python
from pawnlib.utils.operate_handler import run_with_keyboard_interrupt
run_with_keyboard_interrupt(run_func, args, kwargs)
"""
try:
if callable(command):
command(*args, **kwargs)
else:
pawn.console.print(f"\n[red] {command} not callable ")
except KeyboardInterrupt:
pawn.console.print(f"\n\n[red] ^C KeyboardInterrupt - {command.__name__}{str(args)[:-1]}{kwargs}) \n")
[docs]def handle_keyboard_interrupt_signal():
import signal
def handle_ctrl_c(_signal, _frame):
pawn.console.rule(f"[red] KeyboardInterrupt, Going down! Signal={_signal}")
sys.exit(0)
signal.signal(signal.SIGINT, handle_ctrl_c)