__init__.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from __future__ import annotations
  2. import os
  3. import logging
  4. from typing import Union, Optional, Coroutine
  5. from . import debug, version
  6. from .models import Model
  7. from .client import Client, AsyncClient
  8. from .typing import Messages, CreateResult, AsyncResult, ImageType
  9. from .errors import StreamNotSupportedError
  10. from .cookies import get_cookies, set_cookies
  11. from .providers.types import ProviderType
  12. from .providers.helper import concat_chunks, async_concat_chunks
  13. from .client.service import get_model_and_provider
  14. #Configure "g4f" logger
  15. logger = logging.getLogger(__name__)
  16. log_handler = logging.StreamHandler()
  17. log_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
  18. logger.addHandler(log_handler)
  19. logger.setLevel(logging.ERROR)
  20. class ChatCompletion:
  21. @staticmethod
  22. def create(model : Union[Model, str],
  23. messages : Messages,
  24. provider : Union[ProviderType, str, None] = None,
  25. stream : bool = False,
  26. image : ImageType = None,
  27. image_name: Optional[str] = None,
  28. ignore_working: bool = False,
  29. ignore_stream: bool = False,
  30. **kwargs) -> Union[CreateResult, str]:
  31. if image is not None:
  32. kwargs["media"] = [(image, image_name)]
  33. elif "images" in kwargs:
  34. kwargs["media"] = kwargs.pop("images")
  35. model, provider = get_model_and_provider(
  36. model, provider, stream,
  37. ignore_working,
  38. ignore_stream,
  39. has_images="media" in kwargs,
  40. )
  41. if "proxy" not in kwargs:
  42. proxy = os.environ.get("G4F_PROXY")
  43. if proxy:
  44. kwargs["proxy"] = proxy
  45. if ignore_stream:
  46. kwargs["ignore_stream"] = True
  47. result = provider.get_create_function()(model, messages, stream=stream, **kwargs)
  48. return result if stream or ignore_stream else concat_chunks(result)
  49. @staticmethod
  50. def create_async(model : Union[Model, str],
  51. messages : Messages,
  52. provider : Union[ProviderType, str, None] = None,
  53. stream : bool = False,
  54. image : ImageType = None,
  55. image_name: Optional[str] = None,
  56. ignore_stream: bool = False,
  57. ignore_working: bool = False,
  58. **kwargs) -> Union[AsyncResult, Coroutine[str]]:
  59. if image is not None:
  60. kwargs["media"] = [(image, image_name)]
  61. elif "images" in kwargs:
  62. kwargs["media"] = kwargs.pop("images")
  63. model, provider = get_model_and_provider(model, provider, False, ignore_working, has_images="media" in kwargs)
  64. if "proxy" not in kwargs:
  65. proxy = os.environ.get("G4F_PROXY")
  66. if proxy:
  67. kwargs["proxy"] = proxy
  68. if ignore_stream:
  69. kwargs["ignore_stream"] = True
  70. result = provider.get_async_create_function()(model, messages, stream=stream, **kwargs)
  71. if not stream and not ignore_stream:
  72. if hasattr(result, "__aiter__"):
  73. result = async_concat_chunks(result)
  74. return result