Source code for py7zr.compressor

#!/usr/bin/python -u
#
# p7zr library
#
# Copyright (c) 2019-2021 Hiroshi Miura <miurahr@linux.com>
# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
#
import bz2
import lzma
import struct
import zlib
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union

import ppmd as Ppmd  # type: ignore
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes

from py7zr.exceptions import PasswordRequired, UnsupportedCompressionMethodError
from py7zr.helpers import Buffer, BufferedRW, calculate_crc32, calculate_key
from py7zr.properties import (FILTER_ARM, FILTER_ARMTHUMB, FILTER_BZIP2, FILTER_COPY, FILTER_CRYPTO_AES256_SHA256,
                              FILTER_DEFLATE, FILTER_DELTA, FILTER_IA64, FILTER_LZMA, FILTER_LZMA2, FILTER_POWERPC,
                              FILTER_PPMD, FILTER_SPARC, FILTER_X86, FILTER_ZSTD, MAGIC_7Z, CompressionMethod,
                              RuntimeConstant)

try:
    import bcj as BCJFilter  # type: ignore  # noqa
except ImportError:
    import py7zr.bcjfilter as BCJFilter  # type: ignore  # noqa
try:
    import pyzstd  # noqa

    import py7zr.pyzstdfilter as Zstd  # type: ignore  # noqa
except ImportError:
    import py7zr.zstdfilter as Zstd  # type: ignore  # noqa


