# ---------------------------------------------------------------------------
# This file is based on PyTorch's torch/utils/weak.py with modifications.
# Modifications include removing certain parts of the code and other adjustments.
# Modified by: aeeeeeep
# Modification Date: 2024-12-16
# ---------------------------------------------------------------------------
# mypy: allow-untyped-defs
from __future__ import annotations
from weakref import ref
from _weakrefset import _IterationGuard # type: ignore[attr-defined]
from collections.abc import MutableMapping, Mapping
import collections.abc as _collections_abc
[docs]
class WeakIdRef(ref):
def __init__(self, key, callback=None):
self._ref = ref(key)
super().__init__(key, callback)
def __hash__(self):
return id(self._ref)
def __eq__(self, other):
return self._ref is other._ref
# This is directly adapted from cpython/Lib/weakref.py
[docs]
class WeakIdKeyDictionary(MutableMapping):
def __init__(self, dict=None, ref_type=WeakIdRef): # CHANGED
self.data = {}
self.ref_type = ref_type # CHANGED
def remove(k, selfref=ref(self)):
self = selfref()
if self is not None:
if self._iterating:
self._pending_removals.append(k)
else:
try:
del self.data[k]
except KeyError:
pass
self._remove = remove
# A list of dead weakrefs (keys to be removed)
self._pending_removals = []
self._iterating = set()
self._dirty_len = False
if dict is not None:
self.update(dict)
def _commit_removals(self):
# NOTE: We don't need to call this method before mutating the dict,
# because a dead weakref never compares equal to a live weakref,
# even if they happened to refer to equal objects.
# However, it means keys may already have been removed.
pop = self._pending_removals.pop
d = self.data
while True:
try:
key = pop()
except IndexError:
return
try:
del d[key]
except KeyError:
pass
def _scrub_removals(self):
d = self.data
self._pending_removals = [k for k in self._pending_removals if k in d]
self._dirty_len = False
def __delitem__(self, key):
self._dirty_len = True
del self.data[self.ref_type(key)] # CHANGED
def __getitem__(self, key):
return self.data[self.ref_type(key)] # CHANGED
def __len__(self):
if self._dirty_len and self._pending_removals:
# self._pending_removals may still contain keys which were
# explicitly removed, we have to scrub them (see issue #21173).
self._scrub_removals()
return len(self.data) - len(self._pending_removals)
def __repr__(self):
return f"<{self.__class__.__name__} at {id(self):#x}>"
def __setitem__(self, key, value):
self.data[self.ref_type(key, self._remove)] = value # CHANGED
[docs]
def copy(self):
new = WeakIdKeyDictionary()
with _IterationGuard(self):
for key, value in self.data.items():
o = key()
if o is not None:
new[o] = value
return new
__copy__ = copy
def __deepcopy__(self, memo):
from copy import deepcopy
new = self.__class__()
with _IterationGuard(self):
for key, value in self.data.items():
o = key()
if o is not None:
new[o] = deepcopy(value, memo)
return new
[docs]
def get(self, key, default=None):
return self.data.get(self.ref_type(key), default) # CHANGED
def __contains__(self, key):
try:
wr = self.ref_type(key) # CHANGED
except TypeError:
return False
return wr in self.data
[docs]
def items(self):
with _IterationGuard(self):
for wr, value in self.data.items():
key = wr()
if key is not None:
yield key, value
[docs]
def keys(self):
with _IterationGuard(self):
for wr in self.data:
obj = wr()
if obj is not None:
yield obj
__iter__ = keys
[docs]
def values(self):
with _IterationGuard(self):
for wr, value in self.data.items():
if wr() is not None:
yield value
[docs]
def keyrefs(self):
"""Return a list of weak references to the keys.
The references are not guaranteed to be 'live' at the time
they are used, so the result of calling the references needs
to be checked before being used. This can be used to avoid
creating references that will cause the garbage collector to
keep the keys around longer than needed.
"""
return list(self.data)
[docs]
def popitem(self):
self._dirty_len = True
while True:
key, value = self.data.popitem()
o = key()
if o is not None:
return o, value
[docs]
def pop(self, key, *args):
self._dirty_len = True
return self.data.pop(self.ref_type(key), *args) # CHANGED
[docs]
def setdefault(self, key, default=None):
return self.data.setdefault(self.ref_type(key, self._remove), default) # CHANGED
[docs]
def update(self, dict=None, **kwargs): # type: ignore[override]
d = self.data
if dict is not None:
if not hasattr(dict, "items"):
dict = type({})(dict)
for key, value in dict.items():
d[self.ref_type(key, self._remove)] = value # CHANGED
if len(kwargs):
self.update(kwargs)
def __ior__(self, other):
self.update(other)
return self
def __or__(self, other):
if isinstance(other, _collections_abc.Mapping):
c = self.copy()
c.update(other)
return c
return NotImplemented
def __ror__(self, other):
if isinstance(other, _collections_abc.Mapping):
c = self.__class__()
c.update(other)
c.update(self)
return c
return NotImplemented
# Default Mapping equality will tests keys for equality, but
# we want to test ids for equality
def __eq__(self, other):
if not isinstance(other, Mapping):
return NotImplemented
return {id(k): v for k, v in self.items()} == {id(k): v for k, v in other.items()}