123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- from __future__ import annotations
- import uuid
- import aiohttp
- import random
- import asyncio
- import json
- from ...image import ImageResponse
- from ...errors import MissingRequirementsError, NoValidHarFileError
- from ...typing import AsyncResult, Messages
- from ...requests.raise_for_status import raise_for_status
- from ...requests.aiohttp import get_connector
- from ...requests import get_nodriver
- from ..Copilot import get_headers, get_har_files
- from ..base_provider import AsyncGeneratorProvider, ProviderModelMixin
- from ..helper import get_random_hex
- from ... import debug
- class MicrosoftDesigner(AsyncGeneratorProvider, ProviderModelMixin):
- label = "Microsoft Designer"
- url = "https://designer.microsoft.com"
- working = True
- needs_auth = True
- default_image_model = "dall-e-3"
- image_models = [default_image_model, "1024x1024", "1024x1792", "1792x1024"]
- models = image_models
- @classmethod
- async def create_async_generator(
- cls,
- model: str,
- messages: Messages,
- prompt: str = None,
- proxy: str = None,
- **kwargs
- ) -> AsyncResult:
- image_size = "1024x1024"
- if model != cls.default_image_model and model in cls.image_models:
- image_size = model
- yield await cls.generate(messages[-1]["content"] if prompt is None else prompt, image_size, proxy)
- @classmethod
- async def generate(cls, prompt: str, image_size: str, proxy: str = None) -> ImageResponse:
- try:
- access_token, user_agent = readHAR("https://designerapp.officeapps.live.com")
- except NoValidHarFileError as h:
- debug.log(f"{cls.__name__}: {h}")
- try:
- access_token, user_agent = await get_access_token_and_user_agent(cls.url, proxy)
- except MissingRequirementsError:
- raise h
- images = await create_images(prompt, access_token, user_agent, image_size, proxy)
- return ImageResponse(images, prompt)
- async def create_images(prompt: str, access_token: str, user_agent: str, image_size: str, proxy: str = None, seed: int = None):
- url = 'https://designerapp.officeapps.live.com/designerapp/DallE.ashx?action=GetDallEImagesCogSci'
- if seed is None:
- seed = random.randint(0, 10000)
- headers = {
- "User-Agent": user_agent,
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "en-US",
- 'Authorization': f'Bearer {access_token}',
- "AudienceGroup": "Production",
- "Caller": "DesignerApp",
- "ClientId": "b5c2664a-7e9b-4a7a-8c9a-cd2c52dcf621",
- "SessionId": str(uuid.uuid4()),
- "UserId": get_random_hex(16),
- "ContainerId": "1e2843a7-2a98-4a6c-93f2-42002de5c478",
- "FileToken": "9f1a4cb7-37e7-4c90-b44d-cb61cfda4bb8",
- "x-upload-to-storage-das": "1",
- "traceparent": "",
- "X-DC-Hint": "FranceCentral",
- "Platform": "Web",
- "HostApp": "DesignerApp",
- "ReleaseChannel": "",
- "IsSignedInUser": "true",
- "Locale": "de-DE",
- "UserType": "MSA",
- "x-req-start": "2615401",
- "ClientBuild": "1.0.20241120.9",
- "ClientName": "DesignerApp",
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "cross-site",
- "Pragma": "no-cache",
- "Cache-Control": "no-cache",
- "Referer": "https://designer.microsoft.com/"
- }
- form_data = aiohttp.FormData()
- form_data.add_field('dalle-caption', prompt)
- form_data.add_field('dalle-scenario-name', 'TextToImage')
- form_data.add_field('dalle-batch-size', '4')
- form_data.add_field('dalle-image-response-format', 'UrlWithBase64Thumbnail')
- form_data.add_field('dalle-seed', seed)
- form_data.add_field('ClientFlights', 'EnableBICForDALLEFlight')
- form_data.add_field('dalle-hear-back-in-ms', 1000)
- form_data.add_field('dalle-include-b64-thumbnails', 'true')
- form_data.add_field('dalle-aspect-ratio-scaling-factor-b64-thumbnails', 0.3)
- form_data.add_field('dalle-image-size', image_size)
- async with aiohttp.ClientSession(connector=get_connector(proxy=proxy)) as session:
- async with session.post(url, headers=headers, data=form_data) as response:
- await raise_for_status(response)
- response_data = await response.json()
- form_data.add_field('dalle-boost-count', response_data.get('dalle-boost-count', 0))
- polling_meta_data = response_data.get('polling_response', {}).get('polling_meta_data', {})
- form_data.add_field('dalle-poll-url', polling_meta_data.get('poll_url', ''))
- while True:
- await asyncio.sleep(polling_meta_data.get('poll_interval', 1000) / 1000)
- async with session.post(url, headers=headers, data=form_data) as response:
- await raise_for_status(response)
- response_data = await response.json()
- images = [image["ImageUrl"] for image in response_data.get('image_urls_thumbnail', [])]
- if images:
- return images
- def readHAR(url: str) -> tuple[str, str]:
- api_key = None
- user_agent = None
- for path in get_har_files():
- with open(path, 'rb') as file:
- try:
- harFile = json.loads(file.read())
- except json.JSONDecodeError:
- # Error: not a HAR file!
- continue
- for v in harFile['log']['entries']:
- if v['request']['url'].startswith(url):
- v_headers = get_headers(v)
- if "authorization" in v_headers:
- api_key = v_headers["authorization"].split(maxsplit=1).pop()
- if "user-agent" in v_headers:
- user_agent = v_headers["user-agent"]
- if api_key is None:
- raise NoValidHarFileError("No access token found in .har files")
- return api_key, user_agent
- async def get_access_token_and_user_agent(url: str, proxy: str = None):
- browser = await get_nodriver(proxy=proxy, user_data_dir="designer")
- page = await browser.get(url)
- user_agent = await page.evaluate("navigator.userAgent")
- access_token = None
- while access_token is None:
- access_token = await page.evaluate("""
- (() => {
- for (var i = 0; i < localStorage.length; i++) {
- try {
- item = JSON.parse(localStorage.getItem(localStorage.key(i)));
- if (item.credentialType == "AccessToken"
- && item.expiresOn > Math.floor(Date.now() / 1000)
- && item.target.includes("designerappservice")) {
- return item.secret;
- }
- } catch(e) {}
- }
- })()
- """)
- if access_token is None:
- await asyncio.sleep(1)
- await page.close()
- return access_token, user_agent
|