# MIT License
# Copyright (c) 2025 aeeeeeep
import ast
import json
import inspect
import pkgutil
import importlib
import importlib.util
from pathlib import PosixPath
from types import ModuleType, MethodType, FunctionType
from typing import Optional, Tuple, List, Union, Set
from .constants import Constants
from .utils.logger import log_error, log_warn
ClassType = type
TargetsType = List[Union[str, ModuleType]]
[docs]
def iter_parents(node):
"""Generator for traversing AST node parent hierarchy.
Yields:
ast.AST: Parent nodes in bottom-up order (nearest ancestor first)
Example:
for parent in iter_parents(some_node):
if isinstance(parent, ast.ClassDef):
break
"""
while hasattr(node, 'parent'):
node = node.parent
yield node
[docs]
def set_parents(node, parent):
"""Recursively set parent references in AST nodes.
Enables parent traversal via node.parent attribute
Required for accurate scope determination during analysis
"""
node.parent = parent
for child in ast.iter_child_nodes(node):
set_parents(child, node)
[docs]
def deep_merge(source: dict, update: dict) -> dict:
"""Recursively merge two dictionaries.
Args:
source: Base dictionary to be updated
update: Dictionary with update values
Returns:
Reference to the modified source dictionary
"""
for key, val in update.items():
if isinstance(val, dict) and isinstance(source.get(key), dict):
source[key] = deep_merge(source.get(key, {}), val)
elif isinstance(val, list) and isinstance(source.get(key), list):
source[key] = list(set(source[key] + val))
else:
source[key] = val
return source
[docs]
class Targets:
"""
Target processor for monitoring file changes and module structures.
Supported syntax:
1. Module: 'package.module'
2. Class: 'package.module:ClassName'
3. Class attribute: 'package.module:ClassName.attribute'
4. Class method: 'package.module:ClassName.method()'
5. Function: 'package.module:function()'
6. Global variable: 'package.module::GLOBAL_VAR'
"""
[docs]
def __init__(self, targets: TargetsType, exclude_targets: Optional[TargetsType] = None):
"""
Initialize target processor.
Args:
targets: Monitoring targets in various formats
exclude_targets: Exclusion targets in same formats
"""
targets, exclude_targets = self._check_targets(targets, exclude_targets)
self.filename_targets: Set = set()
self.targets: dict = self._process_targets(targets)
self.exclude_targets: dict = self._process_targets(exclude_targets)
self.processed_targets: dict = self._diff_targets()
[docs]
def _check_targets(
self, targets: TargetsType, exclude_targets: Optional[TargetsType]
) -> Tuple[TargetsType, Optional[TargetsType]]:
"""
Normalize and validate target inputs.
Args:
targets: Raw monitoring targets input
exclude_targets: Raw exclusion targets input
Returns:
Tuple[TargetsType, TargetsType]: Normalized (targets, exclude_targets)
"""
if isinstance(targets, str):
targets = [targets]
if isinstance(exclude_targets, str):
exclude_targets = [exclude_targets]
for exclude_target in exclude_targets or []:
if isinstance(exclude_target, str) and exclude_target.endswith('.py'):
log_error("Unsupported .py files in exclude_target")
return targets, exclude_targets
[docs]
def _process_targets(self, targets: Optional[TargetsType]) -> dict:
"""
Convert heterogeneous targets to structured data model.
Args:
targets: List of targets
Returns:
dict: Hierarchical structure:
"""
processed_targets: dict = {}
for target in targets or []:
if isinstance(target, str) and target.endswith('.py'):
self.filename_targets.add(target)
elif isinstance(target, (str, ModuleType, ClassType, FunctionType, MethodType)):
module_path, target_details = self._parse_target(target)
existing_details = processed_targets.setdefault(module_path, {})
processed_targets[module_path] = deep_merge(existing_details, target_details)
else:
log_warn(f"Unsupported target type: {type(target)}")
return processed_targets
[docs]
def _parse_target(self, target: Union[str, ModuleType, ClassType, FunctionType, MethodType]) -> tuple:
"""
Parse different target formats into module structure.
Args:
target: Target specification
Returns:
tuple: (module_path, parsed_structure)
"""
if isinstance(target, ModuleType):
return self._parse_module(target)
if isinstance(target, ClassType):
return self._parse_class(target)
if isinstance(target, (FunctionType, MethodType)):
return self._parse_function(target)
return self._parse_string(target)
[docs]
def _parse_function(self, func: Union[FunctionType, MethodType]) -> tuple:
"""Parse function object and create module structure containing this function or method
Args:
func: Function object to parse
Returns:
tuple: (module name, module structure containing only this function or method)
"""
# Check if this is a class method (bound to class)
if hasattr(func, '__self__') and isinstance(func.__self__, type):
cls = func.__self__
module = inspect.getmodule(cls)
module_name = module.__name__ if module else ''
return (
module_name,
{
'classes': {cls.__name__: {'methods': [func.__name__], 'attributes': [], 'track_all': False}},
'functions': [],
'globals': [],
},
)
# Check if this is a static/class method using qualname (e.g. 'Class.method')
if hasattr(func, '__qualname__') and '.' in func.__qualname__:
class_name, method_name = func.__qualname__.split('.', 1)
module = inspect.getmodule(func)
if module and hasattr(module, class_name):
cls = getattr(module, class_name)
if isinstance(cls, type):
module_name = module.__name__ if module else ''
return (
module_name,
{
'classes': {class_name: {'methods': [method_name], 'attributes': [], 'track_all': False}},
'functions': [],
'globals': [],
},
)
# Regular function handling
module = inspect.getmodule(func)
module_name = module.__name__ if module else ''
function_name = func.__name__
parsed_structure = {'classes': {}, 'functions': [function_name], 'globals': []}
return (module_name, parsed_structure)
[docs]
def _parse_module(self, module: ModuleType) -> tuple:
"""Parse module structure using AST analysis.
Args:
module: Python module object to analyze
Returns:
tuple: (module_name, parsed_structure) pair
"""
return (module.__name__, self._parse_module_by_name(module.__name__))
[docs]
def _parse_class(self, cls: ClassType) -> tuple:
"""Parse class object and create module structure containing this class
Args:
cls: Class object to parse
Returns:
tuple: (module name, module structure containing only this class)
"""
module = inspect.getmodule(cls)
module_name = module.__name__ if module else ''
class_name = cls.__name__
class_details = {'methods': [], 'attributes': [], 'track_all': True} # Flag to track all methods and attributes
parsed_structure = {'classes': {class_name: class_details}, 'functions': [], 'globals': []}
return (module_name, parsed_structure)
[docs]
def _parse_string(self, target: str) -> tuple:
"""Parse string-formatted target definitions
Args:
target: Target definition string
Returns:
tuple: (module_path, parsed_structure)
"""
# Handle global variable syntax
if '::' in target:
module_part, _, global_var = target.partition('::')
spec = importlib.util.find_spec(module_part)
if spec is None:
log_warn(f"Module {module_part} not found")
return (module_part, {'globals': []})
resolved_module_name = spec.name
return (resolved_module_name, {'globals': [global_var.strip()]})
# Split module path and symbol definition
module_part, _, symbol = target.partition(':')
spec = importlib.util.find_spec(module_part)
if spec is None:
log_warn(f"Module {module_part} not found")
return (module_part, {'classes': {}, 'functions': [], 'globals': []})
resolved_module_name = spec.name
full_module = self._parse_module_by_name(resolved_module_name)
if not symbol:
return (resolved_module_name, full_module)
details: dict = {'classes': {}, 'functions': [], 'globals': []}
current_symbol = symbol
# Parse class members (methods or attributes)
if '.' in symbol:
class_part, _, member = current_symbol.partition('.')
if class_part in full_module['classes']:
if member.endswith('()'):
method_name = member[:-2]
# Directly add method without checking if it exists in class_info
details['classes'][class_part] = {
'methods': [method_name],
'attributes': [],
'track_all': False, # Not tracking all, just specific method
}
else:
# Directly add attribute without checking if it exists in class_info
details['classes'][class_part] = {
'methods': [],
'attributes': [member],
'track_all': False, # Not tracking all, just specific attribute
}
else:
if current_symbol.endswith('()'):
func_name = current_symbol[:-2]
if func_name in full_module['functions']:
details['functions'].append(func_name)
elif current_symbol in full_module['classes']:
details['classes'][current_symbol] = {
'methods': [],
'attributes': [],
'track_all': True, # Track entire class
}
return (resolved_module_name, details)
[docs]
def _parse_module_by_name(self, module_name: str, recursive: bool = True) -> dict:
"""Locate and parse module structure by its import name, supporting recursive parsing.
Args:
module_name: Full dotted import path (e.g. 'package.module')
recursive: Whether to recursively parse submodules
Returns:
dict: Parsed module structure with submodules if recursive=True
"""
spec = importlib.util.find_spec(module_name)
if spec is None:
log_warn(f"Module {module_name} not found")
return {'classes': {}, 'functions': [], 'globals': []}
# Parse the current module
module_structure: dict = {'classes': {}, 'functions': [], 'globals': []}
if spec.origin and spec.origin.endswith('.py'):
module_structure = self._parse_py_file(spec.origin)
# Recursively parse submodules if enabled
if recursive and hasattr(spec, 'submodule_search_locations') and spec.submodule_search_locations:
submodule_locations = []
for submodule_location in spec.submodule_search_locations:
if isinstance(submodule_location, PosixPath):
submodule_locations.append(str(submodule_location))
else:
submodule_locations.append(submodule_location)
for _, submodule_name, is_pkg in pkgutil.iter_modules(submodule_locations):
full_submodule_name = f"{module_name}.{submodule_name}"
try:
submodule_structure = self._parse_module_by_name(full_submodule_name, recursive)
# Add submodule structure to current module
module_structure[submodule_name] = submodule_structure
except Exception as e:
log_warn(f"Failed to parse submodule {full_submodule_name}: {str(e)}")
return module_structure
[docs]
def _parse_py_file(self, file_path: str) -> dict:
"""Analyze Python file structure using Abstract Syntax Tree.
Args:
file_path: Absolute path to Python file
Returns:
dict: Parsed file structure dictionary
Raises:
Logs error on parsing failure
"""
parsed_structure: dict = {'classes': {}, 'functions': [], 'globals': []}
try:
with open(file_path, 'r', encoding='utf-8') as f:
tree = ast.parse(f.read())
set_parents(tree, None)
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
class_info = {
'methods': [],
'attributes': [],
'track_all': True, # Flag to track all methods and attributes
}
parsed_structure['classes'][node.name] = class_info
elif isinstance(node, ast.FunctionDef):
if not any(isinstance(parent, ast.ClassDef) for parent in iter_parents(node)):
parsed_structure['functions'].append(node.name)
elif isinstance(node, ast.Assign):
self._process_assignment(node, parsed_structure)
except Exception as e:
log_error(f"Failed to parse {file_path}: {str(e)}")
return parsed_structure
[docs]
def _flatten_module_structure(self, module_path: str, module_structure: dict, result: dict):
"""Flatten nested module structure into a dict.
Args:
module_path: Full module path
module_structure: Module structure dictionary
result: dict to populate
"""
# Extract standard sections (classes, functions, globals)
standard_sections = {
'classes': module_structure.get('classes', {}),
'functions': module_structure.get('functions', []),
'globals': module_structure.get('globals', []),
}
# Only add to result if there's content
if any([standard_sections['classes'], standard_sections['functions'], standard_sections['globals']]):
result[module_path] = standard_sections
# Process nested submodules
for key, value in module_structure.items():
if key not in ['classes', 'functions', 'globals'] and isinstance(value, dict):
submodule_path = f"{module_path}.{key}"
self._flatten_module_structure(submodule_path, value, result)
[docs]
def _diff_targets(self) -> dict:
"""Calculate effective targets by excluding specified patterns.
Returns:
Filtered targets dictionary after applying exclusion rules
"""
filtered_targets: dict = {}
for module_path, target_details in self.targets.items():
exclude_details = self.exclude_targets.get(module_path, {})
# Handle track_all exclusion at the module level
filtered_classes = self._diff_level(target_details.get('classes', {}), exclude_details.get('classes', {}))
# Calculate filtered target details
filtered_details = {
'classes': filtered_classes,
'functions': list(set(target_details.get('functions', [])) - set(exclude_details.get('functions', []))),
'globals': list(set(target_details.get('globals', [])) - set(exclude_details.get('globals', []))),
}
# Process nested submodules
for key, value in target_details.items():
if key not in ['classes', 'functions', 'globals'] and isinstance(value, dict):
submodule_path = f"{module_path}.{key}"
submodule_exclude = exclude_details.get(key, {})
filtered_sub = self._diff_level(value, submodule_exclude)
if filtered_sub:
filtered_details[key] = filtered_sub
# Flatten the module structure
self._flatten_module_structure(module_path, filtered_details, filtered_targets)
return filtered_targets
[docs]
def _diff_level(self, target: dict, exclude: dict) -> dict:
"""Recursively filter nested structures by exclusion rules.
Args:
target: Original nested structure (dict of dicts)
exclude: Exclusion patterns to apply
Returns:
dict: Filtered structure with empty containers preserved
"""
diff = {}
for key, value in target.items():
filtered = None
if key in exclude:
if isinstance(value, dict) and isinstance(exclude[key], dict):
# Handle class details with track_all flag
if (
'track_all' in value
and value['track_all']
and 'track_all' in exclude[key]
and exclude[key]['track_all']
):
# If both are tracking all, exclude the entire class
filtered = None
else:
filtered = self._diff_level(value, exclude[key])
elif isinstance(value, list) and isinstance(exclude[key], list):
filtered = list(set(value) - set(exclude[key])) # type: ignore
elif value != exclude[key]:
filtered = value
else:
filtered = value
if filtered is None:
# Skip this key entirely (excluded)
continue
elif isinstance(filtered, dict):
diff[key] = filtered
elif isinstance(filtered, list):
diff[key] = filtered if filtered else []
else:
diff[key] = filtered
return diff
[docs]
def _process_assignment(self, node: ast.Assign, result: dict):
"""Extract global variables from assignment AST nodes.
Handles two patterns:
1. Simple assignments: `var = value`
2. Tuple unpacking: `a, b = (1, 2)`
Args:
node: AST assignment node to analyze
result: dict to update with found globals
"""
if any(isinstance(parent, ast.ClassDef) for parent in iter_parents(node)):
return
for assign_target in node.targets:
if isinstance(assign_target, ast.Name):
result['globals'].append(assign_target.id)
elif isinstance(assign_target, ast.Tuple):
for element in assign_target.elts:
if isinstance(element, ast.Name):
result['globals'].append(element.id)
[docs]
def get_processed_targets(self) -> dict:
"""Retrieve final monitoring targets after exclusion processing.
Returns:
dict: Filtered dictionary containing:
- classes: Non-excluded class methods
- functions: Non-excluded functions
- globals: Non-excluded global variables
Example:
{
'module.path': {
'classes': {
'ClassName': {
'methods': [...],
'attributes': [...]
}
},
'functions': [...],
'globals': [...]
}
}
"""
return self.processed_targets
[docs]
def get_exclude_targets(self) -> dict:
"""Retrieve excluded targets.
Returns:
dict: Exclude dictionary containing:
- classes: Excluded class methods
- functions: Excluded functions
- globals: Excluded global variables
Example:
{
'module.path': {
'classes': {
'ClassName': {
'methods': [...],
'attributes': [...]
}
},
'functions': [...],
'globals': [...]
}
}
"""
return self.exclude_targets
[docs]
def serialize_targets(self, indent=Constants.LOG_INDENT_LEVEL):
"""Serialize objects that JSON cannot handle by default.
Converts sets to lists, and other objects to their __dict__ or string representation.
If the input is a dictionary with more than 8 top-level keys, only the keys are retained
with a placeholder value and a warning message is added.
Args:
indent: Number of spaces for JSON indentation
Returns:
str: JSON serialized string
"""
def target_handler(o):
if isinstance(o, set):
return list(o)
if hasattr(o, '__dict__'):
return o.__dict__
return str(o)
if len(self.processed_targets) > Constants.MAX_TARGETS_DISPLAY:
truncated_obj = {key: "..." for key in self.processed_targets.keys()}
truncated_obj["Warning: too many top-level keys, only showing values like"] = "..."
return json.dumps(truncated_obj, indent=indent, default=target_handler)
return json.dumps(self.processed_targets, indent=indent, default=target_handler)
[docs]
def get_filename_targets(self) -> Set:
"""Get monitored filesystem paths.
Returns:
Set[str]: Absolute paths to Python files being monitored,
including both directly specified files and module origins
"""
return self.filename_targets