poussetaches.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import base64
  2. import json
  3. import os
  4. from typing import Dict
  5. from typing import Any
  6. from typing import List
  7. from dataclasses import dataclass
  8. import flask
  9. import requests
  10. from datetime import datetime
  11. POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
  12. @dataclass
  13. class Task:
  14. req_id: str
  15. tries: int
  16. payload: Any
  17. @dataclass
  18. class GetTask:
  19. payload: Any
  20. expected: int
  21. schedule: str
  22. task_id: str
  23. next_run: datetime
  24. tries: int
  25. url: str
  26. last_error_status_code: int
  27. last_error_body: str
  28. class PousseTaches:
  29. def __init__(self, api_url: str, base_url: str) -> None:
  30. self.api_url = api_url
  31. self.base_url = base_url
  32. def push(
  33. self,
  34. payload: Any,
  35. path: str,
  36. expected: int = 200,
  37. schedule: str = "",
  38. delay: int = 0,
  39. ) -> str:
  40. # Encode our payload
  41. p = base64.b64encode(json.dumps(payload).encode()).decode()
  42. # Queue/push it
  43. resp = requests.post(
  44. self.api_url,
  45. json={
  46. "url": self.base_url + path,
  47. "payload": p,
  48. "expected": expected,
  49. "schedule": schedule,
  50. "delay": delay,
  51. },
  52. )
  53. resp.raise_for_status()
  54. return resp.headers["Poussetaches-Task-ID"]
  55. def parse(self, req: flask.Request) -> Task:
  56. if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY:
  57. raise ValueError("Bad auth key")
  58. # Parse the "envelope"
  59. envelope = json.loads(req.data)
  60. print(req)
  61. print(f"envelope={envelope!r}")
  62. payload = json.loads(base64.b64decode(envelope["payload"]))
  63. return Task(
  64. req_id=envelope["req_id"], tries=envelope["tries"], payload=payload
  65. ) # type: ignore
  66. @staticmethod
  67. def _expand_task(t: Dict[str, Any]) -> None:
  68. try:
  69. t["payload"] = json.loads(base64.b64decode(t["payload"]))
  70. except json.JSONDecodeError:
  71. t["payload"] = base64.b64decode(t["payload"]).decode()
  72. if t["last_error_body"]:
  73. t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode()
  74. t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9))
  75. if t["last_run"]:
  76. t["last_run"] = datetime.fromtimestamp(float(t["last_run"] / 1e9))
  77. else:
  78. del t["last_run"]
  79. def _get(self, where: str) -> List[GetTask]:
  80. out = []
  81. resp = requests.get(self.api_url + f"/{where}")
  82. resp.raise_for_status()
  83. dat = resp.json()
  84. for t in dat["tasks"]:
  85. self._expand_task(t)
  86. out.append(
  87. GetTask(
  88. task_id=t["id"],
  89. payload=t["payload"],
  90. expected=t["expected"],
  91. schedule=t["schedule"],
  92. tries=t["tries"],
  93. url=t["url"],
  94. last_error_status_code=t["last_error_status_code"],
  95. last_error_body=t["last_error_body"],
  96. next_run=t["next_run"],
  97. )
  98. )
  99. return out
  100. def get_cron(self) -> List[GetTask]:
  101. return self._get("cron")
  102. def get_success(self) -> List[GetTask]:
  103. return self._get("success")
  104. def get_waiting(self) -> List[GetTask]:
  105. return self._get("waiting")
  106. def get_dead(self) -> List[GetTask]:
  107. return self._get("dead")