123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- """
- Version: 3.0.0
- Author: Salty
- Usage: See help `python flaccare.py -h`
- Requirements:
- 1) Make sure to install mutagen: `pip install mutagen`
- 2) Make sure to have the latest versions of the following added to PATH:
- * flac (https://xiph.org/flac/download.html)
- * metaflac (https://xiph.org/flac/download.html)
- """
- import argparse
- import math
- import mimetypes
- import os
- import platform
- import shutil
- import signal
- import subprocess
- import time
- import traceback
- from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError, RawTextHelpFormatter
- from multiprocessing import Pool
- from pathlib import Path, PurePath
- from threading import Event, Semaphore
- from typing import Iterator
- from mutagen.flac import FLAC
- from mutagen.id3 import PictureType
- class HelpFormatter(RawTextHelpFormatter, ArgumentDefaultsHelpFormatter):
- pass
- class SuccessResult:
- def __init__(self, path: str, savings: int):
- self.path = path
- self.savings = savings
- def __str__(self) -> str:
- return f"{res.path}\t{savings}\n"
- class FailureResult:
- def __init__(self, path: str, msg: str):
- self.path = path
- self.msg = msg.replace("\r\n", "\n").strip()
- def __str__(self) -> str:
- return f"{res.path}\n\n{res.msg}\n\n"
- """Utils"""
- def is_arg_folder(value: str):
- if not Path(value).is_dir():
- raise ArgumentTypeError(f"invalid folder: {value}")
- return value
- def format_size(value):
- for mag, unit in enumerate([ "B", "KiB", "MiB", "GiB", "TiB" ]):
- if abs(value) < 1024.0:
- return f"{value:.0f} {unit}" if mag == 0 else f"{value:.2f} {unit}"
- value /= 1024
- return f"{value:.1f} PiB"
- def walk(path: str, limiter: Semaphore, cancel: Event) -> Iterator[str]:
- for path, _, files in os.walk(path):
- if cancel.is_set():
- return
- for file in files:
- if file.endswith(".flac"):
- limiter.acquire()
- yield PurePath(path).joinpath(file).as_posix()
- def run(args: list[str]) -> bool:
- if platform.system() == "Windows":
- kwargs = dict(creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
- else:
- kwargs = dict(preexec_fn=os.setpgrp)
- res = subprocess.run(args, capture_output=True, timeout=300, **kwargs)
- return res.returncode == 0
- """Worker"""
- def work_init():
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- def work_handler(path: str):
- try:
- return work(path)
- except Exception as ex:
- return FailureResult(path, "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)))
- def work(path: str):
- old_size = os.stat(path).st_size
- if args.extract:
- flac = FLAC(path)
- for picture in flac.pictures:
- if picture.type == PictureType.COVER_FRONT:
- cover_path = Path(path).parent.joinpath("cover").with_suffix(mimetypes.guess_extension(picture.mime) or "unk")
- if not cover_path.exists():
- cover_path.write_bytes(picture.data)
- break
- if not run([ "flac", "-8", "-V", "-f", path ]):
- raise Exception("Failed to reencode.")
- if not run([ "metaflac", "--dont-use-padding", "--remove", "--block-type=PICTURE,PADDING", path ]):
- raise Exception("Failed to remove embedded images.")
- if not run([ "metaflac", f"--add-padding={args.padding}", path ]):
- raise Exception("Failed to add padding.")
- new_size = os.stat(path).st_size
- return SuccessResult(path, old_size - new_size)
- """Main"""
- parser = ArgumentParser(formatter_class=HelpFormatter, add_help=False, description="Reencodes with libFLAC, removes embedded images and excess padding.")
- parser.add_argument("path", type=is_arg_folder, help="path to music folder")
- parser.add_argument("--help", action="help", default=argparse.SUPPRESS, help="show help and exit")
- parser.add_argument("--padding", metavar="N", type=int, default=8192, help="bytes of padding to keep")
- parser.add_argument("--extract", action="store_true", help="extract embedded cover to file")
- parser.add_argument("--workers", metavar="N", type=int, default=math.floor(os.cpu_count() * .8), help="parallel encoder count")
- args = parser.parse_args()
- if __name__ == "__main__":
- if shutil.which("flac") is None:
- print("Could not find flac executable!")
- exit(1)
- if shutil.which("metaflac") is None:
- print("Could not find metaflac executable!")
- exit(1)
- print(f"Using {args.workers} workers...")
- count = 0
- savings = 0
- timestamp = time.time()
- log_success = open("log_success.txt", "a+", encoding="utf-8", buffering=1)
- log_failure = open("log_failure.txt", "a+", encoding="utf-8", buffering=1)
- pool = Pool(args.workers, work_init)
- limiter = Semaphore(args.workers)
- cancel = Event()
- def sigint_handler(signum, frame):
- print("Aborting...")
- cancel.set()
- signal.signal(signal.SIGINT, sigint_handler)
- for res in pool.imap(work_handler, walk(args.path, limiter, cancel)):
- limiter.release()
- count += 1
- if isinstance(res, SuccessResult):
- savings += res.savings
- log_success.write(str(res))
- elif isinstance(res, FailureResult):
- log_failure.write(str(res))
- if count % max(args.workers, 10) == 0:
- print(f" {count:>10} | {format_size(savings):>10} | {PurePath(res.path).relative_to(args.path).as_posix()}")
- pool.close()
- log_success.close()
- log_failure.close()
- print(f"Saved {format_size(savings)} by processing {count} files in {(time.time() - timestamp) / 60:.2f} minutes")
|