123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- (*
- * _ _ ____ _
- * _| || |_/ ___| ___ _ __ _ __ ___ | |
- * |_ .. _\___ \ / _ \ '_ \| '_ \ / _ \| |
- * |_ _|___) | __/ |_) | |_) | (_) |_|
- * |_||_| |____/ \___| .__/| .__/ \___/(_)
- * |_| |_|
- *
- * Personal Social Web.
- *
- * Copyright (C) The #Seppo contributors. All rights reserved.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *)
- (* streamline wording with https://v2.ocaml.org/api/Queue.html *)
- type queue = Queue of string
- let qn = Queue "app/var/spool/job/"
- (* slots *)
- let cur = "cur/"
- let err = "err/"
- let new_ = "new/"
- let run = "run/"
- let tmp = "tmp/"
- let wait = "wait/"
- (* exponentially growing delay. 0 is zero. Has https://encore.dev/blog/retries#jitter ?*)
- let do_wait ?(now = Ptime_clock.now ()) ?(jitter = (Random.float 0.1) -. 0.05) i =
- assert (jitter >= -0.05);
- assert (jitter <= 0.05);
- assert (i >= 0);
- Logr.debug (fun m -> m "%s.%s jitter %f %%" "Job" "do_wait" jitter);
- let f = 60 * ((Int.shift_left 1 i) - 1)
- |> float_of_int in
- let f = f *. (1.0 +. jitter) in
- match f
- |> Ptime.Span.of_float_s
- |> Option.get
- |> Ptime.add_span now with
- | None -> now
- | Some t -> t
- let rfc3339 t =
- let (y, m, d), ((hh, mm, ss), tz_s) = Ptime.to_date_time t in
- assert (tz_s = 0);
- Printf.sprintf "%04d-%02d-%02dT%02d%02d%02dZ" y m d hh mm ss
- let move (Queue que) job src dst =
- Logr.debug (fun m -> m "%s.%s %s -> %s %s" "Job" "move" src dst job);
- Unix.rename (que ^ src ^ job) (que ^ dst ^ job)
- let compute_nonce byt =
- byt
- |> Mapcdb.hash32_byt
- |> Optint.to_string
- let compute_fn due n nonce =
- Printf.sprintf "%s.%d.%s.s" due n nonce
- (* similar Queue.add *)
- let enqueue ?(due = Ptime_clock.now ()) q' n byt =
- Logr.debug (fun m -> m "%s.%s %s" "Job" "enqueue" (due |> rfc3339));
- let due = due |> rfc3339 in
- let nonce = compute_nonce byt in
- let fn = compute_fn due n nonce in
- let Queue q = q' in
- let tmp' = q ^ tmp ^ fn in
- let new' = q ^ new_ ^ fn in
- Logr.debug (fun m -> m "%s.%s %s" "Job" "enqueue" new');
- let perm = 0o444 in
- File.out_channel ~tmp:None ~perm tmp' (fun oc -> byt |> output_bytes oc);
- move q' fn tmp new_;
- Ok new'
- let p_true _ = true
- let any ?(pred = p_true) qn (Queue qb) =
- (* Logr.debug (fun m -> m "%s.%s %s" "Job" "find_first" qn); *)
- let pred fn = St.ends_with ~suffix:".s" fn && pred fn in
- File.any pred (qb ^ qn)
- let any_due ?(due = Ptime_clock.now ()) ?(wait = wait) q =
- let due = rfc3339 due in
- let pred fn =
- match fn |> String.split_on_char '.' with
- | [t; _; _; "s"] -> String.compare t due <= 0
- | _ -> false
- in
- (* Logr.debug (fun m -> m "%s.%s %s" "Job" "find_any_due" due); *)
- any ~pred wait q
- let wait_or_err ?(wait = wait) q' s j =
- let maxtries = 13 in
- let (Queue q) = q' in
- Logr.debug (fun m -> m "%s.%s %s %s" "Job" "wait_or_err" s j);
- assert (2 == (s |> String.split_on_char '/' |> List.length));
- assert (1 == (j |> String.split_on_char '/' |> List.length));
- match j |> String.split_on_char '.' with
- | [t0; n; nonce; "s"] ->
- let n = n |> int_of_string |> succ in
- if n > maxtries
- then move q' j s err
- else
- let now = match t0 |> Ptime.of_rfc3339 with
- | Ok (t,_,_) -> t
- | _ -> Ptime_clock.now () in
- let t = n |> do_wait ~now |> rfc3339 in
- let jn' = compute_fn t n nonce in
- Unix.rename (q ^ s ^ j) (q ^ wait ^ jn')
- | _ ->
- Logr.err (fun m -> m "%s invalid job '%s'" E.e1015 j);
- move q' j s err
|