Chatgpt4Online.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. from __future__ import annotations
  2. import re
  3. import json
  4. from aiohttp import ClientSession
  5. from ..typing import Messages, AsyncResult
  6. from ..requests import get_args_from_browser
  7. from .base_provider import AsyncGeneratorProvider
  8. from .helper import get_random_string
  9. class Chatgpt4Online(AsyncGeneratorProvider):
  10. url = "https://chatgpt4online.org"
  11. supports_message_history = True
  12. supports_gpt_35_turbo = True
  13. working = False
  14. _wpnonce = None
  15. _context_id = None
  16. @classmethod
  17. async def create_async_generator(
  18. cls,
  19. model: str,
  20. messages: Messages,
  21. proxy: str = None,
  22. **kwargs
  23. ) -> AsyncResult:
  24. args = get_args_from_browser(f"{cls.url}/chat/", proxy=proxy)
  25. async with ClientSession(**args) as session:
  26. if not cls._wpnonce:
  27. async with session.get(f"{cls.url}/chat/", proxy=proxy) as response:
  28. response.raise_for_status()
  29. response = await response.text()
  30. result = re.search(r'restNonce":"(.*?)"', response)
  31. if result:
  32. cls._wpnonce = result.group(1)
  33. else:
  34. raise RuntimeError("No nonce found")
  35. result = re.search(r'contextId":(.*?),', response)
  36. if result:
  37. cls._context_id = result.group(1)
  38. else:
  39. raise RuntimeError("No contextId found")
  40. data = {
  41. "botId":"default",
  42. "customId":None,
  43. "session":"N/A",
  44. "chatId":get_random_string(11),
  45. "contextId":cls._context_id,
  46. "messages":messages[:-1],
  47. "newMessage":messages[-1]["content"],
  48. "newImageId":None,
  49. "stream":True
  50. }
  51. async with session.post(
  52. f"{cls.url}/wp-json/mwai-ui/v1/chats/submit",
  53. json=data,
  54. proxy=proxy,
  55. headers={"x-wp-nonce": cls._wpnonce}
  56. ) as response:
  57. response.raise_for_status()
  58. async for line in response.content:
  59. if line.startswith(b"data: "):
  60. line = json.loads(line[6:])
  61. if "type" not in line:
  62. raise RuntimeError(f"Response: {line}")
  63. elif line["type"] == "live":
  64. yield line["data"]
  65. elif line["type"] == "end":
  66. break