upload_image.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. """
  2. Module to handle image uploading and processing for Bing AI integrations.
  3. """
  4. from __future__ import annotations
  5. import string
  6. import random
  7. import json
  8. import math
  9. from aiohttp import ClientSession
  10. from PIL import Image
  11. from ...typing import ImageType, Tuple
  12. from ...image import to_image, process_image, to_base64, ImageResponse
  13. IMAGE_CONFIG = {
  14. "maxImagePixels": 360000,
  15. "imageCompressionRate": 0.7,
  16. "enableFaceBlurDebug": False,
  17. }
  18. async def upload_image(
  19. session: ClientSession,
  20. image_data: ImageType,
  21. tone: str,
  22. proxy: str = None
  23. ) -> ImageResponse:
  24. """
  25. Uploads an image to Bing's AI service and returns the image response.
  26. Args:
  27. session (ClientSession): The active session.
  28. image_data (bytes): The image data to be uploaded.
  29. tone (str): The tone of the conversation.
  30. proxy (str, optional): Proxy if any. Defaults to None.
  31. Raises:
  32. RuntimeError: If the image upload fails.
  33. Returns:
  34. ImageResponse: The response from the image upload.
  35. """
  36. image = to_image(image_data)
  37. new_width, new_height = calculate_new_dimensions(image)
  38. processed_img = process_image(image, new_width, new_height)
  39. img_binary_data = to_base64(processed_img, IMAGE_CONFIG['imageCompressionRate'])
  40. data, boundary = build_image_upload_payload(img_binary_data, tone)
  41. headers = prepare_headers(session, boundary)
  42. async with session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as response:
  43. if response.status != 200:
  44. raise RuntimeError("Failed to upload image.")
  45. return parse_image_response(await response.json())
  46. def calculate_new_dimensions(image: Image.Image) -> Tuple[int, int]:
  47. """
  48. Calculates the new dimensions for the image based on the maximum allowed pixels.
  49. Args:
  50. image (Image): The PIL Image object.
  51. Returns:
  52. Tuple[int, int]: The new width and height for the image.
  53. """
  54. width, height = image.size
  55. max_image_pixels = IMAGE_CONFIG['maxImagePixels']
  56. if max_image_pixels / (width * height) < 1:
  57. scale_factor = math.sqrt(max_image_pixels / (width * height))
  58. return int(width * scale_factor), int(height * scale_factor)
  59. return width, height
  60. def build_image_upload_payload(image_bin: str, tone: str) -> Tuple[str, str]:
  61. """
  62. Builds the payload for image uploading.
  63. Args:
  64. image_bin (str): Base64 encoded image binary data.
  65. tone (str): The tone of the conversation.
  66. Returns:
  67. Tuple[str, str]: The data and boundary for the payload.
  68. """
  69. boundary = "----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
  70. data = f"--{boundary}\r\n" \
  71. f"Content-Disposition: form-data; name=\"knowledgeRequest\"\r\n\r\n" \
  72. f"{json.dumps(build_knowledge_request(tone), ensure_ascii=False)}\r\n" \
  73. f"--{boundary}\r\n" \
  74. f"Content-Disposition: form-data; name=\"imageBase64\"\r\n\r\n" \
  75. f"{image_bin}\r\n" \
  76. f"--{boundary}--\r\n"
  77. return data, boundary
  78. def build_knowledge_request(tone: str) -> dict:
  79. """
  80. Builds the knowledge request payload.
  81. Args:
  82. tone (str): The tone of the conversation.
  83. Returns:
  84. dict: The knowledge request payload.
  85. """
  86. return {
  87. 'invokedSkills': ["ImageById"],
  88. 'subscriptionId': "Bing.Chat.Multimodal",
  89. 'invokedSkillsRequestData': {
  90. 'enableFaceBlur': True
  91. },
  92. 'convoData': {
  93. 'convoid': "",
  94. 'convotone': tone
  95. }
  96. }
  97. def prepare_headers(session: ClientSession, boundary: str) -> dict:
  98. """
  99. Prepares the headers for the image upload request.
  100. Args:
  101. session (ClientSession): The active session.
  102. boundary (str): The boundary string for the multipart/form-data.
  103. Returns:
  104. dict: The headers for the request.
  105. """
  106. headers = session.headers.copy()
  107. headers["Content-Type"] = f'multipart/form-data; boundary={boundary}'
  108. headers["Referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
  109. headers["Origin"] = 'https://www.bing.com'
  110. return headers
  111. def parse_image_response(response: dict) -> ImageResponse:
  112. """
  113. Parses the response from the image upload.
  114. Args:
  115. response (dict): The response dictionary.
  116. Raises:
  117. RuntimeError: If parsing the image info fails.
  118. Returns:
  119. ImageResponse: The parsed image response.
  120. """
  121. if not response.get('blobId'):
  122. raise RuntimeError("Failed to parse image info.")
  123. result = {'bcid': response.get('blobId', ""), 'blurredBcid': response.get('processedBlobId', "")}
  124. result["imageUrl"] = f"https://www.bing.com/images/blob?bcid={result['blurredBcid'] or result['bcid']}"
  125. result['originalImageUrl'] = (
  126. f"https://www.bing.com/images/blob?bcid={result['blurredBcid']}"
  127. if IMAGE_CONFIG["enableFaceBlurDebug"] else
  128. f"https://www.bing.com/images/blob?bcid={result['bcid']}"
  129. )
  130. return ImageResponse(result["imageUrl"], "", result)