123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- # Version: 1.0
- import os
- import math
- import time
- import signal
- import struct
- import traceback
- from mmap import mmap
- from pathlib import Path, PurePath
- from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter, ArgumentTypeError
- from subprocess import Popen, PIPE
- from multiprocessing import Pool
- class Result():
- def __init__(self, path: str):
- self.path = path
- class SuccessResult(Result, Exception):
- def __init__(self, path: str, savings: int):
- super().__init__(path)
- self.savings = savings
- class ErrorResult(Result, Exception):
- def __init__(self, path: str, err: str):
- super().__init__(path)
- self.err = err.replace("\r\n", "\n").strip()
- """Utils"""
- def type_folder(value: str):
- if not Path(value).is_dir():
- raise ArgumentTypeError(f"invalid folder path: {value}")
- return value
- def walk(path: str):
- for path, _, files in os.walk(path):
- for file in files:
- if file.endswith(".flac"):
- yield PurePath(path).joinpath(file).as_posix()
- def process(args: list[str]):
- proc = Popen(args, stdout=PIPE, stderr=PIPE)
- if proc.wait() == 0:
- return proc.stdout.read().decode().replace("\r\n", "\n")
- else:
- raise ErrorResult(None, proc.stderr.read().decode().replace("\r\n", "\n"))
- """Worker"""
- def get_flac_info(path: str):
- with open(path, "r+b") as f:
- with mmap(f.fileno(), length=4096) as m:
- m.seek(18)
- if m[0] != 0x66 or m[1] != 0x4C or m[2] != 0x61 or m[3] != 0x43:
- raise ErrorResult(path, "Invalid FLAC file header")
- return {
- "sr": struct.unpack('>I', m.read(4))[0] >> 12
- }
- def get_flac_size(path: str):
- return os.stat(path).st_size
- def get_flac_md5(path: str):
- return process(["metaflac", "--show-md5sum", path]).split("\n")[0]
- def work(path: str):
- if get_flac_info(path)["sr"] < args.samplerate:
- return None
- old_size = get_flac_size(path)
- old_md5 = get_flac_md5(path)
- tmp_path = path + ".enchires"
- process(["ffmpeg", "-hide_banner", "-i", path, "-f", "flac", "-compression_level", str(args.level), "-y", tmp_path])
- process(["metaflac", "--dont-use-padding", "--remove", "--block-type=PICTURE,PADDING", tmp_path])
- process(["metaflac", "--add-padding=8192", tmp_path])
- if old_md5 != get_flac_md5(path):
- return ErrorResult(path, "MD5 mismatched after processing")
- os.replace(tmp_path, path)
- return SuccessResult(path, old_size - get_flac_size(path))
- def work_handler(path: str):
- try:
- return work(path)
- except SuccessResult as res:
- return res
- except ErrorResult as res:
- res.path = path
- return res
- except Exception as ex:
- return ErrorResult(path, "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)))
- def work_init():
- signal.signal(signal.SIGINT, signal.SIG_IGN)
- """Main"""
- parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter, description="Encode hires (>=88.2kHz) flac with ffmpeg to save space.")
- parser.add_argument("path", type=type_folder, help="path to music")
- parser.add_argument("-l", dest="level", type=int, help="ffmpeg compression level", default=11)
- parser.add_argument("-sr", dest="samplerate", type=int, help="minimum sample rate to process", default=88200)
- parser.add_argument("-w", dest="workers", type=int, help="parallel encoder count", default=math.floor(os.cpu_count() * .8))
- args = parser.parse_args()
- if __name__ == "__main__":
- timestamp = time.time()
- log_success = open("log_success.txt", "a+", encoding="utf-8")
- log_error = open("log_error.txt", "a+", encoding="utf-8")
- pool = Pool(args.workers, work_init)
- count = 0
- savings = 0
- for res in pool.imap_unordered(work_handler, walk(args.path)):
- if res is None:
- continue
- count += 1
- if isinstance(res, SuccessResult):
- savings += res.savings
- log_success.write(f"{res.path}\n")
- elif isinstance(res, ErrorResult):
- log_error.write(f"------------\n{res.path}\n------------\n\n{res.err}\n\n\n")
- if count % max(args.workers, 10) == 0:
- print(f" {count:>7} | {savings / 1024 / 1024: >7.0f} MiB | {PurePath(res.path).relative_to(args.path).parent.as_posix()}")
- log_success.flush()
- log_error.flush()
- print(f" {count:>7} | {savings / 1024 / 1024: >7.0f} MiB")
- pool.close()
- log_success.close()
- log_error.close()
- print(f"Finished in {(time.time() - timestamp) / 60:.0f} minutes")
|