123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- #!/home/sasha/Coding/youtube2tg-mp3/env/bin/python
- import logging
- import readline
- import time
- from typing import List
- import validators
- import os
- from sys import exit
- from feed import CheckFeed
- from args import ArgParser
- from download import get_info_yt_video, get_mp3_from_yt, Video
- from mp3tag import modified_id3
- from agent import send_audio
- from config import YT_CHANNEL_ID, VIDEO_ITEMS_QUANTITY
- arg_parser = ArgParser()
- logging.basicConfig(
- format='%(asctime)s %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S',
- level=logging.INFO)
- yt_feed = None
- def print_video_list(reread: bool = False):
- global yt_feed
- yt_feed = CheckFeed(YT_CHANNEL_ID, VIDEO_ITEMS_QUANTITY)
- if reread: yt_feed.read()
- videos = yt_feed.videos
- os.system('cls' if os.name == 'nt' else 'clear')
- if videos:
- print(f'The list of titles of new items in youtube channel "{yt_feed.title}":')
- for i, v in enumerate(videos):
- is_live = "[live]" if v.is_live else ""
- print(f"=== {i + 1}.{is_live} {v.title} ({v.id})")
- else:
- print(f"Something went wrong. The channel {yt_feed.title} has no videos")
- def get_menu_choose_video():
- while True:
- select_text = "Insert num of video to convert or type command exit/link/list: "
- select = input(select_text)
- if select == "exit":
- logging.info("Terminate the script")
- exit()
- elif select == "list":
- print_video_list(reread=True)
- elif select == "link":
- while True:
- link = input("Insert the valid link to youtube video: ")
- print('Not valid')
- if validators.url(link):
- print('Not valid')
- break
- # TODO check link and video
- video = get_info_yt_video(link)
- print(f"Video title is: {video.title}")
- return video
- else:
- num_of_items = len(yt_feed.videos)
- try:
- video_index = int(select) - 1
- if 0 <= video_index < num_of_items:
- video = yt_feed.videos[video_index]
- return video
- else:
- print(f"Number is not in range (1, {num_of_items}). Try again")
- except ValueError:
- print("Error. Not valid command")
- # TODO refactor
- def get_tags_for_video() -> List[str]:
- while True:
- answer = input("Type the tags for mp3 file. Format: artist|title: ")
- tags = answer.split("|")
- if len(tags) == 2:
- print(f"----Artist: {tags[0]}\n----Title: {tags[1]}")
- if input("Is it right? y/n: ").lower() != "n":
- return tags
- else:
- logging.warning("This is not right string")
- def check_is_stream(video: Video) -> None:
- while True:
- if get_info_yt_video(video.link).is_live:
- logging.warning('This stream is on. Waiting for end')
- time.sleep(30 * 60)
- else:
- break
- def download_and_send(video: Video, tags: List[str]):
- logging.info("Start Downloading")
-
- try:
- get_mp3_from_yt(video)
- except Exception:
- raise RuntimeError("Something bad happened")
- else:
- files = [f for f in os.listdir() if f.endswith(".mp3") and f.startswith(video.title[:9])]
- # files = [f for f in os.listdir() if f.endswith(".mp3")]
- if len(files) == 1:
- file_name = files[0]
- else:
- raise RuntimeError("Something bad happened")
- print(f"Video title is: {video.title}")
- modified_id3(file_name, tags[0], tags[1])
- logging.info(f"----DONE!\n----File name: {file_name}")
- time.sleep(5)
- if arg_parser.args.download:
- return
- send_audio(file_name, tags[0], tags[1])
- logging.info("Uploading completed")
- if os.path.isfile(file_name):
- os.remove(file_name)
- logging.info(f"file {file_name} was removed")
- else:
- logging.warning("File doesn't exists! Remove all mp3 files in folder")
- dir_name = os.getcwd()
- files = os.listdir(dir_name)
- for file in files:
- if file.endswith(".mp3"):
- os.remove(os.path.join(dir_name, file))
- def main():
- link = arg_parser.args.link
- if not link:
- print_video_list()
- video = get_menu_choose_video()
- else:
- video = get_info_yt_video(link)
- print(f"Video title is: {video.title}")
- arg_parser.args.link = None
- tags = get_tags_for_video()
- check_is_stream(video)
- download_and_send(video, tags)
- if __name__ == "__main__":
- try:
- main()
- except KeyboardInterrupt:
- print('\n=========== Exit ===============')
|