123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- #!/home/sasha/Coding/youtube2tg-mp3/env/bin/python
- import logging
- import readline
- import time
- from typing import List
- import validators
- import os
- from sys import exit
- from feed.feed import CheckFeed
- from cli.args import ArgParser
- from cli.cli import Bcolors
- from youtube.download import get_info_yt_video, get_mp3_from_yt, Video
- from youtube.mp3tag import is_there_cover, modified_id3
- from telegram.agent import send_audio
- from config import AGENT_ID, CUSTOM_DESCRIPTION, YT_CHANNEL_ID, VIDEO_ITEMS_QUANTITY
- #TODO write readme
- arg_parser = ArgParser()
- logging.basicConfig(
- format='%(asctime)s %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S',
- level=logging.INFO)
- yt_feed = None
- def print_video_list(shift: int = 0, reread: bool = False):
- #TODO make class. incapsulation
- global yt_feed
- yt_feed = CheckFeed(YT_CHANNEL_ID, VIDEO_ITEMS_QUANTITY, shift=shift)
- if reread: yt_feed.read()
- videos = yt_feed.videos
- os.system('cls' if os.name == 'nt' else 'clear')
- if videos:
- print(f'The list of titles of new items in youtube channel "{yt_feed.title}":')
- for i, v in enumerate(videos):
- is_live = "[live]" if v.is_live else ""
- print(f"=== {i + 1}.{is_live} {v.title} ({v.id})")
- else:
- print(f"Something went wrong. The channel {yt_feed.title} has no videos")
- def get_menu_choose_video():
- while True:
- select_text = f"\n{Bcolors.BOLD}Insert num of video to convert or type \
- command exit/link/list:{Bcolors.ENDC} "
- select = input(select_text)
- if select == "exit":
- logging.info("Terminate the script")
- exit()
- elif select == "list":
- print_video_list(reread=True)
- elif select == "link":
- while True:
- link = input("Insert the valid link to youtube video: ")
- if validators.url(link):
- print('Not valid')
- break
- # TODO check link and video
- video = get_info_yt_video(link)
- print(f"Video title is: {video.title}")
- return video
- else:
- num_of_items = len(yt_feed.videos)
- try:
- video_index = int(select) - 1
- if 0 <= video_index < num_of_items:
- video = yt_feed.videos[video_index]
- return video
- else:
- print(f"Number is not in range (1, {num_of_items}). Try again")
- except ValueError:
- print("Error. Not valid command")
- else:
- pass
- # TODO refactor
- def get_tags_for_video() -> List[str]:
- while True:
- answer = input("Type the tags for mp3 file. Format: artist|title: ")
- tags = answer.split("|")
- if len(tags) == 2:
- check_cover = "artist cover was found" if is_there_cover(tags[0]) \
- else f"{Bcolors.WARNING}artist cover was not found{Bcolors.ENDC}"
- print(f"\n----Artist: {tags[0]} ({check_cover})\n----Title: {tags[1]}")
- if input("Is it right? y/n: ").lower() != "n":
- return tags
- else:
- logging.warning("This is not right string")
- def check_is_stream(video: Video) -> None:
- while True:
- if get_info_yt_video(video.link).is_live:
- logging.warning('This stream is on. Waiting for end')
- time.sleep(30 * 60)
- else:
- break
- def download_and_send(video: Video, tags: List[str]):
- logging.info("Start Downloading")
-
- try:
- get_mp3_from_yt(video)
- except Exception:
- return logging.exception("!!!!")
- else:
- files = [f for f in os.listdir() if f.endswith(".mp3") and f.startswith(video.title[:9])]
- if len(files) == 1:
- file_name = files[0]
- else:
- raise RuntimeError(f"Can't find file to write tags.\n Files that \
- was founded: \n {str(files)}")
- print(f"Video title is: {video.title}")
- modified_id3(file_name, tags[0], tags[1])
- logging.info(f"\n----DONE!\n----File name: {file_name}")
-
- #TODO check if upploading works
- if arg_parser.args.download or AGENT_ID == '':
- print(f"""{tags[0]} #{tags[0].replace(" ", "_").replace("-", "_")}
- {tags[1]}
- {CUSTOM_DESCRIPTION}
- """)
- os.replace(file_name, f"media/tracks/{file_name}")
- print(f"\nFile was saved in media/track/{file_name}")
- return
- send_audio(file_name, tags[0], tags[1])
- logging.info("Uploading completed")
- if os.path.isfile(file_name):
- os.remove(file_name)
- logging.info(f"file {file_name} was removed")
- else:
- logging.warning("File doesn't exists! Remove all mp3 files in folder")
- dir_name = os.getcwd()
- files = os.listdir(dir_name)
- for file in files:
- if file.endswith(".mp3"):
- os.remove(os.path.join(dir_name, file))
- def main():
- link = arg_parser.args.link
- if not YT_CHANNEL_ID and not link:
- print("You have no provided CHANNEL_ID in config.py \n\
- You have to provide it or run program with -l argument (-l LINK)\n \
- Run with -h for help")
- exit(1)
- if not link:
- print_video_list(shift=arg_parser.args.shift)
- video = get_menu_choose_video()
- else:
- video = get_info_yt_video(link)
- print(f"Video title is: {video.title}")
- arg_parser.args.link = None
- tags = get_tags_for_video()
- check_is_stream(video)
- download_and_send(video, tags)
- if __name__ == "__main__":
- try:
- main()
- except KeyboardInterrupt:
- print('\n=========== Exit ===============')
- except Exception:
- logging.exception("Error:")
|