copy_images.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from __future__ import annotations
  2. import os
  3. import time
  4. import uuid
  5. import asyncio
  6. import hashlib
  7. import re
  8. from urllib.parse import quote, unquote
  9. from aiohttp import ClientSession, ClientError
  10. from ..typing import Optional, Cookies
  11. from ..requests.aiohttp import get_connector
  12. from ..Provider.template import BackendApi
  13. from . import is_accepted_format, extract_data_uri
  14. from .. import debug
  15. # Directory for storing generated images
  16. images_dir = "./generated_images"
  17. def get_image_extension(image: str) -> str:
  18. """Extract image extension from URL or filename, default to .jpg"""
  19. match = re.search(r"\.(jpe?g|png|webp)$", image, re.IGNORECASE)
  20. return f".{match.group(1).lower()}" if match else ".jpg"
  21. def ensure_images_dir():
  22. """Create images directory if it doesn't exist"""
  23. os.makedirs(images_dir, exist_ok=True)
  24. def get_source_url(image: str, default: str = None) -> str:
  25. """Extract original URL from image parameter if present"""
  26. if "url=" in image:
  27. decoded_url = unquote(image.split("url=", 1)[1])
  28. if decoded_url.startswith(("http://", "https://")):
  29. return decoded_url
  30. return default
  31. async def copy_images(
  32. images: list[str],
  33. cookies: Optional[Cookies] = None,
  34. headers: Optional[dict] = None,
  35. proxy: Optional[str] = None,
  36. alt: str = None,
  37. add_url: bool = True,
  38. target: str = None,
  39. ssl: bool = None
  40. ) -> list[str]:
  41. """
  42. Download and store images locally with Unicode-safe filenames
  43. Returns list of relative image URLs
  44. """
  45. if add_url:
  46. add_url = not cookies
  47. ensure_images_dir()
  48. async with ClientSession(
  49. connector=get_connector(proxy=proxy),
  50. cookies=cookies,
  51. headers=headers,
  52. ) as session:
  53. async def copy_image(image: str, target: str = None) -> str:
  54. """Process individual image and return its local URL"""
  55. target_path = target
  56. if target_path is None:
  57. # Generate filename components
  58. file_hash = hashlib.sha256(image.encode()).hexdigest()[:16]
  59. timestamp = int(time.time())
  60. # Sanitize alt text for filename (Unicode-safe)
  61. if alt:
  62. # Keep letters, numbers, basic punctuation and all Unicode chars
  63. clean_alt = re.sub(
  64. r'[^\w\s.-]', # Allow all Unicode word chars
  65. '_',
  66. unquote(alt).strip(),
  67. flags=re.UNICODE
  68. )
  69. clean_alt = re.sub(r'[\s_]+', '_', clean_alt)[:100]
  70. else:
  71. clean_alt = "image"
  72. # Build safe filename with full Unicode support
  73. extension = get_image_extension(image)
  74. filename = (
  75. f"{timestamp}_"
  76. f"{clean_alt}_"
  77. f"{file_hash}"
  78. f"{extension}"
  79. )
  80. target_path = os.path.join(images_dir, filename)
  81. try:
  82. # Handle different image types
  83. if image.startswith("data:"):
  84. with open(target_path, "wb") as f:
  85. f.write(extract_data_uri(image))
  86. else:
  87. # Apply BackendApi settings if needed
  88. if BackendApi.working and image.startswith(BackendApi.url):
  89. request_headers = BackendApi.headers if headers is None else headers
  90. request_ssl = BackendApi.ssl
  91. else:
  92. request_headers = headers
  93. request_ssl = ssl
  94. async with session.get(image, ssl=request_ssl, headers=request_headers) as response:
  95. response.raise_for_status()
  96. with open(target_path, "wb") as f:
  97. async for chunk in response.content.iter_chunked(4096):
  98. f.write(chunk)
  99. # Verify file format
  100. if target is None and not os.path.splitext(target_path)[1]:
  101. with open(target_path, "rb") as f:
  102. file_header = f.read(12)
  103. detected_type = is_accepted_format(file_header)
  104. if detected_type:
  105. new_ext = f".{detected_type.split('/')[-1]}"
  106. os.rename(target_path, f"{target_path}{new_ext}")
  107. target_path = f"{target_path}{new_ext}"
  108. # Build URL with safe encoding
  109. url_filename = quote(os.path.basename(target_path))
  110. return f"/images/{url_filename}" + (('?url=' + quote(image)) if add_url and not image.startswith('data:') else '')
  111. except (ClientError, IOError, OSError) as e:
  112. debug.error(f"Image copying failed: {type(e).__name__}: {e}")
  113. if target_path and os.path.exists(target_path):
  114. os.unlink(target_path)
  115. return get_source_url(image, image)
  116. return await asyncio.gather(*[copy_image(img, target) for img in images])