Source code for objwatch.targets

# MIT License
# Copyright (c) 2025 aeeeeeep

import ast
import json
import inspect
import pkgutil
import importlib
import importlib.util
from pathlib import PosixPath
from types import ModuleType, MethodType, FunctionType
from typing import Optional, Tuple, List, Union, Set

from .constants import Constants
from .utils.util import target_handler
from .utils.logger import log_error, log_warn

ClassType = type
TargetsType = List[Union[str, ModuleType]]


[docs] def iter_parents(node): """Generator for traversing AST node parent hierarchy. Yields: ast.AST: Parent nodes in bottom-up order (nearest ancestor first) Example: for parent in iter_parents(some_node): if isinstance(parent, ast.ClassDef): break """ while hasattr(node, 'parent'): node = node.parent yield node
[docs] def set_parents(node, parent): """Recursively set parent references in AST nodes. Enables parent traversal via node.parent attribute Required for accurate scope determination during analysis """ node.parent = parent for child in ast.iter_child_nodes(node): set_parents(child, node)
[docs] def deep_merge(source: dict, update: dict) -> dict: """Recursively merge two dictionaries. Args: source: Base dictionary to be updated update: Dictionary with update values Returns: Reference to the modified source dictionary """ for key, val in update.items(): if isinstance(val, dict) and isinstance(source.get(key), dict): source[key] = deep_merge(source.get(key, {}), val) elif isinstance(val, list) and isinstance(source.get(key), list): source[key] = list(set(source[key] + val)) else: source[key] = val return source
[docs] class Targets: """ Target processor for monitoring file changes and module structures. Supported syntax: 1. Module: 'package.module' 2. Class: 'package.module:ClassName' 3. Class attribute: 'package.module:ClassName.attribute' 4. Class method: 'package.module:ClassName.method()' 5. Function: 'package.module:function()' 6. Global variable: 'package.module::GLOBAL_VAR' """
[docs] def __init__(self, targets: TargetsType, exclude_targets: Optional[TargetsType] = None): """ Initialize target processor. Args: targets: Monitoring targets in various formats exclude_targets: Exclusion targets in same formats """ targets, exclude_targets = self._check_targets(targets, exclude_targets) self.targets, self.filename_targets = self._process_targets(targets) self.exclude_targets, self.exclude_filename_targets = self._process_targets(exclude_targets) self._validate_filename_targets()
[docs] def _validate_filename_targets(self): """ Validate that no filename target ends with any exclude filename pattern. Collects all validation errors and raises them at once for better diagnostics. """ if not self.exclude_filename_targets: return # Sort exclude patterns by length (descending) for more efficient matching # Longer patterns are checked first to avoid redundant checks sorted_excludes = sorted(self.exclude_filename_targets, key=len, reverse=True) # Collect all errors errors = [] for target in self.filename_targets: # Only process if we have a string target for exclude in sorted_excludes: # Only check if exclude is a string and not empty if isinstance(exclude, str) and exclude and target.endswith(exclude): errors.append(f"Target '{target}' ends with excluded pattern '{exclude}'") # Break to avoid multiple matches for the same target break if errors: raise ValueError("Found multiple filename validation errors:\n" + "\n".join(errors))
[docs] def _check_targets( self, targets: TargetsType, exclude_targets: Optional[TargetsType] ) -> Tuple[TargetsType, Optional[TargetsType]]: """ Normalize and validate target inputs. Args: targets: Raw monitoring targets input exclude_targets: Raw exclusion targets input Returns: Tuple[TargetsType, TargetsType]: Normalized (targets, exclude_targets) """ if isinstance(targets, str): targets = [targets] if isinstance(exclude_targets, str): exclude_targets = [exclude_targets] return targets, exclude_targets
[docs] def _process_targets(self, targets: Optional[TargetsType]) -> Tuple[dict, Set[str]]: """ Convert heterogeneous targets to structured data model. Args: targets: List of targets Returns: Tuple[dict, Set[str]]: Hierarchical structure and filename targets """ processed_targets: dict = {} filename_targets: Set[str] = set() for target in targets or []: if isinstance(target, str) and target.endswith('.py'): filename_targets.add(target) elif isinstance(target, (str, ModuleType, ClassType, FunctionType, MethodType)): module_path, target_details = self._parse_target(target) existing_details = processed_targets.setdefault(module_path, {}) processed_targets[module_path] = deep_merge(existing_details, target_details) else: log_warn(f"Unsupported target type: {type(target)}") flatten_targets: dict = {} for module_path, target_details in processed_targets.items(): # Flatten the module structure self._flatten_module_structure(module_path, target_details, flatten_targets) return flatten_targets, filename_targets
[docs] def _parse_target(self, target: Union[str, ModuleType, ClassType, FunctionType, MethodType]) -> tuple: """ Parse different target formats into module structure. Args: target: Target specification Returns: tuple: (module_path, parsed_structure) """ if isinstance(target, ModuleType): return self._parse_module(target) if isinstance(target, ClassType): return self._parse_class(target) if isinstance(target, (FunctionType, MethodType)): return self._parse_function(target) return self._parse_string(target)
[docs] def _parse_function(self, func: Union[FunctionType, MethodType]) -> tuple: """Parse function object and create module structure containing this function or method Args: func: Function object to parse Returns: tuple: (module name, module structure containing only this function or method) """ # Check if this is a class method (bound to class) if hasattr(func, '__self__') and isinstance(func.__self__, type): cls = func.__self__ module = inspect.getmodule(cls) module_name = module.__name__ if module else '' return ( module_name, { 'classes': {cls.__name__: {'methods': [func.__name__], 'attributes': [], 'track_all': False}}, 'functions': [], 'globals': [], }, ) # Check if this is a static/class method using qualname (e.g. 'Class.method') if hasattr(func, '__qualname__') and '.' in func.__qualname__: class_name, method_name = func.__qualname__.split('.', 1) module = inspect.getmodule(func) if module and hasattr(module, class_name): cls = getattr(module, class_name) if isinstance(cls, type): module_name = module.__name__ if module else '' return ( module_name, { 'classes': {class_name: {'methods': [method_name], 'attributes': [], 'track_all': False}}, 'functions': [], 'globals': [], }, ) # Regular function handling module = inspect.getmodule(func) module_name = module.__name__ if module else '' function_name = func.__name__ parsed_structure = {'classes': {}, 'functions': [function_name], 'globals': []} return (module_name, parsed_structure)
[docs] def _parse_module(self, module: ModuleType) -> tuple: """Parse module structure using AST analysis. Args: module: Python module object to analyze Returns: tuple: (module_name, parsed_structure) pair """ return (module.__name__, self._parse_module_by_name(module.__name__))
[docs] def _parse_class(self, cls: ClassType) -> tuple: """Parse class object and create module structure containing this class Args: cls: Class object to parse Returns: tuple: (module name, module structure containing only this class) """ module = inspect.getmodule(cls) module_name = module.__name__ if module else '' class_name = cls.__name__ class_details = {'methods': [], 'attributes': [], 'track_all': True} # Flag to track all methods and attributes parsed_structure = {'classes': {class_name: class_details}, 'functions': [], 'globals': []} return (module_name, parsed_structure)
[docs] def _parse_string(self, target: str) -> tuple: """Parse string-formatted target definitions Args: target: Target definition string Returns: tuple: (module_path, parsed_structure) """ # Handle global variable syntax if '::' in target: module_part, _, global_var = target.partition('::') spec = importlib.util.find_spec(module_part) if spec is None: log_warn(f"Module {module_part} not found") return (module_part, {'globals': []}) resolved_module_name = spec.name return (resolved_module_name, {'globals': [global_var.strip()]}) # Split module path and symbol definition module_part, _, symbol = target.partition(':') spec = importlib.util.find_spec(module_part) if spec is None: log_warn(f"Module {module_part} not found") return (module_part, {'classes': {}, 'functions': [], 'globals': []}) resolved_module_name = spec.name full_module = self._parse_module_by_name(resolved_module_name) if not symbol: return (resolved_module_name, full_module) details: dict = {'classes': {}, 'functions': [], 'globals': []} current_symbol = symbol # Parse class members (methods or attributes) if '.' in symbol: class_part, _, member = current_symbol.partition('.') if class_part in full_module['classes']: if member.endswith('()'): method_name = member[:-2] # Directly add method without checking if it exists in class_info details['classes'][class_part] = { 'methods': [method_name], 'attributes': [], 'track_all': False, # Not tracking all, just specific method } else: # Directly add attribute without checking if it exists in class_info details['classes'][class_part] = { 'methods': [], 'attributes': [member], 'track_all': False, # Not tracking all, just specific attribute } else: if current_symbol.endswith('()'): func_name = current_symbol[:-2] if func_name in full_module['functions']: details['functions'].append(func_name) elif current_symbol in full_module['classes']: details['classes'][current_symbol] = { 'methods': [], 'attributes': [], 'track_all': True, # Track entire class } return (resolved_module_name, details)
[docs] def _parse_module_by_name(self, module_name: str, recursive: bool = True) -> dict: """Locate and parse module structure by its import name, supporting recursive parsing. Args: module_name: Full dotted import path (e.g. 'package.module') recursive: Whether to recursively parse submodules Returns: dict: Parsed module structure with submodules if recursive=True """ spec = importlib.util.find_spec(module_name) if spec is None: log_warn(f"Module {module_name} not found") return {'classes': {}, 'functions': [], 'globals': []} # Parse the current module module_structure: dict = {'classes': {}, 'functions': [], 'globals': []} if spec.origin and spec.origin.endswith('.py'): module_structure = self._parse_py_file(spec.origin) # Recursively parse submodules if enabled if recursive and hasattr(spec, 'submodule_search_locations') and spec.submodule_search_locations: submodule_locations = [] for submodule_location in spec.submodule_search_locations: if isinstance(submodule_location, PosixPath): submodule_locations.append(str(submodule_location)) else: submodule_locations.append(submodule_location) for _, submodule_name, is_pkg in pkgutil.iter_modules(submodule_locations): full_submodule_name = f"{module_name}.{submodule_name}" try: submodule_structure = self._parse_module_by_name(full_submodule_name, recursive) # Add submodule structure to current module module_structure[submodule_name] = submodule_structure except Exception as e: log_warn(f"Failed to parse submodule '{full_submodule_name}': {str(e)}") return module_structure
[docs] def _parse_py_file(self, file_path: str) -> dict: """Analyze Python file structure using Abstract Syntax Tree. Args: file_path: Absolute path to Python file Returns: dict: Parsed file structure dictionary Raises: Logs error on parsing failure """ parsed_structure: dict = {'classes': {}, 'functions': [], 'globals': []} try: with open(file_path, 'r', encoding='utf-8') as f: tree = ast.parse(f.read()) set_parents(tree, None) for node in ast.walk(tree): if isinstance(node, ast.ClassDef): class_info = { 'methods': [], 'attributes': [], 'track_all': True, # Flag to track all methods and attributes } parsed_structure['classes'][node.name] = class_info elif isinstance(node, ast.FunctionDef): if not any(isinstance(parent, ast.ClassDef) for parent in iter_parents(node)): parsed_structure['functions'].append(node.name) elif isinstance(node, ast.Assign): self._process_assignment(node, parsed_structure) except Exception as e: log_error(f"Failed to parse {file_path}: {str(e)}") return parsed_structure
[docs] def _extract_class_attributes(self, class_node: ast.ClassDef) -> List[str]: """Extract class attributes from AST node. Includes: - Assignment statements - Annotated assignments """ attrs = [] for node in class_node.body: if isinstance(node, ast.Assign): for target in node.targets: if isinstance(target, ast.Name): attrs.append(target.id) elif isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name): attrs.append(node.target.id) return attrs
[docs] def _flatten_module_structure(self, module_path: str, module_structure: dict, result: dict): """Flatten nested module structure into a dict. Args: module_path: Full module path module_structure: Module structure dictionary result: dict to populate """ # Extract standard sections (classes, functions, globals) standard_sections = { 'classes': module_structure.get('classes', {}), 'functions': module_structure.get('functions', []), 'globals': module_structure.get('globals', []), } # Only add to result if there's content if any([standard_sections['classes'], standard_sections['functions'], standard_sections['globals']]): result[module_path] = standard_sections # Process nested submodules for key, value in module_structure.items(): if key not in ['classes', 'functions', 'globals'] and isinstance(value, dict): submodule_path = f"{module_path}.{key}" self._flatten_module_structure(submodule_path, value, result)
[docs] def _process_assignment(self, node: ast.Assign, result: dict): """Extract global variables from assignment AST nodes. Handles two patterns: 1. Simple assignments: `var = value` 2. Tuple unpacking: `a, b = (1, 2)` Args: node: AST assignment node to analyze result: dict to update with found globals """ if any(isinstance(parent, (ast.ClassDef, ast.FunctionDef)) for parent in iter_parents(node)): return for assign_target in node.targets: if isinstance(assign_target, ast.Name): result['globals'].append(assign_target.id) elif isinstance(assign_target, ast.Tuple): for element in assign_target.elts: if isinstance(element, ast.Name): result['globals'].append(element.id)
[docs] def get_targets(self) -> dict: """Retrieve targets. Returns: dict: Target dictionary containing: - classes: Class methods - functions: Functions - globals: Global variables Example: { 'module.path': { 'classes': { 'ClassName': { 'methods': [...], 'attributes': [...] } }, 'functions': [...], 'globals': [...] } } """ return self.targets
[docs] def get_exclude_targets(self) -> dict: """Retrieve excluded targets. Returns: dict: Exclude dictionary containing: - classes: Excluded class methods - functions: Excluded functions - globals: Excluded global variables Example: { 'module.path': { 'classes': { 'ClassName': { 'methods': [...], 'attributes': [...] } }, 'functions': [...], 'globals': [...] } } """ return self.exclude_targets
[docs] def get_filename_targets(self) -> Set: """Get monitored filesystem paths. Path matching is determined using string.endswith() method. Returns: Set[str]: Paths to Python files being monitored """ return self.filename_targets
[docs] def get_exclude_filename_targets(self) -> Set: """Get monitored excluded filesystem paths. Path matching is determined using string.endswith() method. Returns: Set[str]: Paths to Python files being excluded """ return self.exclude_filename_targets
[docs] @staticmethod def serialize_targets(targets: dict, indent=Constants.LOG_INDENT_LEVEL): """Serialize objects that JSON cannot handle by default. Converts sets to lists, and other objects to their __dict__ or string representation. If the input is a dictionary with more than 8 top-level keys, only the keys are retained with a placeholder value and a warning message is added. Args: indent: Number of spaces for JSON indentation Returns: str: JSON serialized string """ if len(targets) > Constants.MAX_TARGETS_DISPLAY: truncated_obj = {key: "..." for key in targets.keys()} truncated_obj["Warning: too many top-level keys, only showing values like"] = "..." return json.dumps(truncated_obj, indent=indent, default=target_handler) return json.dumps(targets, indent=indent, default=target_handler)