flaccare.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. """
  2. Version: 3.0.0
  3. Author: Salty
  4. Usage: See help `python flaccare.py -h`
  5. Requirements:
  6. 1) Make sure to install mutagen: `pip install mutagen`
  7. 2) Make sure to have the latest versions of the following added to PATH:
  8. * flac (https://xiph.org/flac/download.html)
  9. * metaflac (https://xiph.org/flac/download.html)
  10. """
  11. import argparse
  12. import math
  13. import mimetypes
  14. import os
  15. import platform
  16. import shutil
  17. import signal
  18. import subprocess
  19. import time
  20. import traceback
  21. from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError, RawTextHelpFormatter
  22. from multiprocessing import Pool
  23. from pathlib import Path, PurePath
  24. from threading import Event, Semaphore
  25. from typing import Iterator
  26. from mutagen.flac import FLAC
  27. from mutagen.id3 import PictureType
  28. class HelpFormatter(RawTextHelpFormatter, ArgumentDefaultsHelpFormatter):
  29. pass
  30. class SuccessResult:
  31. def __init__(self, path: str, savings: int):
  32. self.path = path
  33. self.savings = savings
  34. def __str__(self) -> str:
  35. return f"{res.path}\t{savings}\n"
  36. class FailureResult:
  37. def __init__(self, path: str, msg: str):
  38. self.path = path
  39. self.msg = msg.replace("\r\n", "\n").strip()
  40. def __str__(self) -> str:
  41. return f"{res.path}\n\n{res.msg}\n\n"
  42. """Utils"""
  43. def is_arg_folder(value: str):
  44. if not Path(value).is_dir():
  45. raise ArgumentTypeError(f"invalid folder: {value}")
  46. return value
  47. def format_size(value):
  48. for mag, unit in enumerate([ "B", "KiB", "MiB", "GiB", "TiB" ]):
  49. if abs(value) < 1024.0:
  50. return f"{value:.0f} {unit}" if mag == 0 else f"{value:.2f} {unit}"
  51. value /= 1024
  52. return f"{value:.1f} PiB"
  53. def walk(path: str, limiter: Semaphore, cancel: Event) -> Iterator[str]:
  54. for path, _, files in os.walk(path):
  55. if cancel.is_set():
  56. return
  57. for file in files:
  58. if file.endswith(".flac"):
  59. limiter.acquire()
  60. yield PurePath(path).joinpath(file).as_posix()
  61. def run(args: list[str]) -> bool:
  62. if platform.system() == "Windows":
  63. kwargs = dict(creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
  64. else:
  65. kwargs = dict(preexec_fn=os.setpgrp)
  66. res = subprocess.run(args, capture_output=True, timeout=300, **kwargs)
  67. return res.returncode == 0
  68. """Worker"""
  69. def work_init():
  70. signal.signal(signal.SIGINT, signal.SIG_IGN)
  71. def work_handler(path: str):
  72. try:
  73. return work(path)
  74. except Exception as ex:
  75. return FailureResult(path, "".join(traceback.format_exception(type(ex), ex, ex.__traceback__)))
  76. def work(path: str):
  77. old_size = os.stat(path).st_size
  78. if args.extract:
  79. flac = FLAC(path)
  80. for picture in flac.pictures:
  81. if picture.type == PictureType.COVER_FRONT:
  82. cover_path = Path(path).parent.joinpath("cover").with_suffix(mimetypes.guess_extension(picture.mime) or "unk")
  83. if not cover_path.exists():
  84. cover_path.write_bytes(picture.data)
  85. break
  86. if not run([ "flac", "-8", "-V", "-f", path ]):
  87. raise Exception("Failed to reencode.")
  88. if not run([ "metaflac", "--dont-use-padding", "--remove", "--block-type=PICTURE,PADDING", path ]):
  89. raise Exception("Failed to remove embedded images.")
  90. if not run([ "metaflac", f"--add-padding={args.padding}", path ]):
  91. raise Exception("Failed to add padding.")
  92. new_size = os.stat(path).st_size
  93. return SuccessResult(path, old_size - new_size)
  94. """Main"""
  95. parser = ArgumentParser(formatter_class=HelpFormatter, add_help=False, description="Reencodes with libFLAC, removes embedded images and excess padding.")
  96. parser.add_argument("path", type=is_arg_folder, help="path to music folder")
  97. parser.add_argument("--help", action="help", default=argparse.SUPPRESS, help="show help and exit")
  98. parser.add_argument("--padding", metavar="N", type=int, default=8192, help="bytes of padding to keep")
  99. parser.add_argument("--extract", action="store_true", help="extract embedded cover to file")
  100. parser.add_argument("--workers", metavar="N", type=int, default=math.floor(os.cpu_count() * .8), help="parallel encoder count")
  101. args = parser.parse_args()
  102. if __name__ == "__main__":
  103. if shutil.which("flac") is None:
  104. print("Could not find flac executable!")
  105. exit(1)
  106. if shutil.which("metaflac") is None:
  107. print("Could not find metaflac executable!")
  108. exit(1)
  109. print(f"Using {args.workers} workers...")
  110. count = 0
  111. savings = 0
  112. timestamp = time.time()
  113. log_success = open("log_success.txt", "a+", encoding="utf-8", buffering=1)
  114. log_failure = open("log_failure.txt", "a+", encoding="utf-8", buffering=1)
  115. pool = Pool(args.workers, work_init)
  116. limiter = Semaphore(args.workers)
  117. cancel = Event()
  118. def sigint_handler(signum, frame):
  119. print("Aborting...")
  120. cancel.set()
  121. signal.signal(signal.SIGINT, sigint_handler)
  122. for res in pool.imap(work_handler, walk(args.path, limiter, cancel)):
  123. limiter.release()
  124. count += 1
  125. if isinstance(res, SuccessResult):
  126. savings += res.savings
  127. log_success.write(str(res))
  128. elif isinstance(res, FailureResult):
  129. log_failure.write(str(res))
  130. if count % max(args.workers, 10) == 0:
  131. print(f" {count:>10} | {format_size(savings):>10} | {PurePath(res.path).relative_to(args.path).as_posix()}")
  132. pool.close()
  133. log_success.close()
  134. log_failure.close()
  135. print(f"Saved {format_size(savings)} by processing {count} files in {(time.time() - timestamp) / 60:.2f} minutes")