[docs]class ISevenZipCompressor(ABC):
[docs] @abstractmethod def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: ''' Compress data (interface) :param data: input data :return: output data ''' pass
[docs] @abstractmethod def flush(self) -> bytes: ''' Flush output buffer(interface) :return: output data ''' pass
[docs]class ISevenZipDecompressor(ABC):
[docs] @abstractmethod def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: ''' Decompress data (interface) :param data: input data :param max_length: maximum length of output data when it can respect, otherwise ignore. :return: output data ''' pass
[docs]class AESCompressor(ISevenZipCompressor): '''AES Compression(Encryption) class. It accept pre-processing filter which may be a LZMA compression.''' AES_CBC_BLOCKSIZE = 16 def __init__(self, password: str) -> None: self.cycles = 19 # as same as p7zip self.iv = get_random_bytes(16) self.salt = b'' self.method = CompressionMethod.CRYPT_AES256_SHA256 key = calculate_key(password.encode('utf-16LE'), self.cycles, self.salt, 'sha256') self.iv += bytes(self.AES_CBC_BLOCKSIZE - len(self.iv)) # zero padding if iv < AES_CBC_BLOCKSIZE self.cipher = AES.new(key, AES.MODE_CBC, self.iv) self.flushed = False self.buf = Buffer(size=RuntimeConstant().READ_BLOCKSIZE + self.AES_CBC_BLOCKSIZE * 2) def encode_filter_properties(self): saltsize = len(self.salt) ivsize = len(self.iv) ivfirst = 1 # it should always 1 saltfirst = 1 if len(self.salt) > 0 else 0 firstbyte = (self.cycles + (ivfirst << 6) + (saltfirst << 7)).to_bytes(1, 'little') secondbyte = (((ivsize - 1) & 0x0f) + (((saltsize - saltfirst) << 4) & 0xf0)).to_bytes(1, 'little') properties = firstbyte + secondbyte + self.salt + self.iv return properties
[docs] def compress(self, data): '''Compression + AES encryption with 16byte alignment.''' # The size is < 16 which should be only last chunk. # From p7zip/CPP/7zip/common/FilterCoder.cpp # /* # AES filters need 16-bytes alignment for HARDWARE-AES instructions. # So we call IFilter::Filter(, size), where (size != 16 * N) only for last data block. # AES-CBC filters need data size aligned for 16-bytes. # So the encoder can add zeros to the end of original stream. # Some filters (BCJ and others) don't process data at the end of stream in some cases. # So the encoder and decoder write such last bytes without change. # */ currentlen = len(self.buf) + len(data) # hopefully aligned and larger than block size. if currentlen >= 16 and (currentlen & 0x0f) == 0: self.buf.add(data) res = self.cipher.encrypt(self.buf.view) self.buf.reset() elif currentlen > 16: # when not aligned # nextpos = (currentlen // self.AES_CBC_BLOCKSIZE) * self.AES_CBC_BLOCKSIZE nextpos = currentlen & ~0x0f buflen = len(self.buf) self.buf.add(data[:nextpos - buflen]) res = self.cipher.encrypt(self.buf.view) self.buf.set(data[nextpos - buflen:]) else: # pragma: no-cover # smaller than block size, it will processed when flush() self.buf.add(data) res = b'' return res
[docs] def flush(self): if len(self.buf) > 0: # padlen = 16 - currentlen % 16 if currentlen % 16 > 0 else 0 padlen = -len(self.buf) & 15 self.buf.add(bytes(padlen)) res = self.cipher.encrypt(self.buf.view) self.buf.reset() else: res = b'' return res
[docs]class AESDecompressor(ISevenZipDecompressor): def __init__(self, aes_properties: bytes, password: str) -> None: firstbyte = aes_properties[0] numcyclespower = firstbyte & 0x3f if firstbyte & 0xc0 != 0: saltsize = (firstbyte >> 7) & 1 ivsize = (firstbyte >> 6) & 1 secondbyte = aes_properties[1] saltsize += (secondbyte >> 4) ivsize += (secondbyte & 0x0f) assert len(aes_properties) == 2 + saltsize + ivsize salt = aes_properties[2:2 + saltsize] iv = aes_properties[2 + saltsize:2 + saltsize + ivsize] assert len(salt) == saltsize assert len(iv) == ivsize assert numcyclespower <= 24 if ivsize < 16: iv += bytes('\x00' * (16 - ivsize), 'ascii') key = calculate_key(password.encode('utf-16LE'), numcyclespower, salt, 'sha256') self.cipher = AES.new(key, AES.MODE_CBC, iv) self.buf = Buffer(size=RuntimeConstant().READ_BLOCKSIZE + 16) else: raise UnsupportedCompressionMethodError
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: currentlen = len(self.buf) + len(data) # when aligned to 16 bytes(expected) if len(data) > 0 and (currentlen & 0x0f) == 0: self.buf.add(data) temp = self.cipher.decrypt(self.buf.view) self.buf.reset() return temp elif len(data) > 0: # pragma: no-cover # nextpos = (currentlen // 16) * 16 nextpos = currentlen & ~0x0f buflen = len(self.buf) temp2 = data[nextpos - buflen:] self.buf.add(data[:nextpos - buflen]) temp = self.cipher.decrypt(self.buf.view) self.buf.set(temp2) return temp elif len(self.buf) == 0: # pragma: no-cover # action flush return b'' else: # pragma: no-cover # action padding # align = 16 # padlen = (align - offset % align) % align # = (align - (offset & (align - 1))) & (align - 1) # = -offset & (align -1) # = -offset & (16 - 1) = -offset & 15 padlen = -len(self.buf) & 15 self.buf.add(bytes(padlen)) temp3 = self.cipher.decrypt(self.buf.view) # type: bytes self.buf.reset() return temp3
[docs]class DeflateCompressor(ISevenZipCompressor): def __init__(self): self._compressor = zlib.compressobj(wbits=-15)
[docs] def compress(self, data): return self._compressor.compress(data)
[docs] def flush(self): return self._compressor.flush()
[docs]class DeflateDecompressor(ISevenZipDecompressor): def __init__(self): self.flushed = False self._decompressor = zlib.decompressobj(wbits=-15)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: if len(data) == 0: if self.flushed: return b'' else: self.flushed = True return self._decompressor.flush() return self._decompressor.decompress(data)
[docs]class CopyCompressor(ISevenZipCompressor):
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return bytes(data)
[docs] def flush(self): return b''
[docs]class CopyDecompressor(ISevenZipDecompressor):
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return bytes(data)
[docs]class PpmdDecompressor(ISevenZipDecompressor): def __init__(self, properties: bytes): if not isinstance(properties, bytes): raise UnsupportedCompressionMethodError if len(properties) == 5: level, mem = struct.unpack("<BL", properties) elif len(properties) == 7: level, mem, _, _ = struct.unpack("<BLBB", properties) else: raise UnsupportedCompressionMethodError self.block_size = RuntimeConstant().READ_BLOCKSIZE self._buf = BufferedRW() self.decoder = None self.level = level self.mem = mem self.initialized = False def _init2(self): self.decoder = Ppmd.Ppmd7Decoder(self._buf, self.level, self.mem) # type: ignore self.initialized = True
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length=-1) -> bytes: self._buf.write(data) if not self.initialized: if len(self._buf) <= 4: return b'' self._init2() assert self.decoder is not None if max_length <= 0: return self.decoder.decode(1) if len(data) == 0: return self.decoder.decode(max_length) # size = min(self.block_size, max_length) res = bytearray() while len(self._buf) > 0 and len(res) < size: res += self.decoder.decode(1) return bytes(res)
[docs]class PpmdCompressor(ISevenZipCompressor): def __init__(self, level: int, mem: int): self._buf = BufferedRW() self.encoder = Ppmd.Ppmd7Encoder(self._buf, level, mem) # type: ignore
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: self.encoder.encode(data) return self._buf.read()
[docs] def flush(self): self.encoder.flush() return self._buf.read()
[docs]class BcjSparcDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = BCJFilter.SparcDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BcjSparcEncoder(ISevenZipCompressor): def __init__(self): self.encoder = BCJFilter.SparcEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
[docs]class BcjPpcDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = BCJFilter.PpcDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BcjPpcEncoder(ISevenZipCompressor): def __init__(self): self.encoder = BCJFilter.PpcEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
[docs]class BcjArmtDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = BCJFilter.ArmtDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BcjArmtEncoder(ISevenZipCompressor): def __init__(self): self.encoder = BCJFilter.ArmtEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
[docs]class BcjArmDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = BCJFilter.ArmDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BcjArmEncoder(ISevenZipCompressor): def __init__(self): self.encoder = BCJFilter.ArmEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
[docs]class BCJDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = BCJFilter.BCJDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BCJEncoder(ISevenZipCompressor): def __init__(self): self.encoder = BCJFilter.BCJEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
algorithm_class_map = { FILTER_ZSTD: (Zstd.ZstdCompressor, Zstd.ZstdDecompressor), FILTER_PPMD: (PpmdCompressor, PpmdDecompressor), FILTER_BZIP2: (bz2.BZ2Compressor, bz2.BZ2Decompressor), FILTER_COPY: (CopyCompressor, CopyDecompressor), FILTER_DEFLATE: (DeflateCompressor, DeflateDecompressor), FILTER_CRYPTO_AES256_SHA256: (AESCompressor, AESDecompressor), FILTER_X86: (BCJEncoder, BCJDecoder), FILTER_ARM: (BcjArmEncoder, BcjArmDecoder), FILTER_ARMTHUMB: (BcjArmtEncoder, BcjArmtDecoder), FILTER_POWERPC: (BcjPpcEncoder, BcjPpcDecoder), FILTER_SPARC: (BcjSparcEncoder, BcjSparcDecoder), } # type: Dict[int, Tuple[Any, Any]]
[docs]class LZMA1Decompressor(ISevenZipDecompressor): def __init__(self, filters, unpacksize): self._decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters) self.unpacksize = unpacksize
[docs] def decompress(self, data, max_length): return self._decompressor.decompress(data, max_length)
[docs]class SevenZipDecompressor: """Main decompressor object which is properly configured and bind to each 7zip folder. because 7zip folder can have a custom compression method""" def __init__(self, coders: List[Dict[str, Any]], packsize: int, unpacksizes: List[int], crc: Optional[int], password: Optional[str] = None) -> None: self.input_size = packsize self.unpacksizes = unpacksizes self.consumed: int = 0 self.crc = crc self.digest: int = 0 self.block_size: int = RuntimeConstant().READ_BLOCKSIZE if len(coders) > 4: raise UnsupportedCompressionMethodError('Maximum cascade of filters is 4 but got {}.'.format(len(coders))) self.methods_map = [SupportedMethods.is_native_coder(coder) for coder in coders] # type: List[bool] # Check if password given for encrypted archive if SupportedMethods.needs_password(coders) and password is None: raise PasswordRequired("Password is required for extracting given archive.") # Check filters combination and required parameters if len(coders) >= 2: target_compressor = False has_bcj = False bcj_index = -1 for i, coder in enumerate(coders): filter_id = SupportedMethods.get_filter_id(coder) if SupportedMethods.is_compressor_id(filter_id) and filter_id != FILTER_LZMA2: target_compressor = True if filter_id in [FILTER_X86, FILTER_ARM, FILTER_ARMTHUMB, FILTER_POWERPC, FILTER_SPARC]: has_bcj = True bcj_index = i # hack for LZMA1+BCJ which should be native+alternative if target_compressor and has_bcj: self.methods_map[bcj_index] = False break self.chain = [] # type: List[Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor]] self._unpacksizes = [] # type: List[int] self.input_size = self.input_size shift = 0 prev = False for i, r in enumerate(self.methods_map): shift += 1 if r and prev else 0 prev = r self._unpacksizes.append(unpacksizes[i - shift]) self._unpacked = [0 for _ in range(len(self._unpacksizes))] self.consumed = 0 self._unused = bytearray() self._buf = bytearray() self._pos = 0 # --- if all(self.methods_map): decompressor = self._get_lzma_decompressor(coders, unpacksizes[-1]) self.chain.append(decompressor) elif not any(self.methods_map): for i in range(len(coders)): self.chain.append(self._get_alternative_decompressor(coders[i], unpacksizes[i], password)) elif any(self.methods_map): for i in range(len(coders)): if (not any(self.methods_map[:i])) and all(self.methods_map[i:]): for j in range(i): self.chain.append(self._get_alternative_decompressor(coders[j], unpacksizes[j], password)) self.chain.append(self._get_lzma_decompressor(coders[i:], unpacksizes[i])) break else: for i in range(len(coders)): if self.methods_map[i]: self.chain.append(self._get_lzma_decompressor([coders[i]], unpacksizes[i])) else: self.chain.append(self._get_alternative_decompressor(coders[i], unpacksizes[i], password)) else: raise UnsupportedCompressionMethodError def _decompress(self, data, max_length: int): for i, decompressor in enumerate(self.chain): if self._unpacked[i] < self._unpacksizes[i]: if isinstance(decompressor, LZMA1Decompressor): data = decompressor.decompress(data, max_length) # always give max_length for lzma1 else: data = decompressor.decompress(data) self._unpacked[i] += len(data) elif len(data) == 0: data = b'' else: raise EOFError return data def decompress(self, fp, max_length: int = -1) -> bytes: # read data from disk rest_size = self.input_size - self.consumed read_size = min(rest_size, self.block_size) data = fp.read(read_size) self.consumed += len(data) # if max_length < 0: res = self._buf[self._pos:] + self._decompress(self._unused + data, max_length) self._buf = bytearray() self._unused = bytearray() self._pos = 0 else: current_buf_len = len(self._buf) - self._pos if current_buf_len >= max_length: self._unused.extend(data) res = self._buf[self._pos:self._pos + max_length] self._pos += max_length else: if len(self._unused) > 0: tmp = self._decompress(self._unused + data, max_length) self._unused = bytearray() else: tmp = self._decompress(data, max_length) if current_buf_len + len(tmp) <= max_length: res = self._buf[self._pos:] + tmp self._buf = bytearray() self._pos = 0 else: res = self._buf[self._pos:] + tmp[:max_length - current_buf_len] self._buf = bytearray(tmp[max_length - current_buf_len:]) self._pos = 0 self.digest = calculate_crc32(res, self.digest) return res def check_crc(self): return self.crc == self.digest @property def unused_size(self): return len(self._unused) def _get_lzma_decompressor(self, coders: List[Dict[str, Any]], unpacksize: int): filters = [] # type: List[Dict[str, Any]] lzma1 = False for coder in coders: if coder['numinstreams'] != 1 or coder['numoutstreams'] != 1: raise UnsupportedCompressionMethodError('Only a simple compression method is currently supported.') if not SupportedMethods.is_native_coder(coder): raise UnsupportedCompressionMethodError properties = coder.get('properties', None) filter_id = SupportedMethods.get_filter_id(coder) if filter_id == FILTER_LZMA: lzma1 = True if properties is not None: filters[:0] = [lzma._decode_filter_properties(filter_id, properties)] # type: ignore else: filters[:0] = [{'id': filter_id}] if lzma1: return LZMA1Decompressor(filters, unpacksize) else: return lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters) def _get_alternative_decompressor(self, coder: Dict[str, Any], unpacksize=None, password=None) -> Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor]: # noqa filter_id = SupportedMethods.get_filter_id(coder) # Special treatment for BCJ filters if filter_id in [FILTER_X86, FILTER_ARM, FILTER_ARMTHUMB, FILTER_POWERPC, FILTER_SPARC]: return algorithm_class_map[filter_id][1](size=unpacksize) # Check supported? if SupportedMethods.is_native_coder(coder): raise UnsupportedCompressionMethodError('Unknown method code:{}'.format(coder['method'])) if filter_id not in algorithm_class_map: raise UnsupportedCompressionMethodError('Unknown method filter_id:{}'.format(filter_id)) if algorithm_class_map[filter_id][1] is None: raise UnsupportedCompressionMethodError( 'Decompression is not supported by {}.'.format(SupportedMethods.get_method_name_id(filter_id))) # if SupportedMethods.is_crypto_id(filter_id): return algorithm_class_map[filter_id][1](coder['properties'], password) elif SupportedMethods.need_property(filter_id): return algorithm_class_map[filter_id][1](coder['properties']) else: return algorithm_class_map[filter_id][1]()
[docs]class SevenZipCompressor: """Main compressor object to configured for each 7zip folder.""" __slots__ = ['filters', 'chain', 'compressor', 'coders', 'methods_map', 'digest', 'packsize', '_block_size', '_unpacksizes'] def __init__(self, filters=None, password=None): self.filters: List[ISevenZipCompressor] = [] self.chain = [] self.digest = 0 self.packsize = 0 self._unpacksizes = [] self._block_size = RuntimeConstant().READ_BLOCKSIZE if filters is None: self.filters = [{"id": lzma.FILTER_LZMA2, "preset": 7 | lzma.PRESET_EXTREME}] else: self.filters = filters if len(self.filters) > 4: raise UnsupportedCompressionMethodError('Maximum cascade of filters is 4 but got {}.'.format(len(self.filters))) self.methods_map = [SupportedMethods.is_native_filter(filter) for filter in self.filters] self.coders = [] if all(self.methods_map) and SupportedMethods.is_compressor(self.filters[-1]): # all native self._set_native_compressors_coders(self.filters) return # for i, f in enumerate(self.filters): if f['id'] == FILTER_X86: self.methods_map[i] = False # if not any(self.methods_map): # all alternative for f in filters: self._set_alternate_compressors_coders(f, password) elif SupportedMethods.is_crypto_id(self.filters[-1]['id']) and all(self.methods_map[:-1]): self._set_native_compressors_coders(self.filters[:-1]) self._set_alternate_compressors_coders(self.filters[-1], password) else: raise UnsupportedCompressionMethodError def _set_native_compressors_coders(self, filters): self.chain.append(lzma.LZMACompressor(format=lzma.FORMAT_RAW, filters=filters)) self._unpacksizes.append(0) for filter in filters: self.coders.insert(0, SupportedMethods.get_coder(filter)) def _set_alternate_compressors_coders(self, alt_filter, password=None): filter_id = alt_filter['id'] properties = None if filter_id not in algorithm_class_map: raise UnsupportedCompressionMethodError elif SupportedMethods.is_crypto_id(filter_id): compressor = algorithm_class_map[filter_id][0](password) elif SupportedMethods.need_property(filter_id): if filter_id == FILTER_ZSTD: level = alt_filter.get('level', 3) properties = struct.pack("BBBBB", Zstd.ZSTD_VERSION[0], Zstd.ZSTD_VERSION[1], level, 0, 0) compressor = algorithm_class_map[filter_id][0](level=level) elif filter_id == FILTER_PPMD: order = alt_filter.get('level', 6) mem_size = alt_filter.get('mem', 16) << 20 properties = struct.pack("<BLBB", order, mem_size, 0, 0) compressor = algorithm_class_map[filter_id][0](order, mem_size) else: compressor = algorithm_class_map[filter_id][0]() if SupportedMethods.is_crypto_id(filter_id): properties = compressor.encode_filter_properties() self.chain.append(compressor) self._unpacksizes.append(0) self.coders.insert(0, {'method': SupportedMethods.get_method_id(filter_id), 'properties': properties, 'numinstreams': 1, 'numoutstreams': 1}) def compress(self, fd, fp, crc=0): data = fd.read(self._block_size) insize = len(data) foutsize = 0 while data: crc = calculate_crc32(data, crc) for i, compressor in enumerate(self.chain): self._unpacksizes[i] += len(data) data = compressor.compress(data) self.packsize += len(data) self.digest = calculate_crc32(data, self.digest) foutsize += len(data) fp.write(data) data = fd.read(self._block_size) insize += len(data) return insize, foutsize, crc def flush(self, fp): data = None for i, compressor in enumerate(self.chain): if data: self._unpacksizes[i] += len(data) data = compressor.compress(data) data += compressor.flush() else: data = compressor.flush() self.packsize += len(data) self.digest = calculate_crc32(data, self.digest) fp.write(data) return len(data) @property def unpacksizes(self): result = [] shift = 0 prev = False for i, r in enumerate(self.methods_map): shift += 1 if r and prev else 0 prev = r result.insert(0, self._unpacksizes[i - shift]) return result
[docs]class MethodsType(Enum): compressor = 0 filter = 1 crypto = 2
[docs]class SupportedMethods: """Hold list of methods.""" formats = [{'name': "7z", 'magic': MAGIC_7Z}] methods = [{'id': CompressionMethod.COPY, 'name': 'COPY', 'native': False, 'need_prop': False, 'filter_id': FILTER_COPY, 'type': MethodsType.compressor}, {'id': CompressionMethod.LZMA2, 'name': "LZMA2", 'native': True, 'need_prop': True, 'filter_id': FILTER_LZMA2, 'type': MethodsType.compressor}, {'id': CompressionMethod.DELTA, 'name': "DELTA", 'native': True, 'need_prop': True, 'filter_id': FILTER_DELTA, 'type': MethodsType.filter}, {'id': CompressionMethod.LZMA, 'name': "LZMA", 'native': True, 'need_prop': True, 'filter_id': FILTER_LZMA, 'type': MethodsType.compressor}, {'id': CompressionMethod.P7Z_BCJ, 'name': "BCJ", 'native': True, 'need_prop': False, 'filter_id': FILTER_X86, 'type': MethodsType.filter}, {'id': CompressionMethod.BCJ_PPC, 'name': 'PPC', 'native': True, 'need_prop': False, 'filter_id': FILTER_POWERPC, 'type': MethodsType.filter}, {'id': CompressionMethod.BCJ_IA64, 'name': 'IA64', 'native': True, 'need_prop': False, 'filter_id': FILTER_IA64, 'type': MethodsType.filter}, {'id': CompressionMethod.BCJ_ARM, 'name': "ARM", 'native': True, 'need_prop': False, 'filter_id': FILTER_ARM, 'type': MethodsType.filter}, {'id': CompressionMethod.BCJ_ARMT, 'name': "ARMT", 'native': True, 'need_prop': False, 'filter_id': FILTER_ARMTHUMB, 'type': MethodsType.filter}, {'id': CompressionMethod.BCJ_SPARC, 'name': 'SPARC', 'native': True, 'need_prop': False, 'filter_id': FILTER_SPARC, 'type': MethodsType.filter}, {'id': CompressionMethod.MISC_DEFLATE, 'name': 'DEFLATE', 'native': False, 'need_prop': False, 'filter_id': FILTER_DEFLATE, 'type': MethodsType.compressor}, {'id': CompressionMethod.MISC_BZIP2, 'name': 'BZip2', 'native': False, 'need_prop': False, 'filter_id': FILTER_BZIP2, 'type': MethodsType.compressor}, {'id': CompressionMethod.MISC_ZSTD, 'name': 'ZStandard', 'native': False, 'need_prop': True, 'filter_id': FILTER_ZSTD, 'type': MethodsType.compressor}, {'id': CompressionMethod.PPMD, 'name': 'PPMd', 'native': False, 'need_prop': True, 'filter_id': FILTER_PPMD, 'type': MethodsType.compressor}, {'id': CompressionMethod.CRYPT_AES256_SHA256, 'name': '7zAES', 'native': False, 'need_prop': True, 'filter_id': FILTER_CRYPTO_AES256_SHA256, 'type': MethodsType.crypto}, ] @classmethod def _find_method(cls, key_id, key_value): return next((item for item in cls.methods if item[key_id] == key_value), None) @classmethod def get_method_name_id(cls, filter_id): method = cls._find_method('filter_id', filter_id) return method['name'] @classmethod def get_filter_id(cls, coder): method = cls._find_method('id', coder['method']) if method is None: return None return method['filter_id'] @classmethod def is_native_filter(cls, filter) -> bool: method = cls._find_method('filter_id', filter['id']) if method is None: raise UnsupportedCompressionMethodError return method['native'] @classmethod def is_compressor(cls, filter): method = cls._find_method('filter_id', filter['id']) return method['type'] == MethodsType.compressor @classmethod def is_compressor_id(cls, filter_id): method = cls._find_method('filter_id', filter_id) return method['type'] == MethodsType.compressor @classmethod def is_native_coder(cls, coder) -> bool: method = cls._find_method('id', coder['method']) if method is None: raise UnsupportedCompressionMethodError return method['native'] @classmethod def need_property(cls, filter_id): method = cls._find_method('filter_id', filter_id) if method is None: raise UnsupportedCompressionMethodError return method['need_prop'] @classmethod def is_crypto_id(cls, filter_id) -> bool: method = cls._find_method('filter_id', filter_id) if method is None: raise UnsupportedCompressionMethodError return method['type'] == MethodsType.crypto @classmethod def get_method_id(cls, filter_id) -> bytes: method = cls._find_method('filter_id', filter_id) if method is None: raise UnsupportedCompressionMethodError return method['id'] @classmethod def get_coder(cls, filter) -> Dict[str, Any]: method = cls.get_method_id(filter['id']) if filter['id'] in [lzma.FILTER_LZMA1, lzma.FILTER_LZMA2, lzma.FILTER_DELTA]: properties = lzma._encode_filter_properties(filter) # type: Optional[bytes] # type: ignore # noqa else: properties = None return {'method': method, 'properties': properties, 'numinstreams': 1, 'numoutstreams': 1} @classmethod def needs_password(cls, coders) -> bool: for coder in coders: filter_id = SupportedMethods.get_filter_id(coder) if filter_id is None: continue if SupportedMethods.is_crypto_id(filter_id): return True return False
def get_methods_names_string(coders_lists: List[List[dict]]) -> str: # list of known method names with a display priority order methods_namelist = ['LZMA2', 'LZMA', 'BZip2', 'DEFLATE', 'DEFLATE64*', 'delta', 'COPY', 'PPMd', 'ZStandard', 'LZ4*', 'BCJ2*', 'BCJ', 'ARM', 'ARMT', 'IA64', 'PPC', 'SPARC', '7zAES'] unsupported_methods = {CompressionMethod.P7Z_BCJ2: 'BCJ2*', CompressionMethod.MISC_LZ4: 'LZ4*', CompressionMethod.MISC_DEFLATE64: 'DEFLATE64*'} methods_names = [] for coders in coders_lists: for coder in coders: for m in SupportedMethods.methods: if coder['method'] == m['id']: methods_names.append(m['name']) if coder['method'] in unsupported_methods: methods_names.append(unsupported_methods[coder['method']]) return ', '.join(filter(lambda x: x in methods_names, methods_namelist))