Source code for pawnlib.cli.top

#!/usr/bin/env python3
from pawnlib.builder.generator import generate_banner
from pawnlib.__version__ import __version__ as _version
from pawnlib.config import pawn, pconf
from pawnlib.typing import StackList, list_to_oneline_string, str2bool, shorten_text, get_procfs_path
from pawnlib.resource import (
    SystemMonitor, get_cpu_load, get_interface_ips, get_platform_info,
    get_mem_info, get_netstat_count, get_hostname, ProcessMonitor
)
from pawnlib.models.response import CriticalText
from pawnlib.resource.net import ProcNetMonitor
import os

from rich.layout import Layout
from rich.live import Live
from rich.table import Table
from rich.align import Align
from rich.panel import Panel
from rich import box
from pawnlib.typing import todaydate
from pawnlib.input.prompt import CustomArgumentParser, ColoredHelpFormatter
import time

__description__ = "A simple and powerful tool for monitoring server resources in real time."

__epilog__ = (
    "This tool is a comprehensive solution for monitoring your server's resource usage. \n\n"
    "Features include real-time tracking of network traffic, CPU, memory, and disk usage, \n"
    "making it an indispensable tool for system administrators and DevOps professionals.\n\n"
    "Here are some usage examples to get you started:\n\n"
    "  1. **Basic Monitoring:** Monitor system resources with default settings. \n"
    "     Example: `pawns top`\n\n"
    "  2. **Detailed View:** Use `-v` to increase verbosity and get more detailed logs.\n"
    "     Example: `pawns top -v`\n\n"
    "  3. **Minimal Output:** Use `-q` for quiet mode to suppress standard output.\n"
    "     Example: `pawns top -q`\n\n"
    "  4. **Custom Update Interval:** Adjust the refresh rate with `-i` to set the interval in seconds.\n"
    "     Example: `pawns top -i 5`\n\n"
    "  5. **Output Formats:** Choose between 'live' and 'line' output styles with `-t`.\n"
    "     Example: `pawns top -t live`\n\n"
    "  6. **Network-Specific Monitoring:** Focus solely on network traffic and protocols.\n"
    "     Example: `pawns top net`\n\n"
    "  7. **Advanced Filters:** Use advanced options to filter processes by PID, name, or network protocols.\n"
    "     Example: `pawns top proc --pid-filter 1234 --protocols tcp udp`\n\n"
    "Key options:\n"
    "  --top-n              Specify the number of top processes to display.\n"
    "  --refresh-rate       Set the data refresh rate in seconds.\n"
    "  --unit               Choose the unit for network traffic (e.g., Mbps, Gbps).\n"
    "  --group-by           Group processes by PID or name.\n\n"
    "This flexibility allows you to tailor the tool to your specific needs. \n"
    "For more detailed usage, run `--help` or refer to the documentation."
)

PROCFS_PATH = get_procfs_path()

