retry_provider.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. from __future__ import annotations
  2. import random
  3. from ..typing import Type, List, CreateResult, Messages, AsyncResult
  4. from .types import BaseProvider, BaseRetryProvider, ProviderType
  5. from .response import ProviderInfo, JsonConversation, is_content
  6. from .. import debug
  7. from ..errors import RetryProviderError, RetryNoProviderError, MissingAuthError, NoValidHarFileError
  8. class IterListProvider(BaseRetryProvider):
  9. def __init__(
  10. self,
  11. providers: List[Type[BaseProvider]],
  12. shuffle: bool = True
  13. ) -> None:
  14. """
  15. Initialize the BaseRetryProvider.
  16. Args:
  17. providers (List[Type[BaseProvider]]): List of providers to use.
  18. shuffle (bool): Whether to shuffle the providers list.
  19. single_provider_retry (bool): Whether to retry a single provider if it fails.
  20. max_retries (int): Maximum number of retries for a single provider.
  21. """
  22. self.providers = providers
  23. self.shuffle = shuffle
  24. self.working = True
  25. self.last_provider: Type[BaseProvider] = None
  26. self.add_api_key = False
  27. def create_completion(
  28. self,
  29. model: str,
  30. messages: Messages,
  31. stream: bool = False,
  32. ignore_stream: bool = False,
  33. ignored: list[str] = [],
  34. **kwargs,
  35. ) -> CreateResult:
  36. """
  37. Create a completion using available providers, with an option to stream the response.
  38. Args:
  39. model (str): The model to be used for completion.
  40. messages (Messages): The messages to be used for generating completion.
  41. stream (bool, optional): Flag to indicate if the response should be streamed. Defaults to False.
  42. Yields:
  43. CreateResult: Tokens or results from the completion.
  44. Raises:
  45. Exception: Any exception encountered during the completion process.
  46. """
  47. exceptions = {}
  48. started: bool = False
  49. for provider in self.get_providers(stream and not ignore_stream, ignored):
  50. self.last_provider = provider
  51. debug.log(f"Using {provider.__name__} provider")
  52. yield ProviderInfo(**provider.get_dict(), model=model if model else getattr(provider, "default_model"))
  53. try:
  54. response = provider.get_create_function()(model, messages, stream=stream, **kwargs)
  55. for chunk in response:
  56. if chunk:
  57. yield chunk
  58. if is_content(chunk):
  59. started = True
  60. if started:
  61. return
  62. except Exception as e:
  63. exceptions[provider.__name__] = e
  64. debug.error(f"{provider.__name__} {type(e).__name__}: {e}")
  65. if started:
  66. raise e
  67. yield e
  68. raise_exceptions(exceptions)
  69. async def create_async_generator(
  70. self,
  71. model: str,
  72. messages: Messages,
  73. stream: bool = True,
  74. ignore_stream: bool = False,
  75. ignored: list[str] = [],
  76. api_key: str = None,
  77. conversation: JsonConversation = None,
  78. **kwargs
  79. ) -> AsyncResult:
  80. exceptions = {}
  81. started: bool = False
  82. for provider in self.get_providers(stream and not ignore_stream, ignored):
  83. self.last_provider = provider
  84. debug.log(f"Using {provider.__name__} provider" + (f" and {model} model" if model else ""))
  85. yield ProviderInfo(**provider.get_dict(), model=model if model else getattr(provider, "default_model"))
  86. extra_body = kwargs.copy()
  87. if self.add_api_key or provider.__name__ in ["HuggingFace", "HuggingFaceMedia"]:
  88. extra_body["api_key"] = api_key
  89. if conversation is not None and hasattr(conversation, provider.__name__):
  90. extra_body["conversation"] = JsonConversation(**getattr(conversation, provider.__name__))
  91. try:
  92. response = provider.get_async_create_function()(model, messages, stream=stream, **extra_body)
  93. if hasattr(response, "__aiter__"):
  94. async for chunk in response:
  95. if isinstance(chunk, JsonConversation):
  96. if conversation is None:
  97. conversation = JsonConversation()
  98. setattr(conversation, provider.__name__, chunk.get_dict())
  99. yield conversation
  100. elif chunk:
  101. yield chunk
  102. if is_content(chunk):
  103. started = True
  104. elif response:
  105. response = await response
  106. if response:
  107. yield response
  108. started = True
  109. if started:
  110. return
  111. except Exception as e:
  112. exceptions[provider.__name__] = e
  113. debug.error(f"{provider.__name__} {type(e).__name__}: {e}")
  114. if started:
  115. raise e
  116. yield e
  117. raise_exceptions(exceptions)
  118. def get_create_function(self) -> callable:
  119. return self.create_completion
  120. def get_async_create_function(self) -> callable:
  121. return self.create_async_generator
  122. def get_providers(self, stream: bool, ignored: list[str]) -> list[ProviderType]:
  123. providers = [p for p in self.providers if (p.supports_stream or not stream) and p.__name__ not in ignored]
  124. if self.shuffle:
  125. random.shuffle(providers)
  126. return providers
  127. class RetryProvider(IterListProvider):
  128. def __init__(
  129. self,
  130. providers: List[Type[BaseProvider]],
  131. shuffle: bool = True,
  132. single_provider_retry: bool = False,
  133. max_retries: int = 3,
  134. ) -> None:
  135. """
  136. Initialize the BaseRetryProvider.
  137. Args:
  138. providers (List[Type[BaseProvider]]): List of providers to use.
  139. shuffle (bool): Whether to shuffle the providers list.
  140. single_provider_retry (bool): Whether to retry a single provider if it fails.
  141. max_retries (int): Maximum number of retries for a single provider.
  142. """
  143. super().__init__(providers, shuffle)
  144. self.single_provider_retry = single_provider_retry
  145. self.max_retries = max_retries
  146. self.add_api_key = True
  147. def create_completion(
  148. self,
  149. model: str,
  150. messages: Messages,
  151. stream: bool = False,
  152. **kwargs,
  153. ) -> CreateResult:
  154. """
  155. Create a completion using available providers, with an option to stream the response.
  156. Args:
  157. model (str): The model to be used for completion.
  158. messages (Messages): The messages to be used for generating completion.
  159. stream (bool, optional): Flag to indicate if the response should be streamed. Defaults to False.
  160. Yields:
  161. CreateResult: Tokens or results from the completion.
  162. Raises:
  163. Exception: Any exception encountered during the completion process.
  164. """
  165. if self.single_provider_retry:
  166. exceptions = {}
  167. started: bool = False
  168. provider = self.providers[0]
  169. self.last_provider = provider
  170. for attempt in range(self.max_retries):
  171. try:
  172. if debug.logging:
  173. print(f"Using {provider.__name__} provider (attempt {attempt + 1})")
  174. response = provider.get_create_function()(model, messages, stream=stream, **kwargs)
  175. for chunk in response:
  176. yield chunk
  177. if is_content(chunk):
  178. started = True
  179. if started:
  180. return
  181. except Exception as e:
  182. exceptions[provider.__name__] = e
  183. if debug.logging:
  184. print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
  185. if started:
  186. raise e
  187. raise_exceptions(exceptions)
  188. else:
  189. yield from super().create_completion(model, messages, stream, **kwargs)
  190. async def create_async_generator(
  191. self,
  192. model: str,
  193. messages: Messages,
  194. stream: bool = True,
  195. **kwargs
  196. ) -> AsyncResult:
  197. exceptions = {}
  198. started = False
  199. if self.single_provider_retry:
  200. provider = self.providers[0]
  201. self.last_provider = provider
  202. for attempt in range(self.max_retries):
  203. try:
  204. debug.log(f"Using {provider.__name__} provider (attempt {attempt + 1})")
  205. response = provider.get_async_create_function()(model, messages, stream=stream, **kwargs)
  206. if hasattr(response, "__aiter__"):
  207. async for chunk in response:
  208. yield chunk
  209. if is_content(chunk):
  210. started = True
  211. else:
  212. response = await response
  213. if response:
  214. yield response
  215. started = True
  216. if started:
  217. return
  218. except Exception as e:
  219. exceptions[provider.__name__] = e
  220. if debug.logging:
  221. print(f"{provider.__name__}: {e.__class__.__name__}: {e}")
  222. raise_exceptions(exceptions)
  223. else:
  224. async for chunk in super().create_async_generator(model, messages, stream, **kwargs):
  225. yield chunk
  226. def raise_exceptions(exceptions: dict) -> None:
  227. """
  228. Raise a combined exception if any occurred during retries.
  229. Raises:
  230. RetryProviderError: If any provider encountered an exception.
  231. RetryNoProviderError: If no provider is found.
  232. """
  233. if exceptions:
  234. for provider_name, e in exceptions.items():
  235. if isinstance(e, (MissingAuthError, NoValidHarFileError)):
  236. raise e
  237. if len(exceptions) == 1:
  238. raise list(exceptions.values())[0]
  239. raise RetryProviderError("RetryProvider failed:\n" + "\n".join([
  240. f"{p}: {type(exception).__name__}: {exception}" for p, exception in exceptions.items()
  241. ])) from list(exceptions.values())[0]
  242. raise RetryNoProviderError("No provider found")