AIUncensored.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from __future__ import annotations
  2. import json
  3. import random
  4. from aiohttp import ClientSession, ClientError
  5. import asyncio
  6. from itertools import cycle
  7. from ...typing import AsyncResult, Messages
  8. from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
  9. from ...image import ImageResponse
  10. class AIUncensored(AsyncGeneratorProvider, ProviderModelMixin):
  11. url = "https://www.aiuncensored.info/ai_uncensored"
  12. api_endpoints_text = [
  13. "https://twitterclone-i0wr.onrender.com/api/chat",
  14. "https://twitterclone-4e8t.onrender.com/api/chat",
  15. "https://twitterclone-8wd1.onrender.com/api/chat",
  16. ]
  17. api_endpoints_image = [
  18. "https://twitterclone-4e8t.onrender.com/api/image",
  19. "https://twitterclone-i0wr.onrender.com/api/image",
  20. "https://twitterclone-8wd1.onrender.com/api/image",
  21. ]
  22. working = False
  23. supports_stream = True
  24. supports_system_message = True
  25. supports_message_history = True
  26. default_model = 'TextGenerations'
  27. text_models = [default_model]
  28. image_models = ['ImageGenerations']
  29. models = [*text_models, *image_models]
  30. model_aliases = {
  31. "flux": "ImageGenerations",
  32. }
  33. @staticmethod
  34. def generate_cipher() -> str:
  35. """Generate a cipher in format like '3221229284179118'"""
  36. return ''.join([str(random.randint(0, 9)) for _ in range(16)])
  37. @classmethod
  38. def get_model(cls, model: str) -> str:
  39. if model in cls.models:
  40. return model
  41. elif model in cls.model_aliases:
  42. return cls.model_aliases[model]
  43. else:
  44. return cls.default_model
  45. @classmethod
  46. async def create_async_generator(
  47. cls,
  48. model: str,
  49. messages: Messages,
  50. proxy: str = None,
  51. **kwargs
  52. ) -> AsyncResult:
  53. model = cls.get_model(model)
  54. headers = {
  55. 'accept': '*/*',
  56. 'accept-language': 'en-US,en;q=0.9',
  57. 'cache-control': 'no-cache',
  58. 'content-type': 'application/json',
  59. 'origin': 'https://www.aiuncensored.info',
  60. 'pragma': 'no-cache',
  61. 'priority': 'u=1, i',
  62. 'referer': 'https://www.aiuncensored.info/',
  63. 'sec-ch-ua': '"Not?A_Brand";v="99", "Chromium";v="130"',
  64. 'sec-ch-ua-mobile': '?0',
  65. 'sec-ch-ua-platform': '"Linux"',
  66. 'sec-fetch-dest': 'empty',
  67. 'sec-fetch-mode': 'cors',
  68. 'sec-fetch-site': 'cross-site',
  69. 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36'
  70. }
  71. async with ClientSession(headers=headers) as session:
  72. if model in cls.image_models:
  73. prompt = messages[-1]['content']
  74. data = {
  75. "prompt": prompt,
  76. "cipher": cls.generate_cipher()
  77. }
  78. endpoints = cycle(cls.api_endpoints_image)
  79. while True:
  80. endpoint = next(endpoints)
  81. try:
  82. async with session.post(endpoint, json=data, proxy=proxy, timeout=10) as response:
  83. response.raise_for_status()
  84. response_data = await response.json()
  85. image_url = response_data['image_url']
  86. image_response = ImageResponse(images=image_url, alt=prompt)
  87. yield image_response
  88. return
  89. except (ClientError, asyncio.TimeoutError):
  90. continue
  91. elif model in cls.text_models:
  92. data = {
  93. "messages": messages,
  94. "cipher": cls.generate_cipher()
  95. }
  96. endpoints = cycle(cls.api_endpoints_text)
  97. while True:
  98. endpoint = next(endpoints)
  99. try:
  100. async with session.post(endpoint, json=data, proxy=proxy, timeout=10) as response:
  101. response.raise_for_status()
  102. full_response = ""
  103. async for line in response.content:
  104. line = line.decode('utf-8')
  105. if line.startswith("data: "):
  106. try:
  107. json_str = line[6:]
  108. if json_str != "[DONE]":
  109. data = json.loads(json_str)
  110. if "data" in data:
  111. full_response += data["data"]
  112. yield data["data"]
  113. except json.JSONDecodeError:
  114. continue
  115. return
  116. except (ClientError, asyncio.TimeoutError):
  117. continue