[docs]def get_parser(): parser = CustomArgumentParser( description='monitor', epilog=__epilog__, formatter_class=ColoredHelpFormatter ) parser = get_arguments(parser) return parser
[docs]def get_arguments(parser): parser.add_argument('command', help='command to execute. For example, "net" retrieves network information.', type=str, nargs='?', default="resource") parser.add_argument('-c', '--config-file', type=str, help='config', default="config.ini") parser.add_argument('-v', '--verbose', action='count', help='verbose mode. view level (default: %(default)s)', default=1) parser.add_argument('-q', '--quiet', action='count', help='Quiet mode. Dont show any messages. (default: %(default)s)', default=0) parser.add_argument('-i', '--interval', type=float, help='interval sleep time seconds. (default: %(default)s)', default=1) parser.add_argument('-b', '--base-dir', type=str, help='base dir for httping (default: %(default)s)', default=os.getcwd()) parser.add_argument('-t', '--print-type', type=str, help='printing type %(default)s)', default="line", choices=["live", "layout", "line"]) parser.add_argument( '--top-n', type=int, default=10, help="The number of top processes to display in the table. Default: %(default)s." ) # parser.add_argument( # '--refresh-rate', type=int, default=2, # help="The data refresh rate in seconds. Default: %(default)s." # ) parser.add_argument( '--group-by', type=str, default="pid", choices=["pid", "name"], help="Criteria for grouping processes (e.g., 'pid' or 'name'). Default: %(default)s." ) parser.add_argument( '--unit', type=str, default="Mbps", choices=['bps', 'Kbps', 'Mbps', 'Gbps', 'Tbps', 'Pbps'], help="Unit for displaying network traffic (e.g., 'bps', 'Kbps', 'Mbps', 'Gbps', 'Tbps', 'Pbps'). Default: %(default)s." ) parser.add_argument( '--protocols', nargs='+', default=["tcp", "udp"], help="List of protocols to monitor. Defaults to %(default)s." ) parser.add_argument( '--pid-filter', type=int, nargs='*', help="Filter processes by specific process IDs." ) parser.add_argument( '--proc-filter', type=str, nargs='*', help="Filter processes by name." ) parser.add_argument( '--min-bytes-threshold', type=int, default=0, help="Minimum bytes threshold for displaying processes. Default: %(default)s." ) parser.add_argument( '--callback', type=str, help="Path to a user-defined Python script to execute when data is updated." ) return parser
[docs]def main(): app_name = 'monitor' parser = get_parser() args, unknown = parser.parse_known_args() config_file = args.config_file is_hide_line_number = args.verbose > 2 stdout = not args.quiet pawn.set( PAWN_CONFIG_FILE=config_file, PAWN_PATH=args.base_dir, PAWN_CONSOLE=dict( redirect=True, record=True, log_path=is_hide_line_number, # hide line number on the right side ), app_name=app_name, args=args, try_pass=False, last_execute_point=0, data={ "response_time": StackList(), }, fail_count=0, total_count=0, ) if args.verbose > 2: pawn.set( PAWN_LOGGER=dict( log_level="DEBUG", stdout_level="DEBUG", # log_path=f"{args.base_dir}/logs", stdout=stdout, use_hook_exception=True, show_path=False, #hide line numbers ), ) print_banner() system_info = get_platform_info() hostname = shorten_text(get_hostname(), width=20, placeholder='...') system_monitor = SystemMonitor(interval=args.interval, proc_path=PROCFS_PATH) table_title = f"🐰 {hostname} <{system_info.get('model')}, {system_info.get('cores')} cores, {get_mem_info().get('mem_total')} GB> 🐰" if args.command == "proc_net": net_mon = ProcNetMonitor( top_n=args.top_n, refresh_rate=args.interval, group_by=args.group_by, unit=args.unit, protocols=args.protocols, pid_filter=args.pid_filter, proc_filter=args.proc_filter, min_bytes_threshold=args.min_bytes_threshold, ) net_mon.run_live() elif args.command == "proc": proc_mon = ProcessMonitor( n=args.top_n, ) proc_mon.run_live() elif args.print_type == "live": print_rich_live_type_status(table_title=table_title, system_info=system_info, system_monitor=system_monitor) # elif args.print_type == "tab": # print_tabulate_status(system_monitor=system_monitor) elif args.print_type == "line": print_simple_line_type_status(table_title=table_title, system_info=system_info, system_monitor=system_monitor, args=args) elif args.print_type == "layout": print_rich_layout_type_status(table_title=table_title, system_info=system_info, system_monitor=system_monitor, args=args)
[docs]def get_resources_status(system_monitor: SystemMonitor = None, args=None): if not args: args = pconf().args if args.command == "net": netstat = get_netstat_count(proc_path=PROCFS_PATH) data = { "time": todaydate("time_sec"), } data.update(netstat.get('COUNT')) time.sleep(args.interval) elif args.command == "mem": memory = system_monitor.get_memory_status() memory_unit = memory.get('unit') data = { "time": todaydate("time_sec"), # "total": f"{memory.get('total'):.1f}{memory_unit}", "used": f"{memory.get('used'):.1f}{memory_unit}", "free": f"{memory.get('free'):.1f}{memory_unit}", # "available": f"{memory.get('available'):.1f}{memory_unit}", "cached": f"{memory.get('cached'):.1f}{memory_unit}", "buff": f"{memory.get('buffers'):.1f}{memory_unit}", "use_%": f"{memory.get('percent'):.1f}%", "swp_tot": f"{memory.get('swap_total'):.1f}{memory_unit}", "swp_use": f"{memory.get('swap_used'):.1f}{memory_unit}", "swp_free": f"{memory.get('swap_free'):.1f}{memory_unit}", "swp_%": f"{memory.get('swap_percent'):.1f}%", # "memory_trend": f"{memory.get('memory_trend')}", } pressure = memory.get('pressure', {}) if pressure: data.update({ "pressure_some_10": f"{pressure.get('some_avg10', 0):.2f}", "pressure_some_60": f"{pressure.get('some_avg60', 0):.2f}", "pressure_full_10": f"{pressure.get('full_avg10', 0):.2f}", "pressure_full_60": f"{pressure.get('full_avg60', 0):.2f}", }) # Huge Pages 정보 추가 huge_pages = memory.get('huge_pages', {}) if huge_pages: data.update({ # "huge_total": str(huge_pages.get('HugePages_Total', 0)), # "huge_free": str(huge_pages.get('HugePages_Free', 0)), # "huge_rsvd": str(huge_pages.get('HugePages_Rsvd', 0)), # "huge_surp": str(huge_pages.get('HugePages_Surp', 0)), "huge_size": f"{huge_pages.get('Hugepagesize', 0) / 1024:.0f}MB", }) time.sleep(args.interval) elif args.command == "top_mem": top_processes = system_monitor.mem_status.get_top_memory_processes(n=5) data = { "time": todaydate("time_sec"), } for i, proc in enumerate(top_processes, 1): data[f"proc_mem_{i}"] = f"{proc['name']}({proc['pid']}): {proc['memory_percent']:.2f}%" time.sleep(args.interval) else: memory = system_monitor.get_memory_status() network, cpu, disk = system_monitor.collect_system_status() memory_unit = memory.get('unit') data = { "time": todaydate("time_sec"), "net_in": f"{network['Total'].get('recv'):.2f}M", "net_out": f"{network['Total'].get('sent'):.2f}M", "pk_in": f"{network['Total'].get('packets_recv')}", "pk_out": f"{network['Total'].get('packets_sent')}", "load": f"{get_cpu_load()['1min']}", "usr": f"{cpu.get('usr')}%", "sys": f"{cpu.get('sys')}%", "i/o": f"{cpu.get('io_wait'):.2f}", "disk_rd": f"{disk['Total'].get('read_mb')}M", "disk_wr": f"{disk['Total'].get('write_mb')}M", # "mem_total": f"{memory.get('total'):.1f}{memory_unit}", # "mem_free": f"{memory.get('free'):.1f}{memory_unit}", # "cached": f"{memory.get('cached'):.1f}{memory_unit}", "mem_%": f"{memory.get('percent'):.1f}%", } return data
main.__doc__ = ( f"{__description__} \n" f"{__epilog__}" ) if __name__ == '__main__': try: main() except Exception as e: pawn.console.log(e)