downloader.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import hashlib
  2. import config
  3. from .utils import system
  4. from .type import Album, Track
  5. from genius.type import Album as gAlbum
  6. import asyncio
  7. from pathlib import Path
  8. import genius
  9. from .mp3 import set_track
  10. from .slice import slice_audio
  11. class Downloader:
  12. @staticmethod
  13. async def download_audio(url: str):
  14. if "youtu.be" in url:
  15. filename = url.split("/")[-1] + ".mp3"
  16. else:
  17. filename = url.split("?")[-1] + ".mp3"
  18. path = config.tmp_folder / filename
  19. if path.exists():
  20. return path
  21. command = f'youtube-dl -o "tmp/%(id)s.%(ext)s" -x --audio-format mp3 {url}'
  22. await system(command)
  23. return path
  24. @staticmethod
  25. async def download_album(album: Album):
  26. tasks = []
  27. if "watch" in album.audio_url or "youtu.be" in album.audio_url:
  28. _, genius_album = await asyncio.gather(
  29. asyncio.Task(Downloader.download_audio(album.audio_url)),
  30. genius.get_album(album.meta_url)
  31. )
  32. for track in album.tracks:
  33. tasks.append(
  34. asyncio.Task(Downloader.download_track(track, genius_album))
  35. )
  36. return genius_album, await asyncio.gather(*tasks)
  37. @staticmethod
  38. async def download_track(track: Track, album: gAlbum = None):
  39. hash = hashlib.md5(track.meta_url.split("/")[-1].encode()).hexdigest()
  40. async def get_audio():
  41. src = await Downloader.download_audio(track.audio_url)
  42. return await slice_audio(src, config.tmp_folder / f"{hash}.mp3", track.timecode)
  43. dst, meta = await asyncio.gather(
  44. asyncio.Task(get_audio()),
  45. asyncio.Task(genius.get_track(track.meta_url))
  46. )
  47. meta.album = album
  48. meta.number = album.track_urls[meta.url]
  49. await set_track(dst, meta)
  50. return dst, meta
  51. @staticmethod
  52. async def download(object):
  53. res: dict[Path, Track] = {}
  54. album = None
  55. if object.type == "album":
  56. album, tracks = await Downloader.download_album(object)
  57. for key, value in tracks:
  58. res[key] = value
  59. else:
  60. key, value = await Downloader.download_track(object)
  61. res[key] = value
  62. return album, res