main.ml 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. (*
  2. * _ _ ____ _
  3. * _| || |_/ ___| ___ _ __ _ __ ___ | |
  4. * |_ .. _\___ \ / _ \ '_ \| '_ \ / _ \| |
  5. * |_ _|___) | __/ |_) | |_) | (_) |_|
  6. * |_||_| |____/ \___| .__/| .__/ \___/(_)
  7. * |_| |_|
  8. *
  9. * Personal Social Web.
  10. *
  11. * Copyright (C) The #Seppo contributors. All rights reserved.
  12. *
  13. * This program is free software: you can redistribute it and/or modify
  14. * it under the terms of the GNU General Public License as published by
  15. * the Free Software Foundation, either version 3 of the License, or
  16. * (at your option) any later version.
  17. *
  18. * This program is distributed in the hope that it will be useful,
  19. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  20. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  21. * GNU General Public License for more details.
  22. *
  23. * You should have received a copy of the GNU General Public License
  24. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  25. *)
  26. let ( >>= ) = Result.bind
  27. let ( let* ) = Result.bind
  28. let ( let+ ) = Result.map
  29. let lwt_err e = Lwt.return (Error e)
  30. let ( ^/ ) a b =
  31. let p = Uri.path a in
  32. let p' = p ^ b in
  33. Uri.with_path a p'
  34. (* may go to where PubKeyPem is: As2 *)
  35. let post_signed
  36. ?(date = Ptime_clock.now ())
  37. ?(headers = [ Http.H.ct_jlda; Http.H.acc_app_jlda ])
  38. ~uuid
  39. ~key_id
  40. ~pk
  41. body
  42. uri =
  43. Logr.debug (fun m -> m "%s.%s %a key_id: %a" "Main" "post_signed" Uuidm.pp uuid Uri.pp key_id);
  44. assert (key_id |> Uri.to_string |> St.ends_with ~suffix:"/actor.jsa#main-key");
  45. let key : Http.t_sign_k = (key_id,Ap.PubKeyPem.sign pk,date) in
  46. let he_sig = (Http.signed_headers key (Ap.PubKeyPem.digest_base64' body) uri) in
  47. let headers = Cohttp.Header.add_list he_sig headers in
  48. Http.post ~headers body uri
  49. let send_http_post ?(fkt = Lwt.return) ~uuid ~key_id ~pk (msg_id, uri, body) =
  50. Logr.debug (fun m -> m "%s.%s %a / %a %a" "Main" "send_http_post" Uri.pp msg_id Uri.pp uri Uuidm.pp uuid);
  51. let%lwt r = post_signed ~uuid ~pk ~key_id body uri in
  52. match r with
  53. | Error _ as e -> e |> Lwt.return
  54. | Ok (re,_) as o ->
  55. (match re.status with
  56. | #Cohttp.Code.success_status -> fkt o
  57. | _ -> re.status
  58. |> Cohttp.Code.string_of_status
  59. |> Result.error
  60. |> Lwt.return)
  61. (* https://www.w3.org/TR/activitypub/#delivery *)
  62. let notify_remote_actor ~uuid ~pk ~sndr (msg_id, ibox, _, body) =
  63. Logr.debug (fun m -> m "%s.%s %a %a %a" "Main" "notify_remote_actor" Uri.pp ibox Uri.pp msg_id Uuidm.pp uuid);
  64. let key_id = sndr |> Ap.Person.key_id in
  65. assert (sndr |> Uri.to_string |> St.ends_with ~suffix:"/actor.jsa");
  66. assert (key_id|> Uri.to_string |> St.ends_with ~suffix:"/actor.jsa#main-key");
  67. post_signed ~uuid ~pk ~key_id body ibox
  68. (* must correspond to dispatch_job *)
  69. let job_encode_notify msg_id (ibox, id) (json : Ezjsonm.value) : Csexp.t =
  70. let msg_id = msg_id |> Uri.to_string
  71. and ibox = ibox |> Uri.to_string
  72. and id = id |> Uri.to_string
  73. and json = json |> Ezjsonm.value_to_string in
  74. Csexp.(List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom id; Atom json]])
  75. let dispatch_job ~base ~pk j payload =
  76. let sndr = Uri.make ~path:Ap.proj () |> Http.reso ~base in
  77. let key_id = sndr |> Ap.Person.key_id in
  78. let uuid = Uuidm.v `V4 in
  79. Logr.debug (fun m -> m "%s.%s %s %a" "Main" "dispatch_job" j Uuidm.pp uuid);
  80. assert (sndr |> Uri.to_string |> St.ends_with ~suffix:"/actor.jsa");
  81. assert (key_id |> Uri.to_string |> St.ends_with ~suffix:"/actor.jsa#main-key");
  82. let open Csexp in
  83. match payload with
  84. | List [Atom "2"; Atom msg_id; Atom "http.post"; List [Atom uri; Atom body]] ->
  85. send_http_post ~uuid ~key_id ~pk (msg_id |> Uri.of_string, uri |> Uri.of_string, body)
  86. | List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom id; Atom json]] ->
  87. (* must correspond to job_encode_notify *)
  88. let%lwt r = notify_remote_actor ~uuid ~sndr ~pk (msg_id |> Uri.of_string, ibox |> Uri.of_string, id |> Uri.of_string, json) in
  89. (match r with
  90. | Ok (rsp,bod) ->
  91. let%lwt b = bod |> Cohttp_lwt.Body.to_string in
  92. Logr.debug (fun m -> m "%s.%s %a %s Response: %a\n\n%s" "Main" "dispatch_job" Uuidm.pp uuid ibox Cohttp.Response.pp_hum rsp b);
  93. Lwt.return (Ok (rsp,bod))
  94. | Error _ as e -> Lwt.return e )
  95. | _ ->
  96. Logr.err (fun m -> m "%s %s.%s invalid job format %s" E.e1016 "Main" "dispatch_job" j);
  97. Error "invalid job format" |> Lwt.return
  98. type t_f = Csexp.t -> (unit, string) result Lwt.t option
  99. let job_just_post ~base ~pk ~job_name (payload : Csexp.t) =
  100. let sndr = Uri.make ~path:Ap.proj () |> Http.reso ~base in
  101. let key_id = sndr |> Ap.Person.key_id in
  102. let uuid = Uuidm.v `V4 in
  103. Logr.debug (fun m -> m "%s.%s %s %a" "Main" "dispatch_job" job_name Uuidm.pp uuid);
  104. assert (sndr |> Uri.to_string |> St.ends_with ~suffix:"/actor.jsa");
  105. assert (key_id |> Uri.to_string |> St.ends_with ~suffix:"/actor.jsa#main-key");
  106. let open Csexp in
  107. match payload with
  108. | List [Atom "2"; Atom msg_id; Atom "http.post"; List [Atom uri; Atom body]] ->
  109. Some (send_http_post ~uuid ~key_id ~pk (msg_id |> Uri.of_string, uri |> Uri.of_string, body))
  110. | _ -> None
  111. let run_any_job ~base ~pk j payload =
  112. let _ = base
  113. and _ = pk
  114. and _ = j
  115. and _ = payload in
  116. let run_until_some (fkts : t_f list) (v : Csexp.t) =
  117. fkts |> List.fold_left
  118. (fun r fkt ->
  119. match r with
  120. | Some _ as r -> r (* we have a result, don't call more workers *)
  121. | None -> fkt v
  122. )
  123. None
  124. in
  125. match j ,
  126. (Csexp.Atom "xyz")
  127. |> run_until_some [ (* run_job_no; run_job_no; run_job_yes; run_job_yes; run_job_n *) ]
  128. with
  129. | _,None -> Lwt.return (Ok ())
  130. | _fn,Some x ->
  131. let%lwt x = x in
  132. x |> Lwt.return
  133. module Queue = struct
  134. let process_new_and_due
  135. ?(due = Ptime_clock.now ())
  136. ?(wait = Job.wait)
  137. ?(new_ = Job.new_)
  138. ?(run = Job.run)
  139. ?(cur = Job.cur)
  140. ~base
  141. ~pk
  142. que =
  143. let t0 = Sys.time () in
  144. (* Logr.debug (fun m -> m "%s.%s" "Main" "process_queue"); *)
  145. let _ = run
  146. and _ = cur in
  147. (* move those due from wait into new *)
  148. let rec move_due_wait_new ~wait ~new_ ~due =
  149. match Job.(any_due ~due ~wait que) with
  150. | None -> ()
  151. | Some j ->
  152. Job.(move que j wait new_);
  153. move_due_wait_new ~wait ~new_ ~due
  154. in
  155. let Queue que' = que in
  156. let rec loop (i : int) : int Lwt.t =
  157. match Job.any new_ que with
  158. | None -> Lwt.return i
  159. | Some j ->
  160. let%lwt _ =
  161. let open Job in
  162. move que j new_ run;
  163. let fn = que' ^ run ^ j
  164. and error s =
  165. wait_or_err ~wait que run j;
  166. Logr.info (fun m -> m "%s.%s job postponed/cancelled: %s reason: %s" "Main.Queue" "process_new_and_due" j s)
  167. and ok _ =
  168. move que j run cur;
  169. Logr.debug (fun m -> m "%s.%s job done: %s" "Main.Queue" "process_new_and_due" j)
  170. in
  171. match File.in_channel fn Csexp.input with
  172. | Error s ->
  173. s
  174. |> error
  175. |> Lwt.return
  176. | Ok p ->
  177. let%lwt r = try
  178. dispatch_job ~base ~pk j p
  179. with exn ->
  180. let e = exn |> Printexc.to_string in
  181. Logr.warn (fun m -> m "%s.%s Uncaught Exception job:%s %s" "Main.Queue" "process_new_and_due" j e);
  182. Error e |> Lwt.return
  183. in
  184. r
  185. |> Result.fold ~error ~ok
  186. |> Lwt.return in
  187. loop (i+1)
  188. in
  189. try
  190. move_due_wait_new ~wait ~new_ ~due;
  191. let%lwt i = loop 0 in
  192. Logr.info (fun m -> m "%s.%s finished, %i jobs processed in dt=%.3fs." "Main.Queue" "process_new_and_due" i (Sys.time() -. t0));
  193. Lwt.return (Ok que)
  194. with | exn ->
  195. let msg = Printexc.to_string exn in
  196. Logr.err (fun m -> m "%s %s.%s processing failed %s" E.e1017 "Main.Queue" "process_new_and_due" msg);
  197. (Error __LOC__) |> Lwt.return
  198. let ping_and_forget ~base ~run_delay =
  199. Logr.debug (fun m -> m "%s.%s %is" "Main.Queue" "ping_and_forget" run_delay);
  200. let path = Cfg.cgi ^ "/ping"
  201. and query = [("loop",[Printf.sprintf "%is" run_delay])] in
  202. let uri = Uri.make ~path ~query ()
  203. |> Http.reso ~base in
  204. let%lwt f = uri
  205. |> Http.get ~seconds:0.5 (* fire and forget *) in
  206. let _ = Sys.opaque_identity f in
  207. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
  208. |> Lwt.return
  209. let run_fn = "app/var/run/queue.pid"
  210. let loop
  211. ?(lock = run_fn)
  212. ~base
  213. ~run_delay
  214. fkt =
  215. Logr.debug (fun m -> m "%s.%s" "Main.Queue" "loop");
  216. let _t0 = Unix.time () in
  217. try
  218. let fd = Unix.openfile lock [O_CLOEXEC; O_CREAT; O_TRUNC; O_WRONLY; O_SYNC] 0o644 in
  219. let oc = fd |> Unix.out_channel_of_descr in
  220. Printf.fprintf oc "%i" (Unix.getpid ());
  221. flush oc;
  222. Unix.lockf fd F_TLOCK 0
  223. ;
  224. let%lwt _ = fkt Job.qn
  225. in
  226. Logr.debug (fun m -> m "%s.%s sleep %is" "Main.Queue" "loop" run_delay);
  227. Unix.sleep (run_delay |> max 3 |> min 1900);
  228. close_out oc;
  229. Unix.unlink lock; (* do not loop if someone unlinked the lock *)
  230. ping_and_forget ~base ~run_delay
  231. with
  232. | Unix.Unix_error(Unix.EAGAIN, "lockf", "")
  233. | Unix.Unix_error(Unix.EACCES, "open", "app/var/run/queue.pid") ->
  234. Logr.debug (fun m -> m "%s.%s don't race, noop" "Main.Queue" "loop");
  235. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "noop")
  236. |> Lwt.return
  237. | exn ->
  238. let e = exn |> Printexc.to_string in
  239. Logr.warn (fun m -> m "%s.%s Uncaught exception %s" "Main.Queue" "loop" e);
  240. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
  241. |> Lwt.return
  242. end
  243. (* monitor outgoing url and add to <link>? *)
  244. let sift_urls (e : Rfc4287.Entry.t) =
  245. Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_urls");
  246. Ok e
  247. (* Extract tags from a post into a list.
  248. *
  249. * Needs the post and a tag store. Modifies both.
  250. *)
  251. let sift_tags cdb (e : Rfc4287.Entry.t) =
  252. Logr.debug (fun m -> m "%s.%s" "Main" "sift_tags");
  253. let open Rfc4287 in
  254. let c2t init ((Label (Single l),_,_) : Rfc4287.Category.t) =
  255. (Tag.Tag ("#" ^ l)) :: init
  256. in
  257. let t2c init (Tag.Tag t) =
  258. Logr.debug (fun m -> m "%s.%s %s" "Main" "sift_tags" t);
  259. let le = t |> String.length in
  260. assert (1 < le);
  261. let t = if '#' == t.[0]
  262. then String.sub t 1 (le-1)
  263. else t in
  264. let t = Single t in
  265. let l = Category.Label t in
  266. let te = Category.Term t in
  267. (l, te, Rfc4287.tagu) :: init
  268. in
  269. let ti = e.title in
  270. let co = e.content in
  271. let tl = e.categories |> List.fold_left c2t [] in
  272. let ti,co,tl = Tag.cdb_normalise ti co tl cdb in
  273. Ok {e with
  274. title = ti;
  275. content = co;
  276. categories = tl |> List.fold_left t2c []}
  277. let find_handles s =
  278. s
  279. |> Lexing.from_string
  280. |> Plain2handle.handle []
  281. (* find mentions *)
  282. let sift_handles (e : Rfc4287.Entry.t) =
  283. Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_handles");
  284. (* Ok ((e.title |> find_handles) @ (e.content |> find_handles)) *)
  285. Ok e
  286. let fldbl_notify ~due ~que msg_id json init (reac,ibox) =
  287. Logr.debug (fun m -> m "%s.%s %a -> %a" "Main" "fldbl_notify" Uri.pp reac Uri.pp ibox);
  288. let _ = job_encode_notify msg_id (ibox, reac) json
  289. |> Csexp.to_string
  290. |> Bytes.of_string
  291. |> Job.enqueue ~due que 0 in
  292. init
  293. module Note = struct
  294. let load_basics () =
  295. let* base = Cfg.Base.(from_file fn) in
  296. let* prof = Cfg.Profile.(from_file fn) in
  297. let* Auth.Uid userinfo,_ = Auth.(from_file fn) in
  298. let host = base |> Uri.host |> Option.value ~default:"-" in
  299. let auth = Uri.make ~userinfo ~host () in
  300. Ok (base,prof,auth)
  301. module Atom = struct
  302. (* rebuild a single atom page plus evtl. the softlink *)
  303. let page_to_atom ~base ~title ~updated ~lang ~(author : Uri.t) (a,b as pag) =
  304. Logr.debug (fun m -> m "%s.%s %s/%d" "Main.Note.Atom" "page_to_atom" a b);
  305. (* fold ix range into entry. *)
  306. let hydrate sc init (p0,_) =
  307. let* init = init
  308. |> Result.map_error
  309. (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.a" e);
  310. e) in
  311. seek_in sc p0;
  312. let* item = Csexp.input sc
  313. |> Result.map_error
  314. (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.b" e);
  315. e) in
  316. match Rfc4287.Entry.decode item with
  317. | Ok item ->
  318. Logr.debug (fun m -> m "%s.%s 0x%x %a" "Main.Note.Atom" "page_to_atom.hydrate.0" p0 Uri.pp item.id);
  319. Ok (item :: init)
  320. | Error "deleted" ->
  321. Logr.warn (fun m -> m "%s.%s found a stale index entry 0x%x" "Main.Note.Atom" "page_to_atom.hydrate.1" p0);
  322. Ok init
  323. | Error e ->
  324. Logr.err (fun m -> m "%s.%s 0x%x ignoring: %s" "Main.Note.Atom" "page_to_atom.hydrate.2" p0 e);
  325. Ok init in
  326. let pn = pag |> Storage.Page.to_fn in
  327. let* es = File.in_channel Storage.fn (fun sc ->
  328. Logr.debug (fun m -> m "%s.%s %s" "Main.Note.Atom" "page_to_atom.hydrate" pn);
  329. pn |> File.in_channel' (fun ic ->
  330. match Csexp.input_many ic with
  331. | Error e' as e ->
  332. Logr.err (fun m -> m "%s.%s %s/%d: %s" "Main.Note.Atom" "page_to_atom.hydrate" a b e');
  333. e
  334. | Ok l -> l
  335. |> Storage.TwoPad10.decode_many
  336. |> List.rev
  337. |> List.fold_left (hydrate sc) (Ok [])
  338. )) |> Result.map_error (fun e ->
  339. Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom" e);
  340. e) in
  341. (* this used to be an assert, but I hit one case of non-understood failure *)
  342. if (es |> St.is_monotonous Rfc4287.Entry.compare_published_desc)
  343. then Logr.warn (fun m -> m "%s soft assertion failed: %s" __LOC__ pn);
  344. let self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max:7000 ~base:Uri.empty in
  345. (* atom index.xml *)
  346. let j_xml = "%-%/index.xml" |> Make.Jig.make in
  347. let fn = [a;b|> string_of_int] |> Make.Jig.paste j_xml |> Option.get in
  348. Logr.debug (fun m -> m "%s.%s %s/%d -> %s (%d entries)" "Main.Note.Atom" "page_to_atom" a b fn (es |> List.length));
  349. let x = es |> Rfc4287.Feed.to_atom
  350. ~base
  351. ~self
  352. ~prev
  353. ~next
  354. ~first
  355. ~last
  356. ~title
  357. ~updated
  358. ~lang
  359. ~author in
  360. let _ = fn |> Filename.dirname |> File.mkdir_p File.pDir in
  361. fn |> File.out_channel' (x |> Xml.to_chan ~xsl:(Rfc4287.xsl "posts.xsl" fn));
  362. let _,_ = Storage.make_feed_syml pag fn in
  363. Ok fn
  364. let rule = ({
  365. target = "%-%/index.xml";
  366. prerequisites = "app/var/db/%/%.s" :: Cfg.Base.fn :: Cfg.Profile.fn :: [];
  367. fresh = Make.Outdated;
  368. command = (fun p _rz r t ->
  369. let* base,prof,auth = load_basics () in
  370. assert ("%-%/index.xml" |> String.equal r.target);
  371. assert ("app/var/db/%/%.s" |> String.equal (r.prerequisites |> List.hd));
  372. let src,_,v = t |> Make.src_from r in
  373. Logr.debug (fun m -> m "%s.%s %s %s -> %s" "Main.Note.Atom" "rule" p src t);
  374. let pag = match v with
  375. | [a;b] -> (a,b |> int_of_string)
  376. | _ -> failwith __LOC__ in
  377. let max = Storage.Page.( pag |> find_max |> to_int )
  378. and now = Ptime_clock.now () in
  379. let author = auth
  380. and lang = prof.language
  381. and title = prof.title
  382. and tz = prof.timezone
  383. and self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max ~base:Uri.empty in
  384. let updated = now |> Rfc3339.of_ptime ~tz in
  385. let* pag = src |> File.in_channel' Csexp.input_many in
  386. let r = pag
  387. |> List.fold_left Storage.fold_of_twopad10 []
  388. |> Rfc4287.Feed.to_atom_
  389. ~base
  390. ~self
  391. ~prev
  392. ~next
  393. ~first
  394. ~last
  395. ~title
  396. ~updated
  397. ~lang
  398. ~author
  399. t
  400. |> Rfc4287.Feed.to_file t in
  401. (* HOW to (re-)create the symlink in case *)
  402. (* let _,_ = mk_unnumbered_syml (depth,unn,p) fn in *)
  403. r
  404. );
  405. } : Make.t)
  406. let jig = rule.target |> Make.Jig.make
  407. let page_to_fn (a,i : Storage.Page.t) =
  408. assert (a |> St.starts_with ~prefix:"o/");
  409. [a;i |> string_of_int]
  410. |> Make.Jig.paste jig
  411. |> Option.get
  412. end
  413. let publish ~base ~(profile : Cfg.Profile.t) ~(author : Uri.t) (n : Rfc4287.Entry.t) =
  414. Logr.debug (fun m -> m "%s.%s '%s'" "Main.Note" "publish" n.title);
  415. (* determine id and do store app/var/db/o/p.s *)
  416. (* add to indices (p,d,t) *)
  417. (* (2,"o/p",4) app/var/db/o/p.s app/var/db/o/p/4.s -> o/p-4/index.xml *)
  418. (* (3,"o/d/2023-20-13",4) app/var/db/o/d/2023-10-13/4.s -> o/d/2023-10-13-4/index.xml *)
  419. (* (3,"o/t/tag",4) app/var/db/o/t/tag/4.s -> o/t/tag-4/index.xml *)
  420. (* add to storage and indices (main,date,tags)) *)
  421. let items_per_page = profile.posts_per_page in
  422. let n,(a,_b as ix),pos = n |> Storage.save ~items_per_page in
  423. assert (a |> String.equal "o/p");
  424. let append_to_page pos init pa = let _ = (pos |> Storage.Page.apnd pa) in
  425. pa :: init in
  426. let ix_other : Storage.Page.t list = n
  427. |> Storage.Page.next_other_pages ~items_per_page
  428. |> List.fold_left (append_to_page pos) [] in
  429. (* refresh feeds, outbox etc. *)
  430. let lang = profile.language in
  431. let title = profile.title in
  432. let updated = n.updated (* more precisely would be: now *) in
  433. let mater init ix = (ix |> Atom.page_to_atom ~base ~title ~updated ~lang ~author) :: init in
  434. let l = ix :: ix_other
  435. |> List.fold_left mater [] in
  436. assert ( 1 + 1 + (n.categories |> List.length) == (l |> List.length));
  437. Ok n
  438. module Create = struct
  439. (** Enqueue jobs.
  440. *
  441. * https://www.w3.org/TR/activitypub/#delivery says "Servers MUST de-duplicate
  442. * the final recipient list." which implies each actor profile / inbox lookup
  443. * can lag delivery for all.
  444. *
  445. * How long could such a consolidated inbox list be cached? In theory not at
  446. * all because each inbox target url may change without further notice.
  447. *
  448. * In pratice, we will use the inbox as long as it works and redo the
  449. * webfinger/actor lookup otherwise.
  450. *
  451. * 1. get all actor profiles (limit redirects) and extract inbox url
  452. * 2. de-duplicate
  453. * 3. deliver to all
  454. * 4. retry temporary failures
  455. * 5. handle permanent failures to clean link rot
  456. *)
  457. let notify_subscribers
  458. ?(due = Ptime_clock.now ())
  459. ?(que = Job.qn)
  460. ?(cdb = Ap.Followers.cdb)
  461. ~base
  462. (n : Rfc4287.Entry.t) =
  463. let json = n |> Ap.Note.Create.to_json ~base in
  464. cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
  465. end
  466. module Delete = struct
  467. (* find dirty o/t/foo-1/index.xml and o/d/2024-03-12-7/index.xml pages *)
  468. let dirty (n : Rfc4287.Entry.t) : Storage.Page.t list =
  469. (* the primary o/p/0.s *)
  470. match n.id |> Storage.TwoPad10.id_to_page_i with
  471. | Error _ -> []
  472. | Ok ((p : Storage.Page.t),_ as id') ->
  473. p
  474. :: match id' |> Storage.TwoPad10.from_page_i with
  475. | Error _ -> []
  476. | Ok pos ->
  477. n
  478. |> Storage.Page.other_feeds
  479. |> List.fold_left (fun init (bas,_) ->
  480. match Storage.Page.find pos bas with
  481. | None -> init
  482. | Some v ->
  483. let p,i = v in
  484. Logr.debug (fun m -> m "%s.%s and %s-%i/index.xml" "Main.Note.Delete" "find" p i);
  485. v :: init ) []
  486. (* - remove from storage
  487. * - refresh dirty feeds *)
  488. let delete (id : Uri.t) =
  489. Logr.debug (fun m -> m "%s.%s '%a'" "Main.Note.Delete" "delete" Uri.pp id);
  490. let* n = Storage.delete id in
  491. let rz = [Atom.rule] in
  492. let _ = n
  493. |> dirty
  494. |> List.fold_left (fun init pag ->
  495. let fn = pag |> Atom.page_to_fn in
  496. (try fn |> Unix.unlink; (* rather than touch .s *)
  497. with Unix.(Unix_error(ENOENT, "unlink", _)) -> ());
  498. (fn |> Make.M2.make rz)
  499. :: init) [] in
  500. Ok n
  501. let notify_subscribers
  502. ?(due = Ptime_clock.now ())
  503. ?(que = Job.qn)
  504. ?(cdb = Ap.Followers.cdb)
  505. ~base
  506. (n : Rfc4287.Entry.t) =
  507. let json = n |> Ap.Note.Delete.to_json ~base in
  508. cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
  509. end
  510. end