helper.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from __future__ import annotations
  2. import asyncio
  3. import os
  4. import random
  5. import secrets
  6. import string
  7. from asyncio import AbstractEventLoop, BaseEventLoop
  8. from platformdirs import user_config_dir
  9. from browser_cookie3 import (
  10. chrome, chromium, opera, opera_gx,
  11. brave, edge, vivaldi, firefox,
  12. _LinuxPasswordManager, BrowserCookieError
  13. )
  14. from ..typing import Dict, Messages
  15. from .. import debug
  16. # Global variable to store cookies
  17. _cookies: Dict[str, Dict[str, str]] = {}
  18. def get_event_loop() -> AbstractEventLoop:
  19. """
  20. Get the current asyncio event loop. If the loop is closed or not set, create a new event loop.
  21. If a loop is running, handle nested event loops. Patch the loop if 'nest_asyncio' is installed.
  22. Returns:
  23. AbstractEventLoop: The current or new event loop.
  24. """
  25. try:
  26. loop = asyncio.get_event_loop()
  27. if isinstance(loop, BaseEventLoop):
  28. loop._check_closed()
  29. except RuntimeError:
  30. loop = asyncio.new_event_loop()
  31. asyncio.set_event_loop(loop)
  32. try:
  33. asyncio.get_running_loop()
  34. if not hasattr(loop.__class__, "_nest_patched"):
  35. import nest_asyncio
  36. nest_asyncio.apply(loop)
  37. except RuntimeError:
  38. pass
  39. except ImportError:
  40. raise RuntimeError(
  41. 'Use "create_async" instead of "create" function in a running event loop. Or install "nest_asyncio" package.'
  42. )
  43. return loop
  44. if os.environ.get('DBUS_SESSION_BUS_ADDRESS') == "/dev/null":
  45. _LinuxPasswordManager.get_password = lambda a, b: b"secret"
  46. def get_cookies(domain_name: str = '') -> Dict[str, str]:
  47. """
  48. Load cookies for a given domain from all supported browsers and cache the results.
  49. Args:
  50. domain_name (str): The domain for which to load cookies.
  51. Returns:
  52. Dict[str, str]: A dictionary of cookie names and values.
  53. """
  54. if domain_name in _cookies:
  55. return _cookies[domain_name]
  56. cookies = _load_cookies_from_browsers(domain_name)
  57. _cookies[domain_name] = cookies
  58. return cookies
  59. def _load_cookies_from_browsers(domain_name: str) -> Dict[str, str]:
  60. """
  61. Helper function to load cookies from various browsers.
  62. Args:
  63. domain_name (str): The domain for which to load cookies.
  64. Returns:
  65. Dict[str, str]: A dictionary of cookie names and values.
  66. """
  67. cookies = {}
  68. for cookie_fn in [_g4f, chrome, chromium, opera, opera_gx, brave, edge, vivaldi, firefox]:
  69. try:
  70. cookie_jar = cookie_fn(domain_name=domain_name)
  71. if len(cookie_jar) and debug.logging:
  72. print(f"Read cookies from {cookie_fn.__name__} for {domain_name}")
  73. for cookie in cookie_jar:
  74. if cookie.name not in cookies:
  75. cookies[cookie.name] = cookie.value
  76. except BrowserCookieError:
  77. pass
  78. except Exception as e:
  79. if debug.logging:
  80. print(f"Error reading cookies from {cookie_fn.__name__} for {domain_name}: {e}")
  81. return cookies
  82. def _g4f(domain_name: str) -> list:
  83. """
  84. Load cookies from the 'g4f' browser (if exists).
  85. Args:
  86. domain_name (str): The domain for which to load cookies.
  87. Returns:
  88. list: List of cookies.
  89. """
  90. user_data_dir = user_config_dir("g4f")
  91. cookie_file = os.path.join(user_data_dir, "Default", "Cookies")
  92. return [] if not os.path.exists(cookie_file) else chrome(cookie_file, domain_name)
  93. def format_prompt(messages: Messages, add_special_tokens=False) -> str:
  94. """
  95. Format a series of messages into a single string, optionally adding special tokens.
  96. Args:
  97. messages (Messages): A list of message dictionaries, each containing 'role' and 'content'.
  98. add_special_tokens (bool): Whether to add special formatting tokens.
  99. Returns:
  100. str: A formatted string containing all messages.
  101. """
  102. if not add_special_tokens and len(messages) <= 1:
  103. return messages[0]["content"]
  104. formatted = "\n".join([
  105. f'{message["role"].capitalize()}: {message["content"]}'
  106. for message in messages
  107. ])
  108. return f"{formatted}\nAssistant:"
  109. def get_random_string(length: int = 10) -> str:
  110. """
  111. Generate a random string of specified length, containing lowercase letters and digits.
  112. Args:
  113. length (int, optional): Length of the random string to generate. Defaults to 10.
  114. Returns:
  115. str: A random string of the specified length.
  116. """
  117. return ''.join(
  118. random.choice(string.ascii_lowercase + string.digits)
  119. for _ in range(length)
  120. )
  121. def get_random_hex() -> str:
  122. """
  123. Generate a random hexadecimal string of a fixed length.
  124. Returns:
  125. str: A random hexadecimal string of 32 characters (16 bytes).
  126. """
  127. return secrets.token_hex(16).zfill(32)