messagebus.cl 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. (defpackage :uhub
  2. (:use :bt)
  3. (:use :cl))
  4. (in-package :uhub)
  5. (defclass bus ()
  6. ((buffer :initform (make-array 2000 :initial-element (cons 0 nil)))
  7. (index :initform (list 0))))
  8. (defun subscribe (bus fun)
  9. (loop for msg = (with-slots (buffer) bus
  10. (destructuring-bind (idx . msg) (aref buffer (mod index (array-dimension buffer 0)))
  11. (when (= idx index) msg)))
  12. with index = 0
  13. if msg
  14. do (progn (funcall fun msg)
  15. (incf index))
  16. else
  17. do (sleep 0.1)))
  18. (defun publish (bus msg)
  19. (with-slots (buffer index) bus
  20. (let ((index (sb-ext:atomic-incf (car index))))
  21. (setf (aref buffer (mod index (array-dimension buffer 0))) (cons index msg)))))
  22. (defun test ()
  23. (let ((bus (make-instance 'bus))
  24. sub)
  25. (unwind-protect
  26. (progn (setf sub (bt:make-thread (lambda () (subscribe bus (lambda (msg) (print msg))))))
  27. (sleep 0.1)
  28. (publish bus "Hello world!1")
  29. (publish bus "Hello world!2")
  30. (sleep 0.1)
  31. (publish bus "Hello world!3")
  32. (sleep 0.1))
  33. (when sub (sb-thread:terminate-thread sub)))))
  34. (test)