automerge.lisp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. ;;;; UTIL
  2. (defun make-comparison-expr (field value)
  3. `(equal (getf lst ,field) ,value))
  4. (defun make-comparisons-list (fields)
  5. (loop while fields
  6. collecting (make-comparison-expr (pop fields) (pop fields))))
  7. (defmacro where (&rest clauses)
  8. `#'(lambda (lst) (and ,@(make-comparisons-list clauses))))
  9. (defun select (lst selector-fn)
  10. (remove-if-not selector-fn lst))
  11. (defun update (lst selector-fn &rest clauses)
  12. (let ((field (pop clauses))(value (pop clauses)))
  13. (setf lst
  14. (mapcar
  15. #'(lambda (msg)
  16. (when (funcall selector-fn msg)
  17. (setf (getf msg field) value)
  18. msg)) lst))))
  19. (defun max-opid (lst)
  20. (let ((curr-max '(-1 -1)))
  21. (loop for opid in lst do
  22. (let ((lamp (car opid))(actor-id (cadr opid)))
  23. (if (> lamp (car curr-max))
  24. (setf curr-max (list lamp (cadr opid)))
  25. (if (and (= lamp (car curr-max))(> actor-id (cadr curr-max)))
  26. (setf curr-max (list lamp actor-id))))))
  27. curr-max))
  28. (defun remove-if-succ (schema lst)
  29. (remove-if-not #'(lambda (msg) (is-visible (getf schema :succ) (getf msg :opid))) lst))
  30. (defun create-succ (schema opid)
  31. (setf (getf schema :succ) (cons (list :opid opid :succ nil)(getf schema :succ))))
  32. (defun insert-succ (schema opid succ)
  33. (if (select (getf schema :succ) (where :opid opid))
  34. (let ((target-succ (getf (car (select (getf schema :succ) (where :opid opid))) :succ)))
  35. (setf (getf (car (select (getf schema :succ) (where :opid opid))) :succ) (cons succ target-succ)))
  36. (setf (getf schema :succ) (cons (list :opid opid :succ (list succ)) (getf schema :succ)))))
  37. (defun to-opid-list (lst)
  38. (loop for msg in lst
  39. collect (getf msg :opid)))
  40. ;;;; CORE
  41. (defun create-schema (&key (id (random 10000)) (messages nil) (contacts nil))
  42. (list :id id
  43. :cnt 0
  44. :messages messages
  45. :contacts contacts
  46. :succ nil))
  47. (defun clock-tick (schema)
  48. (setf (getf schema :cnt) (1+ (getf schema :cnt))))
  49. (defun clock-set (schema cnt)
  50. (setf (getf schema :cnt) cnt))
  51. (defun create-msg (opid objid prop action value dep)
  52. (list :opid opid
  53. :objid objid
  54. :prop prop
  55. :action action
  56. :value value
  57. :dep dep))
  58. (defun is-visible (lst opid)
  59. (not (select lst (where :opid opid))))
  60. ;;;Implements last-writer-wins conflict resolution, conflicts is a list of messages
  61. (defun lww (conflicts)
  62. (max-opid (to-opid-list conflicts)))
  63. (defun search-msg (schema table prop)
  64. (remove-if-succ schema (select (getf schema :messages) (where :prop prop :objid table))))
  65. ;;;; GET
  66. (defun get-msg (schema table prop)
  67. (getf (car (select (getf schema :messages) (where :opid (lww (search-msg schema table prop))))) :value))
  68. (defun insert-msg (schema msg)
  69. (let ((opid (getf msg :opid)))
  70. (loop for contact in (getf schema :contacts)
  71. do (push (create-msg-state opid) (getf contact :msg-state)))
  72. (push msg (getf schema :messages))))
  73. ;;;; PUT
  74. (defun put-msg (schema table prop value)
  75. (let ((pred (to-opid-list (search-msg schema table prop)))
  76. (new-opid (list (clock-tick schema) (getf schema :id))))
  77. (let ((msg (create-msg new-opid table prop "set value" value pred)))
  78. (insert-msg schema msg)
  79. (loop for p in pred do
  80. (insert-succ schema p new-opid)))))
  81. ;;;; DELETE
  82. (defun delete-msg (schema table prop)
  83. (let ((pred (to-opid-list (search-msg schema table prop)))
  84. (new-opid (list (clock-tick schema) (getf schema :id))))
  85. (loop for p in pred do
  86. (insert-succ schema p new-opid))))
  87. ;;;; PUT-OBJECT
  88. (defun put-obj (schema table prop)
  89. (let ((pred (to-opid-list (search-msg schema table prop)))
  90. (objid (list (clock-tick schema) (getf schema :id)))
  91. (new-opid (list (clock-tick schema) (getf schema :id))))
  92. (let ((msg (create-msg new-opid table prop "create map" objid pred)))
  93. (insert-msg schema msg)
  94. (loop for p in pred do
  95. (insert-succ schema p new-opid)))
  96. objid))
  97. ;;;; Maintaining per-contact, per-messages, notekeeping
  98. (defun create-msg-state (opid &key (seen nil)(ack nil)(req nil)(send-cnt 0)(send-time 0)(max-latency 10000))
  99. (list :opid opid
  100. :seen seen
  101. :ack ack
  102. :req req
  103. :send-cnt send-cnt
  104. :send-time send-time
  105. :max-latency max-latency))
  106. (defun msg-state-from-msg (msg)
  107. (create-msg-state (getf msg :opid)))
  108. (defun add-contact (schema name conid)
  109. (push (append (list :name name :conid conid)
  110. (list :msg-state
  111. (loop for msg in (getf schema :messages)
  112. collect (msg-state-from-msg msg))))
  113. (getf schema :contacts))
  114. schema)
  115. (defun get-msg-by-opid (schema opid)
  116. (car (select (getf schema :messages) (where :opid opid))))
  117. (defun get-msg-state-by-conid (schema conid)
  118. (let ((contact (car (select (getf schema :contacts) (where :conid conid)))))
  119. (getf contact :msg-state)))
  120. (defun send-eager-update (schema conid)
  121. (let ((con-msg-state (get-msg-state-by-conid schema conid)))
  122. (let ((outgoing-msgs (to-opid-list (select con-msg-state (where :seen nil))))
  123. (outgoing-acks (to-opid-list (select con-msg-state (where :ack t)))))
  124. (let ((msg-records
  125. (loop for msg in outgoing-msgs
  126. collect (list 1 (get-msg-by-opid schema msg))))
  127. (ack-record (list 0 outgoing-acks))
  128. (clock-record (list 5 (getf schema :cnt))))
  129. (loop for ackd in outgoing-acks do
  130. (mark-ack schema conid ackd NIL))
  131. (cons clock-record (cons ack-record msg-records))))))
  132. (defun has-all-deps (deps msgs incoming-msgs)
  133. (loop for m in deps do
  134. (if (not (position m (append msgs incoming-msgs) :test #'equal))
  135. (return-from has-all-deps NIL)))
  136. T)
  137. (defun mark-seen (schema conid opid)
  138. (setf (getf (car (select (get-msg-state-by-conid schema conid) (where :opid opid))) :seen) t))
  139. (defun mark-ack (schema conid opid state)
  140. (setf (getf (car (select (get-msg-state-by-conid schema conid) (where :opid opid))) :ack) state))
  141. (defun add-incoming-msg (schema conid msg incoming-msgs)
  142. (if (has-all-deps (getf msg :deps) (getf schema :messages) incoming-msgs)
  143. (mark-seen schema conid (getf msg :opid))))
  144. (defun process-eager-update (schema conid records)
  145. (let ((incoming-msgs
  146. (loop for record in records
  147. collect (if (eq (car record) 1) (car (cdr record))))))
  148. (loop for record in records do
  149. (let ((flag (car record))
  150. (body (car (cdr record))))
  151. (if (equal flag 0)
  152. (loop for opid in body do
  153. (cond ((get-msg-by-opid schema opid)
  154. (mark-ack schema conid opid NIL)
  155. (mark-seen schema conid opid)))))
  156. (if (equal flag 1)
  157. (if (get-msg-by-opid schema (getf body :opid))
  158. (mark-seen schema conid (getf body :opid))
  159. (cond
  160. ((not (getf body :dep))
  161. (insert-msg schema body)
  162. (mark-seen schema conid (getf body :opid))
  163. (mark-ack schema conid (getf body :opid) t))
  164. ((has-all-deps (getf body :dep)
  165. (to-opid-list (getf schema :messages))
  166. (to-opid-list incoming-msgs))
  167. (insert-msg schema body)
  168. (mark-seen schema conid (getf body :opid))
  169. (mark-ack schema conid (getf body :opid) t)
  170. (loop for dep in (getf body :dep) do
  171. (insert-succ schema dep (getf body :opid)))))))
  172. (if (equal flag 2)
  173. (princ "process 2"))
  174. (if (equal flag 3)
  175. (princ "process 3"))
  176. (if (equal flag 4)
  177. (princ "process 4"))
  178. (if (equal flag 5)
  179. (clock-set schema (max body (getf schema :cnt))))))))