os.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import asyncio
  2. import aiofiles
  3. import os
  4. from os.path import join
  5. def listdir(path):
  6. return os.listdir(path)
  7. def path_join(*args):
  8. return join(*args)
  9. def path_exists(path):
  10. return os.path.exists(path)
  11. def remove(path):
  12. return os.remove(path)
  13. async def system(command: str):
  14. """ async os.system(command) """
  15. proc = await asyncio.create_subprocess_shell(command)
  16. await proc.communicate()
  17. def mkdirs(dirname: str):
  18. try:
  19. os.makedirs(dirname)
  20. except Exception as e:
  21. print(e)
  22. pass
  23. return dirname
  24. async def file_read(filename: str, mode="r"):
  25. async with aiofiles.open(filename, mode) as f:
  26. return await f.read()
  27. async def copy_file(src, dist):
  28. """ async os.copy(src, dist) """
  29. content = await file_read(src, "rb")
  30. async with aiofiles.open(dist, "wb") as f:
  31. await f.write(content)
  32. async def rename(src, dist):
  33. try:
  34. await copy_file(src, dist)
  35. remove(src)
  36. except:
  37. print(src, dist)
  38. async def copy_folder(src, dist, blacklist):
  39. tasks = []
  40. for filename in os.listdir(src):
  41. if filename in blacklist:
  42. continue
  43. if filename.endswith(".json"):
  44. continue
  45. task = asyncio.create_task(
  46. copy_file(f"{src}/{filename}", f"{dist}/{filename}")
  47. )
  48. tasks.append(task)
  49. await asyncio.gather(*tasks)