job.ml 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. (*
  2. * _ _ ____ _
  3. * _| || |_/ ___| ___ _ __ _ __ ___ | |
  4. * |_ .. _\___ \ / _ \ '_ \| '_ \ / _ \| |
  5. * |_ _|___) | __/ |_) | |_) | (_) |_|
  6. * |_||_| |____/ \___| .__/| .__/ \___/(_)
  7. * |_| |_|
  8. *
  9. * Personal Social Web.
  10. *
  11. * Copyright (C) The #Seppo contributors. All rights reserved.
  12. *
  13. * This program is free software: you can redistribute it and/or modify
  14. * it under the terms of the GNU General Public License as published by
  15. * the Free Software Foundation, either version 3 of the License, or
  16. * (at your option) any later version.
  17. *
  18. * This program is distributed in the hope that it will be useful,
  19. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  20. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  21. * GNU General Public License for more details.
  22. *
  23. * You should have received a copy of the GNU General Public License
  24. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  25. *)
  26. (* streamline wording with https://v2.ocaml.org/api/Queue.html *)
  27. type queue = Queue of string
  28. let qn = Queue "app/var/spool/job/"
  29. (* slots *)
  30. let cur = "cur/"
  31. let err = "err/"
  32. let new_ = "new/"
  33. let run = "run/"
  34. let tmp = "tmp/"
  35. let wait = "wait/"
  36. (* exponentially growing delay. 0 is zero. Has https://encore.dev/blog/retries#jitter ?*)
  37. let do_wait ?(now = Ptime_clock.now ()) ?(jitter = (Random.float 0.1) -. 0.05) i =
  38. assert (jitter >= -0.05);
  39. assert (jitter <= 0.05);
  40. assert (i >= 0);
  41. Logr.debug (fun m -> m "%s.%s jitter %f %%" "Job" "do_wait" jitter);
  42. let f = 60 * ((Int.shift_left 1 i) - 1)
  43. |> float_of_int in
  44. let f = f *. (1.0 +. jitter) in
  45. match f
  46. |> Ptime.Span.of_float_s
  47. |> Option.get
  48. |> Ptime.add_span now with
  49. | None -> now
  50. | Some t -> t
  51. let rfc3339 t =
  52. let (y, m, d), ((hh, mm, ss), tz_s) = Ptime.to_date_time t in
  53. assert (tz_s = 0);
  54. Printf.sprintf "%04d-%02d-%02dT%02d%02d%02dZ" y m d hh mm ss
  55. let move (Queue que) job src dst =
  56. Logr.debug (fun m -> m "%s.%s %s -> %s %s" "Job" "move" src dst job);
  57. Unix.rename (que ^ src ^ job) (que ^ dst ^ job)
  58. let compute_nonce byt =
  59. byt
  60. |> Mapcdb.hash32_byt
  61. |> Optint.to_string
  62. let compute_fn due n nonce =
  63. Printf.sprintf "%s.%d.%s.s" due n nonce
  64. (* similar Queue.add *)
  65. let enqueue ?(due = Ptime_clock.now ()) q' n byt =
  66. Logr.debug (fun m -> m "%s.%s %s" "Job" "enqueue" (due |> rfc3339));
  67. let due = due |> rfc3339 in
  68. let nonce = compute_nonce byt in
  69. let fn = compute_fn due n nonce in
  70. let Queue q = q' in
  71. let tmp' = q ^ tmp ^ fn in
  72. let new' = q ^ new_ ^ fn in
  73. Logr.debug (fun m -> m "%s.%s %s" "Job" "enqueue" new');
  74. let perm = 0o444 in
  75. File.out_channel ~tmp:None ~perm tmp' (fun oc -> byt |> output_bytes oc);
  76. move q' fn tmp new_;
  77. Ok new'
  78. let p_true _ = true
  79. let any ?(pred = p_true) qn (Queue qb) =
  80. (* Logr.debug (fun m -> m "%s.%s %s" "Job" "find_first" qn); *)
  81. let pred fn = St.ends_with ~suffix:".s" fn && pred fn in
  82. File.any pred (qb ^ qn)
  83. let any_due ?(due = Ptime_clock.now ()) ?(wait = wait) q =
  84. let due = rfc3339 due in
  85. let pred fn =
  86. match fn |> String.split_on_char '.' with
  87. | [t; _; _; "s"] -> String.compare t due <= 0
  88. | _ -> false
  89. in
  90. (* Logr.debug (fun m -> m "%s.%s %s" "Job" "find_any_due" due); *)
  91. any ~pred wait q
  92. let wait_or_err ?(wait = wait) q' s j =
  93. let maxtries = 13 in
  94. let (Queue q) = q' in
  95. Logr.debug (fun m -> m "%s.%s %s %s" "Job" "wait_or_err" s j);
  96. assert (2 == (s |> String.split_on_char '/' |> List.length));
  97. assert (1 == (j |> String.split_on_char '/' |> List.length));
  98. match j |> String.split_on_char '.' with
  99. | [t0; n; nonce; "s"] ->
  100. let n = n |> int_of_string |> succ in
  101. if n > maxtries
  102. then move q' j s err
  103. else
  104. let now = match t0 |> Ptime.of_rfc3339 with
  105. | Ok (t,_,_) -> t
  106. | _ -> Ptime_clock.now () in
  107. let t = n |> do_wait ~now |> rfc3339 in
  108. let jn' = compute_fn t n nonce in
  109. Unix.rename (q ^ s ^ j) (q ^ wait ^ jn')
  110. | _ ->
  111. Logr.err (fun m -> m "%s invalid job '%s'" E.e1015 j);
  112. move q' j s err