import re
from pawnlib.config.globalconfig import pawnlib_config as pawn
import socket
import time
import asyncio
import requests
from typing import Dict, Optional, Union
from concurrent.futures import ThreadPoolExecutor
from timeit import default_timer
from pawnlib.utils import http, timing
from pawnlib.typing import is_valid_ipv4, todaydate, shorten_text, format_network_traffic, format_size
from pawnlib.output import PrintRichTable
from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn
from rich.table import Table
from rich.live import Live
import signal
import threading
import queue
# try:
# from bcc import BPF
# except ImportError:
# BPF = None
try:
from bcc import BPF
BCC_AVAILABLE = True
except Exception as e:
BPF = None
BCC_AVAILABLE = False
# 필요하면 로그
try:
from typing import Literal, Tuple, List
except ImportError:
from typing_extensions import Literal, Tuple, List
_PUBLIC_IP_CACHE_KEY = "CACHED_PUBLIC_IP"
_DEFAULT_IP_SERVICES: List[str] = [
"http://checkip.amazonaws.com",
"https://api.ipify.org",
"https://ifconfig.me/ip",
]
prev_getaddrinfo = socket.getaddrinfo
[docs]class ProcNetMonitor:
"""
ProcNetMonitor monitors network usage of processes using eBPF.
:param top_n: The top N processes to display in the table.
:param refresh_rate: The data refresh rate (refreshes per second).
:param group_by: How to group processes ("pid" or "name").
:param unit: The unit for displaying rates (e.g., "Mbps").
:param protocols: List of protocols to monitor. Defaults to ["tcp", "udp"].
:param pid_filter: List of PIDs to monitor. Only these PIDs will be tracked.
:param proc_filter: List of process names to monitor. Only these processes will be tracked.
:param min_bytes_threshold: Minimum number of bytes to consider for monitoring.
:param callback: A user-defined function to be called when data is updated.
:param exit_signal: A string used as the termination signal. When this string is detected,
the monitoring process will stop and the program will exit. Default is "EXIT".
Example:
.. code-block:: python
# Example 1: Monitor top 5 processes with TCP and UDP protocols
def handle_update(data):
for group, info in data.items():
print(f"Group: {group}, Sent: {info['bytes_sent']} bytes, Recv: {info['bytes_recv']} bytes")
monitor = ProcNetMonitor(top_n=5, protocols=["tcp", "udp"], callback=handle_update)
monitor.run()
# Example 2: Monitor specific PIDs
def handle_specific_pids(data):
for pid, info in data.items():
print(f"PID: {pid}, Sent: {info['bytes_sent']} bytes, Recv: {info['bytes_recv']} bytes")
monitor = ProcNetMonitor(pid_filter=[1234, 5678], callback=handle_specific_pids)
monitor.run()
# Example 3: Monitor specific process names
def handle_specific_procs(data):
for proc, info in data.items():
print(f"Process: {proc}, Sent: {info['bytes_sent']} bytes, Recv: {info['bytes_recv']} bytes")
monitor = ProcNetMonitor(proc_filter=["python", "nginx"], callback=handle_specific_procs)
monitor.run()
# Example 4: Set a custom refresh rate and exit signal
def handle_exit(data):
print("Received data, checking exit condition...")
if some_condition:
return 'EXIT'
monitor = ProcNetMonitor(refresh_rate=1, exit_signal="EXIT", callback=handle_exit)
monitor.run()
"""
EVENT_TCP_SEND = 1
EVENT_TCP_RECV = 2
EVENT_UDP_SEND = 3
EVENT_UDP_RECV = 4
[docs] def __init__(self, top_n=10, refresh_rate=2, group_by="pid", unit="Mbps",
protocols=None, pid_filter=None, proc_filter=None, min_bytes_threshold=0, callback=None,
exit_signal="EXIT"
):
"""
Initialize the ProcNetMonitor class.
:param top_n: The top N processes to display in the table.
:param refresh_rate: The data refresh rate (refreshes per second).
:param group_by: How to group processes ("pid" or "name").
:param unit: The unit for displaying rates (e.g., "Mbps").
:param protocols: List of protocols to monitor. Defaults to ["tcp", "udp"].
:param pid_filter: List of PIDs to monitor. Only these PIDs will be tracked.
:param proc_filter: List of process names to monitor. Only these processes will be tracked.
:param min_bytes_threshold: Minimum number of bytes to consider for monitoring.
:param callback: A user-defined function to be called when data is updated.
:param exit_signal: A string used as the termination signal. When this string is detected,
the monitoring process will stop and the program will exit. Default is "EXIT".
"""
if not BPF:
raise ImportError("'bcc' module is required but not found.")
self.top_n = top_n
self.refresh_rate = refresh_rate
self.group_by = group_by
self.unit = unit
self.protocols = protocols or ["tcp", "udp"]
self.pid_filter = pid_filter # Monitor specific PIDs only
self.proc_filter = proc_filter # Monitor specific process names only
self.min_bytes_threshold = min_bytes_threshold # Minimum bytes threshold
self.callback = callback # User-defined callback function
self.exit_signal = exit_signal
self.is_running = True
self.process_network = {}
self.previous_counts = {}
self.last_update_time = time.time()
self.bpf = None
self.console = pawn.console
self.exit_event = threading.Event() # Event to signal exit
self.callback_queue = queue.Queue() # Queue for callback functions
# Initialize eBPF
self._initialize_bcc()
self.setup_signal_handlers()
[docs] def setup_signal_handlers(self):
"""
Set up signal handlers to perform cleanup on script termination.
"""
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
def _initialize_bcc(self):
"""
Initialize the eBPF program.
"""
try:
self.bpf = BPF(text=self._get_bpf_program())
if "tcp" in self.protocols:
self.bpf.attach_kprobe(event="tcp_sendmsg", fn_name="trace_tcp_send")
self.bpf.attach_kprobe(event="tcp_recvmsg", fn_name="trace_tcp_recv")
if "udp" in self.protocols:
self.bpf.attach_kprobe(event="udp_sendmsg", fn_name="trace_udp_send")
self.bpf.attach_kprobe(event="udp_recvmsg", fn_name="trace_udp_recv")
self.bpf["events"].open_perf_buffer(self._handle_event)
except ImportError as e:
self.console.print(f"[Error] bcc module not found: {e}", style="bold red")
self.bpf = None
except Exception as e:
self.console.print(f"[Error] Failed to initialize BPF: {e}", style="bold red")
self.bpf = None
def _get_bpf_program(self):
"""
Return the eBPF program.
"""
return """
#include <uapi/linux/ptrace.h>
#include <linux/sched.h>
struct sock {};
struct msghdr {};
#define EVENT_TCP_SEND 1
#define EVENT_TCP_RECV 2
#define EVENT_UDP_SEND 3
#define EVENT_UDP_RECV 4
struct net_data_t {
u32 pid;
u64 bytes;
char comm[TASK_COMM_LEN];
u32 event_type; // Event type: 1 (TCP Send), 2 (TCP Recv), etc.
};
BPF_PERF_OUTPUT(events);
int trace_tcp_send(struct pt_regs *ctx, struct sock *sk, struct msghdr *msg, size_t size) {
struct net_data_t data = {};
data.pid = bpf_get_current_pid_tgid() >> 32;
data.bytes = size;
data.event_type = EVENT_TCP_SEND;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
events.perf_submit(ctx, &data, sizeof(data));
return 0;
}
int trace_tcp_recv(struct pt_regs *ctx, struct sock *sk, struct msghdr *msg, size_t size) {
struct net_data_t data = {};
data.pid = bpf_get_current_pid_tgid() >> 32;
data.bytes = size;
data.event_type = EVENT_TCP_RECV;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
events.perf_submit(ctx, &data, sizeof(data));
return 0;
}
int trace_udp_send(struct pt_regs *ctx, struct sock *sk, struct msghdr *msg, size_t size) {
struct net_data_t data = {};
data.pid = bpf_get_current_pid_tgid() >> 32;
data.bytes = size;
data.event_type = EVENT_UDP_SEND;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
events.perf_submit(ctx, &data, sizeof(data));
return 0;
}
int trace_udp_recv(struct pt_regs *ctx, struct sock *sk, struct msghdr *msg, size_t size) {
struct net_data_t data = {};
data.pid = bpf_get_current_pid_tgid() >> 32;
data.bytes = size;
data.event_type = EVENT_UDP_RECV;
bpf_get_current_comm(&data.comm, sizeof(data.comm));
events.perf_submit(ctx, &data, sizeof(data));
return 0;
}
"""
[docs] @staticmethod
def get_process_cmdline(pid):
"""
Get the command-line arguments of a process by PID.
"""
try:
with open(f"/proc/{pid}/cmdline", "r") as f:
cmdline = f.read().replace("\x00", " ").strip()
return cmdline
except Exception as e:
return f"<Error reading cmdline: {e}>"
def _handle_event(self, cpu, data, size):
"""
Handle eBPF events.
"""
event = self.bpf["events"].event(data)
bytes_count = event.bytes
event_type = event.event_type
proc_name = event.comm.decode().strip()
if self.group_by == "name":
group_by = proc_name
elif self.group_by == "pid":
group_by = event.pid
else:
group_by = event.pid
# cmdline = self.get_process_cmdline(pid)
if self.pid_filter and event.pid not in self.pid_filter:
return
if self.proc_filter and proc_name not in self.proc_filter:
return
if bytes_count < self.min_bytes_threshold:
return
if group_by not in self.process_network:
self.process_network[group_by] = {
'pid': event.pid,
'comm': event.comm.decode().strip(),
# 'cmdline': cmdline,
'bytes_sent': 0,
'bytes_recv': 0,
'tcp_sent': 0,
'tcp_recv': 0,
'tcp_sent_rate': 0.0,
'tcp_recv_rate': 0.0,
'udp_sent': 0,
'udp_recv': 0,
'udp_sent_rate': 0.0,
'udp_recv_rate': 0.0,
'send_rate': 0.0,
'recv_rate': 0.0,
}
if event_type == self.EVENT_TCP_SEND:
self.process_network[group_by]['tcp_sent'] += bytes_count
elif event_type == self.EVENT_TCP_RECV:
self.process_network[group_by]['tcp_recv'] += bytes_count
elif event_type == self.EVENT_UDP_SEND:
self.process_network[group_by]['udp_sent'] += bytes_count
elif event_type == self.EVENT_UDP_RECV:
self.process_network[group_by]['udp_recv'] += bytes_count
# Bytes sent/received for rate calculations
if event_type in {self.EVENT_TCP_SEND, self.EVENT_UDP_SEND}:
self.process_network[group_by]['bytes_sent'] += bytes_count
elif event_type in {self.EVENT_TCP_RECV, self.EVENT_UDP_RECV}:
self.process_network[group_by]['bytes_recv'] += bytes_count
if self.callback:
# result = self.callback(self.process_network)
# if result == "EXIT":
# self.exit_event.set()
self.callback_queue.put(self.process_network.copy())
def _update_rates(self):
"""
Update transmission rates for each process and protocol.
Add average rates based on historical data for all protocols.
"""
current_time = time.time()
# Initialize history buffer for rate calculations
if not hasattr(self, 'rate_history'):
self.rate_history = {
pid: {
proto: {'sent_rate': [], 'recv_rate': []} for proto in self.protocols
}
for pid in self.process_network
}
for pid, info in self.process_network.items():
if pid not in self.previous_counts:
# Initialize `previous_counts` and `rate_history` for a new PID
self.previous_counts[pid] = {
'time': current_time,
'bytes_sent': info['bytes_sent'],
'bytes_recv': info['bytes_recv'],
**{f'{proto}_sent': info.get(f'{proto}_sent', 0) for proto in self.protocols},
**{f'{proto}_recv': info.get(f'{proto}_recv', 0) for proto in self.protocols},
}
self.rate_history[pid] = {
proto: {'sent_rate': [], 'recv_rate': []} for proto in self.protocols
}
continue
prev = self.previous_counts[pid]
time_diff = current_time - prev['time']
if time_diff > 0:
info['send_rate'] = (info['bytes_sent'] - prev['bytes_sent']) / time_diff
info['recv_rate'] = (info['bytes_recv'] - prev['bytes_recv']) / time_diff
# Calculate current protocol-specific rates and update history
for proto in self.protocols:
sent_key, recv_key = f'{proto}_sent', f'{proto}_recv'
info[f'{proto}_sent_rate'] = (info.get(sent_key, 0) - prev.get(sent_key, 0)) / time_diff
info[f'{proto}_recv_rate'] = (info.get(recv_key, 0) - prev.get(recv_key, 0)) / time_diff
# Update rate history for the protocol
self.rate_history[pid][proto]['sent_rate'].append(info[f'{proto}_sent_rate'])
self.rate_history[pid][proto]['recv_rate'].append(info[f'{proto}_recv_rate'])
# Limit history size to avoid excessive memory usage
max_history_size = 200
if len(self.rate_history[pid][proto]['sent_rate']) > max_history_size:
self.rate_history[pid][proto]['sent_rate'].pop(0)
if len(self.rate_history[pid][proto]['recv_rate']) > max_history_size:
self.rate_history[pid][proto]['recv_rate'].pop(0)
# Calculate average rates for the protocol
info[f'{proto}_avg_sent_rate'] = (
sum(self.rate_history[pid][proto]['sent_rate']) / len(self.rate_history[pid][proto]['sent_rate'])
)
info[f'{proto}_avg_recv_rate'] = (
sum(self.rate_history[pid][proto]['recv_rate']) / len(self.rate_history[pid][proto]['recv_rate'])
)
self.previous_counts[pid] = {
'time': current_time,
'bytes_sent': info['bytes_sent'],
'bytes_recv': info['bytes_recv'],
**{f'{proto}_sent': info.get(f'{proto}_sent', 0) for proto in self.protocols},
**{f'{proto}_recv': info.get(f'{proto}_recv', 0) for proto in self.protocols},
}
self.last_update_time = current_time
[docs] def generate_title(self):
"""
Dynamically generate the title based on initialized parameters.
Returns:
str: Generated title.
"""
title = f"Process Network Usage ({self.unit} Sent/Recv)"
title += f", TopN: {self.top_n}"
title += f", RefreshRate: {self.refresh_rate}s"
title += f", GroupBy: {self.group_by}"
if self.pid_filter:
title += f", PIDFilter: {', '.join(map(str, self.pid_filter))}"
if self.proc_filter:
title += f", ProcFilter: {', '.join(map(str, self.proc_filter))}"
if self.min_bytes_threshold:
title += f", MinBytesThreshold: {self.min_bytes_threshold} bytes"
title += f", Protocols: {', '.join(self.protocols)}"
return title
def _generate_table(self):
"""
Generate a Rich table displaying process network usage for specified protocols.
:return: Rich Table object.
"""
# table = Table(title=f"Process Network Usage ({self.unit} Sent/Recv), GroupBy: {self.group_by}", expand=True)
table = Table(title=self.generate_title(), expand=True)
if self.group_by == "pid":
table.add_column(f"PID", justify="right", style="cyan")
table.add_column("Name", style="green")
table.add_column("Total Sent", justify="right", style="magenta")
table.add_column("Total Recv", justify="right", style="magenta")
# Add protocol-specific columns dynamically
for proto in self.protocols:
table.add_column(f"{proto.upper()} Sent(AVG)", justify="right", style="yellow")
table.add_column(f"{proto.upper()} Recv(AVG)", justify="right", style="yellow")
# Sort processes by the sum of all protocol rates
sorted_pids = sorted(
self.process_network.keys(),
key=lambda pid: sum(
self.process_network[pid].get(f"{proto}_sent", 0) + self.process_network[pid].get(f"{proto}_recv", 0)
for proto in self.protocols
),
reverse=True
)[:self.top_n]
for pid in sorted_pids:
info = self.process_network[pid]
# Basic process details
row = [
# str(pid),
f"{info['comm']}",
f"{format_size(info['bytes_sent'])}",
f"{format_size(info['bytes_recv'])}",
]
if self.group_by == "pid":
row.insert(0, str(pid))
# Add protocol-specific details
for proto in self.protocols:
sent_rate = format_network_traffic(info.get(f'{proto}_sent_rate', 0), unit=self.unit)
sent_avg_rate = format_network_traffic(info.get(f'{proto}_avg_sent_rate', 0), unit=self.unit, show_unit=False)
recv_rate = format_network_traffic(info.get(f'{proto}_recv_rate', 0), unit=self.unit)
avg_recv_rate = format_network_traffic(info.get(f'{proto}_avg_recv_rate', 0), unit=self.unit, show_unit=False)
row.append(f"{sent_rate} [dim]{sent_avg_rate}[/dim]")
row.append(f"{recv_rate} [dim]{avg_recv_rate}[/dim]")
table.add_row(*row)
return table
def _process_callbacks(self):
"""
Process the callback queue.
"""
while not self.callback_queue.empty():
data = self.callback_queue.get()
result = self.callback(data)
if self.exit_signal and result == self.exit_signal:
self.is_running = False
self.exit_event.set()
break
# def run(self):
# """
# Run the NetworkMonitor (utilize data via callbacks or external logic).
# """
# if not self.bpf:
# self.console.print("[Error] BPF not initialized. Exiting.", style="bold red")
# return
#
# self.console.print("Starting NetworkMonitor...", style="bold green")
# try:
# # while not self.exit_event.is_set():
# while self.is_running:
#
# self.bpf.perf_buffer_poll(timeout=1000)
# self._update_rates()
# self._process_callbacks()
# time.sleep(1 / self.refresh_rate)
#
# except SystemExit:
# self.console.print("[bold yellow]Exiting NetworkMonitor...[/bold yellow]")
# # self.is_running = False
# except KeyboardInterrupt:
# self.console.print("Stopping NetworkMonitor.", style="bold yellow")
#
# finally:
# self.is_running = False
# self.console.print("[bold red]NetworkMonitor stopped.[/bold red]")
[docs] def run(self):
"""
ProcNetMonitor를 실행합니다. `is_running`이 `True`인 동안 루프를 계속합니다.
"""
if not self.bpf:
self.console.print("[Error] BPF not initialized. Exiting.", style="bold red")
return
self.console.print("Starting NetworkMonitor...", style="bold green")
try:
while self.is_running:
self.bpf.perf_buffer_poll(timeout=1000)
self._update_rates()
self._process_callbacks()
time.sleep(1 / self.refresh_rate)
except SystemExit:
self.console.print("[bold yellow]Exiting NetworkMonitor...[/bold yellow]")
except KeyboardInterrupt:
self.console.print("Stopping NetworkMonitor.", style="bold yellow")
finally:
self.is_running = False
self.console.print("[bold red]NetworkMonitor stopped.[/bold red]")
[docs] def run_live(self):
"""
Display real-time tables using Rich Live.
"""
if not self.bpf:
self.console.print("[Error] BPF not initialized. Exiting.", style="bold red")
return
with Live(self._generate_table(), refresh_per_second=self.refresh_rate, console=self.console) as live:
try:
while self.is_running:
self.bpf.perf_buffer_poll(timeout=1000)
self._update_rates()
self._process_callbacks()
live.update(self._generate_table())
time.sleep(1 / self.refresh_rate)
except KeyboardInterrupt:
self.console.print("Stopping NetworkMonitor.", style="bold yellow")
[docs] def update_data(self):
"""
Poll eBPF events and update the process_network data.
"""
if not self.bpf:
raise RuntimeError("BPF is not initialized.")
# eBPF 이벤트 폴링
self.bpf.perf_buffer_poll(timeout=1000)
# 네트워크 전송/수신 속도 갱신
self._update_rates()
[docs] def get_latest_network_data(self, top_n=None):
"""
Return the latest network usage data, optionally limited to the top N processes.
Args:
top_n (int): Number of top processes to return.
Returns:
List[Dict[str, Any]]: List of process network usage dictionaries.
"""
self.update_data()
return self.get_top_n(top_n or self.top_n)
[docs] def get_top_n(self, n: Optional[int] = None) -> List[Dict[str, Union[int, str, float]]]:
"""
Retrieve the top N processes by network usage.
:param n: Number of top processes to retrieve. Defaults to self.top_n.
:return: List of top N processes sorted by (bytes_sent + bytes_recv).
"""
n = n or self.top_n
with threading.Lock():
sorted_pids = sorted(
self.process_network.keys(),
key=lambda pid: self.process_network[pid]['bytes_sent'] + self.process_network[pid]['bytes_recv'],
reverse=True
)
top_pids = sorted_pids[:n]
top_processes = [self.process_network[pid] for pid in top_pids]
return top_processes
def _handle_signal(self, sig, frame):
"""
Handle signals for cleanup.
"""
self.console.print(f"\nExiting... signal={sig}, frame={frame}", style="bold red")
self.is_running = False
self.exit_event.set()
[docs]class OverrideDNS:
"""
Change the Domain Name using socket
Example:
.. code-block:: python
from pawnlib.resource import net
net.OverrideDNS(domain=domain, ipaddr=ipaddr).set()
"""
_dns_cache = {}
def __init__(self, domain="", ipaddr="", port=80):
self._dns_cache[domain] = ipaddr
self.prv_getaddrinfo = prev_getaddrinfo
[docs] def new_getaddrinfo(self, *args):
if args[0] in self._dns_cache:
if pawn.verbose:
print("Forcing FQDN: {} to IP: {}".format(args[0], self._dns_cache[args[0]]))
return self.prv_getaddrinfo(self._dns_cache[args[0]], *args[1:])
else:
return self.prv_getaddrinfo(*args)
[docs] def set(self):
socket.getaddrinfo = self.new_getaddrinfo
[docs] def unset(self):
socket.getaddrinfo = self.prv_getaddrinfo
def _normalize_ip(text: str) -> str:
"""공백/개행을 제거하고 깔끔한 문자열로 변환."""
return text.strip()
def _is_valid_ip(ip: str) -> bool:
"""IPv4/IPv6 모두 허용 (기존 is_valid_ipv4가 있다면 대체/조합)."""
# 기존 is_valid_ipv4 를 재사용하고 싶으면:
# if is_valid_ipv4(ip):
# return True
import ipaddress
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
[docs]def get_public_ip(
use_cache: bool = False,
*,
services: Optional[List[str]] = None,
timeout: float = 2.0,
connect_timeout: Optional[float] = None,
raise_on_error: bool = False,
) -> str:
"""
Returns the public IP address of the current machine.
:param use_cache: Whether to use and update the cached public IP
:param services: List of public IP services to query in order
:param timeout: Response timeout (seconds)
:param connect_timeout: Connection timeout (seconds). If None, use `timeout`.
:param raise_on_error: If True, re-raise the last exception instead of returning an empty string
:return: The public IP address, or empty string on failure (if raise_on_error=False)
Example::
from pawnlib.resource import net
net.get_public_ip()
net.get_public_ip(use_cache=True)
"""
services = services or _DEFAULT_IP_SERVICES
connect_timeout = connect_timeout if connect_timeout is not None else timeout
# 캐시 사용
if use_cache:
cached_ip = pawn.get(_PUBLIC_IP_CACHE_KEY)
if cached_ip and _is_valid_ip(cached_ip):
return cached_ip
last_exception: Optional[Exception] = None
for service in services:
try:
response = http.jequest(
service,
timeout=timeout,
connect_timeout=connect_timeout,
)
raw_text = response.get("text", "")
public_ip = _normalize_ip(raw_text)
if _is_valid_ip(public_ip):
if use_cache:
pawn.set(**{_PUBLIC_IP_CACHE_KEY: public_ip})
return public_ip
msg = (
f"Invalid IP address received from '{service}' - '{public_ip}'"
)
pawn.error_logger.error(msg)
pawn.console.debug(msg)
except Exception as e:
last_exception = e
pawn.error_logger.error(
f"An error occurred while fetching Public IP address from '{service}' - {e}"
)
pawn.console.debug(
f"An error occurred while fetching Public IP address from '{service}' - {e}"
)
if raise_on_error and last_exception is not None:
raise last_exception
return ""
[docs]class FindFastestRegion:
def __init__(self, verbose=True, aws_regions=None):
self.results = []
self.verbose = verbose
if aws_regions:
self.aws_regions = aws_regions
else:
self.aws_regions = {
"Seoul": "ap-northeast-2",
"Tokyo": "ap-northeast-1",
"Virginia": "us-east-1",
"Hongkong": "ap-east-1",
"Singapore": "ap-southeast-1",
"Mumbai": "ap-south-1",
"Frankfurt": "eu-central-1",
"Ohio": "us-east-2",
"California": "us-west-1",
"US-West": "us-west-2",
"Ceentral":"ca-central-1",
"Ireland": "eu-west-1",
"London": "eu-west-2",
"Sydney": "ap-southeast-2",
"São Paulo": "sa-east-1",
"Beijing": "cn-north-1",
}
[docs] def run(self):
self.results = []
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(self.find_fastest_region())
loop.run_until_complete(future)
self.sorted_results()
return self.results
[docs] async def find_fastest_region(self):
tasks = []
with ThreadPoolExecutor(max_workers=10) as executor:
loop = asyncio.get_event_loop()
for region_name, region_code in self.aws_regions.items():
url = f'https://s3.{region_code}.amazonaws.com/ping?x=%s' % todaydate("ms")
tasks.append(loop.run_in_executor(executor, self.get_time, *(url, region_name)))
await asyncio.gather(*tasks)
[docs] def get_time(self, url, name="NULL"):
start_time = default_timer()
try:
response = requests.get(f'{url}', timeout=3)
response_text = response.text
response_time = round(response.elapsed.total_seconds(), 3)
status_code = response.status_code
except:
response_time = None
response_text = None
status_code = 999
elapsed = round(default_timer() - start_time, 3)
data = {
"region": name,
"time": response_time,
"run_time": elapsed,
"url": shorten_text(url, 50),
# "text": response_text,
"status_code": status_code
}
if data.get('time') and data.get("run_time") and data.get("status_code") == 200:
self.results.append(data)
if self.verbose:
print(data)
return data
[docs] def sorted_results(self, key="run_time"):
self.results = sorted(self.results, key=(lambda x: x.get(key)), reverse=False)
[docs] def print_results(self):
PrintRichTable(title="fast_region", data=self.results)
pawn.console.log(f"Fastest Region={self.results[0]['region']}, time={self.results[0]['run_time']} sec")
[docs]class AsyncPortScanner:
"""
Asynchronous Port Scanner class.
:param ip_range: Tuple of start and end IP addresses to scan.
:param port_range: Tuple of start and end ports to scan. Default is all ports (0, 65535).
:param max_concurrency: Maximum number of concurrent scans. Default is 30.
Example:
.. code-block:: python
scanner = AsyncPortScanner(("192.168.0.1", "192.168.0.255"), (1, 1024), 50)
asyncio.run(scanner.scan_all())
"""
def __init__(self, ip_range: Tuple[str, str], port_range: Tuple[int, int] = (0, 65535),
max_concurrency: int = 30, timeout=1, ping_timeout=0.05, fast_scan_ports: List[int] = [22, 80, 443], batch_size=50000):
self.start_ip, self.end_ip = ip_range
self.start_port, self.end_port = port_range
self.scan_results = {}
self.semaphore = asyncio.Semaphore(max_concurrency)
self.timeout = timeout
self.ping_timeout = ping_timeout
self.fast_scan_ports = fast_scan_ports
self.batch_size = batch_size
[docs] async def ping_host(self, ip: str) -> bool:
common_ports = [22, 80, 443]
for port in common_ports:
try:
await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=self.ping_timeout)
return ip # 연결 성공, 호스트가 살아 있음
except Exception:
continue # 해당 포트에 대한 연결 실패, 다음 포트 시도
return False # 모든 시도 실패, 호스트가 닫혀 있음
[docs] async def try_ping_host(self, ip: str, progress: Progress, task_id: int):
progress.advance(task_id)
for port in self.fast_scan_ports:
try:
await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=self.timeout)
if progress is not None and task_id is not None:
progress.advance(task_id) # 성공적으로 핑을 완료하면 진행 상황을 업데이트합니다.
return ip
except (asyncio.TimeoutError, Exception):
continue # 해당 포트에서 연결 실패, 다음 포트로 계속 시도합니다.
return False
[docs] async def scan_all(self, fast_scan: bool = False):
if fast_scan:
tasks = [self.check_and_scan_host(ip) for ip in self._generate_ips()]
else:
tasks = [
self.wrap_scan(ip, port)
for ip in self._generate_ips()
for port in range(self.start_port, self.end_port + 1)
]
await asyncio.gather(*tasks)
[docs] async def check_and_scan_host(self, ip):
if await self.ping_host(ip):
print(f"{ip} is up, scanning ports...")
tasks = [self.wrap_scan(ip, port) for port in range(self.start_port, self.end_port + 1)]
await asyncio.gather(*tasks)
else:
print(f"{ip} is down, skipping...")
[docs] async def scan_port(self, ip: str, port: int) ->(str, int, bool):
async with (self.semaphore):
pawn.console.debug(f"Scanning {ip}:{port} - Acquired semaphore, timeout={self.timeout}")
try:
await asyncio.wait_for(asyncio.open_connection(ip, port), timeout=self.timeout)
pawn.console.debug(f"Connection successful: {ip}:{port}")
return ip, port, True
except asyncio.TimeoutError:
pawn.console.debug(f"Timeout: {ip}:{port}")
return ip, port, False
except Exception as e:
if "Too many" in str(e):
pawn.console.log(f"Error scanning -> [red]{e}[/red]")
else:
pawn.console.debug(f"Error scanning {ip}:{port} - {e}")
return ip, port, False
# finally:
# pawn.console.log(f"Releasing semaphore: {ip}:{port}")
[docs] def calculate_scan_range(self):
start_ip_int = self.ip_to_int(self.start_ip)
end_ip_int = self.ip_to_int(self.end_ip)
total_ips = end_ip_int - start_ip_int + 1
total_ports = self.end_port - self.start_port + 1
total_tasks = total_ips * total_ports
return start_ip_int, end_ip_int, total_tasks
[docs] async def scan(self, fast_scan: bool = False, progress: Progress = None):
tasks = []
ips_to_scan = await self.get_ips_to_scan(fast_scan, progress)
if fast_scan:
if ips_to_scan:
pawn.console.log(f"<FAST SCAN> Alive IPs: {ips_to_scan}")
else:
pawn.console.log(f"<FAST SCAN> [red]No open servers found on ports {self.fast_scan_ports}.[/red]")
total_ports = self.end_port - self.start_port + 1
total_tasks = len(ips_to_scan) * total_ports
fast_scan_string = "FastScan" if fast_scan else ""
task_id = progress.add_task(f"[cyan]Scanning {fast_scan_string}...", total=total_tasks)
if fast_scan:
pawn.console.log(f"Alive IP: {ips_to_scan}")
for ip in ips_to_scan:
for port in range(self.start_port, self.end_port + 1):
task = self.wrap_scan(ip, port, progress, task_id)
tasks.append(task)
if len(tasks) >= self.batch_size:
await asyncio.gather(*tasks)
tasks.clear()
if tasks:
await asyncio.gather(*tasks)
[docs] async def get_ips_to_scan(self, fast_scan: bool, progress: Progress) -> List[str]:
ips = self._generate_ips()
if not fast_scan:
return ips
task_id = progress.add_task("Checking IPs...", total=len(ips))
ping_tasks = [self.try_ping_host(ip, progress, task_id) for ip in ips]
results = await asyncio.gather(*ping_tasks)
alive_ips = [result for result in results if result]
return alive_ips
[docs] async def wrap_scan(self, ip, port, progress, task_id):
async with self.semaphore:
result = await self.scan_port(ip, port)
progress.update(task_id, advance=1)
self._process_results(result)
return result
def _generate_ips(self) -> List[str]:
start_int = self.ip_to_int(self.start_ip)
end_int = self.ip_to_int(self.end_ip)
return [self.int_to_ip(ip_int) for ip_int in range(start_int, end_int + 1)]
[docs] @staticmethod
def ip_to_int(ip: str) -> int:
return sum([int(octet) << (8 * i) for i, octet in enumerate(reversed(ip.split('.')))])
[docs] @staticmethod
def int_to_ip(ip_int: int) -> str:
return '.'.join(str((ip_int >> (8 * i)) & 0xFF) for i in reversed(range(4)))
def _process_results(self, results: List[Tuple[str, int, bool]]):
if isinstance(results, tuple):
results = [results]
for ip, port, is_open in results:
if ip not in self.scan_results:
self.scan_results[ip] = {"open": [], "closed": []}
if is_open:
self.scan_results[ip]["open"].append(port)
else:
self.scan_results[ip]["closed"].append(port)
[docs] def get_results(self):
return self.scan_results
[docs] def print_scan_results(self, view="all"):
for ipaddr, result in self.scan_results.items():
parsed_data = ""
for is_open, port in result.items():
if view == "all" or view == is_open and port:
# pawn.console.print(f"\t \[{is_open}] {port}")
parsed_data = f"\t \[{is_open}] {port}"
# is_data = True
if parsed_data:
pawn.console.print(ipaddr)
pawn.console.print(parsed_data)
[docs] def run_scan(self, fast_scan: bool = False):
with Progress(
TextColumn("[bold blue]{task.description}", justify="right"),
BarColumn(bar_width=None),
TextColumn("{task.completed}/{task.total} • [progress.percentage]{task.percentage:>3.0f}%"),
"•",
TimeRemainingColumn(),
transient=True # Hide the progress bar when done
) as progress:
# asyncio.get_event_loop().run_until_complete(self.scan(progress))
asyncio.get_event_loop().run_until_complete(self.scan(fast_scan, progress))
[docs]def get_local_ip():
"""
Get the local IP address
:return:
Example:
.. code-block:: python
from pawnlib.resource import net
net.get_local_ip()
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(('10.255.255.255', 1))
ipaddr = s.getsockname()[0]
except Exception:
ipaddr = '127.0.0.1'
finally:
s.close()
if is_valid_ipv4(ipaddr):
return ipaddr
else:
pawn.error_logger.error("An error occurred while fetching Local IP address. Invalid IPv4 address")
pawn.console.debug("An error occurred while fetching Local IP address. Invalid IPv4 address")
return ""
[docs]def get_hostname():
"""
Get the local hostname
:return:
Example:
.. code-block:: python
from pawnlib.resource import net
net.get_hostname()
"""
return socket.gethostname()
[docs]def check_port(host: str = "", port: int = 0, timeout: float = 3.0, protocol: Literal["tcp", "udp"] = "tcp") -> bool:
"""
Returns boolean with checks if the port is open
:param host: ipaddress os hostname
:param port: destination port number
:param timeout: timeout sec
:param protocol: type of protocol
:return: boolean
Example:
.. code-block:: python
from pawnlib.resource import net
net.check_port()
"""
if not host:
raise ValueError(f"Host must be specified. inputs: host={host}")
if protocol not in ["tcp", "udp"]:
raise ValueError(f"Invalid protocol specified. Only 'tcp' and 'udp' are supported. inputs: {protocol}")
if not port:
host, port = extract_host_port(host)
pawn.console.debug(f"[red] Parsed from host -> host={host}, port={port}")
port = int(port)
pawn.console.debug(f"host={host}, port={port} ({type(port).__name__}), protocol={protocol}, timeout={timeout}")
socket_protocol = socket.SOCK_STREAM if protocol == "tcp" else socket.SOCK_DGRAM
# if timeout:
# socket.setdefaulttimeout(float(timeout)) # seconds (float)
with socket.socket(socket.AF_INET, socket_protocol) as sock:
host = http.remove_http(host)
sock.settimeout(timeout) # Set timeout directly on the socket
try:
result = sock.connect_ex((host, port))
except Exception as e:
pawn.console.debug(f"[FAIL] {e}")
pawn.error_logger.error(f"[FAIL] {e}")
return False
if result == 0:
pawn.console.debug(f"[OK] Opened port -> {host}:{port}")
return True
else:
pawn.error_logger.error(f"[FAIL] Closed port -> {host}:{port}")
return False
[docs]def listen_socket(host, port):
"""
Create a socket object and bind it to the host and port provided.
Listen for incoming connections on that socket, with a maximum of 5 connections in the queue.
:param host: str - hostname of the machine where the server is running
:param port: int - port number that the server will listen on
:return: socket - a socket object
Example:
.. code-block:: python
# create a socket object and bind it to localhost and port 8080
sock = listen_socket("localhost", 8080)
# listen for incoming connections
conn, addr = sock.accept()
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((host, port))
sock.listen(5)
return sock
[docs]def wait_for_port_open(host: str = "", port: int = 0, timeout: float = 3.0, protocol: Literal["tcp", "udp"] = "tcp", sleep: float =1) -> bool:
"""
Wait for a port to open. Useful when writing scripts which need to wait for a server to be available.
:param host: hostname or ipaddress
:param port: port
:param timeout: timeout seconds (float)
:param protocol: tcp or udp
:param sleep: sleep fime seconds (float)
:return:
Example:
.. code-block:: python
from pawnlib.resource import net
net.wait_for_port_open("127.0.0.1", port)
## ⠏ Wait for port open 127.0.0.1:9900 ... 6
"""
message = f"[bold green] Wait for port open[/bold green] {host}:{port} ........."
count = 0
with pawn.console.status(message) as status:
while True:
if check_port(host, port, timeout, protocol):
status.stop()
pawn.console.debug(f"[OK] Activate port -> {host}:{port}")
pawn.app_logger.info(f"[OK] Activate port -> {host}:{port}")
return True
status.update(f"{message}[cyan] {count}[/cyan]")
count += 1
time.sleep(sleep)
[docs]def get_location(ipaddress=""):
try:
response = requests.get(
f"https://ipinfo.io/widget/demo/{ipaddress}",
headers={
'referer': 'https://ipinfo.io/',
'content-type': 'application/json',
},
timeout=2,
)
return response.json().get('data')
except Exception as e:
pawn.console.debug(f"Error getting location - {e}")
return {}
[docs]def get_location_with_ip_api():
try:
response = requests.get(
f"http://ip-api.com/json",
headers={
'content-type': 'application/json',
},
timeout=2,
)
return response.json()
except Exception as e:
pawn.console.debug(f"Error getting location - {e}")
return {}