# MIT License
# Copyright (c) 2025 aeeeeeep
import ast
import json
import inspect
import pkgutil
import importlib
import importlib.util
from pathlib import PosixPath
from types import ModuleType, MethodType, FunctionType
from typing import Optional, Tuple, List, Union, Set
from .constants import Constants
from .utils.util import target_handler
from .utils.logger import log_error, log_warn
ClassType = type
TargetsType = List[Union[str, ModuleType]]
[docs]
def iter_parents(node):
"""Generator for traversing AST node parent hierarchy.
Yields:
ast.AST: Parent nodes in bottom-up order (nearest ancestor first)
Example:
for parent in iter_parents(some_node):
if isinstance(parent, ast.ClassDef):
break
"""
while hasattr(node, 'parent'):
node = node.parent
yield node
[docs]
def set_parents(node, parent):
"""Recursively set parent references in AST nodes.
Enables parent traversal via node.parent attribute
Required for accurate scope determination during analysis
"""
node.parent = parent
for child in ast.iter_child_nodes(node):
set_parents(child, node)
[docs]
def deep_merge(source: dict, update: dict) -> dict:
"""Recursively merge two dictionaries.
Args:
source: Base dictionary to be updated
update: Dictionary with update values
Returns:
Reference to the modified source dictionary
"""
for key, val in update.items():
if isinstance(val, dict) and isinstance(source.get(key), dict):
source[key] = deep_merge(source.get(key, {}), val)
elif isinstance(val, list) and isinstance(source.get(key), list):
source[key] = list(set(source[key] + val))
else:
source[key] = val
return source
[docs]
class Targets:
"""
Target processor for monitoring file changes and module structures.
Supported syntax:
1. Module: 'package.module'
2. Class: 'package.module:ClassName'
3. Class attribute: 'package.module:ClassName.attribute'
4. Class method: 'package.module:ClassName.method()'
5. Function: 'package.module:function()'
6. Global variable: 'package.module::GLOBAL_VAR'
"""
[docs]
def __init__(self, targets: TargetsType, exclude_targets: Optional[TargetsType] = None):
"""
Initialize target processor.
Args:
targets: Monitoring targets in various formats
exclude_targets: Exclusion targets in same formats
"""
targets, exclude_targets = self._check_targets(targets, exclude_targets)
self.targets, self.filename_targets = self._process_targets(targets)
self.exclude_targets, self.exclude_filename_targets = self._process_targets(exclude_targets)
self._validate_filename_targets()
[docs]
def _validate_filename_targets(self):
"""
Validate that no filename target ends with any exclude filename pattern.
Collects all validation errors and raises them at once for better diagnostics.
"""
if not self.exclude_filename_targets:
return
# Sort exclude patterns by length (descending) for more efficient matching
# Longer patterns are checked first to avoid redundant checks
sorted_excludes = sorted(self.exclude_filename_targets, key=len, reverse=True)
# Collect all errors
errors = []
for target in self.filename_targets:
# Only process if we have a string target
for exclude in sorted_excludes:
# Only check if exclude is a string and not empty
if isinstance(exclude, str) and exclude and target.endswith(exclude):
errors.append(f"Target '{target}' ends with excluded pattern '{exclude}'")
# Break to avoid multiple matches for the same target
break
if errors:
raise ValueError("Found multiple filename validation errors:\n" + "\n".join(errors))
[docs]
def _check_targets(
self, targets: TargetsType, exclude_targets: Optional[TargetsType]
) -> Tuple[TargetsType, Optional[TargetsType]]:
"""
Normalize and validate target inputs.
Args:
targets: Raw monitoring targets input
exclude_targets: Raw exclusion targets input
Returns:
Tuple[TargetsType, TargetsType]: Normalized (targets, exclude_targets)
"""
if isinstance(targets, str):
targets = [targets]
if isinstance(exclude_targets, str):
exclude_targets = [exclude_targets]
return targets, exclude_targets
[docs]
def _process_targets(self, targets: Optional[TargetsType]) -> Tuple[dict, Set[str]]:
"""
Convert heterogeneous targets to structured data model.
Args:
targets: List of targets
Returns:
Tuple[dict, Set[str]]: Hierarchical structure and filename targets
"""
processed_targets: dict = {}
filename_targets: Set[str] = set()
for target in targets or []:
if isinstance(target, str) and target.endswith('.py'):
filename_targets.add(target)
elif isinstance(target, (str, ModuleType, ClassType, FunctionType, MethodType)):
module_path, target_details = self._parse_target(target)
existing_details = processed_targets.setdefault(module_path, {})
processed_targets[module_path] = deep_merge(existing_details, target_details)
else:
log_warn(f"Unsupported target type: {type(target)}")
flatten_targets: dict = {}
for module_path, target_details in processed_targets.items():
# Flatten the module structure
self._flatten_module_structure(module_path, target_details, flatten_targets)
return flatten_targets, filename_targets
[docs]
def _parse_target(self, target: Union[str, ModuleType, ClassType, FunctionType, MethodType]) -> tuple:
"""
Parse different target formats into module structure.
Args:
target: Target specification
Returns:
tuple: (module_path, parsed_structure)
"""
if isinstance(target, ModuleType):
return self._parse_module(target)
if isinstance(target, ClassType):
return self._parse_class(target)
if isinstance(target, (FunctionType, MethodType)):
return self._parse_function(target)
return self._parse_string(target)
[docs]
def _parse_function(self, func: Union[FunctionType, MethodType]) -> tuple:
"""Parse function object and create module structure containing this function or method
Args:
func: Function object to parse
Returns:
tuple: (module name, module structure containing only this function or method)
"""
# Check if this is a class method (bound to class)
if hasattr(func, '__self__') and isinstance(func.__self__, type):
cls = func.__self__
module = inspect.getmodule(cls)
module_name = module.__name__ if module else ''
return (
module_name,
{
'classes': {cls.__name__: {'methods': [func.__name__], 'attributes': [], 'track_all': False}},
'functions': [],
'globals': [],
},
)
# Check if this is a static/class method using qualname (e.g. 'Class.method')
if hasattr(func, '__qualname__') and '.' in func.__qualname__:
class_name, method_name = func.__qualname__.split('.', 1)
module = inspect.getmodule(func)
if module and hasattr(module, class_name):
cls = getattr(module, class_name)
if isinstance(cls, type):
module_name = module.__name__ if module else ''
return (
module_name,
{
'classes': {class_name: {'methods': [method_name], 'attributes': [], 'track_all': False}},
'functions': [],
'globals': [],
},
)
# Regular function handling
module = inspect.getmodule(func)
module_name = module.__name__ if module else ''
function_name = func.__name__
parsed_structure = {'classes': {}, 'functions': [function_name], 'globals': []}
return (module_name, parsed_structure)
[docs]
def _parse_module(self, module: ModuleType) -> tuple:
"""Parse module structure using AST analysis.
Args:
module: Python module object to analyze
Returns:
tuple: (module_name, parsed_structure) pair
"""
return (module.__name__, self._parse_module_by_name(module.__name__))
[docs]
def _parse_class(self, cls: ClassType) -> tuple:
"""Parse class object and create module structure containing this class
Args:
cls: Class object to parse
Returns:
tuple: (module name, module structure containing only this class)
"""
module = inspect.getmodule(cls)
module_name = module.__name__ if module else ''
class_name = cls.__name__
class_details = {'methods': [], 'attributes': [], 'track_all': True} # Flag to track all methods and attributes
parsed_structure = {'classes': {class_name: class_details}, 'functions': [], 'globals': []}
return (module_name, parsed_structure)
[docs]
def _parse_string(self, target: str) -> tuple:
"""Parse string-formatted target definitions
Args:
target: Target definition string
Returns:
tuple: (module_path, parsed_structure)
"""
# Handle global variable syntax
if '::' in target:
module_part, _, global_var = target.partition('::')
spec = importlib.util.find_spec(module_part)
if spec is None:
log_warn(f"Module {module_part} not found")
return (module_part, {'globals': []})
resolved_module_name = spec.name
return (resolved_module_name, {'globals': [global_var.strip()]})
# Split module path and symbol definition
module_part, _, symbol = target.partition(':')
spec = importlib.util.find_spec(module_part)
if spec is None:
log_warn(f"Module {module_part} not found")
return (module_part, {'classes': {}, 'functions': [], 'globals': []})
resolved_module_name = spec.name
full_module = self._parse_module_by_name(resolved_module_name)
if not symbol:
return (resolved_module_name, full_module)
details: dict = {'classes': {}, 'functions': [], 'globals': []}
current_symbol = symbol
# Parse class members (methods or attributes)
if '.' in symbol:
class_part, _, member = current_symbol.partition('.')
if class_part in full_module['classes']:
if member.endswith('()'):
method_name = member[:-2]
# Directly add method without checking if it exists in class_info
details['classes'][class_part] = {
'methods': [method_name],
'attributes': [],
'track_all': False, # Not tracking all, just specific method
}
else:
# Directly add attribute without checking if it exists in class_info
details['classes'][class_part] = {
'methods': [],
'attributes': [member],
'track_all': False, # Not tracking all, just specific attribute
}
else:
if current_symbol.endswith('()'):
func_name = current_symbol[:-2]
if func_name in full_module['functions']:
details['functions'].append(func_name)
elif current_symbol in full_module['classes']:
details['classes'][current_symbol] = {
'methods': [],
'attributes': [],
'track_all': True, # Track entire class
}
return (resolved_module_name, details)
[docs]
def _parse_module_by_name(self, module_name: str, recursive: bool = True) -> dict:
"""Locate and parse module structure by its import name, supporting recursive parsing.
Args:
module_name: Full dotted import path (e.g. 'package.module')
recursive: Whether to recursively parse submodules
Returns:
dict: Parsed module structure with submodules if recursive=True
"""
spec = importlib.util.find_spec(module_name)
if spec is None:
log_warn(f"Module {module_name} not found")
return {'classes': {}, 'functions': [], 'globals': []}
# Parse the current module
module_structure: dict = {'classes': {}, 'functions': [], 'globals': []}
if spec.origin and spec.origin.endswith('.py'):
module_structure = self._parse_py_file(spec.origin)
# Recursively parse submodules if enabled
if recursive and hasattr(spec, 'submodule_search_locations') and spec.submodule_search_locations:
submodule_locations = []
for submodule_location in spec.submodule_search_locations:
if isinstance(submodule_location, PosixPath):
submodule_locations.append(str(submodule_location))
else:
submodule_locations.append(submodule_location)
for _, submodule_name, is_pkg in pkgutil.iter_modules(submodule_locations):
full_submodule_name = f"{module_name}.{submodule_name}"
try:
submodule_structure = self._parse_module_by_name(full_submodule_name, recursive)
# Add submodule structure to current module
module_structure[submodule_name] = submodule_structure
except Exception as e:
log_warn(f"Failed to parse submodule '{full_submodule_name}': {str(e)}")
return module_structure
[docs]
def _parse_py_file(self, file_path: str) -> dict:
"""Analyze Python file structure using Abstract Syntax Tree.
Args:
file_path: Absolute path to Python file
Returns:
dict: Parsed file structure dictionary
Raises:
Logs error on parsing failure
"""
parsed_structure: dict = {'classes': {}, 'functions': [], 'globals': []}
try:
with open(file_path, 'r', encoding='utf-8') as f:
tree = ast.parse(f.read())
set_parents(tree, None)
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
class_info = {
'methods': [],
'attributes': [],
'track_all': True, # Flag to track all methods and attributes
}
parsed_structure['classes'][node.name] = class_info
elif isinstance(node, ast.FunctionDef):
if not any(isinstance(parent, ast.ClassDef) for parent in iter_parents(node)):
parsed_structure['functions'].append(node.name)
elif isinstance(node, ast.Assign):
self._process_assignment(node, parsed_structure)
except Exception as e:
log_error(f"Failed to parse {file_path}: {str(e)}")
return parsed_structure
[docs]
def _flatten_module_structure(self, module_path: str, module_structure: dict, result: dict):
"""Flatten nested module structure into a dict.
Args:
module_path: Full module path
module_structure: Module structure dictionary
result: dict to populate
"""
# Extract standard sections (classes, functions, globals)
standard_sections = {
'classes': module_structure.get('classes', {}),
'functions': module_structure.get('functions', []),
'globals': module_structure.get('globals', []),
}
# Only add to result if there's content
if any([standard_sections['classes'], standard_sections['functions'], standard_sections['globals']]):
result[module_path] = standard_sections
# Process nested submodules
for key, value in module_structure.items():
if key not in ['classes', 'functions', 'globals'] and isinstance(value, dict):
submodule_path = f"{module_path}.{key}"
self._flatten_module_structure(submodule_path, value, result)
[docs]
def _process_assignment(self, node: ast.Assign, result: dict):
"""Extract global variables from assignment AST nodes.
Handles two patterns:
1. Simple assignments: `var = value`
2. Tuple unpacking: `a, b = (1, 2)`
Args:
node: AST assignment node to analyze
result: dict to update with found globals
"""
if any(isinstance(parent, (ast.ClassDef, ast.FunctionDef)) for parent in iter_parents(node)):
return
for assign_target in node.targets:
if isinstance(assign_target, ast.Name):
result['globals'].append(assign_target.id)
elif isinstance(assign_target, ast.Tuple):
for element in assign_target.elts:
if isinstance(element, ast.Name):
result['globals'].append(element.id)
[docs]
def get_targets(self) -> dict:
"""Retrieve targets.
Returns:
dict: Target dictionary containing:
- classes: Class methods
- functions: Functions
- globals: Global variables
Example:
{
'module.path': {
'classes': {
'ClassName': {
'methods': [...],
'attributes': [...]
}
},
'functions': [...],
'globals': [...]
}
}
"""
return self.targets
[docs]
def get_exclude_targets(self) -> dict:
"""Retrieve excluded targets.
Returns:
dict: Exclude dictionary containing:
- classes: Excluded class methods
- functions: Excluded functions
- globals: Excluded global variables
Example:
{
'module.path': {
'classes': {
'ClassName': {
'methods': [...],
'attributes': [...]
}
},
'functions': [...],
'globals': [...]
}
}
"""
return self.exclude_targets
[docs]
def get_filename_targets(self) -> Set:
"""Get monitored filesystem paths.
Path matching is determined using string.endswith() method.
Returns:
Set[str]: Paths to Python files being monitored
"""
return self.filename_targets
[docs]
def get_exclude_filename_targets(self) -> Set:
"""Get monitored excluded filesystem paths.
Path matching is determined using string.endswith() method.
Returns:
Set[str]: Paths to Python files being excluded
"""
return self.exclude_filename_targets
[docs]
@staticmethod
def serialize_targets(targets: dict, indent=Constants.LOG_INDENT_LEVEL):
"""Serialize objects that JSON cannot handle by default.
Converts sets to lists, and other objects to their __dict__ or string representation.
If the input is a dictionary with more than 8 top-level keys, only the keys are retained
with a placeholder value and a warning message is added.
Args:
indent: Number of spaces for JSON indentation
Returns:
str: JSON serialized string
"""
if len(targets) > Constants.MAX_TARGETS_DISPLAY:
truncated_obj = {key: "..." for key in targets.keys()}
truncated_obj["Warning: too many top-level keys, only showing values like"] = "..."
return json.dumps(truncated_obj, indent=indent, default=target_handler)
return json.dumps(targets, indent=indent, default=target_handler)