main.ml 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575
  1. (*
  2. * _ _ ____ _
  3. * _| || |_/ ___| ___ _ __ _ __ ___ | |
  4. * |_ .. _\___ \ / _ \ '_ \| '_ \ / _ \| |
  5. * |_ _|___) | __/ |_) | |_) | (_) |_|
  6. * |_||_| |____/ \___| .__/| .__/ \___/(_)
  7. * |_| |_|
  8. *
  9. * Personal Social Web.
  10. *
  11. * Copyright (C) The #Seppo contributors. All rights reserved.
  12. *
  13. * This program is free software: you can redistribute it and/or modify
  14. * it under the terms of the GNU General Public License as published by
  15. * the Free Software Foundation, either version 3 of the License, or
  16. * (at your option) any later version.
  17. *
  18. * This program is distributed in the hope that it will be useful,
  19. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  20. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  21. * GNU General Public License for more details.
  22. *
  23. * You should have received a copy of the GNU General Public License
  24. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  25. *)
  26. let ( >>= ) = Result.bind
  27. let ( let* ) = Result.bind
  28. let ( let+ ) = Result.map
  29. let lwt_err e = Lwt.return (Error e)
  30. let ( ^/ ) a b =
  31. let p = Uri.path a in
  32. let p' = p ^ b in
  33. Uri.with_path a p'
  34. (** may go to where PubKeyPem is: As2 *)
  35. let post_signed
  36. ?(date = Ptime_clock.now ())
  37. ?(headers = [ Http.H.ct_jlda; Http.H.acc_app_jlda ])
  38. ~uuid
  39. ~key_id
  40. ~pk
  41. body
  42. uri =
  43. Logr.debug (fun m -> m "%s.%s %a key_id: %a" "Main" "post_signed" Uuidm.pp uuid Uri.pp key_id);
  44. assert (key_id |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa#main-key");
  45. let key = Http.Signature.mkey key_id pk date in
  46. let he_sig = (Http.signed_headers key (Ap.PubKeyPem.digest_base64' body) uri) in
  47. let headers = Cohttp.Header.add_list he_sig headers in
  48. Http.post ~headers body uri
  49. (** lift http errors to errors triggering a retry. *)
  50. let http_to_err sta =
  51. sta
  52. |> Cohttp.Code.string_of_status
  53. |> Result.error
  54. (** a plain (signed) http post *)
  55. let send_http_post ?(fkt = Lwt.return) ~uuid ~key_id ~pk (msg_id, uri, body) =
  56. Logr.debug (fun m -> m "%s.%s %a / %a %a" "Main" "send_http_post" Uri.pp msg_id Uri.pp uri Uuidm.pp uuid);
  57. let%lwt r = post_signed ~uuid ~pk ~key_id body uri in
  58. match r with
  59. | Error _ as e -> Lwt.return e
  60. | Ok (re,_) as o ->
  61. (match re.status with
  62. | #Cohttp.Code.success_status ->
  63. (* may leak memory for unconsumed body *)
  64. fkt o
  65. | sta ->
  66. sta
  67. |> http_to_err
  68. |> Lwt.return)
  69. (** asynchronous, queueable task.
  70. ActivityPub delivery https://www.w3.org/TR/activitypub/#delivery *)
  71. module Apjob = struct
  72. module Notify = struct
  73. (** Prepare a job to queue. Must correspond to dispatch_job *)
  74. let encode msg_id (ibox, dst_actor_id) json =
  75. let msg_id = msg_id |> Uri.to_string
  76. and ibox = ibox |> Uri.to_string
  77. and id = dst_actor_id |> Uri.to_string
  78. and json = json |> Ezjsonm.value_to_string in
  79. Csexp.(List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom id; Atom json]])
  80. let decode = function
  81. | Csexp.(List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom dst_actor_id; Atom json]]) ->
  82. (match json |> Ezjsonm.value_from_string_result with
  83. | Error _ -> Error ()
  84. | Ok json ->
  85. Ok (
  86. msg_id |> Uri.of_string,
  87. (
  88. ibox |> Uri.of_string,
  89. dst_actor_id |> Uri.of_string
  90. ),
  91. json
  92. ))
  93. | _ ->
  94. Error ()
  95. end
  96. end
  97. (** process one job, typically doing http post requests or signed ActivityPub delivery. *)
  98. let dispatch_job ~base ~pk j payload =
  99. let sndr = Uri.make ~path:Ap.proj () |> Http.reso ~base in
  100. let key_id = sndr |> Ap.Person.my_key_id in
  101. let uuid = Uuidm.v `V4 in
  102. Logr.debug (fun m -> m "%s.%s %s %a" "Main" "dispatch_job" j Uuidm.pp uuid);
  103. assert (sndr |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa");
  104. assert (key_id |> Uri.to_string |> St.is_suffix ~affix:"/actor.jsa#main-key");
  105. let fkt ibox = function
  106. | Error e as o ->
  107. Logr.debug (fun m -> m "%s.%s %a %s Error: %s" "Main" "dispatch_job.fkt" Uuidm.pp uuid ibox e);
  108. Lwt.return o
  109. | Ok (rsp,bod) as o ->
  110. let%lwt b = bod |> Cohttp_lwt.Body.to_string in
  111. Logr.debug (fun m -> m "%s.%s %a %s Response: %a\n\n%s" "Main" "dispatch_job.fkt" Uuidm.pp uuid ibox Cohttp.Response.pp_hum rsp b);
  112. Lwt.return o
  113. in
  114. let open Csexp in
  115. match payload with
  116. | List [Atom "2"; Atom msg_id; Atom "http.post"; List [Atom uri; Atom body]] ->
  117. Logr.warn (fun m -> m "%s.%s legacy (maybe future?)" "Main" "dispatch_job");
  118. send_http_post ~uuid ~key_id ~pk (msg_id |> Uri.of_string, uri |> Uri.of_string, body)
  119. | List [Atom "2"; Atom msg_id; Atom "notify"; List [Atom ibox; Atom _dst_actor_id; Atom json]] ->
  120. (* Apjob.Notify.encode *)
  121. send_http_post ~fkt:(fkt ibox) ~uuid ~key_id ~pk (msg_id |> Uri.of_string, ibox |> Uri.of_string, json)
  122. | _ ->
  123. (* must correspond to Apjob.Notify.encode *)
  124. Logr.err (fun m -> m "%s %s.%s invalid job format %s" E.e1016 "Main" "dispatch_job" j);
  125. Lwt.return (Error "invalid job format")
  126. (** Simple, file-based, scheduled job queue.
  127. Inspired by http://cr.yp.to/proto/maildir.html
  128. *)
  129. module Queue = struct
  130. let keep_cur_s = 3 * 24 * 60 * 60
  131. (** Move due tasks from wait to new and loop all new. *)
  132. let process_new_and_due
  133. ?(due = Ptime_clock.now ())
  134. ?(wait = Job.wait)
  135. ?(new_ = Job.new_)
  136. ?(run = Job.run)
  137. ?(cur = Job.cur)
  138. ~base
  139. ~pk
  140. que =
  141. let t0 = Sys.time () in
  142. (* Logr.debug (fun m -> m "%s.%s" "Main" "process_queue"); *)
  143. (** unlink old from dn *)
  144. let dir_clean tmin (Job.Queue qn) dn =
  145. Logr.debug (fun m -> m "%s.%s unlink old from '%s->%s'" "Main.Queue" "process_new_and_due.dir_clean" qn dn);
  146. let tmin = tmin |> Ptime.to_float_s in
  147. let open Astring in
  148. assert (qn |> String.is_suffix ~affix:"/");
  149. assert (dn |> String.is_suffix ~affix:"/");
  150. let dn = qn ^ dn in
  151. File.fold_dir (fun init fn ->
  152. if fn |> String.length > 12 (* keep README.txt etc. *)
  153. then
  154. (try
  155. let fn = dn ^ fn in
  156. let st = fn |> Unix.stat in
  157. if st.st_mtime < tmin
  158. then
  159. (Unix.unlink fn;
  160. Logr.debug (fun m -> m "%s.%s unlinked '%s' %f < %f" "Main.Queue" "process_new_and_due.dir_ćlean" fn st.st_mtime tmin))
  161. with _ -> ());
  162. init,true) () dn
  163. in
  164. (** move those due from wait into new *)
  165. let rec move_due_wait_new ~wait ~new_ ~due =
  166. match Job.(any_due ~due ~wait que) with
  167. | None -> ()
  168. | Some j ->
  169. Job.(move que j wait new_);
  170. move_due_wait_new ~wait ~new_ ~due
  171. in
  172. let Queue que' = que in
  173. let rec loop (i : int) : int Lwt.t =
  174. match Job.any new_ que with
  175. | None -> Lwt.return i
  176. | Some j ->
  177. let%lwt _ =
  178. Job.move que j new_ run;
  179. let fn = que' ^ run ^ j
  180. and error s =
  181. Job.wait_or_err ~wait que run j;
  182. Logr.info (fun m -> m "%s.%s job postponed/cancelled: %s reason: %s" "Main.Queue" "process_new_and_due" j s)
  183. and ok _ =
  184. Job.move que j run cur;
  185. Logr.debug (fun m -> m "%s.%s job done: %s" "Main.Queue" "process_new_and_due" j)
  186. in
  187. match fn |> File.in_channel Csexp.input with
  188. | Error s ->
  189. s
  190. |> error
  191. |> Lwt.return
  192. | Ok p ->
  193. let%lwt r = try%lwt
  194. dispatch_job ~base ~pk j p
  195. with
  196. | Failure s ->
  197. Lwt.return (Error s)
  198. | exn ->
  199. let e = exn |> Printexc.to_string in
  200. Logr.warn (fun m -> m "%s.%s Uncaught Exception job:%s %s" "Main.Queue" "process_new_and_due" j e);
  201. Lwt.return (Error e)
  202. in
  203. r
  204. |> Result.fold ~error ~ok
  205. |> Lwt.return in
  206. loop (i+1)
  207. in
  208. try%lwt
  209. dir_clean (keep_cur_s |> Ptime.Span.of_int_s |> Ptime.sub_span due |> Option.value ~default:Ptime.epoch) Job.qn cur;
  210. move_due_wait_new ~wait ~new_ ~due;
  211. let%lwt i = loop 0 in
  212. Logr.info (fun m -> m "%s.%s finished, %i jobs processed in dt=%.3fs." "Main.Queue" "process_new_and_due" i (Sys.time() -. t0));
  213. Lwt.return (Ok que)
  214. with | exn ->
  215. let msg = Printexc.to_string exn in
  216. Logr.err (fun m -> m "%s %s.%s processing failed %s" E.e1017 "Main.Queue" "process_new_and_due" msg);
  217. Lwt.return (Error msg)
  218. (** do one http request, fire and forget *)
  219. let ping_and_forget ~base ~run_delay_s =
  220. Logr.debug (fun m -> m "%s.%s %is" "Main.Queue" "ping_and_forget" run_delay_s);
  221. let path = Cfg.seppo_cgi ^ "/ping"
  222. and query = [("loop",[Printf.sprintf "%is" run_delay_s])] in
  223. let uri = Uri.make ~path ~query ()
  224. |> Http.reso ~base in
  225. let%lwt f = uri
  226. |> Http.get ~seconds:0.5 (* fire and forget *) in
  227. let _ = Sys.opaque_identity f in
  228. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
  229. |> Lwt.return
  230. let run_fn = "app/var/run/queue.pid"
  231. (** synchronously, sequentially run fkt for all jobs in new. *)
  232. let loop
  233. ?(lock = run_fn)
  234. ~base
  235. ~run_delay_s
  236. fkt =
  237. Logr.debug (fun m -> m "%s.%s" "Main.Queue" "loop");
  238. let _t0 = Unix.time () in
  239. try%lwt
  240. let fd = Unix.openfile lock [O_CLOEXEC; O_CREAT; O_TRUNC; O_WRONLY; O_SYNC] 0o644 in
  241. (* https://git.radicallyopensecurity.com/nlnet/ngie-seppo/-/issues/14#note_129407 *)
  242. Unix.lockf fd F_TLOCK 0;
  243. Unix.ftruncate fd 0;
  244. let oc = fd |> Unix.out_channel_of_descr in
  245. Printf.fprintf oc "%i" (Unix.getpid ());
  246. flush oc;
  247. let%lwt _ = fkt Job.qn
  248. in
  249. Logr.debug (fun m -> m "%s.%s sleep %is" "Main.Queue" "loop" run_delay_s);
  250. Unix.sleep (run_delay_s |> max 3 |> min 1900);
  251. close_out oc;
  252. Unix.unlink lock; (* do not loop if someone unlinked the lock *)
  253. ping_and_forget ~base ~run_delay_s
  254. with
  255. | Unix.(Unix_error(EAGAIN, "lockf", ""))
  256. | Unix.(Unix_error(EACCES, "open", "app/var/run/queue.pid")) ->
  257. Logr.debug (fun m -> m "%s.%s don't race, noop" "Main.Queue" "loop");
  258. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "noop")
  259. |> Lwt.return
  260. | exn ->
  261. (* @TODO Error number *)
  262. Logr.warn (fun m -> m "%s.%s Uncaught exception %a" "Main.Queue" "loop" St.pp_exc exn);
  263. Ok (`OK, [Http.H.ct_plain], Cgi.Response.body "ok")
  264. |> Lwt.return
  265. end
  266. (** monitor outgoing url and add to <link>? *)
  267. let sift_urls (e : Rfc4287.Entry.t) =
  268. Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_urls");
  269. Ok e
  270. (** Extract tags from a post into a list.
  271. Needs the post and a tag store. Modifies both.
  272. *)
  273. let sift_tags cdb (e : Rfc4287.Entry.t) =
  274. Logr.debug (fun m -> m "%s.%s" "Main" "sift_tags");
  275. let open Rfc4287 in
  276. let c2t init ((Label (Single l),_,_) : Rfc4287.Category.t) =
  277. (Tag.Tag ("#" ^ l)) :: init
  278. in
  279. let t2c init (Tag.Tag t) =
  280. Logr.debug (fun m -> m "%s.%s %s" "Main" "sift_tags" t);
  281. let le = t |> String.length in
  282. assert (1 < le);
  283. let t = if '#' == t.[0]
  284. then String.sub t 1 (le-1)
  285. else t in
  286. let t = Single t in
  287. let l = Category.Label t in
  288. let te = Category.Term t in
  289. (l, te, Rfc4287.tagu) :: init
  290. in
  291. let ti = e.title in
  292. let co = e.content in
  293. let tl = e.categories |> List.fold_left c2t [] in
  294. let ti,co,tl = Tag.cdb_normalise ti co tl cdb in
  295. Ok {e with
  296. title = ti;
  297. content = co;
  298. categories = tl |> List.fold_left t2c []}
  299. let find_handles s =
  300. s
  301. |> Lexing.from_string
  302. |> Plain2handle.handle []
  303. (** find mentions *)
  304. let sift_handles (e : Rfc4287.Entry.t) =
  305. Logr.debug (fun m -> m "%s.%s not implemented." "Main" "sift_handles");
  306. (* Ok ((e.title |> find_handles) @ (e.content |> find_handles)) *)
  307. Ok e
  308. (** queue json for destination reac,ibox into que. *)
  309. let fldbl_notify ~due ~que msg_id json init (reac,ibox) =
  310. Logr.debug (fun m -> m "%s.%s %a -> %a" "Main" "fldbl_notify" Uri.pp reac Uri.pp ibox);
  311. let _ = Apjob.Notify.encode msg_id (ibox, reac) json
  312. |> Csexp.to_string
  313. |> Bytes.of_string
  314. |> Job.enqueue ~due que 0 in
  315. init
  316. (** https://www.rfc-editor.org/rfc/rfc4287#section-4.1.2 *)
  317. module Note = struct
  318. let load_basics () =
  319. let* base = Cfg.Base.(from_file fn) in
  320. let* prof = Cfg.Profile.(from_file fn) in
  321. let* Auth.Uid userinfo,_ = Auth.(from_file fn) in
  322. let host = base |> Uri.host |> Option.value ~default:"-" in
  323. let auth = {Rfc4287.Person.empty with
  324. name = prof.title;
  325. uri = Some (Uri.make ~userinfo ~host ())} in
  326. Ok (base,prof,auth)
  327. (** https://www.rfc-editor.org/rfc/rfc4287#section-4.1.2 *)
  328. module Atom = struct
  329. (** rebuild a single atom page plus evtl. the softlink *)
  330. let page_to_atom ~base ~title ~updated ~lang ~(author : Rfc4287.Person.t) (a,b as pag) =
  331. Logr.debug (fun m -> m "%s.%s %s/%d" "Main.Note.Atom" "page_to_atom" a b);
  332. (** fold ix range into entry. *)
  333. let hydrate sc init (p0,_) =
  334. let* init = init
  335. |> Result.map_error
  336. (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.a" e);
  337. e) in
  338. seek_in sc p0;
  339. let* item = Csexp.input sc
  340. |> Result.map_error
  341. (fun e -> Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom.hydrate.b" e);
  342. e) in
  343. match Rfc4287.Entry.decode item with
  344. | Ok item ->
  345. Logr.debug (fun m -> m "%s.%s 0x%x %a" "Main.Note.Atom" "page_to_atom.hydrate.0" p0 Uri.pp item.id);
  346. Ok (item :: init)
  347. | Error "deleted" ->
  348. Logr.warn (fun m -> m "%s.%s found a stale index entry 0x%x" "Main.Note.Atom" "page_to_atom.hydrate.1" p0);
  349. Ok init
  350. | Error e ->
  351. Logr.err (fun m -> m "%s.%s 0x%x ignoring: %s" "Main.Note.Atom" "page_to_atom.hydrate.2" p0 e);
  352. Ok init in
  353. let pn = pag |> Storage.Page.to_fn in
  354. let* es = Storage.fn |> File.in_channel (fun sc ->
  355. Logr.debug (fun m -> m "%s.%s %s" "Main.Note.Atom" "page_to_atom.hydrate" pn);
  356. pn |> File.in_channel (fun ic ->
  357. match Csexp.input_many ic with
  358. | Error e' as e ->
  359. Logr.err (fun m -> m "%s.%s %s/%d: %s" "Main.Note.Atom" "page_to_atom.hydrate" a b e');
  360. e
  361. | Ok l -> l
  362. |> Storage.TwoPad10.decode_many
  363. |> List.rev
  364. |> List.fold_left (hydrate sc) (Ok [])
  365. )) |> Result.map_error (fun e ->
  366. Logr.err (fun m -> m "%s.%s ignored %s" "Main.Note.Atom" "page_to_atom" e);
  367. e) in
  368. (* this used to be an assert, but I hit one case of non-understood failure *)
  369. if not (es |> St.is_monotonous Rfc4287.Entry.compare_published_desc)
  370. then Logr.warn (fun m -> m "%s soft assertion failed: %s" __LOC__ pn);
  371. let self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max:7000 ~base:Uri.empty in
  372. (* atom index.xml *)
  373. let j_xml = "%-%/index.xml" |> Make.Jig.make in
  374. let fn = [a;b|> string_of_int] |> Make.Jig.paste j_xml |> Option.get in
  375. Logr.debug (fun m -> m "%s.%s %s/%d -> %s (%d entries)" "Main.Note.Atom" "page_to_atom" a b fn (es |> List.length));
  376. let x = es |> Rfc4287.Feed.to_atom
  377. ~base
  378. ~self
  379. ~prev
  380. ~next
  381. ~first
  382. ~last
  383. ~title
  384. ~updated
  385. ~lang
  386. ~author in
  387. let _ = fn |> Filename.dirname |> File.mkdir_p File.pDir in
  388. fn |> File.out_channel_replace (x |> Xml.to_chan ~xsl:(Rfc4287.xsl "posts.xsl" fn));
  389. let _,_ = Storage.make_feed_syml pag fn in
  390. Ok fn
  391. let rule = ({
  392. target = "%-%/index.xml";
  393. prerequisites = "app/var/db/%/%.s" :: Cfg.Base.fn :: Cfg.Profile.fn :: [];
  394. fresh = Make.Outdated;
  395. command = (fun p _rz r t ->
  396. let* base,prof,auth = load_basics () in
  397. assert ("%-%/index.xml" |> String.equal r.target);
  398. assert ("app/var/db/%/%.s" |> String.equal (r.prerequisites |> List.hd));
  399. let src,_,v = t |> Make.src_from r in
  400. Logr.debug (fun m -> m "%s.%s %s %s -> %s" "Main.Note.Atom" "rule" p src t);
  401. let pag = match v with
  402. | [a;b] -> (a,b |> int_of_string)
  403. | _ -> failwith __LOC__ in
  404. let max = Storage.Page.( pag |> find_max |> to_int )
  405. and now = Ptime_clock.now () in
  406. let author = auth
  407. and lang = prof.language
  408. and title = prof.title
  409. and tz = prof.timezone
  410. and self,first,last,prev,next = pag |> Rfc4287.Feed.compute_links ~max ~base:Uri.empty in
  411. let updated = now |> Rfc3339.of_ptime ~tz in
  412. let* pag = src |> File.in_channel Csexp.input_many in
  413. let r = pag
  414. |> List.fold_left Storage.fold_of_twopad10 []
  415. |> Rfc4287.Feed.to_atom_
  416. ~base
  417. ~self
  418. ~prev
  419. ~next
  420. ~first
  421. ~last
  422. ~title
  423. ~updated
  424. ~lang
  425. ~author
  426. t
  427. |> Rfc4287.Feed.to_file t in
  428. (* HOW to (re-)create the symlink in case *)
  429. (* let _,_ = mk_unnumbered_syml (depth,unn,p) fn in *)
  430. r
  431. );
  432. } : Make.t)
  433. let jig = rule.target |> Make.Jig.make
  434. let page_to_fn (a,i : Storage.Page.t) =
  435. assert (a |> St.is_prefix ~affix:"o/");
  436. [a;i |> string_of_int]
  437. |> Make.Jig.paste jig
  438. |> Option.get
  439. end
  440. (** Atom, local *)
  441. let publish ~base ~(profile : Cfg.Profile.t) ~(author : Rfc4287.Person.t) (n : Rfc4287.Entry.t) =
  442. Logr.debug (fun m -> m "%s.%s '%s'" "Main.Note" "publish" n.title);
  443. (* determine id and do store app/var/db/o/p.s *)
  444. (* add to indices (p,d,t) *)
  445. (* (2,"o/p",4) app/var/db/o/p.s app/var/db/o/p/4.s -> o/p-4/index.xml *)
  446. (* (3,"o/d/2023-20-13",4) app/var/db/o/d/2023-10-13/4.s -> o/d/2023-10-13-4/index.xml *)
  447. (* (3,"o/t/tag",4) app/var/db/o/t/tag/4.s -> o/t/tag-4/index.xml *)
  448. (* add to storage and indices (main,date,tags)) *)
  449. let items_per_page = profile.posts_per_page in
  450. let n,(a,_b as ix),pos = n |> Storage.save ~items_per_page in
  451. assert (a |> String.equal "o/p");
  452. let append_to_page pos init pa = let _ = (pos |> Storage.Page.apnd pa) in
  453. pa :: init in
  454. let ix_other : Storage.Page.t list = n
  455. |> Storage.Page.next_other_pages ~items_per_page
  456. |> List.fold_left (append_to_page pos) [] in
  457. (* refresh feeds, outbox etc. *)
  458. let lang = profile.language in
  459. let title = profile.title in
  460. let updated = n.updated (* more precisely would be: now *) in
  461. let mater init ix = (ix |> Atom.page_to_atom ~base ~title ~updated ~lang ~author) :: init in
  462. let l = ix :: ix_other
  463. |> List.fold_left mater [] in
  464. assert ( 1 + 1 + (n.categories |> List.length) == (l |> List.length));
  465. Ok n
  466. module Create = struct
  467. (** Enqueue jobs.
  468. https://www.w3.org/TR/activitypub/#delivery says "Servers MUST de-duplicate
  469. the final recipient list." which implies each actor profile / inbox lookup
  470. can lag delivery for all.
  471. How long could such a consolidated inbox list be cached? In theory not at
  472. all because each inbox target url may change without further notice.
  473. In pratice, we will use the inbox as long as it works and redo the
  474. webfinger/actor lookup otherwise.
  475. 1. get all actor profiles (limit redirects) and extract inbox url
  476. 2. de-duplicate
  477. 3. deliver to all
  478. 4. retry temporary failures
  479. 5. handle permanent failures to clean link rot
  480. *)
  481. let notify_subscribers
  482. ?(due = Ptime_clock.now ())
  483. ?(que = Job.qn)
  484. ?(cdb = Ap.Followers.cdb)
  485. ~base
  486. (n : Rfc4287.Entry.t) =
  487. let json = n |> Ap.Note.Create.to_json ~base in
  488. cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
  489. end
  490. (** application logic around delete. *)
  491. module Delete = struct
  492. (* find dirty o/t/foo-1/index.xml and o/d/2024-03-12-7/index.xml pages *)
  493. let dirty (n : Rfc4287.Entry.t) : Storage.Page.t list =
  494. (* the primary o/p/0.s *)
  495. match n.id |> Storage.Id.to_page_i with
  496. | Error _ -> []
  497. | Ok ((p : Storage.Page.t),_ as id') ->
  498. p
  499. :: match id' |> Storage.TwoPad10.from_page_i with
  500. | Error _ -> []
  501. | Ok pos ->
  502. n
  503. |> Storage.Page.other_feeds
  504. |> List.fold_left (fun init (bas,_) ->
  505. match Storage.Page.find pos bas with
  506. | None -> init
  507. | Some v ->
  508. let p,i = v in
  509. Logr.debug (fun m -> m "%s.%s and %s-%i/index.xml" "Main.Note.Delete" "find" p i);
  510. v :: init ) []
  511. (** - remove from storage
  512. - refresh dirty feeds
  513. todo? rather keep a tombstone? https://www.w3.org/TR/activitypub/#delete-activity-outbox *)
  514. let delete (id : Uri.t) =
  515. Logr.debug (fun m -> m "%s.%s '%a'" "Main.Note.Delete" "delete" Uri.pp id);
  516. let* n = Storage.delete id in
  517. let rz = [Atom.rule] in
  518. let _ = n
  519. |> dirty
  520. |> List.fold_left (fun init pag ->
  521. let fn = pag |> Atom.page_to_fn in
  522. (try fn |> Unix.unlink; (* rather than touch .s *)
  523. with Unix.(Unix_error(ENOENT, "unlink", _)) -> ());
  524. (fn |> Make.M2.make rz)
  525. :: init) [] in
  526. Ok n
  527. (** make Ap.Note.Delete.to_json and queue it via fldbl_notify for each in cdb. *)
  528. let notify_subscribers
  529. ?(due = Ptime_clock.now ())
  530. ?(que = Job.qn)
  531. ?(cdb = Ap.Followers.cdb)
  532. ~base
  533. (n : Rfc4287.Entry.t) =
  534. let json = n |> Ap.Note.Delete.to_json ~base in
  535. cdb |> Ap.Followers.(fold_left (State.ibox' (fldbl_notify ~due ~que n.id json))) (Ok n)
  536. end
  537. end