yt2tg-mp3.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. #!/home/sasha/Coding/youtube2tg-mp3/env/bin/python
  2. import logging
  3. import readline
  4. import time
  5. from typing import List
  6. import validators
  7. import os
  8. from sys import exit
  9. from feed.feed import CheckFeed
  10. from cli.args import ArgParser
  11. from cli.cli import Bcolors
  12. from youtube.download import get_info_yt_video, get_mp3_from_yt, Video
  13. from youtube.mp3tag import is_there_cover, modified_id3
  14. from telegram.agent import send_audio
  15. from config import AGENT_ID, CUSTOM_DESCRIPTION, YT_CHANNEL_ID, VIDEO_ITEMS_QUANTITY
  16. #TODO write readme
  17. arg_parser = ArgParser()
  18. logging.basicConfig(
  19. format='%(asctime)s %(message)s',
  20. datefmt='%Y-%m-%d %H:%M:%S',
  21. level=logging.INFO)
  22. yt_feed = None
  23. def print_video_list(shift: int = 0, reread: bool = False):
  24. #TODO make class. incapsulation
  25. global yt_feed
  26. yt_feed = CheckFeed(YT_CHANNEL_ID, VIDEO_ITEMS_QUANTITY, shift=shift)
  27. if reread: yt_feed.read()
  28. videos = yt_feed.videos
  29. os.system('cls' if os.name == 'nt' else 'clear')
  30. if videos:
  31. print(f'The list of titles of new items in youtube channel "{yt_feed.title}":')
  32. for i, v in enumerate(videos):
  33. is_live = "[live]" if v.is_live else ""
  34. print(f"=== {i + 1}.{is_live} {v.title} ({v.id})")
  35. else:
  36. print(f"Something went wrong. The channel {yt_feed.title} has no videos")
  37. def get_menu_choose_video():
  38. while True:
  39. select_text = f"\n{Bcolors.BOLD}Insert num of video to convert or type \
  40. command exit/link/list:{Bcolors.ENDC} "
  41. select = input(select_text)
  42. if select == "exit":
  43. logging.info("Terminate the script")
  44. exit()
  45. elif select == "list":
  46. print_video_list(reread=True)
  47. elif select == "link":
  48. while True:
  49. link = input("Insert the valid link to youtube video: ")
  50. if validators.url(link):
  51. print('Not valid')
  52. break
  53. # TODO check link and video
  54. video = get_info_yt_video(link)
  55. print(f"Video title is: {video.title}")
  56. return video
  57. else:
  58. num_of_items = len(yt_feed.videos)
  59. try:
  60. video_index = int(select) - 1
  61. if 0 <= video_index < num_of_items:
  62. video = yt_feed.videos[video_index]
  63. return video
  64. else:
  65. print(f"Number is not in range (1, {num_of_items}). Try again")
  66. except ValueError:
  67. print("Error. Not valid command")
  68. else:
  69. pass
  70. # TODO refactor
  71. def get_tags_for_video() -> List[str]:
  72. while True:
  73. answer = input("Type the tags for mp3 file. Format: artist|title: ")
  74. tags = answer.split("|")
  75. if len(tags) == 2:
  76. check_cover = "artist cover was found" if is_there_cover(tags[0]) \
  77. else f"{Bcolors.WARNING}artist cover was not found{Bcolors.ENDC}"
  78. print(f"\n----Artist: {tags[0]} ({check_cover})\n----Title: {tags[1]}")
  79. if input("Is it right? y/n: ").lower() != "n":
  80. return tags
  81. else:
  82. logging.warning("This is not right string")
  83. def check_is_stream(video: Video) -> None:
  84. while True:
  85. if get_info_yt_video(video.link).is_live:
  86. logging.warning('This stream is on. Waiting for end')
  87. time.sleep(30 * 60)
  88. else:
  89. break
  90. def download_and_send(video: Video, tags: List[str]):
  91. logging.info("Start Downloading")
  92. try:
  93. get_mp3_from_yt(video)
  94. except Exception:
  95. return logging.exception("!!!!")
  96. else:
  97. files = [f for f in os.listdir() if f.endswith(".mp3") and f.startswith(video.title[:9])]
  98. if len(files) == 1:
  99. file_name = files[0]
  100. else:
  101. raise RuntimeError(f"Can't find file to write tags.\n Files that \
  102. was founded: \n {str(files)}")
  103. print(f"Video title is: {video.title}")
  104. modified_id3(file_name, tags[0], tags[1])
  105. logging.info(f"\n----DONE!\n----File name: {file_name}")
  106. #TODO check if upploading works
  107. if arg_parser.args.download or AGENT_ID == '':
  108. print(f"""{tags[0]} #{tags[0].replace(" ", "_").replace("-", "_")}
  109. {tags[1]}
  110. {CUSTOM_DESCRIPTION}
  111. """)
  112. os.replace(file_name, f"media/tracks/{file_name}")
  113. print(f"\nFile was saved in media/track/{file_name}")
  114. return
  115. send_audio(file_name, tags[0], tags[1])
  116. logging.info("Uploading completed")
  117. if os.path.isfile(file_name):
  118. os.remove(file_name)
  119. logging.info(f"file {file_name} was removed")
  120. else:
  121. logging.warning("File doesn't exists! Remove all mp3 files in folder")
  122. dir_name = os.getcwd()
  123. files = os.listdir(dir_name)
  124. for file in files:
  125. if file.endswith(".mp3"):
  126. os.remove(os.path.join(dir_name, file))
  127. def main():
  128. link = arg_parser.args.link
  129. if not YT_CHANNEL_ID and not link:
  130. print("You have no provided CHANNEL_ID in config.py \n\
  131. You have to provide it or run program with -l argument (-l LINK)\n \
  132. Run with -h for help")
  133. exit(1)
  134. if not link:
  135. print_video_list(shift=arg_parser.args.shift)
  136. video = get_menu_choose_video()
  137. else:
  138. video = get_info_yt_video(link)
  139. print(f"Video title is: {video.title}")
  140. arg_parser.args.link = None
  141. tags = get_tags_for_video()
  142. check_is_stream(video)
  143. download_and_send(video, tags)
  144. if __name__ == "__main__":
  145. try:
  146. main()
  147. except KeyboardInterrupt:
  148. print('\n=========== Exit ===============')
  149. except Exception:
  150. logging.exception("Error:")