track.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. import os
  2. import re
  3. import time
  4. import uuid
  5. from typing import Any, Tuple, List
  6. from librespot.audio.decoders import AudioQuality
  7. from librespot.metadata import TrackId
  8. from ffmpy import FFmpeg
  9. from const import TRACKS, ALBUM, GENRES, NAME, ITEMS, DISC_NUMBER, TRACK_NUMBER, IS_PLAYABLE, ARTISTS, IMAGES, URL, \
  10. RELEASE_DATE, ID, TRACKS_URL, SAVED_TRACKS_URL, TRACK_STATS_URL, CODEC_MAP, EXT_MAP, DURATION_MS, HREF
  11. from termoutput import Printer, PrintChannel
  12. from utils import fix_filename, set_audio_tags, set_music_thumbnail, create_download_directory, \
  13. get_directory_song_ids, add_to_directory_song_ids, get_previously_downloaded, add_to_archive, fmt_seconds
  14. from zspotify import ZSpotify
  15. import traceback
  16. from loader import Loader
  17. def get_saved_tracks() -> list:
  18. """ Returns user's saved tracks """
  19. songs = []
  20. offset = 0
  21. limit = 50
  22. while True:
  23. resp = ZSpotify.invoke_url_with_params(
  24. SAVED_TRACKS_URL, limit=limit, offset=offset)
  25. offset += limit
  26. songs.extend(resp[ITEMS])
  27. if len(resp[ITEMS]) < limit:
  28. break
  29. return songs
  30. def get_song_info(song_id) -> Tuple[List[str], List[Any], str, str, Any, Any, Any, Any, Any, Any, int]:
  31. """ Retrieves metadata for downloaded songs """
  32. with Loader(PrintChannel.PROGRESS_INFO, "Fetching track information..."):
  33. (raw, info) = ZSpotify.invoke_url(f'{TRACKS_URL}?ids={song_id}&market=from_token')
  34. if not TRACKS in info:
  35. raise ValueError(f'Invalid response from TRACKS_URL:\n{raw}')
  36. try:
  37. artists = []
  38. for data in info[TRACKS][0][ARTISTS]:
  39. artists.append(data[NAME])
  40. album_name = info[TRACKS][0][ALBUM][NAME]
  41. name = info[TRACKS][0][NAME]
  42. image_url = info[TRACKS][0][ALBUM][IMAGES][0][URL]
  43. release_year = info[TRACKS][0][ALBUM][RELEASE_DATE].split('-')[0]
  44. disc_number = info[TRACKS][0][DISC_NUMBER]
  45. track_number = info[TRACKS][0][TRACK_NUMBER]
  46. scraped_song_id = info[TRACKS][0][ID]
  47. is_playable = info[TRACKS][0][IS_PLAYABLE]
  48. duration_ms = info[TRACKS][0][DURATION_MS]
  49. return artists, info[TRACKS][0][ARTISTS], album_name, name, image_url, release_year, disc_number, track_number, scraped_song_id, is_playable, duration_ms
  50. except Exception as e:
  51. raise ValueError(f'Failed to parse TRACKS_URL response: {str(e)}\n{raw}')
  52. def get_song_genres(rawartists: List[str], track_name: str) -> List[str]:
  53. try:
  54. genres = []
  55. for data in rawartists:
  56. # query artist genres via href, which will be the api url
  57. with Loader(PrintChannel.PROGRESS_INFO, "Fetching artist information..."):
  58. (raw, artistInfo) = ZSpotify.invoke_url(f'{data[HREF]}')
  59. if ZSpotify.CONFIG.get_all_genres() and len(artistInfo[GENRES]) > 0:
  60. for genre in artistInfo[GENRES]:
  61. genres.append(genre)
  62. elif len(artistInfo[GENRES]) > 0:
  63. genres.append(artistInfo[GENRES][0])
  64. if len(genres) == 0:
  65. Printer.print(PrintChannel.WARNINGS, '### No Genres found for song ' + track_name)
  66. genres.append('')
  67. return genres
  68. except Exception as e:
  69. raise ValueError(f'Failed to parse GENRES response: {str(e)}\n{raw}')
  70. def get_song_duration(song_id: str) -> float:
  71. """ Retrieves duration of song in second as is on spotify """
  72. (raw, resp) = ZSpotify.invoke_url(f'{TRACK_STATS_URL}{song_id}')
  73. # get duration in miliseconds
  74. ms_duration = resp['duration_ms']
  75. # convert to seconds
  76. duration = float(ms_duration)/1000
  77. # debug
  78. # print(duration)
  79. # print(type(duration))
  80. return duration
  81. # noinspection PyBroadException
  82. def download_track(mode: str, track_id: str, extra_keys=None, disable_progressbar=False) -> str:
  83. """ Downloads raw song audio from Spotify """
  84. if extra_keys is None:
  85. extra_keys = {}
  86. prepare_download_loader = Loader(PrintChannel.PROGRESS_INFO, "Preparing download...")
  87. prepare_download_loader.start()
  88. filename = None
  89. try:
  90. output_template = ZSpotify.CONFIG.get_output(mode)
  91. (artists, raw_artists, album_name, name, image_url, release_year, disc_number,
  92. track_number, scraped_song_id, is_playable, duration_ms) = get_song_info(track_id)
  93. song_name = fix_filename(artists[0]) + ' - ' + fix_filename(name)
  94. for k in extra_keys:
  95. output_template = output_template.replace("{"+k+"}", fix_filename(extra_keys[k]))
  96. ext = EXT_MAP.get(ZSpotify.CONFIG.get_download_format().lower())
  97. output_template = output_template.replace("{artist}", fix_filename(artists[0]))
  98. output_template = output_template.replace("{album}", fix_filename(album_name))
  99. output_template = output_template.replace("{song_name}", fix_filename(name))
  100. output_template = output_template.replace("{release_year}", fix_filename(release_year))
  101. output_template = output_template.replace("{disc_number}", fix_filename(disc_number))
  102. output_template = output_template.replace("{track_number}", fix_filename(track_number))
  103. output_template = output_template.replace("{id}", fix_filename(scraped_song_id))
  104. output_template = output_template.replace("{track_id}", fix_filename(track_id))
  105. output_template = output_template.replace("{ext}", ext)
  106. filename = os.path.join(ZSpotify.CONFIG.get_root_path(), output_template)
  107. filedir = os.path.dirname(filename)
  108. filename_temp = filename
  109. if ZSpotify.CONFIG.get_temp_download_dir() != '':
  110. filename_temp = os.path.join(ZSpotify.CONFIG.get_temp_download_dir(), f'zspotify_{str(uuid.uuid4())}_{track_id}.{ext}')
  111. check_name = os.path.isfile(filename) and os.path.getsize(filename)
  112. check_id = scraped_song_id in get_directory_song_ids(filedir)
  113. check_all_time = scraped_song_id in get_previously_downloaded()
  114. # a file with the same name exists in the directory, but song not in the downloaded songs list
  115. if not check_id and check_name and not ZSpotify.CONFIG.get_skip_existing_any_origin():
  116. c = len([file for file in os.listdir(filedir) if re.search(f'^{filename}_', str(file))]) + 1
  117. fname = os.path.splitext(os.path.basename(filename))[0]
  118. ext = os.path.splitext(os.path.basename(filename))[1]
  119. filename = os.path.join(filedir, f'{fname}_{c}{ext}')
  120. except Exception as e:
  121. Printer.print(PrintChannel.ERRORS, '### SKIPPING SONG - FAILED TO QUERY METADATA ###')
  122. Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
  123. for k in extra_keys:
  124. Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
  125. Printer.print(PrintChannel.ERRORS, "\n")
  126. Printer.print(PrintChannel.ERRORS, str(e) + "\n")
  127. Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
  128. else:
  129. try:
  130. if not is_playable:
  131. prepare_download_loader.stop()
  132. Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG IS UNAVAILABLE) ###' + "\n")
  133. else:
  134. if check_id and check_name and ZSpotify.CONFIG.get_skip_existing_files():
  135. prepare_download_loader.stop()
  136. Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS) ###' + "\n")
  137. elif check_name and ZSpotify.CONFIG.get_skip_existing_any_origin():
  138. prepare_download_loader.stop()
  139. Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY EXISTS BUT WAS NOT DOWNLOADED WITH CLSPOTIFY) ###' + "\n")
  140. elif check_all_time and ZSpotify.CONFIG.get_skip_previously_downloaded():
  141. prepare_download_loader.stop()
  142. Printer.print(PrintChannel.SKIPS, '\n### SKIPPING: ' + song_name + ' (SONG ALREADY DOWNLOADED ONCE) ###' + "\n")
  143. else:
  144. if track_id != scraped_song_id:
  145. track_id = scraped_song_id
  146. track_id = TrackId.from_base62(track_id)
  147. stream = ZSpotify.get_content_stream(track_id, ZSpotify.DOWNLOAD_QUALITY)
  148. create_download_directory(filedir)
  149. total_size = stream.input_stream.size
  150. prepare_download_loader.stop()
  151. time_start = time.time()
  152. downloaded = 0
  153. with open(filename_temp, 'wb') as file, Printer.progress(
  154. desc=song_name,
  155. total=total_size,
  156. unit='B',
  157. unit_scale=True,
  158. unit_divisor=1024,
  159. disable=disable_progressbar
  160. ) as p_bar:
  161. while total_size > downloaded:
  162. data = stream.input_stream.stream().read(ZSpotify.CONFIG.get_chunk_size())
  163. p_bar.update(file.write(data))
  164. downloaded += len(data)
  165. if len(data) == 0:
  166. break
  167. if ZSpotify.CONFIG.get_download_real_time():
  168. delta_real = time.time() - time_start
  169. delta_want = (downloaded / total_size) * (duration_ms/1000)
  170. if delta_want > delta_real:
  171. time.sleep(delta_want - delta_real)
  172. time_downloaded = time.time()
  173. genres = get_song_genres(raw_artists, name)
  174. convert_audio_format(filename_temp)
  175. set_audio_tags(filename_temp, artists, genres, name, album_name, release_year, disc_number, track_number)
  176. set_music_thumbnail(filename_temp, image_url)
  177. if filename_temp != filename:
  178. os.rename(filename_temp, filename)
  179. time_finished = time.time()
  180. Printer.print(PrintChannel.DOWNLOADS, f'### Downloaded "{song_name}" to "{os.path.relpath(filename, ZSpotify.CONFIG.get_root_path())}" in {fmt_seconds(time_downloaded - time_start)} (plus {fmt_seconds(time_finished - time_downloaded)} converting) ###' + "\n")
  181. # add song id to archive file
  182. if ZSpotify.CONFIG.get_skip_previously_downloaded():
  183. add_to_archive(scraped_song_id, os.path.basename(filename), artists[0], name)
  184. # add song id to download directory's .song_ids file
  185. if not check_id:
  186. add_to_directory_song_ids(filedir, scraped_song_id, os.path.basename(filename), artists[0], name)
  187. if not ZSpotify.CONFIG.get_anti_ban_wait_time():
  188. time.sleep(ZSpotify.CONFIG.get_anti_ban_wait_time())
  189. except Exception as e:
  190. Printer.print(PrintChannel.ERRORS, '### SKIPPING: ' + song_name + ' (GENERAL DOWNLOAD ERROR) ###')
  191. Printer.print(PrintChannel.ERRORS, 'Track_ID: ' + str(track_id))
  192. for k in extra_keys:
  193. Printer.print(PrintChannel.ERRORS, k + ': ' + str(extra_keys[k]))
  194. Printer.print(PrintChannel.ERRORS, "\n")
  195. Printer.print(PrintChannel.ERRORS, str(e) + "\n")
  196. Printer.print(PrintChannel.ERRORS, "".join(traceback.TracebackException.from_exception(e).format()) + "\n")
  197. if os.path.exists(filename_temp):
  198. os.remove(filename_temp)
  199. prepare_download_loader.stop()
  200. return filename
  201. def convert_audio_format(filename) -> None:
  202. """ Converts raw audio into playable file """
  203. temp_filename = f'{os.path.splitext(filename)[0]}.tmp'
  204. os.replace(filename, temp_filename)
  205. download_format = ZSpotify.CONFIG.get_download_format().lower()
  206. file_codec = CODEC_MAP.get(download_format, 'copy')
  207. if file_codec != 'copy':
  208. bitrate = ZSpotify.CONFIG.get_bitrate()
  209. if not bitrate:
  210. if ZSpotify.DOWNLOAD_QUALITY == AudioQuality.VERY_HIGH:
  211. bitrate = '320k'
  212. else:
  213. bitrate = '160k'
  214. else:
  215. bitrate = None
  216. output_params = ['-c:a', file_codec]
  217. if bitrate:
  218. output_params += ['-b:a', bitrate]
  219. ff_m = FFmpeg(
  220. global_options=['-y', '-hide_banner', '-loglevel error'],
  221. inputs={temp_filename: None},
  222. outputs={filename: output_params}
  223. )
  224. with Loader(PrintChannel.PROGRESS_INFO, "Converting file..."):
  225. ff_m.run()
  226. if os.path.exists(temp_filename):
  227. os.remove(temp_filename)