dispatcher.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. import asyncio
  2. import logging
  3. import threading
  4. import weakref
  5. from asgiref.sync import async_to_sync, iscoroutinefunction, sync_to_async
  6. from django.utils.inspect import func_accepts_kwargs
  7. logger = logging.getLogger("django.dispatch")
  8. def _make_id(target):
  9. if hasattr(target, "__func__"):
  10. return (id(target.__self__), id(target.__func__))
  11. return id(target)
  12. NONE_ID = _make_id(None)
  13. # A marker for caching
  14. NO_RECEIVERS = object()
  15. class Signal:
  16. """
  17. Base class for all signals
  18. Internal attributes:
  19. receivers
  20. { receiverkey (id) : weakref(receiver) }
  21. """
  22. def __init__(self, use_caching=False):
  23. """
  24. Create a new signal.
  25. """
  26. self.receivers = []
  27. self.lock = threading.Lock()
  28. self.use_caching = use_caching
  29. # For convenience we create empty caches even if they are not used.
  30. # A note about caching: if use_caching is defined, then for each
  31. # distinct sender we cache the receivers that sender has in
  32. # 'sender_receivers_cache'. The cache is cleaned when .connect() or
  33. # .disconnect() is called and populated on send().
  34. self.sender_receivers_cache = weakref.WeakKeyDictionary() if use_caching else {}
  35. self._dead_receivers = False
  36. def connect(self, receiver, sender=None, weak=True, dispatch_uid=None):
  37. """
  38. Connect receiver to sender for signal.
  39. Arguments:
  40. receiver
  41. A function or an instance method which is to receive signals.
  42. Receivers must be hashable objects. Receivers can be
  43. asynchronous.
  44. If weak is True, then receiver must be weak referenceable.
  45. Receivers must be able to accept keyword arguments.
  46. If a receiver is connected with a dispatch_uid argument, it
  47. will not be added if another receiver was already connected
  48. with that dispatch_uid.
  49. sender
  50. The sender to which the receiver should respond. Must either be
  51. a Python object, or None to receive events from any sender.
  52. weak
  53. Whether to use weak references to the receiver. By default, the
  54. module will attempt to use weak references to the receiver
  55. objects. If this parameter is false, then strong references will
  56. be used.
  57. dispatch_uid
  58. An identifier used to uniquely identify a particular instance of
  59. a receiver. This will usually be a string, though it may be
  60. anything hashable.
  61. """
  62. from django.conf import settings
  63. # If DEBUG is on, check that we got a good receiver
  64. if settings.configured and settings.DEBUG:
  65. if not callable(receiver):
  66. raise TypeError("Signal receivers must be callable.")
  67. # Check for **kwargs
  68. if not func_accepts_kwargs(receiver):
  69. raise ValueError(
  70. "Signal receivers must accept keyword arguments (**kwargs)."
  71. )
  72. if dispatch_uid:
  73. lookup_key = (dispatch_uid, _make_id(sender))
  74. else:
  75. lookup_key = (_make_id(receiver), _make_id(sender))
  76. is_async = iscoroutinefunction(receiver)
  77. if weak:
  78. ref = weakref.ref
  79. receiver_object = receiver
  80. # Check for bound methods
  81. if hasattr(receiver, "__self__") and hasattr(receiver, "__func__"):
  82. ref = weakref.WeakMethod
  83. receiver_object = receiver.__self__
  84. receiver = ref(receiver)
  85. weakref.finalize(receiver_object, self._remove_receiver)
  86. with self.lock:
  87. self._clear_dead_receivers()
  88. if not any(r_key == lookup_key for r_key, _, _ in self.receivers):
  89. self.receivers.append((lookup_key, receiver, is_async))
  90. self.sender_receivers_cache.clear()
  91. def disconnect(self, receiver=None, sender=None, dispatch_uid=None):
  92. """
  93. Disconnect receiver from sender for signal.
  94. If weak references are used, disconnect need not be called. The receiver
  95. will be removed from dispatch automatically.
  96. Arguments:
  97. receiver
  98. The registered receiver to disconnect. May be none if
  99. dispatch_uid is specified.
  100. sender
  101. The registered sender to disconnect
  102. dispatch_uid
  103. the unique identifier of the receiver to disconnect
  104. """
  105. if dispatch_uid:
  106. lookup_key = (dispatch_uid, _make_id(sender))
  107. else:
  108. lookup_key = (_make_id(receiver), _make_id(sender))
  109. disconnected = False
  110. with self.lock:
  111. self._clear_dead_receivers()
  112. for index in range(len(self.receivers)):
  113. r_key, *_ = self.receivers[index]
  114. if r_key == lookup_key:
  115. disconnected = True
  116. del self.receivers[index]
  117. break
  118. self.sender_receivers_cache.clear()
  119. return disconnected
  120. def has_listeners(self, sender=None):
  121. sync_receivers, async_receivers = self._live_receivers(sender)
  122. return bool(sync_receivers) or bool(async_receivers)
  123. def send(self, sender, **named):
  124. """
  125. Send signal from sender to all connected receivers.
  126. If any receiver raises an error, the error propagates back through send,
  127. terminating the dispatch loop. So it's possible that all receivers
  128. won't be called if an error is raised.
  129. If any receivers are asynchronous, they are called after all the
  130. synchronous receivers via a single call to async_to_sync(). They are
  131. also executed concurrently with asyncio.gather().
  132. Arguments:
  133. sender
  134. The sender of the signal. Either a specific object or None.
  135. named
  136. Named arguments which will be passed to receivers.
  137. Return a list of tuple pairs [(receiver, response), ... ].
  138. """
  139. if (
  140. not self.receivers
  141. or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
  142. ):
  143. return []
  144. responses = []
  145. sync_receivers, async_receivers = self._live_receivers(sender)
  146. for receiver in sync_receivers:
  147. response = receiver(signal=self, sender=sender, **named)
  148. responses.append((receiver, response))
  149. if async_receivers:
  150. async def asend():
  151. async_responses = await asyncio.gather(
  152. *(
  153. receiver(signal=self, sender=sender, **named)
  154. for receiver in async_receivers
  155. )
  156. )
  157. return zip(async_receivers, async_responses)
  158. responses.extend(async_to_sync(asend)())
  159. return responses
  160. async def asend(self, sender, **named):
  161. """
  162. Send signal from sender to all connected receivers in async mode.
  163. All sync receivers will be wrapped by sync_to_async()
  164. If any receiver raises an error, the error propagates back through
  165. send, terminating the dispatch loop. So it's possible that all
  166. receivers won't be called if an error is raised.
  167. If any receivers are synchronous, they are grouped and called behind a
  168. sync_to_async() adaption before executing any asynchronous receivers.
  169. If any receivers are asynchronous, they are grouped and executed
  170. concurrently with asyncio.gather().
  171. Arguments:
  172. sender
  173. The sender of the signal. Either a specific object or None.
  174. named
  175. Named arguments which will be passed to receivers.
  176. Return a list of tuple pairs [(receiver, response), ...].
  177. """
  178. if (
  179. not self.receivers
  180. or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
  181. ):
  182. return []
  183. sync_receivers, async_receivers = self._live_receivers(sender)
  184. if sync_receivers:
  185. @sync_to_async
  186. def sync_send():
  187. responses = []
  188. for receiver in sync_receivers:
  189. response = receiver(signal=self, sender=sender, **named)
  190. responses.append((receiver, response))
  191. return responses
  192. else:
  193. sync_send = list
  194. responses, async_responses = await asyncio.gather(
  195. sync_send(),
  196. asyncio.gather(
  197. *(
  198. receiver(signal=self, sender=sender, **named)
  199. for receiver in async_receivers
  200. )
  201. ),
  202. )
  203. responses.extend(zip(async_receivers, async_responses))
  204. return responses
  205. def _log_robust_failure(self, receiver, err):
  206. logger.error(
  207. "Error calling %s in Signal.send_robust() (%s)",
  208. receiver.__qualname__,
  209. err,
  210. exc_info=err,
  211. )
  212. def send_robust(self, sender, **named):
  213. """
  214. Send signal from sender to all connected receivers catching errors.
  215. If any receivers are asynchronous, they are called after all the
  216. synchronous receivers via a single call to async_to_sync(). They are
  217. also executed concurrently with asyncio.gather().
  218. Arguments:
  219. sender
  220. The sender of the signal. Can be any Python object (normally one
  221. registered with a connect if you actually want something to
  222. occur).
  223. named
  224. Named arguments which will be passed to receivers.
  225. Return a list of tuple pairs [(receiver, response), ... ].
  226. If any receiver raises an error (specifically any subclass of
  227. Exception), return the error instance as the result for that receiver.
  228. """
  229. if (
  230. not self.receivers
  231. or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
  232. ):
  233. return []
  234. # Call each receiver with whatever arguments it can accept.
  235. # Return a list of tuple pairs [(receiver, response), ... ].
  236. responses = []
  237. sync_receivers, async_receivers = self._live_receivers(sender)
  238. for receiver in sync_receivers:
  239. try:
  240. response = receiver(signal=self, sender=sender, **named)
  241. except Exception as err:
  242. self._log_robust_failure(receiver, err)
  243. responses.append((receiver, err))
  244. else:
  245. responses.append((receiver, response))
  246. if async_receivers:
  247. async def asend_and_wrap_exception(receiver):
  248. try:
  249. response = await receiver(signal=self, sender=sender, **named)
  250. except Exception as err:
  251. self._log_robust_failure(receiver, err)
  252. return err
  253. return response
  254. async def asend():
  255. async_responses = await asyncio.gather(
  256. *(
  257. asend_and_wrap_exception(receiver)
  258. for receiver in async_receivers
  259. )
  260. )
  261. return zip(async_receivers, async_responses)
  262. responses.extend(async_to_sync(asend)())
  263. return responses
  264. async def asend_robust(self, sender, **named):
  265. """
  266. Send signal from sender to all connected receivers catching errors.
  267. If any receivers are synchronous, they are grouped and called behind a
  268. sync_to_async() adaption before executing any asynchronous receivers.
  269. If any receivers are asynchronous, they are grouped and executed
  270. concurrently with asyncio.gather.
  271. Arguments:
  272. sender
  273. The sender of the signal. Can be any Python object (normally one
  274. registered with a connect if you actually want something to
  275. occur).
  276. named
  277. Named arguments which will be passed to receivers.
  278. Return a list of tuple pairs [(receiver, response), ... ].
  279. If any receiver raises an error (specifically any subclass of
  280. Exception), return the error instance as the result for that receiver.
  281. """
  282. if (
  283. not self.receivers
  284. or self.sender_receivers_cache.get(sender) is NO_RECEIVERS
  285. ):
  286. return []
  287. # Call each receiver with whatever arguments it can accept.
  288. # Return a list of tuple pairs [(receiver, response), ... ].
  289. sync_receivers, async_receivers = self._live_receivers(sender)
  290. if sync_receivers:
  291. @sync_to_async
  292. def sync_send():
  293. responses = []
  294. for receiver in sync_receivers:
  295. try:
  296. response = receiver(signal=self, sender=sender, **named)
  297. except Exception as err:
  298. self._log_robust_failure(receiver, err)
  299. responses.append((receiver, err))
  300. else:
  301. responses.append((receiver, response))
  302. return responses
  303. else:
  304. sync_send = list
  305. async def asend_and_wrap_exception(receiver):
  306. try:
  307. response = await receiver(signal=self, sender=sender, **named)
  308. except Exception as err:
  309. self._log_robust_failure(receiver, err)
  310. return err
  311. return response
  312. responses, async_responses = await asyncio.gather(
  313. sync_send(),
  314. asyncio.gather(
  315. *(asend_and_wrap_exception(receiver) for receiver in async_receivers),
  316. ),
  317. )
  318. responses.extend(zip(async_receivers, async_responses))
  319. return responses
  320. def _clear_dead_receivers(self):
  321. # Note: caller is assumed to hold self.lock.
  322. if self._dead_receivers:
  323. self._dead_receivers = False
  324. self.receivers = [
  325. r
  326. for r in self.receivers
  327. if not (isinstance(r[1], weakref.ReferenceType) and r[1]() is None)
  328. ]
  329. def _live_receivers(self, sender):
  330. """
  331. Filter sequence of receivers to get resolved, live receivers.
  332. This checks for weak references and resolves them, then returning only
  333. live receivers.
  334. """
  335. receivers = None
  336. if self.use_caching and not self._dead_receivers:
  337. receivers = self.sender_receivers_cache.get(sender)
  338. # We could end up here with NO_RECEIVERS even if we do check this case in
  339. # .send() prior to calling _live_receivers() due to concurrent .send() call.
  340. if receivers is NO_RECEIVERS:
  341. return [], []
  342. if receivers is None:
  343. with self.lock:
  344. self._clear_dead_receivers()
  345. senderkey = _make_id(sender)
  346. receivers = []
  347. for (_receiverkey, r_senderkey), receiver, is_async in self.receivers:
  348. if r_senderkey == NONE_ID or r_senderkey == senderkey:
  349. receivers.append((receiver, is_async))
  350. if self.use_caching:
  351. if not receivers:
  352. self.sender_receivers_cache[sender] = NO_RECEIVERS
  353. else:
  354. # Note, we must cache the weakref versions.
  355. self.sender_receivers_cache[sender] = receivers
  356. non_weak_sync_receivers = []
  357. non_weak_async_receivers = []
  358. for receiver, is_async in receivers:
  359. if isinstance(receiver, weakref.ReferenceType):
  360. # Dereference the weak reference.
  361. receiver = receiver()
  362. if receiver is not None:
  363. if is_async:
  364. non_weak_async_receivers.append(receiver)
  365. else:
  366. non_weak_sync_receivers.append(receiver)
  367. else:
  368. if is_async:
  369. non_weak_async_receivers.append(receiver)
  370. else:
  371. non_weak_sync_receivers.append(receiver)
  372. return non_weak_sync_receivers, non_weak_async_receivers
  373. def _remove_receiver(self, receiver=None):
  374. # Mark that the self.receivers list has dead weakrefs. If so, we will
  375. # clean those up in connect, disconnect and _live_receivers while
  376. # holding self.lock. Note that doing the cleanup here isn't a good
  377. # idea, _remove_receiver() will be called as side effect of garbage
  378. # collection, and so the call can happen while we are already holding
  379. # self.lock.
  380. self._dead_receivers = True
  381. def receiver(signal, **kwargs):
  382. """
  383. A decorator for connecting receivers to signals. Used by passing in the
  384. signal (or list of signals) and keyword arguments to connect::
  385. @receiver(post_save, sender=MyModel)
  386. def signal_receiver(sender, **kwargs):
  387. ...
  388. @receiver([post_save, post_delete], sender=MyModel)
  389. def signals_receiver(sender, **kwargs):
  390. ...
  391. """
  392. def _decorator(func):
  393. if isinstance(signal, (list, tuple)):
  394. for s in signal:
  395. s.connect(func, **kwargs)
  396. else:
  397. signal.connect(func, **kwargs)
  398. return func
  399. return _decorator