response.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. from __future__ import annotations
  2. import re
  3. import base64
  4. from typing import Union, Dict, List, Optional
  5. from abc import abstractmethod
  6. from urllib.parse import quote, unquote
  7. def is_content(chunk):
  8. if isinstance(chunk, Reasoning):
  9. if chunk.is_thinking is None and chunk.token is None:
  10. return False
  11. return True
  12. return isinstance(chunk, (str, MediaResponse, AudioResponse, ToolCalls))
  13. def quote_url(url: str) -> str:
  14. """
  15. Quote parts of a URL while preserving the domain structure.
  16. Args:
  17. url: The URL to quote
  18. Returns:
  19. str: The properly quoted URL
  20. """
  21. # Only unquote if needed to avoid double-unquoting
  22. if '%' in url:
  23. url = unquote(url)
  24. url_parts = url.split("//", maxsplit=1)
  25. # If there is no "//" in the URL, then it is a relative URL
  26. if len(url_parts) == 1:
  27. return quote(url_parts[0], '/?&=#')
  28. protocol, rest = url_parts
  29. domain_parts = rest.split("/", maxsplit=1)
  30. # If there is no "/" after the domain, then it is a domain URL
  31. if len(domain_parts) == 1:
  32. return f"{protocol}//{domain_parts[0]}"
  33. domain, path = domain_parts
  34. return f"{protocol}//{domain}/{quote(path, '/?&=#')}"
  35. def quote_title(title: str) -> str:
  36. """
  37. Normalize whitespace in a title.
  38. Args:
  39. title: The title to normalize
  40. Returns:
  41. str: The title with normalized whitespace
  42. """
  43. return " ".join(title.split()) if title else ""
  44. def format_link(url: str, title: Optional[str] = None) -> str:
  45. """
  46. Format a URL and title as a markdown link.
  47. Args:
  48. url: The URL to link to
  49. title: The title to display. If None, extracts from URL
  50. Returns:
  51. str: The formatted markdown link
  52. """
  53. if title is None:
  54. try:
  55. title = unquote(url.split("//", maxsplit=1)[1].split("?")[0].replace("www.", ""))
  56. except IndexError:
  57. title = url
  58. return f"[{quote_title(title)}]({quote_url(url)})"
  59. def format_image(image: str, alt: str, preview: Optional[str] = None) -> str:
  60. """
  61. Formats the given image as a markdown string.
  62. Args:
  63. image: The image to format.
  64. alt: The alt text for the image.
  65. preview: The preview URL format. Defaults to the original image.
  66. Returns:
  67. str: The formatted markdown string.
  68. """
  69. preview_url = preview.replace('{image}', image) if preview else image
  70. # if preview_url.startswith("/media/"):
  71. # preview_url = "/thumbnail" + preview_url[6:]
  72. return f"[![{quote_title(alt)}]({quote_url(preview_url)})]({quote_url(image)})"
  73. def format_images_markdown(images: Union[str, List[str]], alt: str,
  74. preview: Union[str, List[str]] = None) -> str:
  75. """
  76. Formats the given images as a markdown string.
  77. Args:
  78. images: The image or list of images to format.
  79. alt: The alt text for the images.
  80. preview: The preview URL format or list of preview URLs.
  81. If not provided, original images are used.
  82. Returns:
  83. str: The formatted markdown string.
  84. """
  85. if isinstance(images, list) and len(images) == 1:
  86. images = images[0]
  87. if isinstance(images, str):
  88. result = format_image(images, alt, preview)
  89. else:
  90. result = "\n".join(
  91. format_image(
  92. image,
  93. f"#{idx+1} {alt}",
  94. preview[idx] if isinstance(preview, list) and idx < len(preview) else preview
  95. )
  96. for idx, image in enumerate(images)
  97. )
  98. return result
  99. class ResponseType:
  100. @abstractmethod
  101. def __str__(self) -> str:
  102. """Convert the response to a string representation."""
  103. raise NotImplementedError
  104. class JsonMixin:
  105. def __init__(self, **kwargs) -> None:
  106. """Initialize with keyword arguments as attributes."""
  107. for key, value in kwargs.items():
  108. setattr(self, key, value)
  109. def get_dict(self) -> Dict:
  110. """Return a dictionary of non-private attributes."""
  111. return {
  112. key: value
  113. for key, value in self.__dict__.items()
  114. if not key.startswith("__")
  115. }
  116. def reset(self) -> None:
  117. """Reset all attributes."""
  118. self.__dict__ = {}
  119. class RawResponse(ResponseType, JsonMixin):
  120. pass
  121. class HiddenResponse(ResponseType):
  122. def __str__(self) -> str:
  123. """Hidden responses return an empty string."""
  124. return ""
  125. class FinishReason(JsonMixin, HiddenResponse):
  126. def __init__(self, reason: str) -> None:
  127. """Initialize with a reason."""
  128. self.reason = reason
  129. class ToolCalls(HiddenResponse):
  130. def __init__(self, list: List) -> None:
  131. """Initialize with a list of tool calls."""
  132. self.list = list
  133. def get_list(self) -> List:
  134. """Return the list of tool calls."""
  135. return self.list
  136. class Usage(JsonMixin, HiddenResponse):
  137. def __init__(
  138. self,
  139. promptTokens: int = None,
  140. completionTokens: int = None,
  141. **kwargs
  142. ):
  143. if promptTokens is not None:
  144. kwargs["prompt_tokens"] = promptTokens
  145. if completionTokens is not None:
  146. kwargs["completion_tokens"] = completionTokens
  147. if "total_tokens" not in kwargs and "prompt_tokens" in kwargs and "completion_tokens" in kwargs:
  148. kwargs["total_tokens"] = kwargs["prompt_tokens"] + kwargs["completion_tokens"]
  149. return super().__init__(**kwargs)
  150. class AuthResult(JsonMixin, HiddenResponse):
  151. pass
  152. class TitleGeneration(HiddenResponse):
  153. def __init__(self, title: str) -> None:
  154. """Initialize with a title."""
  155. self.title = title
  156. class DebugResponse(HiddenResponse):
  157. def __init__(self, log: str) -> None:
  158. """Initialize with a log message."""
  159. self.log = log
  160. class ContinueResponse(HiddenResponse):
  161. def __init__(self, log: str) -> None:
  162. """Initialize with a log message."""
  163. self.log = log
  164. class Reasoning(ResponseType):
  165. def __init__(
  166. self,
  167. token: Optional[str] = None,
  168. label: Optional[str] = None,
  169. status: Optional[str] = None,
  170. is_thinking: Optional[str] = None
  171. ) -> None:
  172. """Initialize with token, status, and thinking state."""
  173. self.token = token
  174. self.label = label
  175. self.status = status
  176. self.is_thinking = is_thinking
  177. def __str__(self) -> str:
  178. """Return string representation based on available attributes."""
  179. if self.is_thinking is not None:
  180. return self.is_thinking
  181. if self.token is not None:
  182. return self.token
  183. if self.status is not None:
  184. if self.label is not None:
  185. return f"{self.label}: {self.status}\n"
  186. return f"{self.status}\n"
  187. return ""
  188. def get_dict(self) -> Dict:
  189. """Return a dictionary representation of the reasoning."""
  190. if self.label is not None:
  191. return {"label": self.label, "status": self.status}
  192. if self.is_thinking is None:
  193. if self.status is None:
  194. return {"token": self.token}
  195. return {"token": self.token, "status": self.status}
  196. return {"token": self.token, "status": self.status, "is_thinking": self.is_thinking}
  197. class Sources(ResponseType):
  198. def __init__(self, sources: List[Dict[str, str]]) -> None:
  199. """Initialize with a list of source dictionaries."""
  200. self.list = []
  201. for source in sources:
  202. self.add_source(source)
  203. def add_source(self, source: Union[Dict[str, str], str]) -> None:
  204. """Add a source to the list, cleaning the URL if necessary."""
  205. source = source if isinstance(source, dict) else {"url": source}
  206. url = source.get("url", source.get("link", None))
  207. if url is not None:
  208. url = re.sub(r"[&?]utm_source=.+", "", url)
  209. source["url"] = url
  210. self.list.append(source)
  211. def __str__(self) -> str:
  212. """Return formatted sources as a string."""
  213. if not self.list:
  214. return ""
  215. return "\n\n\n\n" + ("\n>\n".join([
  216. f"> [{idx}] {format_link(link['url'], link.get('title', None))}"
  217. for idx, link in enumerate(self.list)
  218. ]))
  219. class SourceLink(ResponseType):
  220. def __init__(self, title: str, url: str) -> None:
  221. self.title = title
  222. self.url = url
  223. def __str__(self) -> str:
  224. title = f"[{self.title}]"
  225. return f" {format_link(self.url, title)}"
  226. class YouTube(HiddenResponse):
  227. def __init__(self, ids: List[str]) -> None:
  228. """Initialize with a list of YouTube IDs."""
  229. self.ids = ids
  230. def to_string(self) -> str:
  231. """Return YouTube embeds as a string."""
  232. if not self.ids:
  233. return ""
  234. return "\n\n" + ("\n".join([
  235. f'<iframe type="text/html" src="https://www.youtube.com/embed/{id}"></iframe>'
  236. for id in self.ids
  237. ]))
  238. class AudioResponse(ResponseType):
  239. def __init__(self, data: Union[bytes, str], **kwargs) -> None:
  240. """Initialize with audio data bytes."""
  241. self.data = data
  242. self.options = kwargs
  243. def to_uri(self) -> str:
  244. if isinstance(self.data, str):
  245. return self.data
  246. """Return audio data as a base64-encoded data URI."""
  247. data_base64 = base64.b64encode(self.data).decode()
  248. return f"data:audio/mpeg;base64,{data_base64}"
  249. def __str__(self) -> str:
  250. """Return audio as html element."""
  251. return f'<audio controls src="{self.to_uri()}"></audio>'
  252. class BaseConversation(ResponseType):
  253. def __str__(self) -> str:
  254. """Return an empty string by default."""
  255. return ""
  256. class JsonConversation(BaseConversation, JsonMixin):
  257. pass
  258. class SynthesizeData(HiddenResponse, JsonMixin):
  259. def __init__(self, provider: str, data: Dict) -> None:
  260. """Initialize with provider and data."""
  261. self.provider = provider
  262. self.data = data
  263. class SuggestedFollowups(HiddenResponse):
  264. def __init__(self, suggestions: list[str]):
  265. self.suggestions = suggestions
  266. class RequestLogin(HiddenResponse):
  267. def __init__(self, label: str, login_url: str) -> None:
  268. """Initialize with label and login URL."""
  269. self.label = label
  270. self.login_url = login_url
  271. def to_string(self) -> str:
  272. """Return formatted login link as a string."""
  273. return format_link(self.login_url, f"[Login to {self.label}]") + "\n\n"
  274. class MediaResponse(ResponseType):
  275. def __init__(
  276. self,
  277. urls: Union[str, List[str]],
  278. alt: str,
  279. options: Dict = {},
  280. **kwargs
  281. ) -> None:
  282. """Initialize with images, alt text, and options."""
  283. self.urls = kwargs.get("images", urls)
  284. self.alt = alt
  285. self.options = options
  286. def get(self, key: str, default: any = None) -> any:
  287. """Get an option value by key."""
  288. return self.options.get(key, default)
  289. def get_list(self) -> List[str]:
  290. """Return images as a list."""
  291. return [self.urls] if isinstance(self.urls, str) else self.urls
  292. class ImageResponse(MediaResponse):
  293. def __str__(self) -> str:
  294. """Return images as markdown."""
  295. return format_images_markdown(self.urls, self.alt, self.get("preview"))
  296. class VideoResponse(MediaResponse):
  297. def __str__(self) -> str:
  298. """Return videos as html elements."""
  299. if self.get("preview"):
  300. result = []
  301. for idx, video in enumerate(self.get_list()):
  302. image = self.get("preview")
  303. if isinstance(image, list) and len(image) > idx:
  304. image = image[idx]
  305. result.append(f'<video controls src="{quote_url(video)}" poster="{quote_url(image)}"></video>')
  306. return "\n".join(result)
  307. return "\n".join([f'<video controls src="{quote_url(video)}"></video>' for video in self.get_list()])
  308. class ImagePreview(ImageResponse):
  309. def __str__(self) -> str:
  310. """Return an empty string for preview."""
  311. return ""
  312. def to_string(self) -> str:
  313. """Return images as markdown."""
  314. return super().__str__()
  315. class PreviewResponse(HiddenResponse):
  316. def __init__(self, data: str) -> None:
  317. """Initialize with data."""
  318. self.data = data
  319. def to_string(self) -> str:
  320. """Return data as a string."""
  321. return self.data
  322. class Parameters(ResponseType, JsonMixin):
  323. def __str__(self) -> str:
  324. """Return an empty string."""
  325. return ""
  326. class ProviderInfo(JsonMixin, HiddenResponse):
  327. pass