123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- from __future__ import annotations
- import time
- import json
- import random
- import requests
- import asyncio
- from urllib.parse import quote_plus
- from typing import Optional
- from aiohttp import ClientSession
- from .helper import filter_none, format_image_prompt
- from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
- from ..typing import AsyncResult, Messages, MediaListType
- from ..image import is_data_an_audio
- from ..errors import ModelNotFoundError, ResponseError
- from ..requests import see_stream
- from ..requests.raise_for_status import raise_for_status
- from ..requests.aiohttp import get_connector
- from ..image.copy_images import save_response_media
- from ..image import use_aspect_ratio
- from ..providers.response import FinishReason, Usage, ToolCalls, ImageResponse, Reasoning, TitleGeneration, SuggestedFollowups
- from ..tools.media import render_messages
- from ..constants import STATIC_URL
- from .. import debug
- DEFAULT_HEADERS = {
- "accept": "*/*",
- 'accept-language': 'en-US,en;q=0.9',
- "user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
- "referer": "https://pollinations.ai/",
- "origin": "https://pollinations.ai",
- }
- FOLLOWUPS_TOOLS = [{
- "type": "function",
- "function": {
- "name": "options",
- "description": "Provides options for the conversation",
- "parameters": {
- "properties": {
- "title": {
- "title": "Conversation Title",
- "type": "string"
- },
- "followups": {
- "items": {
- "type": "string"
- },
- "title": "Suggested Followups",
- "type": "array"
- }
- },
- "title": "Conversation",
- "type": "object"
- }
- }
- }]
- FOLLOWUPS_DEVELOPER_MESSAGE = [{
- "role": "developer",
- "content": "Prefix conversation title with one or more emojies. Suggested 4 Followups"
- }]
- class PollinationsAI(AsyncGeneratorProvider, ProviderModelMixin):
- label = "Pollinations AI"
- url = "https://pollinations.ai"
- working = True
- supports_system_message = True
- supports_message_history = True
- # API endpoints
- text_api_endpoint = "https://text.pollinations.ai"
- openai_endpoint = "https://text.pollinations.ai/openai"
- image_api_endpoint = "https://image.pollinations.ai/"
- # Models configuration
- default_model = "openai"
- default_image_model = "flux"
- default_vision_model = default_model
- default_audio_model = "openai-audio"
- text_models = [default_model, "evil"]
- image_models = [default_image_model]
- audio_models = {default_audio_model: []}
- extra_image_models = ["flux-pro", "flux-dev", "flux-schnell", "midjourney", "dall-e-3", "turbo"]
- vision_models = [default_vision_model, "gpt-4o-mini", "openai", "openai-large", "openai-reasoning", "searchgpt"]
- _models_loaded = False
- # https://github.com/pollinations/pollinations/blob/master/text.pollinations.ai/generateTextPortkey.js#L15
- model_aliases = {
- ### Text Models ###
- "gpt-4o-mini": "openai",
- "gpt-4.1-nano": "openai-fast",
- "gpt-4": "openai-large",
- "gpt-4o": "openai-large",
- "gpt-4.1": "openai-large",
- "o4-mini": "openai-reasoning",
- "gpt-4.1-mini": "openai",
- "command-r-plus-08-2024": "command-r",
- "gemini-2.5-flash": "gemini",
- "gemini-2.0-flash-thinking": "gemini-thinking",
- "qwen-2.5-coder-32b": "qwen-coder",
- "llama-3.3-70b": "llama",
- "llama-4-scout": "llamascout",
- "llama-4-scout-17b": "llamascout",
- "mistral-small-3.1-24b": "mistral",
- "deepseek-r1": "deepseek-reasoning-large",
- "deepseek-r1-distill-llama-70b": "deepseek-reasoning-large",
- #"deepseek-r1-distill-llama-70b": "deepseek-r1-llama",
- #"mistral-small-3.1-24b": "unity", # Personas
- #"mirexa": "mirexa", # Personas
- #"midijourney": "midijourney", # Personas
- #"rtist": "rtist", # Personas
- #"searchgpt": "searchgpt",
- #"evil": "evil", # Personas
- "deepseek-r1-distill-qwen-32b": "deepseek-reasoning",
- "phi-4": "phi",
- #"pixtral-12b": "pixtral",
- #"hormoz-8b": "hormoz",
- "qwq-32b": "qwen-qwq",
- #"hypnosis-tracy-7b": "hypnosis-tracy", # Personas
- #"mistral-?": "sur", # Personas
- "deepseek-v3": "deepseek",
- "deepseek-v3-0324": "deepseek",
- #"bidara": "bidara", # Personas
- ### Audio Models ###
- "gpt-4o-mini-audio": "openai-audio",
- ### Image Models ###
- "sdxl-turbo": "turbo",
- }
- @classmethod
- def get_model(cls, model: str) -> str:
- """Get the internal model name from the user-provided model name."""
- if not model:
- return cls.default_model
-
- # Check if the model exists directly in our model lists
- if model in cls.text_models or model in cls.image_models or model in cls.audio_models:
- return model
-
- # Check if there's an alias for this model
- if model in cls.model_aliases:
- return cls.model_aliases[model]
-
- # If no match is found, raise an error
- raise ModelNotFoundError(f"Model {model} not found")
- @classmethod
- def get_models(cls, **kwargs):
- if not cls._models_loaded:
- try:
- # Update of image models
- image_response = requests.get("https://image.pollinations.ai/models")
- if image_response.ok:
- new_image_models = image_response.json()
- else:
- new_image_models = []
- # Combine image models without duplicates
- all_image_models = [cls.default_image_model] # Start with default model
-
- # Add extra image models if not already in the list
- for model in cls.extra_image_models + new_image_models:
- if model not in all_image_models:
- all_image_models.append(model)
-
- cls.image_models = all_image_models
- text_response = requests.get("https://text.pollinations.ai/models")
- text_response.raise_for_status()
- models = text_response.json()
- # Purpose of audio models
- cls.audio_models = {
- model.get("name"): model.get("voices")
- for model in models
- if "output_modalities" in model and "audio" in model["output_modalities"] and model.get("name") != "gemini"
- }
- if cls.default_audio_model in cls.audio_models:
- cls.audio_models = {**cls.audio_models, **{voice: {} for voice in cls.audio_models[cls.default_audio_model]}}
- cls.vision_models.extend([
- model.get("name")
- for model in models
- if model.get("vision") and model not in cls.vision_models
- ])
- for alias, model in cls.model_aliases.items():
- if model in cls.vision_models and alias not in cls.vision_models:
- cls.vision_models.append(alias)
- # Create a set of unique text models starting with default model
- unique_text_models = cls.text_models.copy()
- # Add models from vision_models
- unique_text_models.extend(cls.vision_models)
- # Add models from the API response
- for model in models:
- model_name = model.get("name")
- if model_name and "input_modalities" in model and "text" in model["input_modalities"]:
- unique_text_models.append(model_name)
- if cls.default_audio_model in cls.audio_models:
- unique_text_models.extend([voice for voice in cls.audio_models[cls.default_audio_model]])
- # Convert to list and update text_models
- cls.text_models = list(dict.fromkeys(unique_text_models))
- cls._models_loaded = True
- except Exception as e:
- # Save default models in case of an error
- if not cls.text_models:
- cls.text_models = [cls.default_model]
- if not cls.image_models:
- cls.image_models = [cls.default_image_model]
- debug.error(f"Failed to fetch models: {e}")
- # Return unique models across all categories
- all_models = cls.text_models.copy()
- all_models.extend(cls.image_models)
- all_models.extend(cls.audio_models.keys())
- return list(dict.fromkeys(all_models))
- @classmethod
- async def create_async_generator(
- cls,
- model: str,
- messages: Messages,
- stream: bool = True,
- proxy: str = None,
- cache: bool = False,
- referrer: str = STATIC_URL,
- extra_body: dict = {},
- # Image generation parameters
- prompt: str = None,
- aspect_ratio: str = "1:1",
- width: int = None,
- height: int = None,
- seed: Optional[int] = None,
- nologo: bool = True,
- private: bool = False,
- enhance: bool = False,
- safe: bool = False,
- n: int = 1,
- # Text generation parameters
- media: MediaListType = None,
- temperature: float = None,
- presence_penalty: float = None,
- top_p: float = None,
- frequency_penalty: float = None,
- response_format: Optional[dict] = None,
- extra_parameters: list[str] = ["tools", "parallel_tool_calls", "tool_choice", "reasoning_effort", "logit_bias", "voice", "modalities", "audio"],
- **kwargs
- ) -> AsyncResult:
- # Load model list
- cls.get_models()
- if not model:
- has_audio = "audio" in kwargs or "audio" in kwargs.get("modalities", [])
- if not has_audio and media is not None:
- for media_data, filename in media:
- if is_data_an_audio(media_data, filename):
- has_audio = True
- break
- model = cls.default_audio_model if has_audio else model
- try:
- model = cls.get_model(model)
- except ModelNotFoundError:
- pass
- if model in cls.image_models:
- async for chunk in cls._generate_image(
- model=model,
- prompt=format_image_prompt(messages, prompt),
- proxy=proxy,
- aspect_ratio=aspect_ratio,
- width=width,
- height=height,
- seed=seed,
- cache=cache,
- nologo=nologo,
- private=private,
- enhance=enhance,
- safe=safe,
- n=n,
- referrer=referrer
- ):
- yield chunk
- else:
- if prompt is not None and len(messages) == 1:
- messages = [{
- "role": "user",
- "content": prompt
- }]
- if model and model in cls.audio_models[cls.default_audio_model]:
- kwargs["audio"] = {
- "voice": model,
- }
- model = cls.default_audio_model
- async for result in cls._generate_text(
- model=model,
- messages=messages,
- media=media,
- proxy=proxy,
- temperature=temperature,
- presence_penalty=presence_penalty,
- top_p=top_p,
- frequency_penalty=frequency_penalty,
- response_format=response_format,
- seed=seed,
- cache=cache,
- stream=stream,
- extra_parameters=extra_parameters,
- referrer=referrer,
- extra_body=extra_body,
- **kwargs
- ):
- yield result
- @classmethod
- async def _generate_image(
- cls,
- model: str,
- prompt: str,
- proxy: str,
- aspect_ratio: str,
- width: int,
- height: int,
- seed: Optional[int],
- cache: bool,
- nologo: bool,
- private: bool,
- enhance: bool,
- safe: bool,
- n: int,
- referrer: str
- ) -> AsyncResult:
- params = use_aspect_ratio({
- "width": width,
- "height": height,
- "model": model,
- "nologo": str(nologo).lower(),
- "private": str(private).lower(),
- "enhance": str(enhance).lower(),
- "safe": str(safe).lower(),
- }, aspect_ratio)
- query = "&".join(f"{k}={quote_plus(str(v))}" for k, v in params.items() if v is not None)
- prompt = quote_plus(prompt)[:2048-len(cls.image_api_endpoint)-len(query)-8]
- url = f"{cls.image_api_endpoint}prompt/{prompt}?{query}"
- def get_image_url(i: int, seed: Optional[int] = None):
- if i == 0:
- if not cache and seed is None:
- seed = random.randint(0, 2**32)
- else:
- seed = random.randint(0, 2**32)
- return f"{url}&seed={seed}" if seed else url
- async with ClientSession(headers=DEFAULT_HEADERS, connector=get_connector(proxy=proxy)) as session:
- responses = set()
- responses.add(Reasoning(status=f"Generating {n} {'image' if n == 1 else 'images'}"))
- finished = 0
- start = time.time()
- async def get_image(responses: set, i: int, seed: Optional[int] = None):
- nonlocal finished
- async with session.get(get_image_url(i, seed), allow_redirects=False, headers={"referer": referrer}) as response:
- try:
- await raise_for_status(response)
- except Exception as e:
- debug.error(f"Error fetching image: {e}")
- responses.add(ImageResponse(str(response.url), prompt))
- finished += 1
- responses.add(Reasoning(status=f"Image {finished}/{n} generated in {time.time() - start:.2f}s"))
- tasks = []
- for i in range(int(n)):
- tasks.append(asyncio.create_task(get_image(responses, i, seed)))
- while finished < n or len(responses) > 0:
- while len(responses) > 0:
- yield responses.pop()
- await asyncio.sleep(0.1)
- await asyncio.gather(*tasks)
- @classmethod
- async def _generate_text(
- cls,
- model: str,
- messages: Messages,
- media: MediaListType,
- proxy: str,
- temperature: float,
- presence_penalty: float,
- top_p: float,
- frequency_penalty: float,
- response_format: Optional[dict],
- seed: Optional[int],
- cache: bool,
- stream: bool,
- extra_parameters: list[str],
- referrer: str,
- extra_body: dict,
- **kwargs
- ) -> AsyncResult:
- if not cache and seed is None:
- seed = random.randint(0, 2**32)
- async with ClientSession(headers=DEFAULT_HEADERS, connector=get_connector(proxy=proxy)) as session:
- if model in cls.audio_models:
- if "audio" in kwargs and kwargs.get("audio", {}).get("voice") is None:
- kwargs["audio"]["voice"] = cls.audio_models[model][0]
- url = cls.text_api_endpoint
- stream = False
- else:
- url = cls.openai_endpoint
- extra_body.update({param: kwargs[param] for param in extra_parameters if param in kwargs})
- data = filter_none(
- messages=list(render_messages(messages, media)),
- model=model,
- temperature=temperature,
- presence_penalty=presence_penalty,
- top_p=top_p,
- frequency_penalty=frequency_penalty,
- response_format=response_format,
- stream=stream,
- seed=seed,
- cache=cache,
- **extra_body
- )
- async with session.post(url, json=data, headers={"referer": referrer}) as response:
- if response.status == 400:
- debug.error(f"Error: 400 - Bad Request: {data}")
- await raise_for_status(response)
- if response.headers["content-type"].startswith("text/plain"):
- yield await response.text()
- return
- elif response.headers["content-type"].startswith("text/event-stream"):
- reasoning = False
- async for result in see_stream(response.content):
- if "error" in result:
- raise ResponseError(result["error"].get("message", result["error"]))
- if result.get("usage") is not None:
- yield Usage(**result["usage"])
- choices = result.get("choices", [{}])
- choice = choices.pop() if choices else {}
- content = choice.get("delta", {}).get("content")
- if content:
- yield content
- tool_calls = choice.get("delta", {}).get("tool_calls")
- if tool_calls:
- yield ToolCalls(choice["delta"]["tool_calls"])
- reasoning_content = choice.get("delta", {}).get("reasoning_content")
- if reasoning_content:
- reasoning = True
- yield Reasoning(reasoning_content)
- finish_reason = choice.get("finish_reason")
- if finish_reason:
- yield FinishReason(finish_reason)
- if reasoning:
- yield Reasoning(status="Done")
- if "action" in kwargs and "tools" not in data and "response_format" not in data:
- data = {
- "model": model,
- "messages": messages + FOLLOWUPS_DEVELOPER_MESSAGE,
- "tool_choice": "required",
- "tools": FOLLOWUPS_TOOLS
- }
- async with session.post(url, json=data, headers={"referer": referrer}) as response:
- try:
- await raise_for_status(response)
- tool_calls = (await response.json()).get("choices", [{}])[0].get("message", {}).get("tool_calls", [])
- if tool_calls:
- arguments = json.loads(tool_calls.pop().get("function", {}).get("arguments"))
- if arguments.get("title"):
- yield TitleGeneration(arguments.get("title"))
- if arguments.get("followups"):
- yield SuggestedFollowups(arguments.get("followups"))
- except Exception as e:
- debug.error("Error generating title and followups")
- debug.error(e)
- elif response.headers["content-type"].startswith("application/json"):
- result = await response.json()
- if "choices" in result:
- choice = result["choices"][0]
- message = choice.get("message", {})
- content = message.get("content", "")
- if content:
- yield content
- if "tool_calls" in message:
- yield ToolCalls(message["tool_calls"])
- else:
- raise ResponseError(result)
- if result.get("usage") is not None:
- yield Usage(**result["usage"])
- finish_reason = choice.get("finish_reason")
- if finish_reason:
- yield FinishReason(finish_reason)
- else:
- async for chunk in save_response_media(response, format_image_prompt(messages), [model, extra_body.get("audio", {}).get("voice")]):
- yield chunk
- return
|