Source code for objwatch.tracer

# MIT License
# Copyright (c) 2025 aeeeeeep

import sys
from functools import lru_cache
from types import FrameType
from typing import Optional, Any, Dict, Set

from .constants import Constants
from .config import ObjWatchConfig
from .targets import Targets
from .wrappers import ABCWrapper
from .events import EventType
from .event_handls import EventHandls
from .mp_handls import MPHandls
from .utils.weak import WeakIdKeyDictionary
from .utils.logger import log_info, log_error
from .runtime_info import runtime_info


[docs] class Tracer: """ Tracer class to monitor and trace function calls, returns, and variable updates within specified target modules. """
[docs] def __init__( self, config: ObjWatchConfig, ) -> None: """ Initialize the Tracer with configuration parameters. Args: config (ObjWatchConfig): Configuration parameters for ObjWatch. """ self.config = config if self.config.with_locals: self.tracked_locals: Dict[FrameType, dict] = {} self.tracked_locals_lens: Dict[FrameType, Dict[str, int]] = {} if self.config.with_globals: self.tracked_globals: Dict[FrameType, dict] = {} self.tracked_globals_lens: Dict[FrameType, Dict[str, int]] = {} # List of Python built-in fields to exclude from tracking self.builtin_fields = set(dir(__builtins__)) | { 'self', '__builtins__', '__name__', '__package__', '__loader__', '__spec__', '__file__', '__cached__', } # Process and determine the set of target files to monitor targets_cls = Targets(self.config.targets, self.config.exclude_targets) self.filename_targets: Set = targets_cls.get_filename_targets() self.exclude_filename_targets: Set = targets_cls.get_exclude_filename_targets() self.targets: dict = targets_cls.get_targets() self.exclude_targets: dict = targets_cls.get_exclude_targets() self._build_target_index() self._build_exclude_target_index() # Load the function wrapper if provided self.abc_wrapper: Optional[ABCWrapper] = self.load_wrapper(self.config.wrapper) # Initialize multi-process handler with the specified framework self.mp_handlers: MPHandls = MPHandls(framework=self.config.framework) self.index_info: str = "" self.current_index: Optional[int] = None self.indexes: Set[int] = set(self.config.indexes if self.config.indexes is not None else [0])
[docs] def _initialize_tracking_state(self) -> None: """ Initialize all tracking state including dictionaries, handlers, and counters. """ # Initialize event handlers with optional JSON output self.event_handlers: EventHandls = EventHandls(config=self.config) # Initialize tracking dictionaries for objects self.tracked_objects: WeakIdKeyDictionary = WeakIdKeyDictionary() self.tracked_objects_lens: WeakIdKeyDictionary = WeakIdKeyDictionary() # Initialize last line numbers dictionary for tracking previous line in line events self.last_linenos: Dict[FrameType, int] = {} # Initialize call depth tracker self._call_depth: int = 0
@property def call_depth(self) -> int: return self._call_depth @call_depth.setter def call_depth(self, value: int) -> None: if value < 0: raise ValueError( "call_depth cannot be negative. " f"Received invalid value: {value}. " "This indicates a potential issue in the call stack tracking logic. " "Please report this issue to the developers with the traceback information." ) self._call_depth = value
[docs] def _build_target_index(self): """Build fast lookup indexes for monitoring targets.""" self.module_index = set(self.targets.keys()) self.class_index = {} self.method_index = {} self.attribute_index = {} self.function_index = {} self.global_index = {} self.class_info = {} # Store class info for track_all checking # Build indexes for track_all scenarios for module, details in self.targets.items(): # Process classes classes = details.get('classes', {}) for cls_name, cls_info in classes.items(): # Add class to class index self.class_index.setdefault(module, set()).add(cls_name) # Store class info for track_all checking self.class_info.setdefault(module, {})[cls_name] = cls_info # Process methods (only if not tracking all) if not cls_info.get('track_all', False): methods = cls_info.get('methods', []) if methods: class_methods = self.method_index.setdefault(module, {}).setdefault(cls_name, set()) class_methods.update(methods) # Process attributes (only if not tracking all) if not cls_info.get('track_all', False): attributes = cls_info.get('attributes', []) if attributes: class_attrs = self.attribute_index.setdefault(module, {}).setdefault(cls_name, set()) class_attrs.update(attributes) # Process functions for func in details.get('functions', []): self.function_index.setdefault(module, set()).add(func) # Process globals for gvar in details.get('globals', []): self.global_index.setdefault(module, set()).add(gvar) self.index_map = { 'class': self.class_index, 'method': self.method_index, 'attribute': self.attribute_index, 'function': self.function_index, 'global': self.global_index, }
[docs] def _build_exclude_target_index(self): """Build fast lookup indexes for exclusion targets.""" self.exclude_module_index = set(self.exclude_targets.keys()) self.exclude_class_index = {} self.exclude_method_index = {} self.exclude_attribute_index = {} self.exclude_function_index = {} self.exclude_global_index = {} self.exclude_class_info = {} # Store exclude class info for track_all checking # Build indexes for exclusion targets for module, details in self.exclude_targets.items(): # Process excluded classes classes = details.get('classes', {}) for cls_name, cls_info in classes.items(): # Add class to exclude class index self.exclude_class_index.setdefault(module, set()).add(cls_name) # Store exclude class info for track_all checking self.exclude_class_info.setdefault(module, {})[cls_name] = cls_info # Process excluded methods methods = cls_info.get('methods', []) if methods: class_methods = self.exclude_method_index.setdefault(module, {}).setdefault(cls_name, set()) class_methods.update(methods) # Process excluded attributes attributes = cls_info.get('attributes', []) if attributes: class_attrs = self.exclude_attribute_index.setdefault(module, {}).setdefault(cls_name, set()) class_attrs.update(attributes) # Process excluded functions for func in details.get('functions', []): self.exclude_function_index.setdefault(module, set()).add(func) # Process excluded globals for gvar in details.get('globals', []): self.exclude_global_index.setdefault(module, set()).add(gvar) self.exclude_index_map = { 'class': self.exclude_class_index, 'method': self.exclude_method_index, 'attribute': self.exclude_attribute_index, 'function': self.exclude_function_index, 'global': self.exclude_global_index, }
[docs] def load_wrapper(self, wrapper): """ Load a custom function wrapper if provided. Args: wrapper: The custom wrapper to load. Returns: The initialized wrapper or None. """ if wrapper: if issubclass(wrapper, ABCWrapper): return wrapper() log_error(f"wrapper '{wrapper.__name__}' is not a subclass of ABCWrapper") raise ValueError(f"wrapper '{wrapper.__name__}' is not a subclass of ABCWrapper") return None
[docs] @lru_cache(maxsize=sys.maxsize) def _should_trace_module(self, module: str) -> bool: """Check if a module is within monitoring scope. Args: module (str): Full module name to check Returns: bool: True if the module is in monitoring targets """ return module in self.module_index and module not in self.exclude_module_index
[docs] @lru_cache(maxsize=sys.maxsize) def _should_trace_class(self, module: str, class_name: str) -> bool: """Check if a specific class should be traced. Args: module (str): Parent module name class_name (str): Class name to check Returns: bool: True if the class should be traced """ return class_name in self.class_index.get(module, set()) and class_name not in self.exclude_class_index.get( module, set() )
[docs] @lru_cache(maxsize=sys.maxsize) def _should_trace_method(self, module: str, class_name: str, method_name: str) -> bool: """Check if a specific method should be traced. Args: module (str): Parent module name class_name (str): Class name containing the method method_name (str): Method name to check Returns: bool: True if the method should be traced """ # Check if tracking all methods for this class class_info = self.class_info.get(module, {}).get(class_name, {}) if class_info.get('track_all', False): # Check if this method is excluded excluded_methods = self.exclude_method_index.get(module, {}).get(class_name, set()) return method_name not in excluded_methods return method_name in self.method_index.get(module, {}).get(class_name, set())
[docs] @lru_cache(maxsize=sys.maxsize) def _should_trace_attribute(self, module: str, class_name: str, attr_name: str) -> bool: """Check if a specific attribute should be traced. Args: module (str): Parent module name class_name (str): Class name containing the attribute attr_name (str): Attribute name to check Returns: bool: True if the attribute should be traced """ # Check if tracking all attributes for this class class_info = self.class_info.get(module, {}).get(class_name, {}) if class_info.get('track_all', False): # Check if this attribute is excluded excluded_attrs = self.exclude_attribute_index.get(module, {}).get(class_name, set()) return attr_name not in excluded_attrs return attr_name in self.attribute_index.get(module, {}).get(class_name, set())
[docs] @lru_cache(maxsize=sys.maxsize) def _should_trace_function(self, module: str, func_name: str) -> bool: """Check if a specific function should be traced. Args: module (str): Parent module name func_name (str): Function name to check Returns: bool: True if the function should be traced """ return func_name in self.function_index.get(module, set()) and func_name not in self.exclude_function_index.get( module, set() )
[docs] @lru_cache(maxsize=sys.maxsize) def _should_trace_global(self, module: str, global_name: str) -> bool: """Check if a specific global variable should be traced. Args: module (str): Parent module name global_name (str): Global variable name to check Returns: bool: True if the global variable should be traced """ if not self.config.with_globals: return False if not self.global_index: return global_name not in self.builtin_fields return global_name in self.global_index.get(module, set()) and global_name not in self.exclude_global_index.get( module, set() )
[docs] @lru_cache(maxsize=sys.maxsize) def _filename_endswith(self, filename: str) -> bool: """ Check if the filename does not end with any of the target extensions. Args: filename (str): The filename to check. Returns: bool: True if the filename does not end with the target extensions, False otherwise. """ return filename.endswith(tuple(self.filename_targets)) and not filename.endswith( tuple(self.exclude_filename_targets) )
[docs] def _should_trace_frame(self, frame: FrameType) -> bool: """Determine if a stack frame should be traced. Args: frame (FrameType): Execution frame to evaluate Returns: bool: True if tracing should occur for this frame """ # Check if file extension matches target patterns if self._filename_endswith(frame.f_code.co_filename): return True module = frame.f_globals.get('__name__', '') # Check if module is in tracing targets if not self._should_trace_module(module): return False # Handle class methods and attributes if 'self' in frame.f_locals: cls_name = frame.f_locals['self'].__class__.__name__ method_name = frame.f_code.co_name # Check if class is traced and method is traced class_is_traced = self._should_trace_class(module, cls_name) method_is_traced = self._should_trace_method(module, cls_name, method_name) # If method is traced, no need to check attributes if class_is_traced and method_is_traced: return True # Check if any attribute is traced if class_is_traced: obj = frame.f_locals['self'] current_attrs = {k: v for k, v in obj.__dict__.items() if not callable(v)} any_attr_traced = any( self._should_trace_attribute(module, cls_name, attr) for attr in current_attrs.keys() ) return any_attr_traced # Check if the method has been monkey-patched if not method_is_traced and not class_is_traced: if self._should_trace_function(module, method_name): return True return False # Handle regular functions func_name = frame.f_code.co_name if self._should_trace_function(module, func_name): return True # Check for global variable changes return self._check_global_changes(frame)
[docs] def _check_global_changes(self, frame: FrameType) -> bool: """Detect monitored global variables in current frame. Args: frame (FrameType): Execution frame containing globals Returns: bool: True if any tracked global variables exist """ module_name = frame.f_globals.get('__name__', '') if not self.global_index and self.config.with_globals: return any(var not in self.builtin_fields for var in frame.f_globals.keys()) if not self.config.with_globals: return False return bool(self.global_index.get(module_name))
[docs] def _update_objects_lens(self, frame: FrameType) -> None: """ Update tracked objects' sequence-type attribute lengths. Args: frame (FrameType): Current stack frame to inspect. """ if 'self' in frame.f_locals: obj = frame.f_locals['self'] if hasattr(obj, '__dict__') and hasattr(obj.__class__, '__weakref__'): attrs: dict = {k: v for k, v in obj.__dict__.items() if not callable(v)} if obj not in self.tracked_objects: self.tracked_objects[obj] = attrs if obj not in self.tracked_objects_lens: self.tracked_objects_lens[obj] = {} for k, v in attrs.items(): if isinstance(v, Constants.LOG_SEQUENCE_TYPES): self.tracked_objects_lens[obj][k] = len(v)
[docs] def _get_function_info(self, frame: FrameType) -> dict: """ Extract information about the currently executing function. Args: frame (FrameType): The current stack frame. Returns: dict: Dictionary containing function information. """ func_info = {} module = frame.f_globals.get('__name__', '') if 'self' in frame.f_locals: cls = frame.f_locals['self'].__class__.__name__ func_name = f"{cls}.{frame.f_code.co_name}" symbol_type = 'method' if self._should_trace_method(module, cls, func_name) else None else: func_name = frame.f_code.co_name symbol_type = 'function' if self._should_trace_function(module, func_name) else None func_info.update( { 'module': module, 'symbol': func_name, 'symbol_type': symbol_type, 'qualified_name': f"{module}.{func_name}" if module else func_name, 'frame': frame, } ) return func_info
[docs] def _handle_change_type( self, lineno: int, class_name: str, key: str, old_value: Optional[Any], current_value: Any, old_value_len: Optional[int], current_value_len: Optional[int], ) -> None: """ Helper function to handle the change type for both object attributes and local variables. Args: lineno (int): Line number where the change occurred. class_name (str): Class name if the change relates to an object attribute. key (str): The key (variable or attribute) being tracked. old_value (Optional[Any]): The old value of the variable or attribute. current_value (Any): The current value of the variable or attribute. old_value_len (Optional[int]): The length of the old value (if applicable). current_value_len (Optional[int]): The length of the current value (if applicable). """ if old_value_len is not None and current_value_len is not None: change_type: Optional[EventType] = ( self.event_handlers.determine_change_type(old_value_len, current_value_len) if old_value_len is not None else EventType.UPD ) else: change_type = EventType.UPD if id(old_value) == id(current_value): if change_type == EventType.APD: self.event_handlers.handle_apd( lineno, class_name, key, type(current_value), old_value_len, current_value_len, self.call_depth, self.index_info, ) elif change_type == EventType.POP: self.event_handlers.handle_pop( lineno, class_name, key, type(current_value), old_value_len, current_value_len, self.call_depth, self.index_info, ) elif change_type == EventType.UPD: self.event_handlers.handle_upd( lineno, class_name, key, old_value, current_value, self.call_depth, self.index_info, self.abc_wrapper, )
[docs] def _track_object_change(self, frame: FrameType, lineno: int): """ Handle changes in object attributes and track updates. Args: frame (FrameType): The current stack frame. lineno (int): The line number where the change occurred. """ if 'self' not in frame.f_locals: return obj = frame.f_locals['self'] class_name = obj.__class__.__name__ should_trace_all_attrs = self._filename_endswith(frame.f_code.co_filename) if obj in self.tracked_objects: old_attrs = self.tracked_objects[obj] old_attrs_lens = self.tracked_objects_lens[obj] module_name = frame.f_globals.get('__name__', '') current_attrs = {k: v for k, v in obj.__dict__.items() if not callable(v)} for key, current_value in current_attrs.items(): if not (should_trace_all_attrs or self._should_trace_attribute(module_name, class_name, key)): continue old_value = old_attrs.get(key, None) old_value_len = old_attrs_lens.get(key, None) is_current_seq = isinstance(current_value, Constants.LOG_SEQUENCE_TYPES) current_value_len = len(current_value) if old_value_len is not None and is_current_seq else None self._handle_change_type( lineno, class_name, key, old_value, current_value, old_value_len, current_value_len, ) old_attrs[key] = current_value if is_current_seq: self.tracked_objects_lens[obj][key] = len(current_value)
[docs] def _track_locals_change(self, frame: FrameType, lineno: int): """ Handle changes in local variables and track updates. Args: frame (FrameType): The current stack frame. lineno (int): The line number where the change occurred. """ if not self.config.with_locals or frame not in self.tracked_locals: return old_locals = self.tracked_locals[frame] current_locals = {k: v for k, v in frame.f_locals.items() if k != 'self' and not callable(v)} old_locals_lens = self.tracked_locals_lens[frame] added_vars = set(current_locals.keys()) - set(old_locals.keys()) for var in added_vars: current_local = current_locals[var] self.event_handlers.handle_upd( lineno, class_name=Constants.HANDLE_LOCALS_SYMBOL, key=var, old_value=None, current_value=current_local, call_depth=self.call_depth, index_info=self.index_info, abc_wrapper=self.abc_wrapper, ) if isinstance(current_local, Constants.LOG_SEQUENCE_TYPES): self.tracked_locals_lens[frame][var] = len(current_local) common_vars = set(old_locals.keys()) & set(current_locals.keys()) for var in common_vars: old_local = old_locals[var] old_local_len = old_locals_lens.get(var, None) current_local = current_locals[var] is_current_seq = isinstance(current_local, Constants.LOG_SEQUENCE_TYPES) current_local_len = len(current_local) if old_local_len is not None and is_current_seq else None self._handle_change_type( lineno, Constants.HANDLE_LOCALS_SYMBOL, var, old_local, current_local, old_local_len, current_local_len ) if is_current_seq: self.tracked_locals_lens[frame][var] = len(current_local) self.tracked_locals[frame] = current_locals
[docs] def _track_globals_change(self, frame: FrameType, lineno: int): """ Handle changes in global variables and track updates. Args: frame (FrameType): The current stack frame. lineno (int): The line number where the change occurred. """ global_vars = frame.f_globals module_name = frame.f_globals.get('__name__', '') if not self.config.with_globals: return if module_name not in self.tracked_globals: self.tracked_globals[module_name] = {} if module_name not in self.tracked_globals_lens: self.tracked_globals_lens[module_name] = {} for key, current_value in list(global_vars.items()): if not self._should_trace_global(module_name, key): continue old_value = self.tracked_globals[module_name].get(key, None) old_value_len = self.tracked_globals_lens[module_name].get(key, None) is_current_seq = isinstance(current_value, Constants.LOG_SEQUENCE_TYPES) current_value_len = len(current_value) if old_value_len is not None and is_current_seq else None self._handle_change_type( lineno, Constants.HANDLE_GLOBALS_SYMBOL, key, old_value, current_value, old_value_len, current_value_len ) self.tracked_globals[module_name][key] = current_value if is_current_seq: self.tracked_globals_lens[module_name][key] = len(current_value)
[docs] def trace_factory(self): # noqa: C901 """ Create the tracing function to be used with sys.settrace. Returns: The trace function. """ def trace_func(frame: FrameType, event: str, arg: Any): """ This function is the actual trace function used by sys.settrace. It is called for every event (e.g., call, return, line) during code execution. Args: frame (FrameType): The current stack frame. event (str): The type of event ('call', 'return', or 'line'). arg (Any): The argument for the event (e.g., return value for 'return'). Returns: Returns the trace function itself to continue tracing. """ # Skip frames that do not match the filename condition if not self._should_trace_frame(frame): return trace_func if self.current_index is None: # Check if multi-process framework is initialized and set the current process index if self.mp_handlers.is_initialized(): self.current_index = self.mp_handlers.get_index() self.index_info = f"[#{self.current_index}] " elif self.current_index not in self.indexes: # Skip tracing for processes that are not part of the tracked indexes return trace_func if event == "call": # Handle function call event lineno = frame.f_back.f_lineno if frame.f_back else frame.f_lineno func_info = self._get_function_info(frame) self._update_objects_lens(frame) self.event_handlers.handle_run(lineno, func_info, self.abc_wrapper, self.call_depth, self.index_info) self.call_depth += 1 # Track local variables if needed if self.config.with_locals: local_vars: dict = {k: v for k, v in frame.f_locals.items() if k != 'self' and not callable(v)} self.tracked_locals[frame] = local_vars self.tracked_locals_lens[frame] = {} for var, value in local_vars.items(): if isinstance(value, Constants.LOG_SEQUENCE_TYPES): self.tracked_locals_lens[frame][var] = len(value) return trace_func elif event == "return": # Handle function return event lineno = frame.f_back.f_lineno if frame.f_back else frame.f_lineno self.call_depth -= 1 func_info = self._get_function_info(frame) self._update_objects_lens(frame) self.event_handlers.handle_end( lineno, func_info, self.abc_wrapper, self.call_depth, self.index_info, arg ) # Clean up local tracking after function return if self.config.with_locals and frame in self.tracked_locals: del self.tracked_locals[frame] del self.tracked_locals_lens[frame] # Clean up last lineno tracking if frame in self.last_linenos: del self.last_linenos[frame] return trace_func elif event == "line": # Handle line event (track changes at each line of code) # Get previous line number instead of current line if frame in self.last_linenos: lineno = self.last_linenos[frame] else: # First line event for this frame, use current line as fallback lineno = frame.f_lineno # Update last lineno for next line event self.last_linenos[frame] = frame.f_lineno self._track_object_change(frame, lineno) self._track_locals_change(frame, lineno) self._track_globals_change(frame, lineno) return trace_func return trace_func return trace_func
[docs] def log_metainfo_with_format(self) -> None: """Log metainfo in formatted view.""" # Table header with version information header = [ "=" * 80, "# ObjWatch Log", f"> Version: {runtime_info.version}", f"> Start Time: {runtime_info.start_time}", f"> System Info: {runtime_info.system_info}", f"> Python Version: {runtime_info.python_version}", ] # Config section config_section = [] if self.config: config_section = ["\n## Config:", str(self.config)] # Targets section targets_section = [ "\n## Targets:", Targets.serialize_targets(self.targets), ] # Filename targets section filename_targets_section = [ "\n## Filename Targets:", ] if self.filename_targets: for target in sorted(self.filename_targets): filename_targets_section.append(f"* {target}") else: filename_targets_section.append("* None") # Exclude filename targets section exclude_filename_targets_section = [ "\n## Exclude Filename Targets:", ] if self.exclude_filename_targets: for target in sorted(self.exclude_filename_targets): exclude_filename_targets_section.append(f"* {target}") else: exclude_filename_targets_section.append("* None") # Footer footer = ["=" * 80] # Combine all sections and log log_content = "\n".join( header + config_section + targets_section + filename_targets_section + exclude_filename_targets_section + footer ) log_info(log_content)
[docs] def start(self) -> None: """ Start the tracing process by setting the trace function. """ # Format and logging all metainfo self.log_metainfo_with_format() # Initialize tracking dictionaries self._initialize_tracking_state() sys.settrace(self.trace_factory()) self.mp_handlers.sync()
[docs] def stop(self) -> None: """ Stop the tracing process by removing the trace function and saving JSON logs. """ sys.settrace(None) self.event_handlers.save_json()