from functools import wraps, partial
import asyncio
import aiometer
from aiometer._impl import utils
from pawnlib.output import debug_print, classdump
from pawnlib.config import pawnlib_config as pawn, setup_logger
# from pawnlib.utils.http import ALLOWS_HTTP_METHOD
from pawnlib.typing.constants import const
import httpx
import sys
[docs]class AsyncTasks:
[docs] def __init__(self,
max_at_once: int = 10,
max_per_second: int = 10,
title: str = "Working on async tasks ...",
debug: bool = False,
status: bool = False,
**kwargs):
"""
This Class is to run asyncio using aiometer.
:param max_at_once: Limit maximum number of concurrently running tasks.
:param max_per_second: Limit request rate to not overload the server.
:param title: Title of the tasks
:param status: Status of the tasks
:param debug: Whether to use debug
:param kwargs:
Example:
.. code-block:: python
async_tasks = AsyncTasks(max_at_once=10, max_per_second=10)
async_tasks.generate_tasks(
target_list=range(1, 100) ,
function=run_container,
**{"args": args}
).run()
"""
self.tasks = []
self.max_at_once = max_at_once
self.max_per_second = max_per_second
self.debug = debug
self._debug_print(self)
self.get_list_function = None
self.async_partial_target_func = None
self.async_partial_task_func = None
self.status_console = None
self._title = title
self._view_status = status
self._function_name = ""
[docs] def get_tasks(self):
return self.tasks
[docs] def generate_tasks(self, target_list=None, function=None, **kwargs):
"""
This function generates the async tasks list
:param target_list: List of targets for asynchronous execution
:param function: Name of the function to execute
:param kwargs:
:return:
"""
if function and getattr(function, "__qualname__"):
self._function_name = function.__qualname__
else:
raise ValueError(f"{function} is not function")
self._debug_print(f"{target_list}, {type(target_list)}")
if kwargs.get('kwargs'):
kwargs = kwargs['kwargs']
else:
kwargs = {}
if target_list is None:
target_list = []
for target in target_list:
self._debug_print(f"target={target}, function={self._function_name}(), kwargs={kwargs}, task_len={len(self.tasks)}")
self.tasks.append(partial(function, target, **kwargs))
[docs] def run(self):
"""
This function executes an asynchronous operation.
:return:
"""
if self._view_status:
with pawn.console.status(self._title) as status:
result = asyncio.run(self._runner(status))
else:
result = asyncio.run(self._runner())
return result
async def _runner(self, status=None):
result = {}
tasks_length = len(self.tasks)
if tasks_length > 0:
async with aiometer.amap(
async_fn=lambda fn: fn(),
args=self.tasks,
max_at_once=self.max_at_once,
max_per_second=self.max_per_second,
_include_index=True,
) as amap_results:
async for _index, _result in amap_results:
result[_index] = _result
if status and self._view_status:
status.update(f"{self._title} <{self._function_name}> [{_index}/{tasks_length}] {_result}")
else:
pawn.console.log(f"ERROR: tasks is null = {self.tasks}")
return utils.list_from_indexed_dict(result)
def _debug_print(self, *args, **kwargs):
if self.debug:
debug_print(*args, **kwargs)
[docs]def async_partial(async_fn, *args, **kwargs):
async def wrapped():
result = None
if asyncio.iscoroutinefunction(async_fn):
result = await async_fn(*args, **kwargs)
else:
debug_print(f"{async_fn} is not coroutine", "red")
return result
return wrapped
[docs]def run_in_async_loop(f):
@wraps(f)
async def wrapped(*args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, f(*args, **kwargs))
return wrapped
[docs]class AsyncHttp(AsyncTasks):
"""
This class is a subclass of AsyncTasks and is used to handle asynchronous HTTP requests.
Attributes:
max_at_once (int): Maximum number of tasks to run at once.
max_per_second (int): Maximum number of tasks to start per second.
title (str): Title for the progress bar.
debug (bool): If True, print debug information.
status (bool): If True, print status information.
urls (list): List of URLs to fetch.
"""
def __init__(self,
max_at_once: int = 10,
max_per_second: int = 10,
title="Working on async tasks ...",
debug: bool = False,
status: bool = False,
urls: list = None,
**kwargs):
if urls is None:
urls = []
super().__init__(max_at_once, max_per_second, title, debug, status, **kwargs)
self.urls = urls
self._prepare()
[docs] def append_task(self, task):
"""
Append a new task to the task list.
Args:
task (str or dict): If it's a string, it's considered as a URL. If it's a dictionary, it should contain a 'url' key.
"""
_url = None
_kwargs = {}
if isinstance(task, str):
_url = task
_kwargs = {}
elif isinstance(task, dict) and task.get('url'):
_url = task.pop("url")
_kwargs = task
self.generate_tasks(
target_list=[_url],
function=fetch_httpx_url,
**_kwargs
)
pawn.console.log(f"IN] url={_url}, kwargs={_kwargs}, max_at_once={self.max_at_once}")
def _prepare(self):
"""
Prepare the tasks based on the provided URLs.
"""
if len(self.urls) > 0:
for info in self.urls:
self.append_task(info)
[docs]async def fetch_httpx_url(url, method="get", timeout=4, info="", max_keepalive_connections=10, max_connections=20, **kwargs):
"""
Asynchronously fetch a URL using httpx.
Args:
url (str): The URL to fetch.
method (str): The HTTP method to use. Default is "get".
timeout (int): The timeout for the request in seconds. Default is 4.
info (str): Additional information for the request. Default is an empty string.
max_keepalive_connections (int): The maximum number of keep-alive connections. Default is 10.
max_connections (int): The maximum number of connections. Default is 20.
**kwargs: Additional keyword arguments for the httpx request.
Returns:
httpx.Response or dict: The response from the server, or an empty dictionary if an error occurred.
"""
response = None
try:
limits = httpx.Limits(max_keepalive_connections=max_keepalive_connections, max_connections=max_connections)
async with httpx.AsyncClient(timeout=timeout, limits=limits) as client:
# pawn.console.log(url, kwargs)
if method in const.get_http_methods(lowercase=True):
response = await getattr(client, method)(url, timeout=timeout, **kwargs)
else:
pawn.console.log(f"[ERROR] Unsupported HTTP method -> {method}")
pawn.app_logger.error(f"[ERROR] Unsupported HTTP method -> {method}")
raise ValueError(f"[ERROR] Unsupported HTTP method -> {method}")
# client.post
#
# if method == "get":
# response = await client.get(url)
# else:
# response = await getattr(client, method)(url)
if response.status_code != 200:
pawn.console.log(f"[red][ERROR] fetching {url}, status_code={response.status_code}, response={response.text}")
pawn.app_logger.error(f"[red][ERROR] fetching {url}, status_code={response.status_code}, response={response.text}")
return response
except Exception as e:
pawn.console.log(f"url={url} e={e}, response={response}, info={info}")
pawn.app_logger.error(f"url={url} e={e}, response={response}, info={info}")
return {}
# async def shutdown_async_tasks(tasks, loop, exit_on_shutdown=False):
# """
# Gracefully close the WebSocket connection and cancel all pending async tasks.
#
# :param tasks: List of asyncio tasks to cancel
# :param loop: The current event loop
# :param exit_on_shutdown: Boolean indicating if the system should exit after shutdown
# """
# import logging
# import sys
# logger = logging.getLogger("shutdown_async_tasks")
# logger.info(f"Initiating graceful shutdown. Pending tasks: {len(tasks)}")
#
# try:
# # Cancel all pending tasks
# logger.info(f"Attempting to cancel {len(tasks)} tasks.")
# for task in tasks:
# print(task)
# if not task.done():
# task_name = f"Task {task.get_name()}" if task.get_name() else str(task)
# try:
# task.cancel()
# await task
# logger.info(f"{task_name} cancelled successfully.")
# except asyncio.CancelledError:
# logger.warning(f"{task_name} was already cancelled.")
# except Exception as e:
# logger.error(f"Error while cancelling {task_name}: {e}")
# else:
# logger.info(f"Task {task.get_name()} already completed.")
#
# # Close the event loop gracefully
# logger.info("Closing the event loop.")
# loop.stop()
# loop.close()
# logger.info("Event loop closed successfully.")
#
# if exit_on_shutdown:
# logger.info("Exiting the system after graceful shutdown.")
# sys.exit(0)
#
# except Exception as e:
# logger.error(f"An error occurred during shutdown: {e}")
# finally:
# logger.info("Shutdown process completed.")
# async def shutdown_async_tasks(loop=None, tasks=None, logger=None, exit_on_shutdown=True, verbose=1):
# """
# Shutdown all pending async tasks and close the loop gracefully.
#
# Args:
# loop (asyncio.AbstractEventLoop): The event loop to use. If None, the current running loop will be used.
# tasks (List[asyncio.Task]): List of tasks to cancel. If None, all pending tasks in the loop will be cancelled.
# logger (logging.Logger): Logger instance for logging. If None, print will be used as fallback.
# exit_on_shutdown (bool): If True, the system will exit after shutdown.
# verbose (int): The verbosity level for logging output.
# """
#
# logger = setup_logger(logger, "shutdown_async_tasks", verbose)
#
# if loop is None:
# try:
# loop = asyncio.get_running_loop()
# except RuntimeError:
# logger("No running event loop found.")
# return
#
# if tasks is None:
# tasks = [task for task in asyncio.all_tasks(loop) if isinstance(task, asyncio.Task)]
#
# logger.info(f"Initiating graceful shutdown. Found {len(tasks)} pending task(s) to cancel.")
#
# if tasks:
# for task in tasks:
# if not task.done() and not task.cancelled():
# task_name = task.get_coro().__name__
# logger.info(f"Cancelling task: {task_name}")
# task.cancel()
#
# for task in tasks:
# try:
# await task
# except asyncio.CancelledError:
# task_name = task.get_coro().__name__
# logger.info(f"Task '{task_name}' cancelled successfully.")
# except Exception as e:
# task_name = task.get_coro().__name__
# logger.error(f"Error during task '{task_name}' cancellation: {e}")
#
# logger.info("All tasks cancelled and event loop closed gracefully.")
#
# if exit_on_shutdown:
# logger.info("Exiting the system after graceful shutdown.")
# sys.exit(0)