123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- from __future__ import annotations
- from aiohttp import ClientSession, ClientTimeout, ClientError
- import json
- import hashlib
- from pathlib import Path
- from urllib.parse import urlparse, quote_plus
- from datetime import datetime
- import datetime
- import asyncio
- try:
- from duckduckgo_search import DDGS
- from duckduckgo_search.exceptions import DuckDuckGoSearchException
- from bs4 import BeautifulSoup
- has_requirements = True
- except ImportError:
- has_requirements = False
- try:
- import spacy
- has_spacy = True
- except:
- has_spacy = False
- from typing import Iterator
- from ..cookies import get_cookies_dir
- from ..providers.response import format_link
- from ..errors import MissingRequirementsError
- from .. import debug
- DEFAULT_INSTRUCTIONS = """
- Using the provided web search results, to write a comprehensive reply to the user request.
- Make sure to add the sources of cites using [[Number]](Url) notation after the reference. Example: [[0]](http://google.com)
- """
- class SearchResults():
- def __init__(self, results: list, used_words: int):
- self.results = results
- self.used_words = used_words
- def __iter__(self):
- yield from self.results
- def __str__(self):
- search = ""
- for idx, result in enumerate(self.results):
- if search:
- search += "\n\n\n"
- search += f"Title: {result.title}\n\n"
- if result.text:
- search += result.text
- else:
- search += result.snippet
- search += f"\n\nSource: [[{idx}]]({result.url})"
- return search
- def __len__(self) -> int:
- return len(self.results)
- class SearchResultEntry():
- def __init__(self, title: str, url: str, snippet: str, text: str = None):
- self.title = title
- self.url = url
- self.snippet = snippet
- self.text = text
- def set_text(self, text: str):
- self.text = text
- def scrape_text(html: str, max_words: int = None, add_source=True, count_images: int = 2) -> Iterator[str]:
- source = BeautifulSoup(html, "html.parser")
- soup = source
- for selector in [
- "main",
- ".main-content-wrapper",
- ".main-content",
- ".emt-container-inner",
- ".content-wrapper",
- "#content",
- "#mainContent",
- ]:
- select = soup.select_one(selector)
- if select:
- soup = select
- break
- # Zdnet
- for remove in [".c-globalDisclosure"]:
- select = soup.select_one(remove)
- if select:
- select.extract()
- image_select = "img[alt][src^=http]:not([alt=''])"
- image_link_select = f"a:has({image_select})"
- yield_words = []
- for paragraph in soup.select(f"h1, h2, h3, h4, h5, h6, p, table:not(:has(p)), ul:not(:has(p)), {image_link_select}"):
- if count_images > 0:
- image = paragraph.select_one(image_select)
- if image:
- title = paragraph.get("title") or paragraph.text
- if title:
- yield f"!{format_link(image['src'], title)}\n"
- if max_words is not None:
- max_words -= 10
- count_images -= 1
- continue
- for line in paragraph.get_text(" ").splitlines():
- words = [word for word in line.split() if word]
- count = len(words)
- if not count:
- continue
- words = " ".join(words)
- if words in yield_words:
- continue
- if max_words:
- max_words -= count
- if max_words <= 0:
- break
- yield words + "\n"
- yield_words.append(words)
- if add_source:
- canonical_link = source.find("link", rel="canonical")
- if canonical_link and "href" in canonical_link.attrs:
- link = canonical_link["href"]
- domain = urlparse(link).netloc
- yield f"\nSource: [{domain}]({link})"
- async def fetch_and_scrape(session: ClientSession, url: str, max_words: int = None, add_source: bool = False) -> str:
- try:
- bucket_dir: Path = Path(get_cookies_dir()) / ".scrape_cache" / "fetch_and_scrape"
- bucket_dir.mkdir(parents=True, exist_ok=True)
- md5_hash = hashlib.md5(url.encode()).hexdigest()
- cache_file = bucket_dir / f"{quote_plus(url.split('?')[0].split('//')[1].replace('/', ' ')[:48])}.{datetime.date.today()}.{md5_hash[:16]}.cache"
- if cache_file.exists():
- return cache_file.read_text()
- async with session.get(url) as response:
- if response.status == 200:
- html = await response.text(errors="replace")
- text = "".join(scrape_text(html, max_words, add_source))
- with open(cache_file, "w") as f:
- f.write(text)
- return text
- except (ClientError, asyncio.TimeoutError):
- return
- async def search(query: str, max_results: int = 5, max_words: int = 2500, backend: str = "auto", add_text: bool = True, timeout: int = 5, region: str = "wt-wt") -> SearchResults:
- if not has_requirements:
- raise MissingRequirementsError('Install "duckduckgo-search" and "beautifulsoup4" package | pip install -U g4f[search]')
- with DDGS() as ddgs:
- results = []
- for result in ddgs.text(
- query,
- region=region,
- safesearch="moderate",
- timelimit="y",
- max_results=max_results,
- backend=backend,
- ):
- if ".google." in result["href"]:
- continue
- results.append(SearchResultEntry(
- result["title"],
- result["href"],
- result["body"]
- ))
- if add_text:
- requests = []
- async with ClientSession(timeout=ClientTimeout(timeout)) as session:
- for entry in results:
- requests.append(fetch_and_scrape(session, entry.url, int(max_words / (max_results - 1)), False))
- texts = await asyncio.gather(*requests)
- formatted_results = []
- used_words = 0
- left_words = max_words
- for i, entry in enumerate(results):
- if add_text:
- entry.text = texts[i]
- if max_words:
- left_words -= entry.title.count(" ") + 5
- if entry.text:
- left_words -= entry.text.count(" ")
- else:
- left_words -= entry.snippet.count(" ")
- if 0 > left_words:
- break
- used_words = max_words - left_words
- formatted_results.append(entry)
- return SearchResults(formatted_results, used_words)
- async def do_search(prompt: str, query: str = None, instructions: str = DEFAULT_INSTRUCTIONS, **kwargs) -> str:
- if query is None:
- query = spacy_get_keywords(prompt)
- json_bytes = json.dumps({"query": query, **kwargs}, sort_keys=True).encode()
- md5_hash = hashlib.md5(json_bytes).hexdigest()
- bucket_dir: Path = Path(get_cookies_dir()) / ".scrape_cache" / f"web_search" / f"{datetime.date.today()}"
- bucket_dir.mkdir(parents=True, exist_ok=True)
- cache_file = bucket_dir / f"{quote_plus(query[:20])}.{md5_hash}.cache"
- if cache_file.exists():
- with cache_file.open("r") as f:
- search_results = f.read()
- else:
- search_results = await search(query, **kwargs)
- if search_results.results:
- with cache_file.open("w") as f:
- f.write(str(search_results))
- if instructions:
- new_prompt = f"""
- {search_results}
- Instruction: {instructions}
- User request:
- {prompt}
- """
- else:
- new_prompt = f"""
- {search_results}
- {prompt}
- """
- debug.log(f"Web search: '{query.strip()[:50]}...'")
- if isinstance(search_results, SearchResults):
- debug.log(f"with {len(search_results.results)} Results {search_results.used_words} Words")
- return new_prompt
- def get_search_message(prompt: str, raise_search_exceptions=False, **kwargs) -> str:
- try:
- return asyncio.run(do_search(prompt, **kwargs))
- except (DuckDuckGoSearchException, MissingRequirementsError) as e:
- if raise_search_exceptions:
- raise e
- debug.log(f"Couldn't do web search: {e.__class__.__name__}: {e}")
- return prompt
- def spacy_get_keywords(text: str):
- if not has_spacy:
- return text
- # Load the spaCy language model
- nlp = spacy.load("en_core_web_sm")
- # Process the query
- doc = nlp(text)
- # Extract keywords based on POS and named entities
- keywords = []
- for token in doc:
- # Filter for nouns, proper nouns, and adjectives
- if token.pos_ in {"NOUN", "PROPN", "ADJ"} and not token.is_stop:
- keywords.append(token.lemma_)
- # Add named entities as keywords
- for ent in doc.ents:
- keywords.append(ent.text)
- # Remove duplicates and print keywords
- keywords = list(set(keywords))
- #print("Keyword:", keywords)
- #keyword_freq = Counter(keywords)
- #keywords = keyword_freq.most_common()
- #print("Keyword Frequencies:", keywords)
- keywords = [chunk.text for chunk in doc.noun_chunks if not chunk.root.is_stop]
- #print("Phrases:", keywords)
- return keywords
|