123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575 |
- (*
- * _ _ ____ _
- * _| || |_/ ___| ___ _ __ _ __ ___ | |
- * |_ .. _\___ \ / _ \ '_ \| '_ \ / _ \| |
- * |_ _|___) | __/ |_) | |_) | (_) |_|
- * |_||_| |____/ \___| .__/| .__/ \___/(_)
- * |_| |_|
- *
- * Personal Social Web.
- *
- * Copyright (C) The #Seppo contributors. All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *)
- let ( >>= ) = Result.bind
- let ( let* ) = Result.bind
- let ( let+ ) = Result.map
- let lwt_err e = Lwt.return (Error e)
- let ( ^/ ) a b =
- let p = Uri.path a in
- let p' = p ^ b in
- Uri.with_path a p'
- (** may go to where PubKeyPem is: As2 *)
- let post_signed
- ?(date = Ptime_clock.now ())
- ?(headers = [ Http.H.ct_jlda; Http.H.acc_app_jlda ])
- ~uuid
- ~key_id
- ~pk
- body
- uri =
- Logr.debug (fun m -> m "%s.%s %a key_id: %a" "Main" "post_signed" Uuidm.pp uuid Uri.pp key_id);
- assert (key_id |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa#main-key");
- let key = Http.Signature.mkey key_id pk date in
- let he_sig = (Http.signed_headers key (Ap.PubKeyPem.digest_base64' body) uri) in
- let headers = Cohttp.Header.add_list he_sig headers in
- Http.post ~headers body uri
- (** lift http errors to errors triggering a retry. *)
- let http_to_err sta =
- sta
- |> Cohttp.Code.string_of_status
- |> Result.error
- (** a plain (signed) http post *)
- let send_http_post ?(fkt = Lwt.return) ~uuid ~key_id ~pk (msg_id, uri, body) =
- Logr.debug (fun m -> m "%s.%s %a / %a %a" "Main" "send_http_post" Uri.pp msg_id Uri.pp uri Uuidm.pp uuid);
- let%lwt r = post_signed ~uuid ~pk ~key_id body uri in
- match r with
- | Error _ as e -> Lwt.return e
- | Ok (re,_) as o ->
- (match re.status with
- | #Cohttp.Code.success_status ->
- (* may leak memory for unconsumed body *)
- fkt o
- | sta ->
- sta
- |> http_to_err
- |> Lwt.return)
- (** asynchronous, queueable task.
- ActivityPub delivery https://www.w3.org/TR/activitypub/#delivery *)
- module Apjob = struct
- module Notify = struct
- (** Prepare a job to queue. Must correspond to dispatch_job *)
- let encode msg_id (ibox, dst_actor_id) json =
- let msg_id = msg_id |> Uri.to_string
- and ibox = ibox |> Uri.to_string
- and id = dst_actor_id |> Uri.to_string
- and json = json |> Ezjsonm.value_to_string in
- Csexp.(List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom id; Atom json]])
- let decode = function
- | Csexp.(List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom dst_actor_id; Atom json]]) ->
- (match json |> Ezjsonm.value_from_string_result with
- | Error _ -> Error ()
- | Ok json ->
- Ok (
- msg_id |> Uri.of_string,
- (
- ibox |> Uri.of_string,
- dst_actor_id |> Uri.of_string
- ),
- json
- ))
- | _ ->
- Error ()
- end
- end
- (** process one job, typically doing http post requests or signed ActivityPub delivery. *)
- let dispatch_job ~base ~pk j payload =
- let sndr = Uri.make ~path:Ap.proj () |> Http.reso ~base in
- let key_id = sndr |> Ap.Person.my_key_id in
- let uuid = Uuidm.v `V4 in
- Logr.debug (fun m -> m "%s.%s %s %a" "Main" "dispatch_job" j Uuidm.pp uuid);
- assert (sndr |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa");
- assert (key_id |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa#main-key");
- let fkt ibox = function
- | Error e as o ->
- Logr.debug (fun m -> m "%s.%s %a %s Error: %s" "Main" "dispatch_job.fkt" Uuidm.pp uuid ibox e);
- Lwt.return o
- | Ok (rsp,bod) as o ->
- let%lwt b = bod |> Cohttp_lwt.Body.to_string in
- Logr.debug (fun m -> m "%s.%s %a %s Response: %a\n\n%s" "Main" "dispatch_job.fkt" Uuidm.pp uuid ibox Cohttp.Response.pp_hum rsp b);
- Lwt.return o
- in
- let open Csexp in
- match payload with
- | List [Atom "2"; Atom msg_id; Atom "http.post"; List [Atom uri; Atom body]] ->
- Logr.warn (fun m -> m "%s.%s legacy (maybe future?)" "Main" "dispatch_job");
- send_http_post ~uuid ~key_id ~pk (msg_id |> Uri.of_string, uri |> Uri.of_string, body)
- | List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom _dst_actor_id; Atom json]] ->
- (* Apjob.Notify.encode *)
- send_http_post ~fkt:(fkt ibox) ~uuid ~key_id ~pk (msg_id |> Uri.of_string, ibox |> Uri.of_string, json)
- | _ ->
- (* must correspond to Apjob.Notify.encode *)
- Logr.err (fun m -> m "%s %s.%s invalid job format %s" E.e1016 "Main" "dispatch_job" j);
- Lwt.return (Error "invalid job format")
- (** Simple, file-based, scheduled job queue.
- Inspired by http://cr.yp.to/proto/maildir.html
- *)
- module Queue = struct
- let keep_cur_s = 3 * 24 * 60 * 60
- (** Move due tasks from wait to new and loop all new. *)
- let process_new_and_due
- ?(due = Ptime_clock.now ())
- ?(wait = Job.wait)
- ?(new_ = Job.new_)
- ?(run = Job.run)
- ?(cur = Job.cur)
- ~base
- ~pk
- que =
- let t0 = Sys.time () in
- (* Logr.debug (fun m -> m "%s.%s" "Main" "process_queue"); *)
- (** unlink old from dn *)
- let dir_clean tmin (Job.Queue qn) dn =
- Logr.debug (fun m -> m "%s.%s unlink old from '%s->%s'" "Main.Queue" "process_new_and_due.dir_clean" qn dn);
- let tmin = tmin |> Ptime.to_float_s in
- let open Astring in
- assert (qn |> String.is_suffix ~affix:"/");
- assert (dn |> String.is_suffix ~affix:"/");
- let dn = qn ^ dn in
- File.fold_dir (fun init fn ->
- if fn |> String.length > 12 (* keep README.txt etc. *)
- then
- (try
- let fn = dn ^ fn in
- let st = fn |> Unix.stat in
- if st.st_mtime < tmin
- then
- (Unix.unlink fn;
- Logr.debug (fun m -> m "%s.%s unlinked '%s' %f < %f" "Main.Queue" "process_new_and_due.dir_ćlean" fn st.st_mtime tmin))
- with _ -> ());
- init,true) () dn
- in
- (** move those due from wait into new *)
- let rec move_due_wait_new ~wait ~new_ ~due =
- match Job.(any_due ~due ~wait que) with
- | None -> ()
- | Some j ->
- Job.(move que j wait new_);
- move_due_wait_new ~wait ~new_ ~due
- in
- let Queue que' = que in
- let rec loop (i : int) : int Lwt.t =
- match Job.any new_ que with
- | None -> Lwt.return i
- | Some j ->
- let%lwt _ =
- Job.move que j new_ run;
- let fn = que' ^ run ^ j
- and error s =
- Job.wait_or_err ~wait que run j;
- Logr.info (fun m -> m "%s.%s job postponed/cancelled: %s reason: %s" "Main.Queue" "process_new_and_due" j s)
- and ok _ =
- Job.move que j run cur;
- Logr.debug (fun m -> m "%s.%s job done: %s" "Main.Queue" "process_new_and_due" j)
- in
- match fn |> File.in_channel Csexp.input with
- | Error s ->
- s
- |> error
- |> Lwt.return
- | Ok p ->
- let%lwt r = try%lwt
- dispatch_job ~base ~pk j p
- with
- | Failure s ->
- Lwt.return (Error s)
- | exn ->
- let e = exn |> Printexc.to_string in
- Logr.warn (fun m -> m "%s.%s Uncaught Exception job:%s %s" "Main.Queue" "process_new_and_due" j e);
- Lwt.return (Error e)
- in
- r
- |> Result.fold ~error ~ok
- |> Lwt.return in
- loop (i+1)
- in
- try%lwt
- dir_clean (keep_cur_s |> Ptime.Span.of_int_s |> Ptime.sub_span due |> Option.value ~default:Ptime.epoch) Job.qn cur;
- move_due_wait_new ~wait ~new_ ~due;
- let%lwt i = loop 0 in
- Logr.info (fun m -> m "%s.%s finished, %i jobs processed in dt=%.3fs." "Main.Queue" "process_new_and_due" i (Sys.time() -. t0));
- Lwt.return (Ok que)
- with | exn ->
- let msg = Printexc.to_string exn in
- Logr.err (fun m -> m "%s %s.%s processing failed %s" E.e1017 "Main.Queue" "process_new_and_due" msg);
- Lwt.return (Error msg)
- (** do one http request, fire and forget *)
- let ping_and_forget ~base ~run_delay_s =
- Logr.debug (fun m -> m "%s.%s %is" "Main.Queue" "ping_and_forget" run_delay_s);
- let path = Cfg.seppo_cgi ^ "/ping"
- and query = [("loop",[Printf.sprintf "%is" run_delay_s])] in
- let uri = Uri.make ~path ~query ()
- |> Http.reso ~base in
- let%lwt f = uri
- |> Http.get ~seconds:0.5 (* fire and forget *) in
- let _ = Sys.opaque_identity f in
- Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
- |> Lwt.return
- let run_fn = "app/var/run/queue.pid"
- (** synchronously, sequentially run fkt for all jobs in new. *)
- let loop
- ?(lock = run_fn)
- ~base
- ~run_delay_s
- fkt =
- Logr.debug (fun m -> m "%s.%s" "Main.Queue" "loop");
- let _t0 = Unix.time () in
- try%lwt
- let fd = Unix.openfile lock [O_CLOEXEC; O_CREAT; O_TRUNC; O_WRONLY; O_SYNC] 0o644 in
- (* https://git.radicallyopensecurity.com/nlnet/ngie-seppo/-/issues/14#note_129407 *)
- Unix.lockf fd F_TLOCK 0;
- Unix.ftruncate fd 0;
- let oc = fd |> Unix.out_channel_of_descr in
- Printf.fprintf oc "%i" (Unix.getpid ());
- flush oc;
- let%lwt _ = fkt Job.qn
- in
- Logr.debug (fun m -> m "%s.%s sleep %is" "Main.Queue" "loop" run_delay_s);
- Unix.sleep (run_delay_s |> max 3 |> min 1900);
- close_out oc;
- Unix.unlink lock; (* do not loop if someone unlinked the lock *)
- ping_and_forget ~base ~run_delay_s
- with
- | Unix.(Unix_error(EAGAIN, "lockf", ""))
- | Unix.(Unix_error(EACCES, "open", "app/var/run/queue.pid")) ->
- Logr.debug (fun m -> m "%s.%s don't race, noop" "Main.Queue" "loop");
- Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "noop")
- |> Lwt.return
- | exn ->
- (* @TODO Error number *)
- Logr.warn (fun m -> m "%s.%s Uncaught exception %a" "Main.Queue" "loop" St.pp_exc exn);
- Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
- |> Lwt.return
- end
- (** monitor outgoing url and add to <link>? *)
- let sift_urls (e : Rfc4287.Entry.t) =
- Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_urls");
- Ok e
- (** Extract tags from a post into a list.
- Needs the post and a tag store. Modifies both.
- *)
- let sift_tags cdb (e : Rfc4287.Entry.t) =
- Logr.debug (fun m -> m "%s.%s" "Main" "sift_tags");
- let open Rfc4287 in
- let c2t init ((Label (Single l),_,_) : Rfc4287.Category.t) =
- (Tag.Tag ("#" ^ l)) :: init
- in
- let t2c init (Tag.Tag t) =
- Logr.debug (fun m -> m "%s.%s %s" "Main" "sift_tags" t);
- let le = t |> String.length in
- assert (1 < le);
- let t = if '#' == t.[0]
- then String.sub t 1 (le-1)
- else t in
- let t = Single t in
- let l = Category.Label t in
- let te = Category.Term t in
- (l, te, Rfc4287.tagu) :: init
- in
- let ti = e.title in
- let co = e.content in
- let tl = e.categories |> List.fold_left c2t [] in
- let ti,co,tl = Tag.cdb_normalise ti co tl cdb in
- Ok {e with
- title = ti;
- content = co;
- categories = tl |> List.fold_left t2c []}
- let find_handles s =
- s
- |> Lexing.from_string
- |> Plain2handle.handle []
- (** find mentions *)
- let sift_handles (e : Rfc4287.Entry.t) =
- Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_handles");
- (* Ok ((e.title |> find_handles) @ (e.content |> find_handles)) *)
- Ok e
- (** queue json for destination reac,ibox into que. *)
- let fldbl_notify ~due ~que msg_id json init (reac,ibox) =
- Logr.debug (fun m -> m "%s.%s %a -> %a" "Main" "fldbl_notify" Uri.pp reac Uri.pp ibox);
- let _ = Apjob.Notify.encode msg_id (ibox, reac) json
- |> Csexp.to_string
- |> Bytes.of_string
- |> Job.enqueue ~due que 0 in
- init
- (** https://www.rfc-editor.org/rfc/rfc4287#section-4.1.2 *)
- module Note = struct
- let load_basics () =
- let* base = Cfg.Base.(from_file fn) in
- let* prof = Cfg.Profile.(from_file fn) in
- let* Auth.Uid userinfo,_ = Auth.(from_file fn) in
- let host = base |> Uri.host |> Option.value ~default:"-" in
- let auth = {Rfc4287.Person.empty with
- name = prof.title;
- uri = Some (Uri.make ~userinfo ~host ())} in
- Ok (base,prof,auth)
- (** https://www.rfc-editor.org/rfc/rfc4287#section-4.1.2 *)
- module Atom = struct
- (** rebuild a single atom page plus evtl. the softlink *)
- let page_to_atom ~base ~title ~updated ~lang ~(author : Rfc4287.Person.t) (a,b as pag) =
- Logr.debug (fun m -> m "%s.%s %s/%d" "Main.Note.Atom" "page_to_atom" a b);
- (** fold ix range into entry. *)
- let hydrate sc init (p0,_) =
- let* init = init
- |> Result.map_error
- (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.a" e);
- e) in
- seek_in sc p0;
- let* item = Csexp.input sc
- |> Result.map_error
- (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.b" e);
- e) in
- match Rfc4287.Entry.decode item with
- | Ok item ->
- Logr.debug (fun m -> m "%s.%s 0x%x %a" "Main.Note.Atom" "page_to_atom.hydrate.0" p0 Uri.pp item.id);
- Ok (item :: init)
- | Error "deleted" ->
- Logr.warn (fun m -> m "%s.%s found a stale index entry 0x%x" "Main.Note.Atom" "page_to_atom.hydrate.1" p0);
- Ok init
- | Error e ->
- Logr.err (fun m -> m "%s.%s 0x%x ignoring: %s" "Main.Note.Atom" "page_to_atom.hydrate.2" p0 e);
- Ok init in
- let pn = pag |> Storage.Page.to_fn in
- let* es = Storage.fn |> File.in_channel (fun sc ->
- Logr.debug (fun m -> m "%s.%s %s" "Main.Note.Atom" "page_to_atom.hydrate" pn);
- pn |> File.in_channel (fun ic ->
- match Csexp.input_many ic with
- | Error e' as e ->
- Logr.err (fun m -> m "%s.%s %s/%d: %s" "Main.Note.Atom" "page_to_atom.hydrate" a b e');
- e
- | Ok l -> l
- |> Storage.TwoPad10.decode_many
- |> List.rev
- |> List.fold_left (hydrate sc) (Ok [])
- )) |> Result.map_error (fun e ->
- Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom" e);
- e) in
- (* this used to be an assert, but I hit one case of non-understood failure *)
- if not (es |> St.is_monotonous Rfc4287.Entry.compare_published_desc)
- then Logr.warn (fun m -> m "%s soft assertion failed: %s" __LOC__ pn);
- let self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max:7000 ~base:Uri.empty in
- (* atom index.xml *)
- let j_xml = "%-%/index.xml" |> Make.Jig.make in
- let fn = [a;b|> string_of_int] |> Make.Jig.paste j_xml |> Option.get in
- Logr.debug (fun m -> m "%s.%s %s/%d -> %s (%d entries)" "Main.Note.Atom" "page_to_atom" a b fn (es |> List.length));
- let x = es |> Rfc4287.Feed.to_atom
- ~base
- ~self
- ~prev
- ~next
- ~first
- ~last
- ~title
- ~updated
- ~lang
- ~author in
- let _ = fn |> Filename.dirname |> File.mkdir_p File.pDir in
- fn |> File.out_channel_replace (x |> Xml.to_chan ~xsl:(Rfc4287.xsl "posts.xsl" fn));
- let _,_ = Storage.make_feed_syml pag fn in
- Ok fn
- let rule = ({
- target = "%-%/index.xml";
- prerequisites = "app/var/db/%/%.s" :: Cfg.Base.fn :: Cfg.Profile.fn :: [];
- fresh = Make.Outdated;
- command = (fun p _rz r t ->
- let* base,prof,auth = load_basics () in
- assert ("%-%/index.xml" |> String.equal r.target);
- assert ("app/var/db/%/%.s" |> String.equal (r.prerequisites |> List.hd));
- let src,_,v = t |> Make.src_from r in
- Logr.debug (fun m -> m "%s.%s %s %s -> %s" "Main.Note.Atom" "rule" p src t);
- let pag = match v with
- | [a;b] -> (a,b |> int_of_string)
- | _ -> failwith __LOC__ in
- let max = Storage.Page.( pag |> find_max |> to_int )
- and now = Ptime_clock.now () in
- let author = auth
- and lang = prof.language
- and title = prof.title
- and tz = prof.timezone
- and self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max ~base:Uri.empty in
- let updated = now |> Rfc3339.of_ptime ~tz in
- let* pag = src |> File.in_channel Csexp.input_many in
- let r = pag
- |> List.fold_left Storage.fold_of_twopad10 []
- |> Rfc4287.Feed.to_atom_
- ~base
- ~self
- ~prev
- ~next
- ~first
- ~last
- ~title
- ~updated
- ~lang
- ~author
- t
- |> Rfc4287.Feed.to_file t in
- (* HOW to (re-)create the symlink in case *)
- (* let _,_ = mk_unnumbered_syml (depth,unn,p) fn in *)
- r
- );
- } : Make.t)
- let jig = rule.target |> Make.Jig.make
- let page_to_fn (a,i : Storage.Page.t) =
- assert (a |> St.is_prefix ~affix:"o/");
- [a;i |> string_of_int]
- |> Make.Jig.paste jig
- |> Option.get
- end
- (** Atom, local *)
- let publish ~base ~(profile : Cfg.Profile.t) ~(author : Rfc4287.Person.t) (n : Rfc4287.Entry.t) =
- Logr.debug (fun m -> m "%s.%s '%s'" "Main.Note" "publish" n.title);
- (* determine id and do store app/var/db/o/p.s *)
- (* add to indices (p,d,t) *)
- (* (2,"o/p",4) app/var/db/o/p.s app/var/db/o/p/4.s -> o/p-4/index.xml *)
- (* (3,"o/d/2023-20-13",4) app/var/db/o/d/2023-10-13/4.s -> o/d/2023-10-13-4/index.xml *)
- (* (3,"o/t/tag",4) app/var/db/o/t/tag/4.s -> o/t/tag-4/index.xml *)
- (* add to storage and indices (main,date,tags)) *)
- let items_per_page = profile.posts_per_page in
- let n,(a,_b as ix),pos = n |> Storage.save ~items_per_page in
- assert (a |> String.equal "o/p");
- let append_to_page pos init pa = let _ = (pos |> Storage.Page.apnd pa) in
- pa :: init in
- let ix_other : Storage.Page.t list = n
- |> Storage.Page.next_other_pages ~items_per_page
- |> List.fold_left (append_to_page pos) [] in
- (* refresh feeds, outbox etc. *)
- let lang = profile.language in
- let title = profile.title in
- let updated = n.updated (* more precisely would be: now *) in
- let mater init ix = (ix |> Atom.page_to_atom ~base ~title ~updated ~lang ~author) :: init in
- let l = ix :: ix_other
- |> List.fold_left mater [] in
- assert ( 1 + 1 + (n.categories |> List.length) == (l |> List.length));
- Ok n
- module Create = struct
- (** Enqueue jobs.
- https://www.w3.org/TR/activitypub/#delivery says "Servers MUST de-duplicate
- the final recipient list." which implies each actor profile / inbox lookup
- can lag delivery for all.
- How long could such a consolidated inbox list be cached? In theory not at
- all because each inbox target url may change without further notice.
- In pratice, we will use the inbox as long as it works and redo the
- webfinger/actor lookup otherwise.
- 1. get all actor profiles (limit redirects) and extract inbox url
- 2. de-duplicate
- 3. deliver to all
- 4. retry temporary failures
- 5. handle permanent failures to clean link rot
- *)
- let notify_subscribers
- ?(due = Ptime_clock.now ())
- ?(que = Job.qn)
- ?(cdb = Ap.Followers.cdb)
- ~base
- (n : Rfc4287.Entry.t) =
- let json = n |> Ap.Note.Create.to_json ~base in
- cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
- end
- (** application logic around delete. *)
- module Delete = struct
- (* find dirty o/t/foo-1/index.xml and o/d/2024-03-12-7/index.xml pages *)
- let dirty (n : Rfc4287.Entry.t) : Storage.Page.t list =
- (* the primary o/p/0.s *)
- match n.id |> Storage.Id.to_page_i with
- | Error _ -> []
- | Ok ((p : Storage.Page.t),_ as id') ->
- p
- :: match id' |> Storage.TwoPad10.from_page_i with
- | Error _ -> []
- | Ok pos ->
- n
- |> Storage.Page.other_feeds
- |> List.fold_left (fun init (bas,_) ->
- match Storage.Page.find pos bas with
- | None -> init
- | Some v ->
- let p,i = v in
- Logr.debug (fun m -> m "%s.%s and %s-%i/index.xml" "Main.Note.Delete" "find" p i);
- v :: init ) []
- (** - remove from storage
- - refresh dirty feeds
- todo? rather keep a tombstone? https://www.w3.org/TR/activitypub/#delete-activity-outbox *)
- let delete (id : Uri.t) =
- Logr.debug (fun m -> m "%s.%s '%a'" "Main.Note.Delete" "delete" Uri.pp id);
- let* n = Storage.delete id in
- let rz = [Atom.rule] in
- let _ = n
- |> dirty
- |> List.fold_left (fun init pag ->
- let fn = pag |> Atom.page_to_fn in
- (try fn |> Unix.unlink; (* rather than touch .s *)
- with Unix.(Unix_error(ENOENT, "unlink", _)) -> ());
- (fn |> Make.M2.make rz)
- :: init) [] in
- Ok n
- (** make Ap.Note.Delete.to_json and queue it via fldbl_notify for each in cdb. *)
- let notify_subscribers
- ?(due = Ptime_clock.now ())
- ?(que = Job.qn)
- ?(cdb = Ap.Followers.cdb)
- ~base
- (n : Rfc4287.Entry.t) =
- let json = n |> Ap.Note.Delete.to_json ~base in
- cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
- end
- end
|