helper.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from __future__ import annotations
  2. import re
  3. import logging
  4. from typing import AsyncIterator, Iterator, AsyncGenerator, Optional
  5. def filter_markdown(text: str, allowd_types=None, default=None) -> str:
  6. """
  7. Parses code block from a string.
  8. Args:
  9. text (str): A string containing a code block.
  10. Returns:
  11. dict: A dictionary parsed from the code block.
  12. """
  13. match = re.search(r"```(.+)\n(?P<code>[\S\s]+?)(\n```|$)", text)
  14. if match:
  15. if allowd_types is None or match.group(1) in allowd_types:
  16. return match.group("code")
  17. return default
  18. def filter_json(text: str) -> str:
  19. """
  20. Parses JSON code block from a string.
  21. Args:
  22. text (str): A string containing a JSON code block.
  23. Returns:
  24. dict: A dictionary parsed from the JSON code block.
  25. """
  26. return filter_markdown(text, ["", "json"], text)
  27. def find_stop(stop: Optional[list[str]], content: str, chunk: str = None):
  28. first = -1
  29. word = None
  30. if stop is not None:
  31. for word in list(stop):
  32. first = content.find(word)
  33. if first != -1:
  34. content = content[:first]
  35. break
  36. if chunk is not None and first != -1:
  37. first = chunk.find(word)
  38. if first != -1:
  39. chunk = chunk[:first]
  40. else:
  41. first = 0
  42. return first, content, chunk
  43. def filter_none(**kwargs) -> dict:
  44. return {
  45. key: value
  46. for key, value in kwargs.items()
  47. if value is not None
  48. }
  49. async def safe_aclose(generator: AsyncGenerator) -> None:
  50. try:
  51. if generator and hasattr(generator, 'aclose'):
  52. await generator.aclose()
  53. except Exception as e:
  54. logging.warning(f"Error while closing generator: {e}")