123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import base64
- import json
- import os
- from typing import Dict
- from typing import Any
- from typing import List
- from dataclasses import dataclass
- import flask
- import requests
- from datetime import datetime
- POUSSETACHES_AUTH_KEY = os.getenv("POUSSETACHES_AUTH_KEY")
- @dataclass
- class Task:
- req_id: str
- tries: int
- payload: Any
- @dataclass
- class GetTask:
- payload: Any
- expected: int
- schedule: str
- task_id: str
- next_run: datetime
- tries: int
- url: str
- last_error_status_code: int
- last_error_body: str
- class PousseTaches:
- def __init__(self, api_url: str, base_url: str) -> None:
- self.api_url = api_url
- self.base_url = base_url
- def push(
- self,
- payload: Any,
- path: str,
- expected: int = 200,
- schedule: str = "",
- delay: int = 0,
- ) -> str:
- # Encode our payload
- p = base64.b64encode(json.dumps(payload).encode()).decode()
- # Queue/push it
- resp = requests.post(
- self.api_url,
- json={
- "url": self.base_url + path,
- "payload": p,
- "expected": expected,
- "schedule": schedule,
- "delay": delay,
- },
- )
- resp.raise_for_status()
- return resp.headers["Poussetaches-Task-ID"]
- def parse(self, req: flask.Request) -> Task:
- if req.headers.get("Poussetaches-Auth-Key") != POUSSETACHES_AUTH_KEY:
- raise ValueError("Bad auth key")
- # Parse the "envelope"
- envelope = json.loads(req.data)
- print(req)
- print(f"envelope={envelope!r}")
- payload = json.loads(base64.b64decode(envelope["payload"]))
- return Task(
- req_id=envelope["req_id"], tries=envelope["tries"], payload=payload
- ) # type: ignore
- @staticmethod
- def _expand_task(t: Dict[str, Any]) -> None:
- try:
- t["payload"] = json.loads(base64.b64decode(t["payload"]))
- except json.JSONDecodeError:
- t["payload"] = base64.b64decode(t["payload"]).decode()
- if t["last_error_body"]:
- t["last_error_body"] = base64.b64decode(t["last_error_body"]).decode()
- t["next_run"] = datetime.fromtimestamp(float(t["next_run"] / 1e9))
- if t["last_run"]:
- t["last_run"] = datetime.fromtimestamp(float(t["last_run"] / 1e9))
- else:
- del t["last_run"]
- def _get(self, where: str) -> List[GetTask]:
- out = []
- resp = requests.get(self.api_url + f"/{where}")
- resp.raise_for_status()
- dat = resp.json()
- for t in dat["tasks"]:
- self._expand_task(t)
- out.append(
- GetTask(
- task_id=t["id"],
- payload=t["payload"],
- expected=t["expected"],
- schedule=t["schedule"],
- tries=t["tries"],
- url=t["url"],
- last_error_status_code=t["last_error_status_code"],
- last_error_body=t["last_error_body"],
- next_run=t["next_run"],
- )
- )
- return out
- def get_cron(self) -> List[GetTask]:
- return self._get("cron")
- def get_success(self) -> List[GetTask]:
- return self._get("success")
- def get_waiting(self) -> List[GetTask]:
- return self._get("waiting")
- def get_dead(self) -> List[GetTask]:
- return self._get("dead")
|