123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- #!/usr/bin/env python3
- import os
- import sys
- import datetime
- import json
- import subprocess
- import ff
- def analyze_file(p):
- basename = os.path.basename(p)
- stem, ext = os.path.splitext(basename)
- out = {
- 'path': p,
- 'basename': basename,
- }
- # Get the actual file size.
- stat = os.stat(p)
- size = stat.st_size
- out['size'] = size
- # If empty then ffprobe will fail anyways, so we should pick this up early.
- if size == 0:
- out['title'] = basename
- out['error'] = 'no data'
- return out
- # Pick out the video stream data.
- probe = None
- try:
- probe = ff.probe_file(p)
- except Exception as e:
- out['title'] = basename
- out['error'] = str(e)
- return p
- props = ff.extract_stream_props(probe)
- if props['title'] is None:
- props['title'] = basename
- return {**out, **props}
- def display_analysis(a, pfx=''):
- if 'error' in a:
- print(pfx + a['path'])
- sz = a['size'] // 1024 // 1024
- print(pfx + 'sz', 'MiB')
- print(pfx + 'err', a['error'])
- return
- if a['title'] == a['basename']:
- print(pfx + a['basename'])
- else:
- print(pfx + a['basename'], '(', a['title'], ')')
- print(pfx + '* res', '%sx%s' % (a['res_w'], + a['res_h']), 'in', a['codec'])
- td = datetime.timedelta(seconds=a['duration_sec'])
- kbps = a['bitrate_bps'] // 1000
- sz = a['size'] // 1024 // 1024
- print(pfx + '* dur', td, 'at', kbps, 'kb/s', '(', sz, 'MiB )')
- MEDIA_EXT = ['.mkv', '.mp4', '.ogv', '.avi', '.wmv']
- def analyze_dir(dp):
- anas = []
- for f in os.listdir(dp):
- p = os.path.join(dp, f)
- if os.path.isdir(p):
- continue
- _, ext = os.path.splitext(f)
- if ext not in MEDIA_EXT:
- continue
- ana = analyze_file(p)
- anas.append(ana)
- return anas
- def main_analyze(argv):
- rp = os.path.expanduser(argv[0])
- ana = analyze_file(rp)
- display_analysis(ana, pfx='\t')
- return 0
- def main_comparedir(argv):
- dirpath = os.path.expanduser(argv[0])
- anas = analyze_dir(dirpath)
- for a in anas:
- display_analysis(a)
- def main_finddups(argv):
- mpath = os.path.expanduser(argv[0])
- dups = []
- weirds = []
- n_ok = 0
- for f in sorted(os.listdir(mpath)):
- p = os.path.join(mpath, f)
- if not os.path.isdir(p):
- continue
- try:
- anas = analyze_dir(p)
- except Exception as e:
- print('error: skipping dir with problem:', f)
- weirds.append((f, e))
- continue
- if len(anas) == 0:
- print('warn: skipped dir with no media:', f)
- n_ok += 1
- elif len(anas) == 1:
- print('ok: single media:', f)
- else:
- print('match: found', len(anas), 'in', f)
- dups.append((f, anas))
- print('OK:', n_ok)
- print('Mulitples:')
- for (f, dups) in dups:
- print('name:', f)
- for a in dups:
- display_analysis(a, pfx='\t')
- print('Errors:')
- for (f, e) in weirds:
- print('name:', f, str(e))
- def print_usage():
- print('usage: ./mediacleaner.py [analyze|a] <filepath>')
- print('usage: ./mediacleaner.py [comparedir|c] <dirpath>')
- print('usage: ./mediacleaner.py [finddups|d] <mediapath>')
- def main(argv):
- if len(argv) == 1:
- print_usage()
- return 1
- subc = argv[1]
- if subc == 'analyze' or subc == 'a':
- return main_analyze(argv[2:])
- if subc == 'comparedir' or subc == 'c':
- return main_comparedir(argv[2:])
- if subc == 'finddups' or subc == 'd':
- return main_finddups(argv[2:])
- print('unknown subcommand:', subc)
- print_usage()
- return 2
- if __name__ == '__main__':
- sys.exit(main(sys.argv))
|