response.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. from __future__ import annotations
  2. import re
  3. import base64
  4. from typing import Union, Dict, List, Optional
  5. from abc import abstractmethod
  6. from urllib.parse import quote_plus, unquote_plus
  7. def quote_url(url: str) -> str:
  8. """
  9. Quote parts of a URL while preserving the domain structure.
  10. Args:
  11. url: The URL to quote
  12. Returns:
  13. str: The properly quoted URL
  14. """
  15. # Only unquote if needed to avoid double-unquoting
  16. if '%' in url:
  17. url = unquote_plus(url)
  18. url_parts = url.split("//", maxsplit=1)
  19. # If there is no "//" in the URL, then it is a relative URL
  20. if len(url_parts) == 1:
  21. return quote_plus(url_parts[0], '/?&=#')
  22. protocol, rest = url_parts
  23. domain_parts = rest.split("/", maxsplit=1)
  24. # If there is no "/" after the domain, then it is a domain URL
  25. if len(domain_parts) == 1:
  26. return f"{protocol}//{domain_parts[0]}"
  27. domain, path = domain_parts
  28. return f"{protocol}//{domain}/{quote_plus(path, '/?&=#')}"
  29. def quote_title(title: str) -> str:
  30. """
  31. Normalize whitespace in a title.
  32. Args:
  33. title: The title to normalize
  34. Returns:
  35. str: The title with normalized whitespace
  36. """
  37. return " ".join(title.split()) if title else ""
  38. def format_link(url: str, title: Optional[str] = None) -> str:
  39. """
  40. Format a URL and title as a markdown link.
  41. Args:
  42. url: The URL to link to
  43. title: The title to display. If None, extracts from URL
  44. Returns:
  45. str: The formatted markdown link
  46. """
  47. if title is None:
  48. try:
  49. title = unquote_plus(url.split("//", maxsplit=1)[1].split("?")[0].replace("www.", ""))
  50. except IndexError:
  51. title = url
  52. return f"[{quote_title(title)}]({quote_url(url)})"
  53. def format_image(image: str, alt: str, preview: Optional[str] = None) -> str:
  54. """
  55. Formats the given image as a markdown string.
  56. Args:
  57. image: The image to format.
  58. alt: The alt text for the image.
  59. preview: The preview URL format. Defaults to the original image.
  60. Returns:
  61. str: The formatted markdown string.
  62. """
  63. preview_url = preview.replace('{image}', image) if preview else image
  64. return f"[![{quote_title(alt)}]({quote_url(preview_url)})]({quote_url(image)})"
  65. def format_images_markdown(images: Union[str, List[str]], alt: str,
  66. preview: Union[str, List[str]] = None) -> str:
  67. """
  68. Formats the given images as a markdown string.
  69. Args:
  70. images: The image or list of images to format.
  71. alt: The alt text for the images.
  72. preview: The preview URL format or list of preview URLs.
  73. If not provided, original images are used.
  74. Returns:
  75. str: The formatted markdown string.
  76. """
  77. if isinstance(images, list) and len(images) == 1:
  78. images = images[0]
  79. if isinstance(images, str):
  80. result = format_image(images, alt, preview)
  81. else:
  82. result = "\n".join(
  83. format_image(
  84. image,
  85. f"#{idx+1} {alt}",
  86. preview[idx] if isinstance(preview, list) and idx < len(preview) else preview
  87. )
  88. for idx, image in enumerate(images)
  89. )
  90. start_flag = "<!-- generated images start -->\n"
  91. end_flag = "<!-- generated images end -->\n"
  92. return f"\n{start_flag}{result}\n{end_flag}\n"
  93. class ResponseType:
  94. @abstractmethod
  95. def __str__(self) -> str:
  96. """Convert the response to a string representation."""
  97. raise NotImplementedError
  98. class JsonMixin:
  99. def __init__(self, **kwargs) -> None:
  100. """Initialize with keyword arguments as attributes."""
  101. for key, value in kwargs.items():
  102. setattr(self, key, value)
  103. def get_dict(self) -> Dict:
  104. """Return a dictionary of non-private attributes."""
  105. return {
  106. key: value
  107. for key, value in self.__dict__.items()
  108. if not key.startswith("__")
  109. }
  110. def reset(self) -> None:
  111. """Reset all attributes."""
  112. self.__dict__ = {}
  113. class RawResponse(ResponseType, JsonMixin):
  114. pass
  115. class HiddenResponse(ResponseType):
  116. def __str__(self) -> str:
  117. """Hidden responses return an empty string."""
  118. return ""
  119. class FinishReason(JsonMixin, HiddenResponse):
  120. def __init__(self, reason: str) -> None:
  121. """Initialize with a reason."""
  122. self.reason = reason
  123. class ToolCalls(HiddenResponse):
  124. def __init__(self, list: List) -> None:
  125. """Initialize with a list of tool calls."""
  126. self.list = list
  127. def get_list(self) -> List:
  128. """Return the list of tool calls."""
  129. return self.list
  130. class Usage(JsonMixin, HiddenResponse):
  131. pass
  132. class AuthResult(JsonMixin, HiddenResponse):
  133. pass
  134. class TitleGeneration(HiddenResponse):
  135. def __init__(self, title: str) -> None:
  136. """Initialize with a title."""
  137. self.title = title
  138. class DebugResponse(HiddenResponse):
  139. def __init__(self, log: str) -> None:
  140. """Initialize with a log message."""
  141. self.log = log
  142. class Reasoning(ResponseType):
  143. def __init__(
  144. self,
  145. token: Optional[str] = None,
  146. label: Optional[str] = None,
  147. status: Optional[str] = None,
  148. is_thinking: Optional[str] = None
  149. ) -> None:
  150. """Initialize with token, status, and thinking state."""
  151. self.token = token
  152. self.label = label
  153. self.status = status
  154. self.is_thinking = is_thinking
  155. def __str__(self) -> str:
  156. """Return string representation based on available attributes."""
  157. if self.is_thinking is not None:
  158. return self.is_thinking
  159. if self.token is not None:
  160. return self.token
  161. if self.status is not None:
  162. if self.label is not None:
  163. return f"{self.label}: {self.status}\n"
  164. return f"{self.status}\n"
  165. return ""
  166. def __eq__(self, other: Reasoning):
  167. return (self.token == other.token and
  168. self.status == other.status and
  169. self.is_thinking == other.is_thinking)
  170. def get_dict(self) -> Dict:
  171. """Return a dictionary representation of the reasoning."""
  172. if self.label is not None:
  173. return {"label": self.label, "status": self.status}
  174. if self.is_thinking is None:
  175. if self.status is None:
  176. return {"token": self.token}
  177. return {"token": self.token, "status": self.status}
  178. return {"token": self.token, "status": self.status, "is_thinking": self.is_thinking}
  179. class Sources(ResponseType):
  180. def __init__(self, sources: List[Dict[str, str]]) -> None:
  181. """Initialize with a list of source dictionaries."""
  182. self.list = []
  183. for source in sources:
  184. self.add_source(source)
  185. def add_source(self, source: Union[Dict[str, str], str]) -> None:
  186. """Add a source to the list, cleaning the URL if necessary."""
  187. source = source if isinstance(source, dict) else {"url": source}
  188. url = source.get("url", source.get("link", None))
  189. if url is not None:
  190. url = re.sub(r"[&?]utm_source=.+", "", url)
  191. source["url"] = url
  192. self.list.append(source)
  193. def __str__(self) -> str:
  194. """Return formatted sources as a string."""
  195. if not self.list:
  196. return ""
  197. return "\n\n\n\n" + ("\n>\n".join([
  198. f"> [{idx}] {format_link(link['url'], link.get('title', None))}"
  199. for idx, link in enumerate(self.list)
  200. ]))
  201. class YouTube(HiddenResponse):
  202. def __init__(self, ids: List[str]) -> None:
  203. """Initialize with a list of YouTube IDs."""
  204. self.ids = ids
  205. def to_string(self) -> str:
  206. """Return YouTube embeds as a string."""
  207. if not self.ids:
  208. return ""
  209. return "\n\n" + ("\n".join([
  210. f'<iframe type="text/html" src="https://www.youtube.com/embed/{id}"></iframe>'
  211. for id in self.ids
  212. ]))
  213. class AudioResponse(ResponseType):
  214. def __init__(self, data: Union[bytes, str]) -> None:
  215. """Initialize with audio data bytes."""
  216. self.data = data
  217. def to_uri(self) -> str:
  218. if isinstance(self.data, str):
  219. return self.data
  220. """Return audio data as a base64-encoded data URI."""
  221. data_base64 = base64.b64encode(self.data).decode()
  222. return f"data:audio/mpeg;base64,{data_base64}"
  223. def __str__(self) -> str:
  224. """Return audio as html element."""
  225. return f'<audio controls src="{self.to_uri()}"></audio>'
  226. class BaseConversation(ResponseType):
  227. def __str__(self) -> str:
  228. """Return an empty string by default."""
  229. return ""
  230. class JsonConversation(BaseConversation, JsonMixin):
  231. pass
  232. class SynthesizeData(HiddenResponse, JsonMixin):
  233. def __init__(self, provider: str, data: Dict) -> None:
  234. """Initialize with provider and data."""
  235. self.provider = provider
  236. self.data = data
  237. class SuggestedFollowups(HiddenResponse):
  238. def __init__(self, suggestions: list[str]):
  239. self.suggestions = suggestions
  240. class RequestLogin(HiddenResponse):
  241. def __init__(self, label: str, login_url: str) -> None:
  242. """Initialize with label and login URL."""
  243. self.label = label
  244. self.login_url = login_url
  245. def to_string(self) -> str:
  246. """Return formatted login link as a string."""
  247. return format_link(self.login_url, f"[Login to {self.label}]") + "\n\n"
  248. class MediaResponse(ResponseType):
  249. def __init__(
  250. self,
  251. urls: Union[str, List[str]],
  252. alt: str,
  253. options: Dict = {},
  254. **kwargs
  255. ) -> None:
  256. """Initialize with images, alt text, and options."""
  257. self.urls = kwargs.get("images", urls)
  258. self.alt = alt
  259. self.options = options
  260. def get(self, key: str) -> any:
  261. """Get an option value by key."""
  262. return self.options.get(key)
  263. def get_list(self) -> List[str]:
  264. """Return images as a list."""
  265. return [self.urls] if isinstance(self.urls, str) else self.urls
  266. class ImageResponse(MediaResponse):
  267. def __str__(self) -> str:
  268. """Return images as markdown."""
  269. return format_images_markdown(self.urls, self.alt, self.get("preview"))
  270. class VideoResponse(MediaResponse):
  271. def __str__(self) -> str:
  272. """Return videos as html elements."""
  273. return "\n".join([f'<video controls src="{video}"></video>' for video in self.get_list()])
  274. class ImagePreview(ImageResponse):
  275. def __str__(self) -> str:
  276. """Return an empty string for preview."""
  277. return ""
  278. def to_string(self) -> str:
  279. """Return images as markdown."""
  280. return super().__str__()
  281. class PreviewResponse(HiddenResponse):
  282. def __init__(self, data: str) -> None:
  283. """Initialize with data."""
  284. self.data = data
  285. def to_string(self) -> str:
  286. """Return data as a string."""
  287. return self.data
  288. class Parameters(ResponseType, JsonMixin):
  289. def __str__(self) -> str:
  290. """Return an empty string."""
  291. return ""
  292. class ProviderInfo(JsonMixin, HiddenResponse):
  293. pass