123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595 |
- """Read from and write to 7zip format archives.
- """
- import pylzma
- from struct import pack, unpack
- from zlib import crc32
- from cStringIO import StringIO
- MAGIC_7Z = '7z\xbc\xaf\x27\x1c'
- PROPERTY_END = '\x00'
- PROPERTY_HEADER = '\x01'
- PROPERTY_ARCHIVE_PROPERTIES = '\x02'
- PROPERTY_ADDITIONAL_STREAMS_INFO = '\x03'
- PROPERTY_MAIN_STREAMS_INFO = '\x04'
- PROPERTY_FILES_INFO = '\x05'
- PROPERTY_PACK_INFO = '\x06'
- PROPERTY_UNPACK_INFO = '\x07'
- PROPERTY_SUBSTREAMS_INFO = '\x08'
- PROPERTY_SIZE = '\x09'
- PROPERTY_CRC = '\x0a'
- PROPERTY_FOLDER = '\x0b'
- PROPERTY_CODERS_UNPACK_SIZE = '\x0c'
- PROPERTY_NUM_UNPACK_STREAM = '\x0d'
- PROPERTY_EMPTY_STREAM = '\x0e'
- PROPERTY_EMPTY_FILE = '\x0f'
- PROPERTY_ANTI = '\x10'
- PROPERTY_NAME = '\x11'
- PROPERTY_CREATION_TIME = '\x12'
- PROPERTY_LAST_ACCESS_TIME = '\x13'
- PROPERTY_LAST_WRITE_TIME = '\x14'
- PROPERTY_ATTRIBUTES = '\x15'
- PROPERTY_COMMENT = '\x16'
- PROPERTY_ENCODED_HEADER = '\x17'
- class FormatError(Exception):
- pass
- class Base:
- """ base class with support for various basic read/write functions """
-
- def _readReal64Bit(self, file):
- res = file.read(8)
- a, b = unpack('<LL', res)
- return b << 32 | a, res
-
- def _read64Bit(self, file):
- b = ord(file.read(1))
- mask = 0x80
- for i in xrange(8):
- if b & mask == 0:
- bytes = list(unpack('%dB' % i, file.read(i)))
- bytes.reverse()
- value = (bytes and reduce(lambda x, y: long(x) << 8 | y, bytes)) or 0L
- highpart = b & (mask - 1)
- return value + (long(highpart) << (i * 8))
-
- mask >>= 1
- def _readBoolean(self, file, count, checkall=0):
- if checkall:
- alldefined = file.read(1)
- if alldefined != '\x00':
- return [True] * count
-
- result = []
- b = 0
- mask = 0
- for i in xrange(count):
- if mask == 0:
- b = ord(file.read(1))
- mask = 0x80
- result.append(b & mask != 0)
- mask >>= 1
-
- return result
-
- class PackInfo(Base):
- """ informations about packed streams """
-
- def __init__(self, file):
- self.packpos = self._read64Bit(file)
- self.numstreams = self._read64Bit(file)
- id = file.read(1)
- if id == PROPERTY_SIZE:
- self.packsizes = [self._read64Bit(file) for x in xrange(self.numstreams)]
- id = file.read(1)
-
- if id == PROPERTY_CRC:
- self.crcs = [self._read64Bit(file) for x in xrange(self.numstreams)]
- id = file.read(1)
-
- if id != PROPERTY_END:
- raise FormatError, 'end id expected but %s found' % repr(id)
- class Folder(Base):
- """ a "Folder" represents a stream of compressed data """
-
- def __init__(self, file):
- numcoders = self._read64Bit(file)
- self.coders = []
- self.digestdefined = False
- totalin = 0
- self.totalout = 0
- for i in xrange(numcoders):
- while True:
- b = ord(file.read(1))
- methodsize = b & 0xf
- issimple = b & 0x10 == 0
- noattributes = b & 0x20 == 0
- last_alternative = b & 0x80 == 0
- c = {}
- c['method'] = file.read(methodsize)
- if not issimple:
- c['numinstreams'] = self._read64Bit(file)
- c['numoutstreams'] = self._read64Bit(file)
- else:
- c['numinstreams'] = 1
- c['numoutstreams'] = 1
- totalin += c['numinstreams']
- self.totalout += c['numoutstreams']
- if c['method'][0] != '\x00':
- c['properties'] = file.read(self._read64Bit(file))
- self.coders.append(c)
- if last_alternative:
- break
-
- numbindpairs = self.totalout - 1
- self.bindpairs = []
- for i in xrange(numbindpairs):
- self.bindpairs.append((self._read64Bit(file), self._read64Bit(file), ))
-
- numpackedstreams = totalin - numbindpairs
- self.packed_indexes = []
- if numpackedstreams == 1:
- for i in xrange(totalin):
- if self.findInBindPair(i) < 0:
- self.packed_indexes.append(i)
- elif numpackedstreams > 1:
- for i in xrange(numpackedstreams):
- self.packed_indexes.append(self._read64Bit(file))
- def getUnpackSize(self):
- if not self.unpacksizes:
- return 0
-
- r = range(len(self.unpacksizes))
- r.reverse()
- for i in r:
- if self.findOutBindPair(i):
- return self.unpacksizes[i]
-
- raise 'not found'
- def findInBindPair(self, index):
- for idx in xrange(len(self.bindpairs)):
- a, b = self.bindpairs[idx]
- if a == index:
- return idx
- return -1
- def findOutBindPair(self, index):
- for idx in xrange(len(self.bindpairs)):
- a, b = self.bindpairs[idx]
- if b == index:
- return idx
- return -1
-
- class Digests(Base):
- """ holds a list of checksums """
-
- def __init__(self, file, count):
- self.defined = self._readBoolean(file, count, checkall=1)
- self.crcs = [unpack('<l', file.read(4))[0] for x in xrange(count)]
-
- UnpackDigests = Digests
- class UnpackInfo(Base):
- """ combines multiple folders """
- def __init__(self, file):
- id = file.read(1)
- if id != PROPERTY_FOLDER:
- raise FormatError, 'folder id expected but %s found' % repr(id)
- self.numfolders = self._read64Bit(file)
- self.folders = []
- external = file.read(1)
- if external == '\x00':
- self.folders = [Folder(file) for x in xrange(self.numfolders)]
- elif external == '\x01':
- self.datastreamidx = self._read64Bit(file)
- else:
- raise FormatError, '0x00 or 0x01 expected but %s found' % repr(external)
-
- id = file.read(1)
- if id != PROPERTY_CODERS_UNPACK_SIZE:
- raise FormatError, 'coders unpack size id expected but %s found' % repr(id)
-
- for folder in self.folders:
- folder.unpacksizes = [self._read64Bit(file) for x in xrange(folder.totalout)]
-
- id = file.read(1)
- if id == PROPERTY_CRC:
- digests = UnpackDigests(file, self.numfolders)
- for idx in xrange(self.numfolders):
- folder = self.folders[idx]
- folder.digestdefined = digests.defined[idx]
- folder.crc = digests.crcs[idx]
-
- id = file.read(1)
-
- if id != PROPERTY_END:
- raise FormatError, 'end id expected but %s found' % repr(id)
-
- class SubstreamsInfo(Base):
- """ defines the substreams of a folder """
-
- def __init__(self, file, numfolders, folders):
- self.digests = []
- self.digestsdefined = []
- id = file.read(1)
- if id == PROPERTY_NUM_UNPACK_STREAM:
- self.numunpackstreams = [self._read64Bit(file) for x in xrange(numfolders)]
- id = file.read(1)
- else:
- self.numunpackstreams = []
- for idx in xrange(numfolders):
- self.numunpackstreams.append(1)
-
- if id == PROPERTY_SIZE:
- sum = 0
- self.unpacksizes = []
- for i in xrange(len(self.numunpackstreams)):
- for j in xrange(1, self.numunpackstreams[i]):
- size = self._read64Bit(file)
- self.unpacksizes.append(size)
- sum += size
- self.unpacksizes.append(folders[i].getUnpackSize() - sum)
-
- id = file.read(1)
- if id == PROPERTY_CRC:
- numdigests = 0
- numdigeststotal = 0
- for i in xrange(numfolders):
- numsubstreams = self.numunpackstreams[i]
- if numsubstreams != 1 or not folders[i].digestdefined:
- numdigests += numsubstreams
- numdigeststotal += numsubstreams
-
- digests = Digests(file, numdigests)
- didx = 0
- for i in xrange(numfolders):
- folder = folders[i]
- numsubstreams = self.numunpackstreams[i]
- if numsubstreams == 1 and folder.digestdefined:
- self.digestsdefined.append(True)
- self.digests.append(folder.crc)
- else:
- for j in xrange(numsubstreams):
- self.digestsdefined.append(digests.defined[didx])
- self.digests.append(digests.crcs[didx])
- didx += 1
-
- id = file.read(1)
-
- if id != PROPERTY_END:
- raise FormatError, 'end id expected but %s found' % repr(id)
- if not self.digestsdefined:
- self.digestsdefined = [False] * numdigeststotal
- self.digests = [0] * numdigeststotal
- class StreamsInfo(Base):
- """ informations about compressed streams """
-
- def __init__(self, file):
- id = file.read(1)
- if id == PROPERTY_PACK_INFO:
- self.packinfo = PackInfo(file)
- id = file.read(1)
-
- if id == PROPERTY_UNPACK_INFO:
- self.unpackinfo = UnpackInfo(file)
- id = file.read(1)
-
- if id == PROPERTY_SUBSTREAMS_INFO:
- self.substreamsinfo = SubstreamsInfo(file, self.unpackinfo.numfolders, self.unpackinfo.folders)
- id = file.read(1)
-
- if id != PROPERTY_END:
- raise FormatError, 'end id expected but %s found' % repr(id)
- class FilesInfo(Base):
- """ holds file properties """
-
- def _readTimes(self, file, files, name):
- defined = self._readBoolean(file, len(files), checkall=1)
-
- for i in xrange(len(files)):
- if defined[i]:
- files[i][name] = self._readReal64Bit(file)[0]
- else:
- files[i][name] = None
- def __init__(self, file):
- self.numfiles = self._read64Bit(file)
- self.files = [{'emptystream': False} for x in xrange(self.numfiles)]
- numemptystreams = 0
- while True:
- typ = self._read64Bit(file)
- if typ > 255:
- raise FormatError, 'invalid type, must be below 256, is %d' % typ
-
- typ = chr(typ)
- if typ == PROPERTY_END:
- break
-
- size = self._read64Bit(file)
- buffer = StringIO(file.read(size))
- if typ == PROPERTY_EMPTY_STREAM:
- isempty = self._readBoolean(buffer, self.numfiles)
- map(lambda x, y: x.update({'emptystream': y}), self.files, isempty)
- for x in isempty:
- if x: numemptystreams += 1
- emptyfiles = [False] * numemptystreams
- antifiles = [False] * numemptystreams
- elif typ == PROPERTY_EMPTY_FILE:
- emptyfiles = self._readBoolean(buffer, numemptystreams)
- elif typ == PROPERTY_ANTI:
- antifiles = self._readBoolean(buffer, numemptystreams)
- elif typ == PROPERTY_NAME:
- external = buffer.read(1)
- if external != '\x00':
- self.dataindex = self._read64Bit(buffer)
-
- raise NotImplementedError
-
- for f in self.files:
- name = ''
- while True:
- ch = buffer.read(2)
- if ch == '\0\0':
- f['filename'] = unicode(name, 'utf-16')
- break
- name += ch
- elif typ == PROPERTY_CREATION_TIME:
- self._readTimes(buffer, self.files, 'creationtime')
- elif typ == PROPERTY_LAST_ACCESS_TIME:
- self._readTimes(buffer, self.files, 'lastaccesstime')
- elif typ == PROPERTY_LAST_WRITE_TIME:
- self._readTimes(buffer, self.files, 'lastwritetime')
- elif typ == PROPERTY_ATTRIBUTES:
- defined = self._readBoolean(buffer, self.numfiles, checkall=1)
- for i in xrange(self.numfiles):
- f = self.files[i]
- if defined[i]:
- f['attributes'] = unpack('<L', buffer.read(4))[0]
- else:
- f['attributes'] = None
- else:
- raise FormatError, 'invalid type %s' % repr(typ)
-
- class Header(Base):
- """ the archive header """
-
- def __init__(self, file):
- id = file.read(1)
- if id == PROPERTY_ARCHIVE_PROPERTIES:
- self.properties = ArchiveProperties(file)
- id = file.read(1)
-
- if id == PROPERTY_ADDITIONAL_STREAMS_INFO:
- self.additional_streams = StreamsInfo(file)
- id = file.read(1)
-
- if id == PROPERTY_MAIN_STREAMS_INFO:
- self.main_streams = StreamsInfo(file)
- id = file.read(1)
-
- if id == PROPERTY_FILES_INFO:
- self.files = FilesInfo(file)
- id = file.read(1)
-
- if id != PROPERTY_END:
- raise FormatError, 'end id expected but %s found' % (repr(id))
- class ArchiveFile:
- """ wrapper around a file in the archive """
-
- def __init__(self, info, start, src_start, size, folder, archive, maxsize=None):
- self.digest = None
- self._archive = archive
- self._file = archive._file
- self._start = start
- self._src_start = src_start
- self._folder = folder
- self.size = size
-
- self._maxsize = maxsize
- for k, v in info.items():
- setattr(self, k, v)
- self.reset()
- def reset(self):
- self.pos = 0
-
- def read(self):
- data = ''
- idx = 0
- cnt = 0
- dec = pylzma.decompressobj(maxlength=self._start+self.size)
- self._file.seek(self._src_start)
- dec.decompress(self._folder.coders[0]['properties'])
- total = self.compressed
- if total is None:
- remaining = self._start+self.size
- out = StringIO()
- while remaining > 0:
- data = self._file.read(1024)
- tmp = dec.decompress(data, remaining)
- out.write(tmp)
- remaining -= len(tmp)
-
- data = out.getvalue()
- else:
- data = dec.decompress(self._file.read(total), self._start+self.size)
- return data[self._start:self._start+self.size]
-
- def checkcrc(self):
- if self.digest is None:
- return True
-
- self.reset()
- data = self.read()
- crc = crc32(data)
-
-
- crc = unpack('<l', pack('<L', crc))[0]
- return crc == self.digest
- class Archive7z(Base):
- """ the archive itself """
-
- def __init__(self, file):
- self._file = file
- self.header = file.read(len(MAGIC_7Z))
- if self.header != MAGIC_7Z:
- raise FormatError, 'not a 7z file'
- self.version = unpack('BB', file.read(2))
- self.startheadercrc = unpack('<l', file.read(4))[0]
- self.nextheaderofs, data = self._readReal64Bit(file)
- crc = crc32(data)
- self.nextheadersize, data = self._readReal64Bit(file)
- crc = crc32(data, crc)
- data = file.read(4)
- self.nextheadercrc = unpack('<l', data)[0]
- crc = crc32(data, crc)
- if crc != self.startheadercrc:
- raise FormatError, 'invalid header data'
- self.afterheader = file.tell()
-
- file.seek(self.nextheaderofs, 1)
- buffer = StringIO(file.read(self.nextheadersize))
- if crc32(buffer.getvalue()) != self.nextheadercrc:
- raise FormatError, 'invalid header data'
-
- while True:
- id = buffer.read(1)
- if id == PROPERTY_HEADER:
- break
-
- if id != PROPERTY_ENCODED_HEADER:
- raise 'Unknown field:', repr(id)
-
- streams = StreamsInfo(buffer)
- file.seek(self.afterheader + 0)
- data = ''
- idx = 0
- for folder in streams.unpackinfo.folders:
- file.seek(streams.packinfo.packpos, 1)
- props = folder.coders[0]['properties']
- for idx in xrange(len(streams.packinfo.packsizes)):
- tmp = file.read(streams.packinfo.packsizes[idx])
- data += pylzma.decompress(props+tmp, maxlength=folder.unpacksizes[idx])
-
- if folder.digestdefined:
- if folder.crc != crc32(data):
- raise FormatError, 'invalid block data'
-
- buffer = StringIO(data)
-
- self.header = Header(buffer)
- self.files = []
-
- files = self.header.files
- folders = self.header.main_streams.unpackinfo.folders
- packinfo = self.header.main_streams.packinfo
- subinfo = self.header.main_streams.substreamsinfo
- packsizes = packinfo.packsizes
- self.solid = packinfo.numstreams == 1
- if self.solid:
-
- if hasattr(subinfo, 'unpacksizes'):
- unpacksizes = subinfo.unpacksizes
- else:
- unpacksizes = [x.unpacksizes[0] for x in folders]
- else:
-
- unpacksizes = [x.unpacksizes[0] for x in folders]
-
- fidx = 0
- obidx = 0
- src_pos = self.afterheader
- pos = 0
- maxsize = (self.solid and packinfo.packsizes[0]) or None
- for idx in xrange(files.numfiles):
- info = files.files[idx]
- folder = folders[fidx]
- if not info['emptystream']:
- info['compressed'] = (not self.solid and packsizes[obidx]) or None
- info['uncompressed'] = unpacksizes[obidx]
- file = ArchiveFile(info, pos, src_pos, unpacksizes[obidx], folder, self, maxsize=maxsize)
- if subinfo.digestsdefined[obidx]:
- file.digest = subinfo.digests[obidx]
- self.files.append(file)
- if self.solid:
- pos += unpacksizes[obidx]
- else:
- src_pos += packsizes[obidx]
- obidx += 1
- if not self.solid:
- fidx += 1
-
- self.numfiles = len(self.files)
- self.filenames = map(lambda x: x.filename, self.files)
-
-
-
- def getmember(self, name):
-
- for f in self.files:
- if f.filename == name:
- return f
-
- return None
-
- def getmembers(self):
- return self.files
-
- def getnames(self):
- return self.filenames
- def list(self, verbose=True):
- print 'total %d files in %sarchive' % (self.numfiles, (self.solid and 'solid ') or '')
- if not verbose:
- print '\n'.join(self.filenames)
- return
-
- for f in self.files:
- extra = (f.compressed and '%10d ' % (f.compressed)) or ' '
- print '%10d%s%s %s' % (f.size, extra, hex(f.digest)[2:-1], f.filename)
-
- if __name__ == '__main__':
- f = Archive7z(open('test.7z', 'rb'))
-
- f.list()
|