123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- from __future__ import annotations
- import os
- import time
- import uuid
- import asyncio
- import hashlib
- import re
- from urllib.parse import quote, unquote
- from aiohttp import ClientSession, ClientError
- from ..typing import Optional, Cookies
- from ..requests.aiohttp import get_connector
- from ..Provider.template import BackendApi
- from . import is_accepted_format, extract_data_uri
- from .. import debug
- # Directory for storing generated images
- images_dir = "./generated_images"
- def get_image_extension(image: str) -> str:
- """Extract image extension from URL or filename, default to .jpg"""
- match = re.search(r"\.(jpe?g|png|webp)$", image, re.IGNORECASE)
- return f".{match.group(1).lower()}" if match else ".jpg"
- def ensure_images_dir():
- """Create images directory if it doesn't exist"""
- os.makedirs(images_dir, exist_ok=True)
- def get_source_url(image: str, default: str = None) -> str:
- """Extract original URL from image parameter if present"""
- if "url=" in image:
- decoded_url = unquote(image.split("url=", 1)[1])
- if decoded_url.startswith(("http://", "https://")):
- return decoded_url
- return default
- async def copy_images(
- images: list[str],
- cookies: Optional[Cookies] = None,
- headers: Optional[dict] = None,
- proxy: Optional[str] = None,
- alt: str = None,
- add_url: bool = True,
- target: str = None,
- ssl: bool = None
- ) -> list[str]:
- """
- Download and store images locally with Unicode-safe filenames
- Returns list of relative image URLs
- """
- if add_url:
- add_url = not cookies
- ensure_images_dir()
-
- async with ClientSession(
- connector=get_connector(proxy=proxy),
- cookies=cookies,
- headers=headers,
- ) as session:
- async def copy_image(image: str, target: str = None) -> str:
- """Process individual image and return its local URL"""
- target_path = target
- if target_path is None:
- # Generate filename components
- file_hash = hashlib.sha256(image.encode()).hexdigest()[:16]
- timestamp = int(time.time())
-
- # Sanitize alt text for filename (Unicode-safe)
- if alt:
- # Keep letters, numbers, basic punctuation and all Unicode chars
- clean_alt = re.sub(
- r'[^\w\s.-]', # Allow all Unicode word chars
- '_',
- unquote(alt).strip(),
- flags=re.UNICODE
- )
- clean_alt = re.sub(r'[\s_]+', '_', clean_alt)[:100]
- else:
- clean_alt = "image"
- # Build safe filename with full Unicode support
- extension = get_image_extension(image)
- filename = (
- f"{timestamp}_"
- f"{clean_alt}_"
- f"{file_hash}"
- f"{extension}"
- )
- target_path = os.path.join(images_dir, filename)
- try:
- # Handle different image types
- if image.startswith("data:"):
- with open(target_path, "wb") as f:
- f.write(extract_data_uri(image))
- else:
- # Apply BackendApi settings if needed
- if BackendApi.working and image.startswith(BackendApi.url):
- request_headers = BackendApi.headers if headers is None else headers
- request_ssl = BackendApi.ssl
- else:
- request_headers = headers
- request_ssl = ssl
- async with session.get(image, ssl=request_ssl, headers=request_headers) as response:
- response.raise_for_status()
- with open(target_path, "wb") as f:
- async for chunk in response.content.iter_chunked(4096):
- f.write(chunk)
- # Verify file format
- if target is None and not os.path.splitext(target_path)[1]:
- with open(target_path, "rb") as f:
- file_header = f.read(12)
- detected_type = is_accepted_format(file_header)
- if detected_type:
- new_ext = f".{detected_type.split('/')[-1]}"
- os.rename(target_path, f"{target_path}{new_ext}")
- target_path = f"{target_path}{new_ext}"
- # Build URL with safe encoding
- url_filename = quote(os.path.basename(target_path))
- return f"/images/{url_filename}" + (('?url=' + quote(image)) if add_url and not image.startswith('data:') else '')
- except (ClientError, IOError, OSError) as e:
- debug.error(f"Image copying failed: {type(e).__name__}: {e}")
- if target_path and os.path.exists(target_path):
- os.unlink(target_path)
- return get_source_url(image, image)
- return await asyncio.gather(*[copy_image(img, target) for img in images])
|