from collections import deque
import time
import re
[docs]class TPSCalculator:
"""
A class to calculate Transactions Per Second (TPS) based on block height changes over time.
This class tracks the block height and calculates TPS over a configurable history size.
It supports both fixed interval calculations and variable time-based calculations.
:param history_size: Number of recent TPS values to keep for averaging.
:type history_size: int
:param sleep_time: Fixed interval between API calls in seconds.
:type sleep_time: int
:param variable_time: If True, calculate TPS based on actual elapsed time.
:type variable_time: bool
Example:
.. code-block:: python
# Initialize TPSCalculator with default parameters
tps_calculator = TPSCalculator()
# Simulate block height updates
current_height = 100
current_time = time.time()
# Calculate TPS with a new block height
current_tps, average_tps = tps_calculator.calculate_tps(current_height, current_time)
print(f"Current TPS: {current_tps}, Average TPS: {average_tps}")
# Reset the calculator if needed
tps_calculator.reset()
"""
[docs] def __init__(self, history_size=50, sleep_time=2, variable_time=False):
"""
Initializes the TPSCalculator.
:param history_size: Number of recent TPS values to keep for averaging.
:type history_size: int
:param sleep_time: Fixed interval between API calls in seconds.
:type sleep_time: int
:param variable_time: If True, calculate TPS based on actual elapsed time.
:type variable_time: bool
"""
self.previous_height = None
self.previous_time = None
self.tps_history = deque(maxlen=history_size)
self.call_count = 0 # To track the number of API calls
self.sleep_time = sleep_time
self.variable_time = variable_time
self.total_transactions = 0
[docs] def calculate_tps(self, current_height, current_time=None):
"""
Calculates the current TPS and updates the TPS history.
:param current_height: The current block height.
:type current_height: int
:param current_time: The current timestamp. If None and variable_time is False, uses fixed sleep_time.
:type current_time: float, optional
:return: Tuple of (current_tps, average_tps)
:rtype: tuple(float, float)
"""
if not self.variable_time:
time_diff = self.sleep_time
else:
if current_time is None:
current_time = time.time()
if self.previous_time is not None:
time_diff = current_time - self.previous_time
# Validate time_diff
if time_diff <= 0:
print(f"Warning: Non-positive time_diff ({time_diff}). Skipping TPS calculation.")
return 0, self.get_average_tps()
elif time_diff < 0.1:
print(f"Warning: Very small time_diff ({time_diff}). Adjusting to minimum threshold.")
time_diff = 0.1 # Set to minimum threshold to prevent inflated TPS
else:
time_diff = self.sleep_time
if self.previous_height is not None:
height_diff = current_height - self.previous_height
# Validate height_diff
if height_diff < 0:
print(f"Warning: Negative height_diff ({height_diff}). Resetting TPS calculator.")
self.reset()
return 0, self.get_average_tps()
if height_diff == 0:
current_tps = 0
else:
current_tps = height_diff / time_diff
max_tps = 1000 # Define a maximum reasonable TPS
if current_tps > max_tps:
print(f"Warning: Unusually high TPS ({current_tps}). Capping to {max_tps}.")
current_tps = max_tps
self.tps_history.append(current_tps)
self.total_transactions += height_diff
else:
current_tps = 0
self.previous_height = current_height
if self.variable_time:
self.previous_time = current_time
average_tps = self.get_average_tps()
self.call_count += 1
return current_tps, average_tps
[docs] def get_average_tps(self):
"""
Calculates the average TPS from the history.
:return: The average TPS.
:rtype: float
"""
return sum(self.tps_history) / len(self.tps_history) if self.tps_history else 0
[docs] def processed_tx(self):
"""
Returns the total number of transactions since monitoring started.
:return: Total transactions as integer.
:rtype: int
"""
return self.total_transactions
[docs] def last_n_tx(self):
"""
Returns the number of transactions in the last n seconds (sleep_time).
:return: Transactions count in last n seconds.
:rtype: float
"""
if self.tps_history:
return self.tps_history[-1] * self.sleep_time
else:
return 0
[docs] def reset(self):
"""
Resets the TPSCalculator to its initial state.
This clears all stored data and resets counters.
:return: None
"""
self.previous_height = None
self.previous_time = None
self.tps_history.clear()
self.call_count = 0
[docs]class SyncSpeedTracker:
"""
A class to track the synchronization speed of a blockchain node.
This class calculates the average number of blocks synchronized per second
using a moving average approach based on recent block height and time differences.
:param history_size: The number of records to keep for calculating the moving average.
:type history_size: int
Example:
.. code-block:: python
# Initialize the tracker with a history size of 10
sync_tracker = SyncSpeedTracker(history_size=10)
# Update the tracker with new block height and timestamp
current_height = 150
current_time = time.time()
sync_tracker.update(current_height, current_time)
# Get the average synchronization speed (blocks per second)
avg_sync_speed = sync_tracker.get_average_sync_speed()
print(f"Average Sync Speed: {avg_sync_speed} blocks/second")
"""
[docs] def __init__(self, history_size=10):
"""
Initializes the SyncSpeedTracker.
:param history_size: The number of records to store for calculating the moving average.
:type history_size: int
"""
self.history_size = history_size
self.block_differences = deque(maxlen=history_size)
self.time_differences = deque(maxlen=history_size)
self.previous_height = None
self.previous_time = None
[docs] def update(self, current_height, current_time):
"""
Updates the synchronization speed tracker with new block height and timestamp.
:param current_height: The current block height.
:type current_height: int
:param current_time: The current timestamp.
:type current_time: float
"""
if self.previous_height is not None and self.previous_time is not None:
block_diff = current_height - self.previous_height
time_diff = current_time - self.previous_time
if block_diff > 0 and time_diff > 0:
self.block_differences.append(block_diff)
self.time_differences.append(time_diff)
self.previous_height = current_height
self.previous_time = current_time
[docs] def get_average_sync_speed(self):
"""
Calculates the average number of blocks synchronized per second using a moving average.
:return: The average synchronization speed in blocks per second. Returns None if insufficient data.
:rtype: float or None
"""
if not self.block_differences or not self.time_differences:
return None # Insufficient data
total_blocks = sum(self.block_differences)
total_time = sum(self.time_differences)
if total_time > 0:
return total_blocks / total_time # Blocks per second
else:
return None
[docs]class BlockDifferenceTracker:
"""
A class to track and calculate the average block difference over a configurable history size.
This class is useful for monitoring the difference in block heights between nodes or systems.
:param history_size: The number of recent block differences to track.
:type history_size: int
Example:
.. code-block:: python
# Initialize the tracker with a history size of 50
block_tracker = BlockDifferenceTracker(history_size=50)
# Add block differences
block_tracker.add_difference(5)
block_tracker.add_difference(3)
# Get the average block difference
avg_difference = block_tracker.get_average_difference()
print(f"Average Block Difference: {avg_difference}")
"""
[docs] def __init__(self, history_size=100):
"""
Initializes the BlockDifferenceTracker.
:param history_size: Number of recent block differences to track.
:type history_size: int
"""
self.differences = deque(maxlen=history_size)
[docs] def add_difference(self, block_difference):
"""
Adds a new block difference to the tracker.
:param block_difference: The current block difference.
:type block_difference: int
"""
self.differences.append(block_difference)
[docs] def get_average_difference(self):
"""
Calculates the average block difference.
:return: The average of the tracked block differences.
:rtype: float
"""
if not self.differences:
return 0
return sum(self.differences) / len(self.differences)
[docs]class LatencyTracker:
"""
A class to track and analyze latency measurements.
This class stores latency values and provides methods to calculate average, minimum, and maximum latencies.
:param history_size: The number of recent latency measurements to track.
:type history_size: int
Example:
.. code-block:: python
# Initialize the tracker with a history size of 100
latency_tracker = LatencyTracker(history_size=100)
# Add latency measurements
latency_tracker.add_latency(120)
latency_tracker.add_latency(150)
# Get latency statistics
avg_latency = latency_tracker.get_average_latency()
min_latency = latency_tracker.get_min_latency()
max_latency = latency_tracker.get_max_latency()
print(f"Average Latency: {avg_latency} ms")
print(f"Min Latency: {min_latency} ms")
print(f"Max Latency: {max_latency} ms")
"""
[docs] def __init__(self, history_size=100):
"""
Initializes the LatencyTracker.
:param history_size: The number of recent latency measurements to track.
:type history_size: int
"""
self.latencies = deque(maxlen=history_size)
[docs] def add_latency(self, latency):
"""
Adds a new latency measurement to the tracker.
:param latency: The measured latency in milliseconds.
:type latency: float
"""
self.latencies.append(latency)
[docs] def get_average_latency(self):
"""
Calculates the average latency from the tracked measurements.
:return: The average latency in milliseconds.
:rtype: float
"""
return sum(self.latencies) / len(self.latencies) if self.latencies else 0
[docs] def get_min_latency(self):
"""
Returns the minimum recorded latency.
:return: The minimum latency in milliseconds, or None if no data is available.
:rtype: float or None
"""
return min(self.latencies) if self.latencies else None
[docs] def get_max_latency(self):
"""
Returns the maximum recorded latency.
:return: The maximum latency in milliseconds, or None if no data is available.
:rtype: float or None
"""
return max(self.latencies) if self.latencies else None
[docs]class ErrorRateTracker:
"""
A class to track and calculate error rates for API requests or operations.
This class monitors total requests and failed requests to compute an error rate percentage.
Example:
.. code-block:: python
# Initialize the tracker
error_tracker = ErrorRateTracker()
# Record requests (success=True for successful requests)
error_tracker.record_request(success=True)
error_tracker.record_request(success=False)
# Get the current error rate
error_rate = error_tracker.get_error_rate()
print(f"Error Rate: {error_rate:.2f}%")
"""
[docs] def __init__(self):
"""
Initializes the ErrorRateTracker.
Tracks total and failed requests for calculating error rates.
"""
self.total_requests = 0
self.failed_requests = 0
[docs] def record_request(self, success=True):
"""
Records a request and whether it was successful or not.
:param success: True if the request was successful; False otherwise.
Defaults to True.
:type success: bool
"""
self.total_requests += 1
if not success:
self.failed_requests += 1
[docs] def get_error_rate(self):
"""
Calculates the error rate as a percentage of failed requests over total requests.
:return: The error rate percentage. Returns 0 if no requests have been recorded.
:rtype: float
"""
if self.total_requests == 0:
return 0.0
return (self.failed_requests / self.total_requests) * 100
[docs]class ThroughputTracker:
"""
A class to track and calculate throughput (requests per second).
This class records timestamps of requests and calculates the throughput
over a configurable history size.
:param history_size: The number of recent timestamps to track.
:type history_size: int
Example:
.. code-block:: python
# Initialize the tracker with a history size of 100
throughput_tracker = ThroughputTracker(history_size=100)
# Record requests with timestamps
throughput_tracker.record_request()
time.sleep(1)
throughput_tracker.record_request()
# Get the current throughput
throughput = throughput_tracker.get_throughput()
print(f"Throughput: {throughput} requests/second")
"""
[docs] def __init__(self, history_size=100):
"""
Initializes the ThroughputTracker.
:param history_size: The number of recent timestamps to track.
:type history_size: int
"""
self.timestamps = deque(maxlen=history_size)
[docs] def record_request(self, timestamp=None):
"""
Records the timestamp of a request.
:param timestamp: The timestamp of the request. Defaults to the current time.
:type timestamp: float, optional
"""
self.timestamps.append(timestamp or time.time())
[docs] def get_throughput(self):
"""
Calculates the throughput (requests per second).
:return: The calculated throughput. Returns 0 if insufficient data.
:rtype: float
"""
if len(self.timestamps) < 2:
return 0
duration = self.timestamps[-1] - self.timestamps[0]
return len(self.timestamps) / duration if duration > 0 else 0
[docs]class PeriodicMetricLogger:
"""
A class for logging metrics at regular intervals.
This class logs provided metrics only if a specified interval has passed since the last log.
:param interval: The logging interval in seconds.
:type interval: int
Example:
.. code-block:: python
# Initialize the logger with a 10-second interval
metric_logger = PeriodicMetricLogger(interval=10)
# Log metrics periodically
metrics = {"TPS": 50.5, "Latency": 120}
metric_logger.log_metrics(metrics)
"""
[docs] def __init__(self, interval=10):
"""
Initializes the PeriodicMetricLogger.
:param interval: The logging interval in seconds.
:type interval: int
"""
self.interval = interval
self.last_logged = time.time()
[docs] def log_metrics(self, metrics):
"""
Logs the provided metrics if the logging interval has passed.
:param metrics: A dictionary of metric names and their values.
:type metrics: dict
"""
current_time = time.time()
if current_time - self.last_logged >= self.interval:
for key, value in metrics.items():
print(f"{key}: {value}")
self.last_logged = current_time
[docs]class SpikeDetector:
"""
A class to detect spikes in a series of values.
A spike is detected when the difference between consecutive values exceeds a specified threshold.
:param threshold: The change amount considered as a spike.
:type threshold: float
:param history_size: The maximum number of recent values to store.
:type history_size: int
Example:
.. code-block:: python
# Initialize the spike detector with a threshold of 10
spike_detector = SpikeDetector(threshold=10, history_size=5)
# Add values and detect spikes
spike_detector.add_value(50)
spike_detector.add_value(65)
is_spike = spike_detector.detect_spike()
print(f"Spike Detected: {is_spike}")
"""
[docs] def __init__(self, threshold, history_size=10):
"""
Initializes the SpikeDetector.
:param threshold: The change amount considered as a spike.
:type threshold: float
:param history_size: The maximum number of recent values to store.
:type history_size: int
"""
self.threshold = threshold
self.values = deque(maxlen=history_size)
[docs] def add_value(self, value):
"""
Adds a new value to the tracker.
:param value: The new value to track.
:type value: float
"""
self.values.append(value)
[docs] def detect_spike(self):
"""
Detects whether a spike occurred based on recent values.
:return: True if a spike is detected; False otherwise.
:rtype: bool
"""
if len(self.values) < 2:
return False
last_value = self.values[-1]
prev_value = self.values[-2]
return abs(last_value - prev_value) > self.threshold
[docs]class AnomalyDetector:
"""
A class to detect anomalies in data based on a baseline and tolerance.
An anomaly is detected when a value deviates from the baseline by more than the allowed tolerance.
:param baseline: The reference value for normal data (e.g., average TPS).
:type baseline: float
:param tolerance: The acceptable deviation from the baseline as a fraction (e.g., 0.2 for ±20%).
Defaults to 0.2 (±20%).
:type tolerance: float
Example:
.. code-block:: python
# Initialize the anomaly detector with a baseline of 50 and tolerance of 20%
anomaly_detector = AnomalyDetector(baseline=50, tolerance=0.2)
# Detect anomalies in new values
is_anomaly = anomaly_detector.detect_anomaly(65)
print(f"Anomaly Detected: {is_anomaly}")
"""
[docs] def __init__(self, baseline, tolerance=0.2):
"""
Initializes the AnomalyDetector.
:param baseline: The reference value for normal data (e.g., average TPS).
:type baseline: float
:param tolerance: The acceptable deviation from the baseline as a fraction.
Defaults to 0.2 (±20%).
:type tolerance: float
"""
self.baseline = baseline
self.tolerance = tolerance
[docs] def detect_anomaly(self, value):
"""
Detects whether a given value is an anomaly based on the baseline and tolerance.
:param value: The value to check for anomalies.
:type value: float
:return: True if the value is an anomaly; False otherwise.
:rtype: bool
Example:
.. code-block:: python
# Check if a value is an anomaly
is_anomaly = anomaly_detector.detect_anomaly(75)
print(f"Anomaly Detected: {is_anomaly}")
"""
lower_bound = self.baseline * (1 - self.tolerance)
upper_bound = self.baseline * (1 + self.tolerance)
return not (lower_bound <= value <= upper_bound)
[docs]class TrendAnalyzer:
"""
A class to analyze trends in a series of values.
This class tracks a series of values and determines whether the trend is upward, downward, or stable.
:param history_size: The maximum number of recent values to store for trend analysis.
:type history_size: int
Example:
.. code-block:: python
# Initialize the trend analyzer with a history size of 5
trend_analyzer = TrendAnalyzer(history_size=5)
# Add values to the trend analyzer
trend_analyzer.add_value(10)
trend_analyzer.add_value(15)
trend_analyzer.add_value(20)
# Get the current trend
trend = trend_analyzer.get_trend()
print(f"Current Trend: {trend}")
"""
[docs] def __init__(self, history_size=10):
"""
Initializes the TrendAnalyzer.
:param history_size: The maximum number of recent values to store for trend analysis.
:type history_size: int
"""
self.values = deque(maxlen=history_size)
[docs] def add_value(self, value):
"""
Adds a new value to the tracker.
:param value: The new value to track.
:type value: float or int
"""
self.values.append(value)
[docs] def get_trend(self):
"""
Calculates the trend based on the stored values.
The trend is determined as:
- "upward": If all differences between consecutive values are positive.
- "downward": If all differences between consecutive values are negative.
- "stable": If there is no consistent upward or downward pattern.
:return: The calculated trend ("upward", "downward", or "stable").
:rtype: str
Example:
.. code-block:: python
# Analyze the trend
trend = trend_analyzer.get_trend()
print(f"Current Trend: {trend}")
"""
if len(self.values) < 2:
return "stable"
diffs = [self.values[i + 1] - self.values[i] for i in range(len(self.values) - 1)]
if all(d > 0 for d in diffs):
return "upward"
elif all(d < 0 for d in diffs):
return "downward"
return "stable"
[docs]class RollingAverageCalculator:
"""
A class to calculate the rolling (moving) average of a series of values.
This class maintains a fixed-size window of recent values and calculates the average
of the values within the window.
:param window_size: The size of the rolling window.
:type window_size: int
Example:
.. code-block:: python
# Initialize the rolling average calculator with a window size of 5
avg_calculator = RollingAverageCalculator(window_size=5)
# Add values to the calculator
avg_calculator.add_value(10)
avg_calculator.add_value(20)
avg_calculator.add_value(30)
# Get the current rolling average
average = avg_calculator.get_average()
print(f"Rolling Average: {average}")
"""
[docs] def __init__(self, window_size=5):
"""
Initializes the RollingAverageCalculator.
:param window_size: The size of the rolling window.
:type window_size: int
"""
self.window_size = window_size
self.values = deque(maxlen=window_size)
[docs] def add_value(self, value):
"""
Adds a new value to the rolling window.
:param value: The new value to add.
:type value: float or int
"""
self.values.append(value)
[docs] def get_average(self):
"""
Calculates and returns the rolling average of the stored values.
:return: The rolling average. Returns 0 if no values are stored.
:rtype: float
Example:
.. code-block:: python
# Calculate the rolling average
average = avg_calculator.get_average()
print(f"Rolling Average: {average}")
"""
return sum(self.values) / len(self.values) if self.values else 0
[docs]class ThresholdNotifier:
"""
A class to monitor values and trigger a notification when a threshold is exceeded.
This class checks if a given value exceeds a predefined threshold and executes
a callback function when the threshold is crossed.
:param threshold: The threshold value to monitor.
:type threshold: float or int
:param alert_callback: A callback function to execute when the threshold is exceeded.
:type alert_callback: callable
Example:
.. code-block:: python
# Define an alert callback function
def alert(value):
print(f"Alert! Value exceeded: {value}")
# Initialize the notifier with a threshold of 100
notifier = ThresholdNotifier(threshold=100, alert_callback=alert)
# Check values and trigger alerts if necessary
notifier.check_and_notify(120) # This will trigger the alert
notifier.check_and_notify(80) # This will not trigger the alert
"""
[docs] def __init__(self, threshold, alert_callback):
"""
Initializes the ThresholdNotifier.
:param threshold: The threshold value to monitor.
:type threshold: float or int
:param alert_callback: A callback function to execute when the threshold is exceeded.
:type alert_callback: callable
"""
self.threshold = threshold
self.alert_callback = alert_callback
[docs] def check_and_notify(self, value):
"""
Checks if the given value exceeds the threshold and triggers the callback if it does.
:param value: The value to check against the threshold.
:type value: float or int
Example:
.. code-block:: python
notifier.check_and_notify(150) # Triggers the callback if value > threshold
"""
if value > self.threshold:
self.alert_callback(value)
[docs]class RateLimiter:
"""
A class to enforce rate limits on operations.
This class ensures that a maximum number of calls can be made within a specified time period.
If the limit is exceeded, further calls are not allowed until the time window resets.
:param max_calls: The maximum number of calls allowed within the time period.
:type max_calls: int
:param time_period: The time period (in seconds) for rate limiting.
:type time_period: float or int
Example:
.. code-block:: python
# Initialize a rate limiter allowing 5 calls per 10 seconds
rate_limiter = RateLimiter(max_calls=5, time_period=10)
# Check if calls are allowed
for i in range(10):
if rate_limiter.is_allowed():
print(f"Call {i + 1}: Allowed")
else:
print(f"Call {i + 1}: Rate limit exceeded")
time.sleep(1) # Simulate time delay between calls
"""
[docs] def __init__(self, max_calls, time_period):
"""
Initializes the RateLimiter.
:param max_calls: The maximum number of calls allowed within the time period.
:type max_calls: int
:param time_period: The time period (in seconds) for rate limiting.
:type time_period: float or int
"""
self.max_calls = max_calls
self.time_period = time_period
self.calls = deque()
[docs] def is_allowed(self):
"""
Checks if a new call is allowed under the rate limit.
Removes expired calls from the queue based on the current time and time period,
then determines if another call can be made.
:return: True if the call is allowed; False otherwise.
:rtype: bool
Example:
.. code-block:: python
if rate_limiter.is_allowed():
print("Call allowed")
else:
print("Rate limit exceeded")
"""
current_time = time.time()
# Remove timestamps outside of the current time window
while self.calls and self.calls[0] < current_time - self.time_period:
self.calls.popleft()
# Check if we can allow another call
if len(self.calls) < self.max_calls:
self.calls.append(current_time)
return True
return False
[docs]def calculate_reset_percentage(data):
match = re.search(r'height=(\d+) resolved=(\d+) unresolved=(\d+)', data)
if match:
height = int(match.group(1)) # height
resolved = int(match.group(2)) # resolved
unresolved = int(match.group(3)) # unresolved
reset_percentage = (resolved / height) * 100
return {
"height": height,
"resolved": resolved,
"unresolved": unresolved,
"progress": round(reset_percentage, 2)
}
else:
raise ValueError("Cant parsing data")
[docs]def calculate_pruning_percentage(data):
match = re.search(r'pruning (\d+)/(\d+)\s+resolved=(\d+) unresolved=(\d+)', data)
if match:
current = int(match.group(1))
total = int(match.group(2))
resolved = int(match.group(3))
unresolved = int(match.group(4))
progress_percentage = (current / total) * 100
resolve_progress_percentage = (resolved / total) * 100
# progress_percentage = (resolved / (resolved + unresolved)) * 100
# 전체 처리해야 할 항목 수 추정
# estimated_total = resolved + unresolved
# 진행률 계산
# progress_percentage = (resolved / estimated_total) * 100
return {
"current": current,
"total": total,
"resolved": resolved,
"unresolved": unresolved,
"resolve_progress_percentage": round(resolve_progress_percentage, 2),
"progress": round(progress_percentage, 2),
}
else:
raise ValueError("Cant parsing data")