123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- #! /usr/bin/env python3
- #
- # Copyright (C) 2019, Ansgar Burchardt <ansgar@debian.org>
- # License: GPL-2+
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program. If not, see <https://www.gnu.org/licenses/>.
- import argparse
- import gzip
- import hashlib
- import os
- import signal
- import sys
- from contextlib import contextmanager
- from typing import BinaryIO, Set
- Hashes = Set[bytes]
- def hash_fh(fh: BinaryIO) -> bytes:
- h = hashlib.sha1()
- buf = b"dummy"
- while len(buf) > 0:
- buf = fh.read(32768)
- h.update(buf)
- return h.hexdigest().encode('ascii')
- def hash_file(filename: bytes) -> bytes:
- with open(filename, 'rb') as fh:
- return hash_fh(fh)
- def load_hashes(path) -> Hashes:
- with gzip.open(path, 'rb') as fh:
- return set(h.strip() for h in fh)
- @contextmanager
- def IgnoreSignals():
- handlers = [
- (sig, signal.signal(sig, signal.SIG_IGN))
- for sig in (signal.SIGHUP, signal.SIGINT, signal.SIGTERM)
- ]
- yield
- for sig, handler in handlers:
- if handler is None:
- handler = signal.SIG_DFL
- signal.signal(sig, handler)
- def replace_file(path: bytes, hash: bytes, base: bytes) -> None:
- target = os.path.join(base, hash[0:2], hash[2:4], hash)
- with IgnoreSignals():
- os.unlink(path)
- os.symlink(target, path)
- def keep_file(path: bytes) -> None:
- target = path + b".nosnapshot"
- with open(target, 'x') as fh:
- pass
- def process_file(path: bytes, known_hashes: Hashes, base: bytes) -> None:
- """
- Replace file `path` with a symlink below `base` if the file is
- known, otherwise create `{path}.nosnapshot` to avoid checking the file
- again later.
- """
- h = hash_file(path)
- if h in known_hashes:
- replace_file(path, h, base)
- else:
- keep_file(path)
- def scan_directory(path: bytes):
- """
- Returns paths to regular files in `path` and subdirectories,
- skipping `*.nosnapshot` and files `fn` for which `{fn}.nosnapshot`
- exists.
- """
- directories = []
- filenames = []
- # We do not use `os.walk` as `os.scandir` allows us to skip
- # symlinks without an extra `stat()` call.
- for entry in os.scandir(path):
- if entry.is_dir(follow_symlinks=False):
- directories.append(entry.path)
- elif entry.is_file(follow_symlinks=False):
- filenames.append(entry.path)
- yield from (fn for fn in filenames
- if fn + b".nosnapshot" not in filenames
- and not fn.endswith(b".nosnapshot")
- and not fn.endswith(b"/.nobackup"))
- for path in directories:
- yield from scan_directory(path)
- def process_directory(path: bytes, known_hashes: Hashes, base: bytes) -> None:
- os.chdir(path)
- for fn in scan_directory(b"."):
- process_file(fn, known_hashes, base)
- def run(config):
- known_hashes = load_hashes(config.known_hashes)
- process_directory(config.morguedir.encode(), known_hashes, config.farmdir.encode())
- def main(argv=sys.argv[1:]):
- parser = argparse.ArgumentParser(
- description="replace files in morgue with symlinks to snapshot.d.o"
- )
- parser.add_argument("--known-hashes", type=str, required=True)
- parser.add_argument("--farmdir", type=str, required=True)
- parser.add_argument("--morguedir", type=str, required=True)
- config = parser.parse_args(argv)
- run(config)
- if __name__ == "__main__":
- main()
|