helper.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from __future__ import annotations
  2. import re
  3. import logging
  4. from typing import AsyncIterator, Iterator, AsyncGenerator, Optional
  5. def filter_markdown(text: str, allowed_types=None, default=None) -> str:
  6. """
  7. Parses code block from a string.
  8. Args:
  9. text (str): A string containing a code block.
  10. Returns:
  11. dict: A dictionary parsed from the code block.
  12. """
  13. match = re.search(r"```(.+)\n(?P<code>[\S\s]+?)(\n```|$)", text)
  14. if match:
  15. if allowed_types is None or match.group(1) in allowed_types:
  16. return match.group("code")
  17. return default
  18. def filter_json(text: str) -> str:
  19. """
  20. Parses JSON code block from a string.
  21. Args:
  22. text (str): A string containing a JSON code block.
  23. Returns:
  24. dict: A dictionary parsed from the JSON code block.
  25. """
  26. return filter_markdown(text, ["", "json"], text.strip("^\n "))
  27. def find_stop(stop: Optional[list[str]], content: str, chunk: str = None):
  28. first = -1
  29. word = None
  30. if stop is not None:
  31. content = str(content)
  32. for word in list(stop):
  33. first = content.find(word)
  34. if first != -1:
  35. content = content[:first]
  36. break
  37. if chunk is not None and first != -1:
  38. first = chunk.find(word)
  39. if first != -1:
  40. chunk = chunk[:first]
  41. else:
  42. first = 0
  43. return first, content, chunk
  44. def filter_none(**kwargs) -> dict:
  45. return {
  46. key: value
  47. for key, value in kwargs.items()
  48. if value is not None
  49. }
  50. async def safe_aclose(generator: AsyncGenerator) -> None:
  51. try:
  52. if generator and hasattr(generator, 'aclose'):
  53. await generator.aclose()
  54. except Exception as e:
  55. logging.warning(f"Error while closing generator: {e}")