app.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. from librespot.audio.decoders import AudioQuality
  2. from tabulate import tabulate
  3. import os
  4. from album import download_album, download_artist_albums
  5. from const import TRACK, NAME, ID, ARTIST, ARTISTS, ITEMS, TRACKS, EXPLICIT, ALBUM, ALBUMS, \
  6. OWNER, PLAYLIST, PLAYLISTS, DISPLAY_NAME
  7. from playlist import get_playlist_songs, get_playlist_info, download_from_user_playlist, download_playlist
  8. from podcast import download_episode, get_show_episodes
  9. from termoutput import Printer, PrintChannel
  10. from track import download_track, get_saved_tracks
  11. from utils import splash, split_input, regex_input_for_urls
  12. from zspotify import ZSpotify
  13. SEARCH_URL = 'https://api.spotify.com/v1/search'
  14. def client(args) -> None:
  15. """ Connects to spotify to perform query's and get songs to download """
  16. ZSpotify(args)
  17. Printer.print(PrintChannel.SPLASH, splash())
  18. if ZSpotify.check_premium():
  19. Printer.print(PrintChannel.SPLASH, '[ DETECTED PREMIUM ACCOUNT - USING VERY_HIGH QUALITY ]\n\n')
  20. ZSpotify.DOWNLOAD_QUALITY = AudioQuality.VERY_HIGH
  21. else:
  22. Printer.print(PrintChannel.SPLASH, '[ DETECTED FREE ACCOUNT - USING HIGH QUALITY ]\n\n')
  23. ZSpotify.DOWNLOAD_QUALITY = AudioQuality.HIGH
  24. if args.download:
  25. urls = []
  26. filename = args.download
  27. if os.path.exists(filename):
  28. with open(filename, 'r', encoding='utf-8') as file:
  29. urls.extend([line.strip() for line in file.readlines()])
  30. download_from_urls(urls)
  31. else:
  32. Printer.print(PrintChannel.ERRORS, f'File {filename} not found.\n')
  33. if args.urls:
  34. download_from_urls(args.urls)
  35. if args.playlist:
  36. download_from_user_playlist()
  37. if args.liked_songs:
  38. liked_songs_list = []
  39. resp_json = ZSpotify.invoke_url('https://api.spotify.com/v1/me')[1]
  40. name_id = '_'.join([resp_json[DISPLAY_NAME], resp_json[ID]])
  41. for song in get_saved_tracks():
  42. if not song[TRACK][NAME] or not song[TRACK][ID]:
  43. Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
  44. else:
  45. filename = download_track('liked', song[TRACK][ID])
  46. # Use relative path for m3u file
  47. liked_songs_list.append('./' + filename[len(ZSpotify.CONFIG.get_root_path()):].lstrip('../'))
  48. try:
  49. with open(f'{ZSpotify.CONFIG.get_root_path()}/{name_id}_liked_songs.m3u', 'w', encoding='utf-8') as file:
  50. file.write('\n'.join(liked_songs_list))
  51. except OSError:
  52. Printer.print(PrintChannel.ERRORS, '### ERROR: COULD NOT WRITE LIKED SONGS M3U FILE ###' + "\n")
  53. if args.search_spotify:
  54. search_text = ''
  55. while len(search_text) == 0:
  56. search_text = input('Enter search or URL: ')
  57. if not download_from_urls([search_text]):
  58. search(search_text)
  59. def download_from_urls(urls: list[str]) -> bool:
  60. """ Downloads from a list of spotify urls """
  61. download = False
  62. for spotify_url in urls:
  63. track_id, album_id, playlist_id, episode_id, show_id, artist_id = regex_input_for_urls(
  64. spotify_url)
  65. if track_id is not None:
  66. download = True
  67. download_track('single', track_id)
  68. elif artist_id is not None:
  69. download = True
  70. download_artist_albums(artist_id)
  71. elif album_id is not None:
  72. download = True
  73. download_album(album_id)
  74. elif playlist_id is not None:
  75. download = True
  76. playlist_songs = get_playlist_songs(playlist_id)
  77. name, owner = get_playlist_info(playlist_id)
  78. enum = 1
  79. char_num = len(str(len(playlist_songs)))
  80. playlist_songlist = []
  81. for song in playlist_songs:
  82. if not song[TRACK][NAME] or not song[TRACK][ID]:
  83. Printer.print(PrintChannel.SKIPS, '### SKIPPING: SONG DOES NOT EXIST ON SPOTIFY ANYMORE ###' + "\n")
  84. else:
  85. filename = download_track('playlist', song[TRACK][ID], extra_keys=
  86. {
  87. 'playlist_song_name': song[TRACK][NAME],
  88. 'playlist': name,
  89. 'playlist_num': str(enum).zfill(char_num),
  90. 'playlist_id': playlist_id,
  91. 'playlist_track_id': song[TRACK][ID]
  92. })
  93. playlist_songlist.append('./' + filename[len(ZSpotify.CONFIG.get_root_path()):].lstrip('../'))
  94. enum += 1
  95. try:
  96. with open(f'{ZSpotify.CONFIG.get_root_path()}/{owner}_{name}.m3u', 'w', encoding='utf-8') as file:
  97. file.write('\n'.join(playlist_songlist))
  98. except OSError:
  99. Printer.print(PrintChannel.ERRORS, '### ERROR: COULD NOT WRITE M3U FILE ###' + "\n")
  100. elif episode_id is not None:
  101. download = True
  102. download_episode(episode_id)
  103. elif show_id is not None:
  104. download = True
  105. for episode in get_show_episodes(show_id):
  106. download_episode(episode)
  107. return download
  108. def search(search_term):
  109. """ Searches Spotify's API for relevant data """
  110. params = {'limit': '10',
  111. 'offset': '0',
  112. 'q': search_term,
  113. 'type': 'track,album,artist,playlist'}
  114. # Parse args
  115. splits = search_term.split()
  116. for split in splits:
  117. index = splits.index(split)
  118. if split[0] == '-' and len(split) > 1:
  119. if len(splits)-1 == index:
  120. raise IndexError('No parameters passed after option: {}\n'.
  121. format(split))
  122. if split == '-l' or split == '-limit':
  123. try:
  124. int(splits[index+1])
  125. except ValueError:
  126. raise ValueError('Paramater passed after {} option must be an integer.\n'.
  127. format(split))
  128. if int(splits[index+1]) > 50:
  129. raise ValueError('Invalid limit passed. Max is 50.\n')
  130. params['limit'] = splits[index+1]
  131. if split == '-t' or split == '-type':
  132. allowed_types = ['track', 'playlist', 'album', 'artist']
  133. passed_types = []
  134. for i in range(index+1, len(splits)):
  135. if splits[i][0] == '-':
  136. break
  137. if splits[i] not in allowed_types:
  138. raise ValueError('Parameters passed after {} option must be from this list:\n{}'.
  139. format(split, '\n'.join(allowed_types)))
  140. passed_types.append(splits[i])
  141. params['type'] = ','.join(passed_types)
  142. if len(params['type']) == 0:
  143. params['type'] = 'track,album,artist,playlist'
  144. # Clean search term
  145. search_term_list = []
  146. for split in splits:
  147. if split[0] == "-":
  148. break
  149. search_term_list.append(split)
  150. if not search_term_list:
  151. raise ValueError("Invalid query.")
  152. params["q"] = ' '.join(search_term_list)
  153. resp = ZSpotify.invoke_url_with_params(SEARCH_URL, **params)
  154. counter = 1
  155. dics = []
  156. total_tracks = 0
  157. if TRACK in params['type'].split(','):
  158. tracks = resp[TRACKS][ITEMS]
  159. if len(tracks) > 0:
  160. print('### TRACKS ###')
  161. track_data = []
  162. for track in tracks:
  163. if track[EXPLICIT]:
  164. explicit = '[E]'
  165. else:
  166. explicit = ''
  167. track_data.append([counter, f'{track[NAME]} {explicit}',
  168. ','.join([artist[NAME] for artist in track[ARTISTS]])])
  169. dics.append({
  170. ID: track[ID],
  171. NAME: track[NAME],
  172. 'type': TRACK,
  173. })
  174. counter += 1
  175. total_tracks = counter - 1
  176. print(tabulate(track_data, headers=[
  177. 'S.NO', 'Name', 'Artists'], tablefmt='pretty'))
  178. print('\n')
  179. del tracks
  180. del track_data
  181. total_albums = 0
  182. if ALBUM in params['type'].split(','):
  183. albums = resp[ALBUMS][ITEMS]
  184. if len(albums) > 0:
  185. print('### ALBUMS ###')
  186. album_data = []
  187. for album in albums:
  188. album_data.append([counter, album[NAME],
  189. ','.join([artist[NAME] for artist in album[ARTISTS]])])
  190. dics.append({
  191. ID: album[ID],
  192. NAME: album[NAME],
  193. 'type': ALBUM,
  194. })
  195. counter += 1
  196. total_albums = counter - total_tracks - 1
  197. print(tabulate(album_data, headers=[
  198. 'S.NO', 'Album', 'Artists'], tablefmt='pretty'))
  199. print('\n')
  200. del albums
  201. del album_data
  202. total_artists = 0
  203. if ARTIST in params['type'].split(','):
  204. artists = resp[ARTISTS][ITEMS]
  205. if len(artists) > 0:
  206. print('### ARTISTS ###')
  207. artist_data = []
  208. for artist in artists:
  209. artist_data.append([counter, artist[NAME]])
  210. dics.append({
  211. ID: artist[ID],
  212. NAME: artist[NAME],
  213. 'type': ARTIST,
  214. })
  215. counter += 1
  216. total_artists = counter - total_tracks - total_albums - 1
  217. print(tabulate(artist_data, headers=[
  218. 'S.NO', 'Name'], tablefmt='pretty'))
  219. print('\n')
  220. del artists
  221. del artist_data
  222. total_playlists = 0
  223. if PLAYLIST in params['type'].split(','):
  224. playlists = resp[PLAYLISTS][ITEMS]
  225. if len(playlists) > 0:
  226. print('### PLAYLISTS ###')
  227. playlist_data = []
  228. for playlist in playlists:
  229. playlist_data.append(
  230. [counter, playlist[NAME], playlist[OWNER][DISPLAY_NAME]])
  231. dics.append({
  232. ID: playlist[ID],
  233. NAME: playlist[NAME],
  234. 'type': PLAYLIST,
  235. })
  236. counter += 1
  237. total_playlists = counter - total_artists - total_tracks - total_albums - 1
  238. print(tabulate(playlist_data, headers=[
  239. 'S.NO', 'Name', 'Owner'], tablefmt='pretty'))
  240. print('\n')
  241. del playlists
  242. del playlist_data
  243. if total_tracks + total_albums + total_artists + total_playlists == 0:
  244. print('NO RESULTS FOUND - EXITING...')
  245. else:
  246. selection = ''
  247. print('> SELECT A DOWNLOAD OPTION BY ID')
  248. print('> SELECT A RANGE BY ADDING A DASH BETWEEN BOTH ID\'s')
  249. print('> OR PARTICULAR OPTIONS BY ADDING A COMMA BETWEEN ID\'s\n')
  250. while len(selection) == 0:
  251. selection = str(input('ID(s): '))
  252. inputs = split_input(selection)
  253. for pos in inputs:
  254. position = int(pos)
  255. for dic in dics:
  256. print_pos = dics.index(dic) + 1
  257. if print_pos == position:
  258. if dic['type'] == TRACK:
  259. download_track('single', dic[ID])
  260. elif dic['type'] == ALBUM:
  261. download_album(dic[ID])
  262. elif dic['type'] == ARTIST:
  263. download_artist_albums(dic[ID])
  264. else:
  265. download_playlist(dic)