MagickPen.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. from __future__ import annotations
  2. from aiohttp import ClientSession
  3. import hashlib
  4. import time
  5. import random
  6. import re
  7. import json
  8. from ..typing import AsyncResult, Messages
  9. from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
  10. from .helper import format_prompt
  11. class MagickPen(AsyncGeneratorProvider, ProviderModelMixin):
  12. url = "https://magickpen.com"
  13. api_endpoint = "https://api.magickpen.com/ask"
  14. working = True
  15. supports_stream = True
  16. supports_system_message = True
  17. supports_message_history = True
  18. default_model = 'gpt-4o-mini'
  19. models = ['gpt-4o-mini']
  20. @classmethod
  21. async def fetch_api_credentials(cls) -> tuple:
  22. url = "https://magickpen.com/_nuxt/bf709a9ce19f14e18116.js"
  23. async with ClientSession() as session:
  24. async with session.get(url) as response:
  25. text = await response.text()
  26. pattern = r'"X-API-Secret":"(\w+)"'
  27. match = re.search(pattern, text)
  28. X_API_SECRET = match.group(1) if match else None
  29. timestamp = str(int(time.time() * 1000))
  30. nonce = str(random.random())
  31. s = ["TGDBU9zCgM", timestamp, nonce]
  32. s.sort()
  33. signature_string = ''.join(s)
  34. signature = hashlib.md5(signature_string.encode()).hexdigest()
  35. pattern = r'secret:"(\w+)"'
  36. match = re.search(pattern, text)
  37. secret = match.group(1) if match else None
  38. if X_API_SECRET and timestamp and nonce and secret:
  39. return X_API_SECRET, signature, timestamp, nonce, secret
  40. else:
  41. raise Exception("Unable to extract all the necessary data from the JavaScript file.")
  42. @classmethod
  43. async def create_async_generator(
  44. cls,
  45. model: str,
  46. messages: Messages,
  47. proxy: str = None,
  48. **kwargs
  49. ) -> AsyncResult:
  50. model = cls.get_model(model)
  51. X_API_SECRET, signature, timestamp, nonce, secret = await cls.fetch_api_credentials()
  52. headers = {
  53. 'accept': 'application/json, text/plain, */*',
  54. 'accept-language': 'en-US,en;q=0.9',
  55. 'content-type': 'application/json',
  56. 'nonce': nonce,
  57. 'origin': cls.url,
  58. 'referer': f"{cls.url}/",
  59. 'secret': secret,
  60. 'signature': signature,
  61. 'timestamp': timestamp,
  62. 'x-api-secret': X_API_SECRET,
  63. }
  64. async with ClientSession(headers=headers) as session:
  65. prompt = format_prompt(messages)
  66. payload = {
  67. 'query': prompt,
  68. 'turnstileResponse': '',
  69. 'action': 'verify'
  70. }
  71. async with session.post(cls.api_endpoint, json=payload, proxy=proxy) as response:
  72. response.raise_for_status()
  73. async for chunk in response.content:
  74. if chunk:
  75. yield chunk.decode()