123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490 |
- import asyncio
- import logging
- import threading
- import weakref
- from asgiref.sync import async_to_sync, iscoroutinefunction, sync_to_async
- from django.utils.inspect import func_accepts_kwargs
- logger = logging.getLogger("django.dispatch")
- def _make_id(target):
- if hasattr(target, "__func__"):
- return (id(target.__self__), id(target.__func__))
- return id(target)
- NONE_ID = _make_id(None)
- # A marker for caching
- NO_RECEIVERS = object()
- class Signal:
- """
- Base class for all signals
- Internal attributes:
- receivers
- { receiverkey (id) : weakref(receiver) }
- """
- def __init__(self, use_caching=False):
- """
- Create a new signal.
- """
- self.receivers = []
- self.lock = threading.Lock()
- self.use_caching = use_caching
- # For convenience we create empty caches even if they are not used.
- # A note about caching: if use_caching is defined, then for each
- # distinct sender we cache the receivers that sender has in
- # 'sender_receivers_cache'. The cache is cleaned when .connect() or
- # .disconnect() is called and populated on send().
- self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
- self._dead_receivers = False
- def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
- """
- Connect receiver to sender for signal.
- Arguments:
- receiver
- A function or an instance method which is to receive signals.
- Receivers must be hashable objects. Receivers can be
- asynchronous.
- If weak is True, then receiver must be weak referenceable.
- Receivers must be able to accept keyword arguments.
- If a receiver is connected with a dispatch_uid argument, it
- will not be added if another receiver was already connected
- with that dispatch_uid.
- sender
- The sender to which the receiver should respond. Must either be
- a Python object, or None to receive events from any sender.
- weak
- Whether to use weak references to the receiver. By default, the
- module will attempt to use weak references to the receiver
- objects. If this parameter is false, then strong references will
- be used.
- dispatch_uid
- An identifier used to uniquely identify a particular instance of
- a receiver. This will usually be a string, though it may be
- anything hashable.
- """
- from django.conf import settings
- # If DEBUG is on, check that we got a good receiver
- if settings.configured and settings.DEBUG:
- if not callable(receiver):
- raise TypeError("Signal receivers must be callable.")
- # Check for **kwargs
- if not func_accepts_kwargs(receiver):
- raise ValueError(
- "Signal receivers must accept keyword arguments (**kwargs)."
- )
- if dispatch_uid:
- lookup_key = (dispatch_uid, _make_id(sender))
- else:
- lookup_key = (_make_id(receiver), _make_id(sender))
- is_async = iscoroutinefunction(receiver)
- if weak:
- ref = weakref.ref
- receiver_object = receiver
- # Check for bound methods
- if hasattr(receiver, "__self__") and hasattr(receiver, "__func__"):
- ref = weakref.WeakMethod
- receiver_object = receiver.__self__
- receiver = ref(receiver)
- weakref.finalize(receiver_object, self._remove_receiver)
- with self.lock:
- self._clear_dead_receivers()
- if not any(r_key == lookup_key for r_key, _, _ in self.receivers):
- self.receivers.append((lookup_key, receiver, is_async))
- self.sender_receivers_cache.clear()
- def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
- """
- Disconnect receiver from sender for signal.
- If weak references are used, disconnect need not be called. The receiver
- will be removed from dispatch automatically.
- Arguments:
- receiver
- The registered receiver to disconnect. May be none if
- dispatch_uid is specified.
- sender
- The registered sender to disconnect
- dispatch_uid
- the unique identifier of the receiver to disconnect
- """
- if dispatch_uid:
- lookup_key = (dispatch_uid, _make_id(sender))
- else:
- lookup_key = (_make_id(receiver), _make_id(sender))
- disconnected = False
- with self.lock:
- self._clear_dead_receivers()
- for index in range(len(self.receivers)):
- r_key, *_ = self.receivers[index]
- if r_key == lookup_key:
- disconnected = True
- del self.receivers[index]
- break
- self.sender_receivers_cache.clear()
- return disconnected
- def has_listeners(self, sender=None):
- sync_receivers, async_receivers = self._live_receivers(sender)
- return bool(sync_receivers) or bool(async_receivers)
- def send(self, sender, **named):
- """
- Send signal from sender to all connected receivers.
- If any receiver raises an error, the error propagates back through send,
- terminating the dispatch loop. So it's possible that all receivers
- won't be called if an error is raised.
- If any receivers are asynchronous, they are called after all the
- synchronous receivers via a single call to async_to_sync(). They are
- also executed concurrently with asyncio.gather().
- Arguments:
- sender
- The sender of the signal. Either a specific object or None.
- named
- Named arguments which will be passed to receivers.
- Return a list of tuple pairs [(receiver, response), ... ].
- """
- if (
- not self.receivers
- or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
- ):
- return []
- responses = []
- sync_receivers, async_receivers = self._live_receivers(sender)
- for receiver in sync_receivers:
- response = receiver(signal=self, sender=sender, **named)
- responses.append((receiver, response))
- if async_receivers:
- async def asend():
- async_responses = await asyncio.gather(
- *(
- receiver(signal=self, sender=sender, **named)
- for receiver in async_receivers
- )
- )
- return zip(async_receivers, async_responses)
- responses.extend(async_to_sync(asend)())
- return responses
- async def asend(self, sender, **named):
- """
- Send signal from sender to all connected receivers in async mode.
- All sync receivers will be wrapped by sync_to_async()
- If any receiver raises an error, the error propagates back through
- send, terminating the dispatch loop. So it's possible that all
- receivers won't be called if an error is raised.
- If any receivers are synchronous, they are grouped and called behind a
- sync_to_async() adaption before executing any asynchronous receivers.
- If any receivers are asynchronous, they are grouped and executed
- concurrently with asyncio.gather().
- Arguments:
- sender
- The sender of the signal. Either a specific object or None.
- named
- Named arguments which will be passed to receivers.
- Return a list of tuple pairs [(receiver, response), ...].
- """
- if (
- not self.receivers
- or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
- ):
- return []
- sync_receivers, async_receivers = self._live_receivers(sender)
- if sync_receivers:
- @sync_to_async
- def sync_send():
- responses = []
- for receiver in sync_receivers:
- response = receiver(signal=self, sender=sender, **named)
- responses.append((receiver, response))
- return responses
- else:
- sync_send = list
- responses, async_responses = await asyncio.gather(
- sync_send(),
- asyncio.gather(
- *(
- receiver(signal=self, sender=sender, **named)
- for receiver in async_receivers
- )
- ),
- )
- responses.extend(zip(async_receivers, async_responses))
- return responses
- def _log_robust_failure(self, receiver, err):
- logger.error(
- "Error calling %s in Signal.send_robust() (%s)",
- receiver.__qualname__,
- err,
- exc_info=err,
- )
- def send_robust(self, sender, **named):
- """
- Send signal from sender to all connected receivers catching errors.
- If any receivers are asynchronous, they are called after all the
- synchronous receivers via a single call to async_to_sync(). They are
- also executed concurrently with asyncio.gather().
- Arguments:
- sender
- The sender of the signal. Can be any Python object (normally one
- registered with a connect if you actually want something to
- occur).
- named
- Named arguments which will be passed to receivers.
- Return a list of tuple pairs [(receiver, response), ... ].
- If any receiver raises an error (specifically any subclass of
- Exception), return the error instance as the result for that receiver.
- """
- if (
- not self.receivers
- or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
- ):
- return []
- # Call each receiver with whatever arguments it can accept.
- # Return a list of tuple pairs [(receiver, response), ... ].
- responses = []
- sync_receivers, async_receivers = self._live_receivers(sender)
- for receiver in sync_receivers:
- try:
- response = receiver(signal=self, sender=sender, **named)
- except Exception as err:
- self._log_robust_failure(receiver, err)
- responses.append((receiver, err))
- else:
- responses.append((receiver, response))
- if async_receivers:
- async def asend_and_wrap_exception(receiver):
- try:
- response = await receiver(signal=self, sender=sender, **named)
- except Exception as err:
- self._log_robust_failure(receiver, err)
- return err
- return response
- async def asend():
- async_responses = await asyncio.gather(
- *(
- asend_and_wrap_exception(receiver)
- for receiver in async_receivers
- )
- )
- return zip(async_receivers, async_responses)
- responses.extend(async_to_sync(asend)())
- return responses
- async def asend_robust(self, sender, **named):
- """
- Send signal from sender to all connected receivers catching errors.
- If any receivers are synchronous, they are grouped and called behind a
- sync_to_async() adaption before executing any asynchronous receivers.
- If any receivers are asynchronous, they are grouped and executed
- concurrently with asyncio.gather.
- Arguments:
- sender
- The sender of the signal. Can be any Python object (normally one
- registered with a connect if you actually want something to
- occur).
- named
- Named arguments which will be passed to receivers.
- Return a list of tuple pairs [(receiver, response), ... ].
- If any receiver raises an error (specifically any subclass of
- Exception), return the error instance as the result for that receiver.
- """
- if (
- not self.receivers
- or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
- ):
- return []
- # Call each receiver with whatever arguments it can accept.
- # Return a list of tuple pairs [(receiver, response), ... ].
- sync_receivers, async_receivers = self._live_receivers(sender)
- if sync_receivers:
- @sync_to_async
- def sync_send():
- responses = []
- for receiver in sync_receivers:
- try:
- response = receiver(signal=self, sender=sender, **named)
- except Exception as err:
- self._log_robust_failure(receiver, err)
- responses.append((receiver, err))
- else:
- responses.append((receiver, response))
- return responses
- else:
- sync_send = list
- async def asend_and_wrap_exception(receiver):
- try:
- response = await receiver(signal=self, sender=sender, **named)
- except Exception as err:
- self._log_robust_failure(receiver, err)
- return err
- return response
- responses, async_responses = await asyncio.gather(
- sync_send(),
- asyncio.gather(
- *(asend_and_wrap_exception(receiver) for receiver in async_receivers),
- ),
- )
- responses.extend(zip(async_receivers, async_responses))
- return responses
- def _clear_dead_receivers(self):
- # Note: caller is assumed to hold self.lock.
- if self._dead_receivers:
- self._dead_receivers = False
- self.receivers = [
- r
- for r in self.receivers
- if not (isinstance(r[1], weakref.ReferenceType) and r[1]() is None)
- ]
- def _live_receivers(self, sender):
- """
- Filter sequence of receivers to get resolved, live receivers.
- This checks for weak references and resolves them, then returning only
- live receivers.
- """
- receivers = None
- if self.use_caching and not self._dead_receivers:
- receivers = self.sender_receivers_cache.get(sender)
- # We could end up here with NO_RECEIVERS even if we do check this case in
- # .send() prior to calling _live_receivers() due to concurrent .send() call.
- if receivers is NO_RECEIVERS:
- return [], []
- if receivers is None:
- with self.lock:
- self._clear_dead_receivers()
- senderkey = _make_id(sender)
- receivers = []
- for (_receiverkey, r_senderkey), receiver, is_async in self.receivers:
- if r_senderkey == NONE_ID or r_senderkey == senderkey:
- receivers.append((receiver, is_async))
- if self.use_caching:
- if not receivers:
- self.sender_receivers_cache[sender] = NO_RECEIVERS
- else:
- # Note, we must cache the weakref versions.
- self.sender_receivers_cache[sender] = receivers
- non_weak_sync_receivers = []
- non_weak_async_receivers = []
- for receiver, is_async in receivers:
- if isinstance(receiver, weakref.ReferenceType):
- # Dereference the weak reference.
- receiver = receiver()
- if receiver is not None:
- if is_async:
- non_weak_async_receivers.append(receiver)
- else:
- non_weak_sync_receivers.append(receiver)
- else:
- if is_async:
- non_weak_async_receivers.append(receiver)
- else:
- non_weak_sync_receivers.append(receiver)
- return non_weak_sync_receivers, non_weak_async_receivers
- def _remove_receiver(self, receiver=None):
- # Mark that the self.receivers list has dead weakrefs. If so, we will
- # clean those up in connect, disconnect and _live_receivers while
- # holding self.lock. Note that doing the cleanup here isn't a good
- # idea, _remove_receiver() will be called as side effect of garbage
- # collection, and so the call can happen while we are already holding
- # self.lock.
- self._dead_receivers = True
- def receiver(signal, **kwargs):
- """
- A decorator for connecting receivers to signals. Used by passing in the
- signal (or list of signals) and keyword arguments to connect::
- @receiver(post_save, sender=MyModel)
- def signal_receiver(sender, **kwargs):
- ...
- @receiver([post_save, post_delete], sender=MyModel)
- def signals_receiver(sender, **kwargs):
- ...
- """
- def _decorator(func):
- if isinstance(signal, (list, tuple)):
- for s in signal:
- s.connect(func, **kwargs)
- else:
- signal.connect(func, **kwargs)
- return func
- return _decorator
|