shared_uwsgi.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. # SPDX-License-Identifier: AGPL-3.0-or-later
  2. import time
  3. import uwsgi # pylint: disable=E0401
  4. from . import shared_abstract
  5. _last_signal = 10
  6. class UwsgiCacheSharedDict(shared_abstract.SharedDict):
  7. def get_int(self, key):
  8. value = uwsgi.cache_get(key)
  9. if value is None:
  10. return value
  11. else:
  12. return int.from_bytes(value, 'big')
  13. def set_int(self, key, value):
  14. b = value.to_bytes(4, 'big')
  15. uwsgi.cache_update(key, b)
  16. def get_str(self, key):
  17. value = uwsgi.cache_get(key)
  18. if value is None:
  19. return value
  20. else:
  21. return value.decode('utf-8')
  22. def set_str(self, key, value):
  23. b = value.encode('utf-8')
  24. uwsgi.cache_update(key, b)
  25. def schedule(delay, func, *args):
  26. """
  27. Can be implemented using a spooler.
  28. https://uwsgi-docs.readthedocs.io/en/latest/PythonDecorators.html
  29. To make the uwsgi configuration simple, use the alternative implementation.
  30. """
  31. global _last_signal
  32. def sighandler(signum):
  33. now = int(time.time())
  34. key = 'scheduler_call_time_signal_' + str(signum)
  35. uwsgi.lock()
  36. try:
  37. updating = uwsgi.cache_get(key)
  38. if updating is not None:
  39. updating = int.from_bytes(updating, 'big')
  40. if now - updating < delay:
  41. return
  42. uwsgi.cache_update(key, now.to_bytes(4, 'big'))
  43. finally:
  44. uwsgi.unlock()
  45. func(*args)
  46. signal_num = _last_signal
  47. _last_signal += 1
  48. uwsgi.register_signal(signal_num, 'worker', sighandler)
  49. uwsgi.add_timer(signal_num, delay)
  50. return True