123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- ;;;; UTIL
- (defun make-comparison-expr (field value)
- `(equal (getf lst ,field) ,value))
- (defun make-comparisons-list (fields)
- (loop while fields
- collecting (make-comparison-expr (pop fields) (pop fields))))
- (defmacro where (&rest clauses)
- `#'(lambda (lst) (and ,@(make-comparisons-list clauses))))
- (defun select (lst selector-fn)
- (remove-if-not selector-fn lst))
- (defun update (lst selector-fn &rest clauses)
- (let ((field (pop clauses))(value (pop clauses)))
- (setf lst
- (mapcar
- #'(lambda (msg)
- (when (funcall selector-fn msg)
- (setf (getf msg field) value)
- msg)) lst))))
- (defun max-opid (lst)
- (let ((curr-max '(-1 -1)))
- (loop for opid in lst do
- (let ((lamp (car opid))(actor-id (cadr opid)))
- (if (> lamp (car curr-max))
- (setf curr-max (list lamp (cadr opid)))
- (if (and (= lamp (car curr-max))(> actor-id (cadr curr-max)))
- (setf curr-max (list lamp actor-id))))))
- curr-max))
- (defun remove-if-succ (schema lst)
- (remove-if-not #'(lambda (msg) (is-visible (getf schema :succ) (getf msg :opid))) lst))
- (defun create-succ (schema opid)
- (setf (getf schema :succ) (cons (list :opid opid :succ nil)(getf schema :succ))))
- (defun insert-succ (schema opid succ)
- (if (select (getf schema :succ) (where :opid opid))
- (let ((target-succ (getf (car (select (getf schema :succ) (where :opid opid))) :succ)))
- (setf (getf (car (select (getf schema :succ) (where :opid opid))) :succ) (cons succ target-succ)))
- (setf (getf schema :succ) (cons (list :opid opid :succ (list succ)) (getf schema :succ)))))
- (defun to-opid-list (lst)
- (loop for msg in lst
- collect (getf msg :opid)))
- ;;;; CORE
- (defun create-schema (&key (id (random 10000)) (messages nil) (contacts nil))
- (list :id id
- :cnt 0
- :messages messages
- :contacts contacts
- :succ nil))
- (defun clock-tick (schema)
- (setf (getf schema :cnt) (1+ (getf schema :cnt))))
- (defun clock-set (schema cnt)
- (setf (getf schema :cnt) cnt))
- (defun create-msg (opid objid prop action value dep)
- (list :opid opid
- :objid objid
- :prop prop
- :action action
- :value value
- :dep dep))
- (defun is-visible (lst opid)
- (not (select lst (where :opid opid))))
- ;;;Implements last-writer-wins conflict resolution, conflicts is a list of messages
- (defun lww (conflicts)
- (max-opid (to-opid-list conflicts)))
- (defun search-msg (schema table prop)
- (remove-if-succ schema (select (getf schema :messages) (where :prop prop :objid table))))
- ;;;; GET
- (defun get-msg (schema table prop)
- (getf (car (select (getf schema :messages) (where :opid (lww (search-msg schema table prop))))) :value))
- (defun insert-msg (schema msg)
- (let ((opid (getf msg :opid)))
- (loop for contact in (getf schema :contacts)
- do (push (create-msg-state opid) (getf contact :msg-state)))
- (push msg (getf schema :messages))))
- ;;;; PUT
- (defun put-msg (schema table prop value)
- (let ((pred (to-opid-list (search-msg schema table prop)))
- (new-opid (list (clock-tick schema) (getf schema :id))))
- (let ((msg (create-msg new-opid table prop "set value" value pred)))
- (insert-msg schema msg)
- (loop for p in pred do
- (insert-succ schema p new-opid)))))
- ;;;; DELETE
- (defun delete-msg (schema table prop)
- (let ((pred (to-opid-list (search-msg schema table prop)))
- (new-opid (list (clock-tick schema) (getf schema :id))))
- (loop for p in pred do
- (insert-succ schema p new-opid))))
- ;;;; PUT-OBJECT
- (defun put-obj (schema table prop)
- (let ((pred (to-opid-list (search-msg schema table prop)))
- (objid (list (clock-tick schema) (getf schema :id)))
- (new-opid (list (clock-tick schema) (getf schema :id))))
- (let ((msg (create-msg new-opid table prop "create map" objid pred)))
- (insert-msg schema msg)
- (loop for p in pred do
- (insert-succ schema p new-opid)))
- objid))
- ;;;; Maintaining per-contact, per-messages, notekeeping
- (defun create-msg-state (opid &key (seen nil)(ack nil)(req nil)(send-cnt 0)(send-time 0)(max-latency 10000))
- (list :opid opid
- :seen seen
- :ack ack
- :req req
- :send-cnt send-cnt
- :send-time send-time
- :max-latency max-latency))
- (defun msg-state-from-msg (msg)
- (create-msg-state (getf msg :opid)))
- (defun add-contact (schema name conid)
- (push (append (list :name name :conid conid)
- (list :msg-state
- (loop for msg in (getf schema :messages)
- collect (msg-state-from-msg msg))))
- (getf schema :contacts))
- schema)
- (defun get-msg-by-opid (schema opid)
- (car (select (getf schema :messages) (where :opid opid))))
- (defun get-msg-state-by-conid (schema conid)
- (let ((contact (car (select (getf schema :contacts) (where :conid conid)))))
- (getf contact :msg-state)))
- (defun send-eager-update (schema conid)
- (let ((con-msg-state (get-msg-state-by-conid schema conid)))
- (let ((outgoing-msgs (to-opid-list (select con-msg-state (where :seen nil))))
- (outgoing-acks (to-opid-list (select con-msg-state (where :ack t)))))
- (let ((msg-records
- (loop for msg in outgoing-msgs
- collect (list 1 (get-msg-by-opid schema msg))))
- (ack-record (list 0 outgoing-acks))
- (clock-record (list 5 (getf schema :cnt))))
- (loop for ackd in outgoing-acks do
- (mark-ack schema conid ackd NIL))
- (cons clock-record (cons ack-record msg-records))))))
- (defun has-all-deps (deps msgs incoming-msgs)
- (loop for m in deps do
- (if (not (position m (append msgs incoming-msgs) :test #'equal))
- (return-from has-all-deps NIL)))
- T)
- (defun mark-seen (schema conid opid)
- (setf (getf (car (select (get-msg-state-by-conid schema conid) (where :opid opid))) :seen) t))
- (defun mark-ack (schema conid opid state)
- (setf (getf (car (select (get-msg-state-by-conid schema conid) (where :opid opid))) :ack) state))
- (defun add-incoming-msg (schema conid msg incoming-msgs)
- (if (has-all-deps (getf msg :deps) (getf schema :messages) incoming-msgs)
- (mark-seen schema conid (getf msg :opid))))
- (defun process-eager-update (schema conid records)
- (let ((incoming-msgs
- (loop for record in records
- collect (if (eq (car record) 1) (car (cdr record))))))
- (loop for record in records do
- (let ((flag (car record))
- (body (car (cdr record))))
- (if (equal flag 0)
- (loop for opid in body do
- (cond ((get-msg-by-opid schema opid)
- (mark-ack schema conid opid NIL)
- (mark-seen schema conid opid)))))
- (if (equal flag 1)
- (if (get-msg-by-opid schema (getf body :opid))
- (mark-seen schema conid (getf body :opid))
- (cond
- ((not (getf body :dep))
- (insert-msg schema body)
- (mark-seen schema conid (getf body :opid))
- (mark-ack schema conid (getf body :opid) t))
- ((has-all-deps (getf body :dep)
- (to-opid-list (getf schema :messages))
- (to-opid-list incoming-msgs))
- (insert-msg schema body)
- (mark-seen schema conid (getf body :opid))
- (mark-ack schema conid (getf body :opid) t)
- (loop for dep in (getf body :dep) do
- (insert-succ schema dep (getf body :opid)))))))
- (if (equal flag 2)
- (princ "process 2"))
- (if (equal flag 3)
- (princ "process 3"))
- (if (equal flag 4)
- (princ "process 4"))
- (if (equal flag 5)
- (clock-set schema (max body (getf schema :cnt))))))))
|