Source code for pawnlib.output.file

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import glob
import yaml
import time
import re
from typing import Union, Any, List, Callable
from pawnlib.config.globalconfig import pawnlib_config as pawn
from pawnlib.output import color_print
from pawnlib.typing import converter
from rich.prompt import Confirm
import asyncio
import aiofiles
import logging

from pawnlib.config.logging_config import setup_logger


[docs]class NullByteRemover:
[docs] def __init__(self, file_paths, chunk_size=1024 * 1024, replace_with=None): """ A class to remove null bytes from files. :param file_paths: List of file paths or a single file path. :param chunk_size: Size of the chunks to read from the file (default is 1 MB). :param replace_with: Bytes to replace null bytes with (default is empty bytes). Example: .. code-block:: python remover = NullByteRemover(["file1.txt", "file2.txt"]) remover.remove_null_bytes() remover = NullByteRemover("single_file.txt", replace_with=b' ') remover.remove_null_bytes() """ self.file_paths = file_paths if isinstance(file_paths, list) else [file_paths] self.chunk_size = chunk_size self.replace_with = replace_with if replace_with is not None else b'' self.report = []
[docs] def run(self): self.remove_null_bytes() self.print_report()
[docs] def check_null_bytes(self, file_path): """ Check if the file contains null bytes. :param file_path: Path to the file to check. :return: True if null bytes are found, otherwise False. Example: .. code-block:: python has_null_bytes = remover.check_null_bytes("file1.txt") print(has_null_bytes) # >> True or False """ with open(file_path, 'rb') as f: while True: chunk = f.read(self.chunk_size) if not chunk: break if b'\x00' in chunk: return True return False
[docs] def remove_null_bytes(self): """ Remove null bytes from the specified files and create new files without null bytes. Example: .. code-block:: python remover.remove_null_bytes() """ for file_path in self.file_paths: # Check for null bytes if not self.check_null_bytes(file_path): print(f"No null bytes found: {file_path} - Skipping processing.") continue start_time = time.time() # Record start time original_size = os.path.getsize(file_path) new_file_path = f"{file_path}_recovered" null_byte_count = 0 total_bytes_processed = 0 with open(file_path, 'rb') as f_in, open(new_file_path, 'wb') as f_out: while True: data = f_in.read(self.chunk_size) if not data: break clean_data = data.replace(b'\x00', self.replace_with) null_byte_count += data.count(b'\x00') f_out.write(clean_data) total_bytes_processed += len(data) self._show_progress(total_bytes_processed, original_size, file_path) # Report details after processing self.report.append({ 'File Name': file_path, 'Original Size': original_size, 'Recovered Size': os.path.getsize(new_file_path), 'Null Byte Count': null_byte_count, 'Execution Time': time.time() - start_time })
@staticmethod def _show_progress(processed, total, file_path): """ Display the progress of file processing. :param processed: Number of bytes processed so far. :param total: Total number of bytes in the original file. :param file_path: Path to the current file being processed. """ percent = processed / total * 100 sys.stdout.write(f"\r{file_path}: {percent:.2f}% completed ({processed}/{total} bytes)") sys.stdout.flush() if processed >= total: print() # New line after completion
[docs] def print_report(self): """ Print a detailed report of the processing results after completion. This includes information about each processed file such as size and null byte count. """ for file_report in self.report: print(f"File Name: {file_report['File Name']}") print(f"Original Size: {file_report['Original Size']} bytes") print(f"Recovered Size: {file_report['Recovered Size']} bytes") print(f"Null Byte Count: {file_report['Null Byte Count']} occurrences") print(f"Execution Time: {file_report['Execution Time']:.2f} seconds") print("-" * 40)
[docs]class Tail: """ Tail class for monitoring log files with support for both synchronous and asynchronous modes. :param log_file_paths: Paths to the log files to monitor. :type log_file_paths: Union[str, List[str]] :param filters: List of regex patterns to filter log lines. :type filters: List[str] :param callback: Function to call with processed log lines. :type callback: Callable[[str], Union[None, asyncio.coroutine]] :param async_mode: Whether to operate in asynchronous mode. :type async_mode: bool :param formatter: Function to format log lines before processing. :type formatter: Callable[[str], str] Example: .. code-block:: python from pawnlib.output.file import Tail # Synchronous mode tail = Tail(log_file_paths=["/var/log/app.log"], filters=["ERROR", "CRITICAL"], callback=process_log_line, async_mode=False) tail.follow() # Asynchronous mode async def process_log_line_async(line: str): # Your async callback logic pass tail = Tail(log_file_paths=["/var/log/app.log"], filters=["ERROR", "WARNING"], callback=process_log_line_async, async_mode=True) tail.follow() """ def __init__(self, log_file_paths: Union[str, List[str]], filters: List[str], callback: Callable[[str], Any], async_mode: bool = False, formatter: Callable[[str], str] = None, enable_logging: bool = True, logger=None, verbose: int = 0, ): self.log_file_paths = [log_file_paths] if isinstance(log_file_paths, str) else log_file_paths self.filters = [re.compile(f) for f in filters] self.callback = callback self.async_mode = async_mode self.formatter = formatter self.verbose = verbose # if enable_logging: # logging.basicConfig(level=logging.INFO) # else: # logging.basicConfig(level=logging.CRITICAL) # Silence all logs # from pawnlib.utils.log import ConsoleLoggerAdapter self.logger = setup_logger(logger, "Tail", self.verbose) self.file_inodes = {} # self.logger.info("Start Info") self.max_retries = 5 def _get_inode(self, file_path): """Get the inode of the file.""" return os.stat(file_path).st_ino async def _handle_retry(self, file_path, retry_count): """ Handle the retry logic with backoff for when the file is not found. :param file_path: The path to the log file. :param retry_count: The number of retry attempts. """ retry_count += 1 backoff_time = min(2 ** retry_count, 60) # Exponential backoff with a max of 60 seconds self.logger.warning(f"File '{file_path}' not found. Retrying in {backoff_time} seconds. Retry count: {retry_count}") await asyncio.sleep(backoff_time) return retry_count async def _follow_async(self, file_path: str): retry_count = 0 self.logger.info(f"Starting async follow on {file_path}") while retry_count <= self.max_retries: try: # Check if the file exists # if not os.path.exists(file_path): # self.logger.warning(f"Log file {file_path} not found. Waiting for it to be created... ") # retry_count += 1 # backoff_time = min(2 ** retry_count, 60) # Exponential backoff with a max of 60 seconds # self.logger.warning(f"---- backoff_time -> {backoff_time}") # await asyncio.sleep(backoff_time) # continue if not os.path.exists(file_path): retry_count = await self._handle_retry(file_path, retry_count) continue async with aiofiles.open(file_path, mode='r') as file: # Move to the end of the file await file.seek(0, os.SEEK_END) self.file_inodes[file_path] = self._get_inode(file_path) self.logger.debug(f"Started async monitoring on file: {file_path}") while True: # Check for file rotation by comparing inode numbers current_inode = self._get_inode(file_path) if self.file_inodes[file_path] != current_inode: self.file_inodes[file_path] = current_inode self.logger.info(f"Log file {file_path} rotated. Reopening new file.") break # Exit loop to reopen the new file # Read new lines line = await file.readline() if not line: await asyncio.sleep(0.1) continue # Process each line await self._process_line(line, file_path) retry_count = 0 # Reset retry count after successful file access except FileNotFoundError: retry_count = await self._handle_retry(file_path, retry_count) except Exception as e: self.logger.error(f"Error occurred while asynchronously monitoring file {file_path}: {e}") break def _follow_sync(self, file_path: str): while True: try: if not os.path.exists(file_path): self.logger.warning(f"Log file {file_path} not found. Waiting for it to be created...") time.sleep(1) # Wait for the file to be recreated continue with open(file_path, "r") as file: file.seek(0, os.SEEK_END) self.file_inodes[file_path] = self._get_inode(file_path) self.logger.debug(f"Started synchronous monitoring on file: {file_path}") while True: current_inode = self._get_inode(file_path) if self.file_inodes[file_path] != current_inode: self.file_inodes[file_path] = current_inode self.logger.info(f"Log file {file_path} rotated. Reopening new file.") break # Exit the loop to reopen the file line = file.readline() if not line: time.sleep(0.1) continue self._process_line_sync(line, file_path) except FileNotFoundError: self.logger.error(f"Error monitoring file {file_path}: File not found. Retrying...") time.sleep(1) # Retry after a delay if the file is not found except Exception as e: self.logger.error(f"Error occurred while synchronously monitoring file {file_path}: {e}") break async def _process_line(self, line: str, file_path: str): line = line.strip() if self._should_process(line): try: formatted_line = self.formatter(line) if self.formatter else line result = self.callback(formatted_line) if asyncio.iscoroutine(result): # Check if the result is awaitable result = await result # Await only if it is a coroutine if isinstance(result, bool): # Handle if the awaited result is a boolean if not result: self.logger.warning(f"Callback returned False after awaiting for line from {file_path}: {formatted_line}") elif isinstance(result, bool): # Handle non-awaitable boolean values if not result: self.logger.warning(f"Callback returned False for line from {file_path}: {formatted_line}") else: self.logger.debug(f"Callback returned non-awaitable value for line from {file_path}: {result}") self.logger.debug(f"Successfully processed a log line from {file_path}: {formatted_line}") except Exception as e: self.logger.error(f"Error occurred while executing callback on line from {file_path}: {e}") def _process_line_sync(self, line: str, file_path: str): line = line.strip() if self._should_process(line): try: formatted_line = self.formatter(line) if self.formatter else line self.callback(formatted_line) self.logger.debug(f"Successfully processed a log line from {file_path}: {formatted_line}") except Exception as e: self.logger.error(f"Error occurred while executing callback on line from {file_path}: {e}") def _should_process(self, line: str) -> bool: return any(f.search(line) for f in self.filters)
[docs] async def follow_async(self): """ Asynchronous follow method """ tasks = [self._follow_async(path) for path in self.log_file_paths] await asyncio.gather(*tasks)
[docs] def follow(self): """ Unified follow method for both async and sync """ if self.async_mode: try: loop = asyncio.get_running_loop() if loop.is_running(): # Use an already running loop return asyncio.ensure_future(self.follow_async()) else: # If no loop is running, start it manually loop.run_until_complete(self.follow_async()) except RuntimeError: # Handle if no loop is running (as in the case with `asyncio.run()`) asyncio.run(self.follow_async()) else: # Synchronous mode: directly call the sync methods for path in self.log_file_paths: self._follow_sync(path)
[docs]def check_path(path, detailed=False): """ Check if the provided path is a file, a directory, or neither. Optionally return extended details about the path. :param path: The path to check. :type path: str :param detailed: If True, returns a dictionary with additional information about the path. If False, returns only the type as a string. :type detailed: bool :return: If detailed=True, returns a dictionary with path type, existence, readability, and writability. If detailed=False, returns only the type of the path as a string ('file', 'directory', or 'neither'). :rtype: dict | str Example usage: >>> check_path("/path/to/file.txt") 'file' >>> check_path("/path/to/file.txt", detailed=True) {'type': 'file', 'exists': True, 'readable': True, 'writable': False} >>> check_path("/path/to/directory") 'directory' >>> check_path("/path/to/directory", detailed=True) {'type': 'directory', 'exists': True, 'readable': True, 'writable': True} >>> check_path("/path/to/unknown") None >>> check_path("/path/to/unknown", detailed=True) {'type': None, 'exists': False, 'readable': False, 'writable': False} Notes: - `detailed=False` by default, returning only the type of the path as a string. - If `detailed=True`, additional details (existence, readability, writability) are provided in a dictionary. """ path_info = { "type": None, "exists": os.path.exists(path), "readable": os.access(path, os.R_OK), "writable": os.access(path, os.W_OK), } if path_info["exists"]: if os.path.isfile(path): path_info["type"] = "file" elif os.path.isdir(path): path_info["type"] = "directory" return path_info if detailed else path_info["type"]
[docs]def check_file_overwrite(filename, answer=None, exit_on_deny: bool = True) -> bool: """ Checks the existence of a file. :param filename: filename to check :param answer: answer to the question :param exit_on_deny: exit the program if the answer is no :return: Example: .. code-block:: python # touch sdsd from pawnlib.output import file file.check_file_overwrite(filename="sdsd") # >> File already exists => sdsd # >> Overwrite already existing 'sdsd' file? (y/n) """ if not filename: return False if is_file(filename): color_print.cprint(f"File already exists => {filename}", "green") if answer is None: answer = Confirm.ask(prompt=f"Overwrite already existing '{filename}' file?", default=False) if answer: color_print.cprint(f"Removing the existing file => {filename}", "green") os.remove(filename) else: if exit_on_deny: color_print.cprint("Operation stopped.", "red") sys.exit(1) return False return True
[docs]def get_file_path(filename) -> dict: """ This function is intended to return the file information :param filename: :return: Example: .. code-block:: python from pawnlib.output import file file.get_file_path("sample_file.txt") # >> { dirname: file: sample_file.txt extension: txt filename: sample_file.txt full_path: /examples/sample_file.txt } """ _dirname, _file = os.path.split(filename) _extension = os.path.splitext(filename)[1] _fullpath = get_abs_path(filename) return { "dirname": _dirname, "file": _file, "extension": _extension, "filename": filename, "full_path": _fullpath }
[docs]def get_file_extension(file_path): """ Get the file extension from a file path. :param file_path: The path of the file. :return: The extension of the file. Example: .. code-block:: python utils.get_file_extension("/home/user/document.txt") # >> 'txt' utils.get_file_extension("/home/user/.hiddenfile") # >> '' utils.get_file_extension("/home/user/.hiddenfile.txt") # >> 'txt' """ _, file_extension = os.path.splitext(file_path) return file_extension[1:] # Remove the leading "."
[docs]def get_file_list(path="./", pattern="*", recursive=False): """ Get the list of files in the specified directory that match the given pattern. :param path: The path of the directory to search. Default is current directory. :param pattern: The pattern to match. Default is "*", which matches all files. :param recursive: Whether to search subdirectories recursively. Default is False. :return: A list of file paths that match the pattern. Example: .. code-block:: python get_file_list(directory_path="./", pattern="*.py", recursive=True) # >> ['./main.py', './utils/helper.py'] get_file_list(directory_path="./", pattern="*.txt", recursive=False) # >> ['./README.txt'] """ try: if recursive: search_pattern = os.path.join(path, '**', pattern) else: search_pattern = os.path.join(path, pattern) files = glob.glob(search_pattern, recursive=recursive) return files except OSError: print("Error reading directory '{}'.".format(path)) return None
[docs]def get_parent_path(run_path=__file__) -> str: """ Returns the parent path :param run_path: (str) Path of the file to get the parent path of. :return: (str) Parent path of the given file path. Example: .. code-block:: python get_parent_path(__file__) # >> '/path/to/parent/directory' """ path = os.path.dirname(os.path.abspath(run_path)) parent_path = os.path.abspath(os.path.join(path, "..")) return parent_path
# def get_script_path(run_path=__file__): # """ # Returns the script path. # :param run_path: (str) Path of the file to get the script path of. # :return: (str) Script path of the given file path. # Example: # .. code-block:: python # get_script_path(__file__) # # >> '/path/to/script/directory' # """ # return os.path.dirname(run_path)
[docs]def get_script_path(run_path=None): """ Returns the script directory. :param run_path: (str) Path of the file to get the script directory of. :return: (str) Script directory of the given file path. Example: .. code-block:: python get_script_dir(__file__) # >> '/path/to/script/directory' get_script_dir() # >> '/path/to/script/directory' """ if run_path: return os.path.dirname(run_path) if getattr(sys, 'frozen', False): return os.path.dirname(os.path.abspath(sys.executable)) if sys.argv[0]: return os.path.dirname(os.path.abspath(sys.argv[0])) return os.getcwd()
[docs]def get_real_path(run_path=__file__): """ Returns the real path. :param run_path: (str) Path of the file to get the real path of. :return: (str) Real path of the given file path. Example: .. code-block:: python get_real_path(__file__) # >> '/path/to/real/directory' """ path = os.path.dirname(get_abs_path(run_path)) return path
[docs]def get_abs_path(filename) -> str: """ Returns the absolute path. :param filename: (str) Name of the file to get the absolute path of. :return: (str) Absolute path of the given file name. Example: .. code-block:: python get_abs_path('example.txt') # >> '/path/to/example.txt' """ return os.path.abspath(filename)
[docs]def is_binary_file(filename) -> bool: """ Check if the file is binary. :param filename: (str) Name of the file to check. :return: (bool) True if the file is binary, False otherwise. Example: .. code-block:: python is_binary_file('example.txt') # >> False """ text_chars = bytearray({7, 8, 9, 10, 12, 13, 27} | set(range(0x20, 0x100)) - {0x7f}) fh = open(filename, 'rb').read(100) return bool(fh.translate(None, text_chars))
[docs]def is_file(filename: str) -> bool: """ Check if the file exists. :param filename: (str) Name of the file to check. :return: (bool) True if the file exists, False otherwise. Example: .. code-block:: python is_file('example.txt') # >> True """ if not filename: return False if "*" in filename: if len(glob.glob(filename)) > 0: return True else: return False else: return os.path.exists(os.path.expanduser(filename))
[docs]def is_directory(path): """ Check if the given path is a directory. :param path: The path to check. :return: True if the path is a directory, False otherwise. Example: .. code-block:: python check.is_directory("/home/user") # >> True check.is_directory("/home/user/myfile.txt") # >> False """ return os.path.isdir(path)
[docs]def is_json(json_file: str) -> bool: """ Validate the JSON. :param json_file: (str) Name of the JSON file to validate. :return: (bool) True if the JSON is valid, False otherwise. Example: .. code-block:: python is_json('example.json') # >> True """ try: with open(json_file, 'r', encoding="utf-8-sig") as j: json.loads(j.read()) except ValueError: return False return True
[docs]def is_json_file(json_file: str) -> bool: """ Validate the JSON. :param json_file: (str) Name of the JSON file to validate. :return: (bool) True if the JSON is valid, False otherwise. Example: .. code-block:: python is_json('example.json') # >> True """ try: with open(json_file, 'r', encoding="utf-8-sig") as j: json.loads(j.read()) except ValueError: return False return True
[docs]def open_json(filename: str, encoding="utf-8-sig"): """ Read the JSON file. :param filename: str, the name of the JSON file to be read. :param encoding: str, the encoding to use when opening the file. :return: dict, the contents of the JSON file as a dictionary. Example: .. code-block:: python json_data = open_json("data.json") # >> {'name': 'John', 'age': 30, 'city': 'New York'} """ try: with open(filename, "r", encoding=encoding) as json_file: return json.loads(json_file.read()) except Exception as e: pawn.error_logger.error(f"[ERROR] Can't open the json -> '{filename}' / {e}") if pawn.error_logger else False raise ValueError(f"Error: Failed to parse JSON in file . <'{filename}'>\n{e}")
[docs]def open_file(filename: str, encoding=None): """ Read the file. :param filename: str, the name of the file to be read. :param encoding: str, the encoding to use when opening the file. :return: str, the contents of the file as a string. Example: .. code-block:: python file_contents = open_file("example.txt") # >> 'This is an example file.\\nIt contains some text.\\n' """ try: with open(filename, "r", encoding=encoding) as file_handler: return file_handler.read() except Exception as e: pawn.error_logger.error(f"[ERROR] Can't open the file -> '{filename}' / {e}") if pawn.error_logger else False raise ValueError(f"Error: An error occurred while reading the file. <'{filename}'>\n{e}")
[docs]def open_yaml_file(filename: str, encoding=None): """ Read the YAML file. :param filename: str, the name of the YAML file to be read. :param encoding: str, the encoding to use when opening the file. :return: dict, the contents of the YAML file as a dictionary. Example: .. code-block:: python yaml_data = open_yaml_file("data.yaml") # >> {'name': 'John', 'age': 30, 'city': 'New York'} """ try: with open(filename, 'r', encoding=encoding) as file: yaml_data = yaml.load(file, Loader=yaml.FullLoader) return yaml_data except FileNotFoundError: raise FileNotFoundError(f"Error: The file '{filename}' was not found.") except yaml.YAMLError as e: raise ValueError(f"Error: Failed to parse YAML in file '{filename}'. {e}")
[docs]def write_file(filename: str, data: Any, option: str = 'w', permit: str = '664'): """ Write data to a file. :param filename: The name of the file to write. :param data: The data to write to the file. :param option: The write mode. Default is 'w'. :param permit: The permission of the file. Default is '664'. Example: .. code-block:: python write_file('test.txt', 'Hello, World!') # >> 'Write file -> test.txt, 13' :return: A string indicating the success or failure of the write operation. """ with open(filename, option) as outfile: outfile.write(data) os.chmod(filename, int(permit, base=8)) if os.path.exists(filename): return "Write file -> %s, %s" % (filename, converter.get_size(filename)) else: return "write_file() can not write to file"
[docs]def write_json(filename: str, data: Union[dict, list], option: str = 'w', permit: str = '664', force_write: bool = True, json_default=None): """ Write JSON data to a file. :param filename: The name of the file to write. :param data: The JSON data to write to the file. :param option: The write mode. Default is 'w'. :param permit: The permission of the file. Default is '664'. :param force_write: Whether to force the write operation. Default is True. :param json_default: A function to convert non-serializable objects to a serializable format. Default is None. Example: .. code-block:: python data = {'name': 'John', 'age': 30} write_json('test.json', data) # >> 'Write json file -> test.json, 23' :return: A string indicating the success or failure of the write operation. """ def _json_default(obj): if hasattr(obj, 'to_json'): return obj.to_json() else: return str(obj) if not force_write: _json_default = None if json_default: _json_default = json_default with open(filename, option) as outfile: json.dump(data, outfile, default=_json_default) os.chmod(filename, int(permit, base=8)) if os.path.exists(filename): return "Write json file -> %s, %s" % (filename, converter.get_size(filename)) else: return "write_json() can not write to json"
[docs]def represent_ordereddict(dumper, data): """ Represents an OrderedDict for YAML. :param dumper: A YAML dumper instance. :param data: An OrderedDict instance. Example: .. code-block:: python import yaml from collections import OrderedDict data = OrderedDict() data['key1'] = 'value1' data['key2'] = 'value2' yaml.add_representer(OrderedDict, represent_ordereddict) print(yaml.dump(data)) # >> "key1: value1\\nkey2: value2\\n" """ value = [] node = yaml.nodes.MappingNode("tag:yaml.org,2002:map", value) for key, val in data.items(): node_key = dumper.represent_data(key) node_val = dumper.represent_data(val) value.append((node_key, node_val)) return node
[docs]def write_yaml(filename: str, data: Union[dict, list], option: str = 'w', permit: str = '664'): """ Write YAML data to a file. :param filename: The name of the file to write. :param data: The YAML data to write to the file. :param option: The write mode. Default is 'w'. :param permit: The permission of the file. Default is '664'. Example: .. code-block:: python data = {'name': 'John', 'age': 30} write_yaml('test.yaml', data) # >> 'Write json file -> test.yaml, 23' :return: A string indicating the success or failure of the write operation. """ yaml.add_representer(dict, represent_ordereddict) with open(filename, option) as outfile: yaml.dump(data, outfile) os.chmod(filename, int(permit, base=8)) if os.path.exists(filename): return "Write json file -> %s, %s" % (filename, converter.get_size(filename)) else: return "write_json() can not write to json"