Source code for py7zr.compressor

#!/usr/bin/python -u
# p7zr library
# Copyright (c) 2019-2021 Hiroshi Miura <>
# Copyright (c) 2004-2015 by Joachim Bauch,
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
import bz2
import lzma
import struct
import zlib
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Union

import bcj  # type: ignore  # noqa
import pyppmd
import pyzstd
from zipfile_deflate64 import deflate64  # type: ignore
from Cryptodome.Cipher import AES
from Cryptodome.Random import get_random_bytes

from py7zr.exceptions import PasswordRequired, UnsupportedCompressionMethodError
from py7zr.helpers import Buffer, calculate_crc32, calculate_key
from import (

    import brotli  # type: ignore  # noqa
except ImportError:
    import brotlicffi as brotli  # type: ignore  # noqa
brotli_major = 1
brotli_minor = 0

[docs]class ISevenZipCompressor(ABC):
[docs] @abstractmethod def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: """ Compress data (interface) :param data: input data :return: output data """ pass
[docs] @abstractmethod def flush(self) -> bytes: """ Flush output buffer(interface) :return: output data """ pass
[docs]class ISevenZipDecompressor(ABC):
[docs] @abstractmethod def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: """ Decompress data (interface) :param data: input data :param max_length: maximum length of output data when it can respect, otherwise ignore. :return: output data """ pass
[docs]class AESCompressor(ISevenZipCompressor): """AES Compression(Encryption) class. It accept pre-processing filter which may be a LZMA compression.""" AES_CBC_BLOCKSIZE = 16 def __init__(self, password: str, blocksize: Optional[int] = None) -> None: self.cycles = 19 # as same as p7zip self.iv = get_random_bytes(16) self.salt = b"" self.method = CompressionMethod.CRYPT_AES256_SHA256 key = calculate_key(password.encode("utf-16LE"), self.cycles, self.salt, "sha256") self.iv += bytes(self.AES_CBC_BLOCKSIZE - len(self.iv)) # zero padding if iv < AES_CBC_BLOCKSIZE self.cipher =, AES.MODE_CBC, self.iv) self.flushed = False if blocksize: self.buf = Buffer(size=blocksize + self.AES_CBC_BLOCKSIZE * 2) else: self.buf = Buffer(size=get_default_blocksize() + self.AES_CBC_BLOCKSIZE * 2) def encode_filter_properties(self): saltsize = len(self.salt) ivsize = len(self.iv) ivfirst = 1 # it should always 1 saltfirst = 1 if len(self.salt) > 0 else 0 firstbyte = (self.cycles + (ivfirst << 6) + (saltfirst << 7)).to_bytes(1, "little") secondbyte = (((ivsize - 1) & 0x0F) + (((saltsize - saltfirst) << 4) & 0xF0)).to_bytes(1, "little") properties = firstbyte + secondbyte + self.salt + self.iv return properties
[docs] def compress(self, data): """Compression + AES encryption with 16byte alignment.""" # The size is < 16 which should be only last chunk. # From p7zip/CPP/7zip/common/FilterCoder.cpp # /* # AES filters need 16-bytes alignment for HARDWARE-AES instructions. # So we call IFilter::Filter(, size), where (size != 16 * N) only for last data block. # AES-CBC filters need data size aligned for 16-bytes. # So the encoder can add zeros to the end of original stream. # Some filters (BCJ and others) don't process data at the end of stream in some cases. # So the encoder and decoder write such last bytes without change. # */ currentlen = len(self.buf) + len(data) # hopefully aligned and larger than block size. if currentlen >= 16 and (currentlen & 0x0F) == 0: self.buf.add(data) res = self.cipher.encrypt(self.buf.view) self.buf.reset() elif currentlen > 16: # when not aligned # nextpos = (currentlen // self.AES_CBC_BLOCKSIZE) * self.AES_CBC_BLOCKSIZE nextpos = currentlen & ~0x0F buflen = len(self.buf) self.buf.add(data[: nextpos - buflen]) res = self.cipher.encrypt(self.buf.view) self.buf.set(data[nextpos - buflen :]) else: # pragma: no-cover # smaller than block size, it will processed when flush() self.buf.add(data) res = b"" return res
[docs] def flush(self): if len(self.buf) > 0: # padlen = 16 - currentlen % 16 if currentlen % 16 > 0 else 0 padlen = -len(self.buf) & 15 self.buf.add(bytes(padlen)) res = self.cipher.encrypt(self.buf.view) self.buf.reset() else: res = b"" return res
[docs]class AESDecompressor(ISevenZipDecompressor): """Decrypt data""" def __init__(self, aes_properties: bytes, password: str, blocksize: Optional[int] = None) -> None: firstbyte = aes_properties[0] numcyclespower = firstbyte & 0x3F if firstbyte & 0xC0 != 0: saltsize = (firstbyte >> 7) & 1 ivsize = (firstbyte >> 6) & 1 secondbyte = aes_properties[1] saltsize += secondbyte >> 4 ivsize += secondbyte & 0x0F assert len(aes_properties) == 2 + saltsize + ivsize salt = aes_properties[2 : 2 + saltsize] iv = aes_properties[2 + saltsize : 2 + saltsize + ivsize] assert len(salt) == saltsize assert len(iv) == ivsize assert numcyclespower <= 24 if ivsize < 16: iv += bytes("\x00" * (16 - ivsize), "ascii") key = calculate_key(password.encode("utf-16LE"), numcyclespower, salt, "sha256") self.cipher =, AES.MODE_CBC, iv) if blocksize: self.buf = Buffer(size=blocksize + 16) else: self.buf = Buffer(size=get_default_blocksize() + 16) else: raise UnsupportedCompressionMethodError(firstbyte, "Wrong 7zAES properties")
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: currentlen = len(self.buf) + len(data) # when aligned to 16 bytes(expected) if len(data) > 0 and (currentlen & 0x0F) == 0: self.buf.add(data) temp = self.cipher.decrypt(self.buf.view) self.buf.reset() return temp elif len(data) > 0: # pragma: no-cover # nextpos = (currentlen // 16) * 16 nextpos = currentlen & ~0x0F buflen = len(self.buf) temp2 = data[nextpos - buflen :] self.buf.add(data[: nextpos - buflen]) temp = self.cipher.decrypt(self.buf.view) self.buf.set(temp2) return temp elif len(self.buf) == 0: # pragma: no-cover # action flush return b"" else: # pragma: no-cover # action padding # align = 16 # padlen = (align - offset % align) % align # = (align - (offset & (align - 1))) & (align - 1) # = -offset & (align -1) # = -offset & (16 - 1) = -offset & 15 padlen = -len(self.buf) & 15 self.buf.add(bytes(padlen)) temp3 = self.cipher.decrypt(self.buf.view) # type: bytes self.buf.reset() return temp3
[docs]class DeflateCompressor(ISevenZipCompressor): def __init__(self): self._compressor = zlib.compressobj(wbits=-15)
[docs] def compress(self, data): return self._compressor.compress(data)
[docs] def flush(self): return self._compressor.flush()
[docs]class DeflateDecompressor(ISevenZipDecompressor): def __init__(self): self.flushed = False self._decompressor = zlib.decompressobj(wbits=-15)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: if len(data) == 0: if self.flushed: return b"" else: self.flushed = True return self._decompressor.flush() return self._decompressor.decompress(data)
[docs]class Deflate64Compressor(ISevenZipCompressor): def __init__(self): raise RuntimeError("Deflate64 compression not implemented yet")
[docs]class Deflate64Decompressor(ISevenZipDecompressor): def __init__(self): self.flushed = False self._decompressor = deflate64.Deflate64()
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: if len(data) == 0: if self.flushed: return b"" else: self.flushed = True return self._decompressor.flush() return self._decompressor.decompress(data)
[docs]class CopyCompressor(ISevenZipCompressor):
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return bytes(data)
[docs] def flush(self): return b""
[docs]class CopyDecompressor(ISevenZipDecompressor):
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return bytes(data)
[docs]class PpmdDecompressor(ISevenZipDecompressor): """Decompress PPMd compressed data""" def __init__(self, properties: bytes, blocksize: Optional[int] = None): if not isinstance(properties, bytes): raise UnsupportedCompressionMethodError(properties, "Unknown type of properties is passed") if len(properties) == 5: order, mem = struct.unpack("<BL", properties) elif len(properties) == 7: order, mem, _, _ = struct.unpack("<BLBB", properties) else: raise UnsupportedCompressionMethodError(properties, "Unknown size of properties is passed") self.decoder = pyppmd.Ppmd7Decoder(order, mem)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length=-1) -> bytes: if len(data) == 0 and self.decoder.needs_input: return self.decoder.decode(b"\0", max_length) return self.decoder.decode(data, max_length)
[docs]class PpmdCompressor(ISevenZipCompressor): """Compress with PPMd compression algorithm""" def __init__(self, properties: bytes): order, mem = self._decode_property(properties) self.encoder = pyppmd.Ppmd7Encoder(order, mem)
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
def _decode_property(self, properties): order, mem, _, _ = struct.unpack("<BLBB", properties) return order, mem @classmethod def encode_filter_properties(cls, filter: Dict[str, Union[str, int]]): order = filter.get("order", 8) mem = filter.get("mem", 24) if isinstance(mem, str): if mem.isdecimal(): size = 1 << int(mem) elif mem.lower().endswith("m") and mem[:-1].isdecimal(): size = int(mem[:-1]) << 20 elif mem.lower().endswith("k") and mem[:-1].isdecimal(): size = int(mem[:-1]) << 10 elif mem.lower().endswith("b") and mem[:-1].isdecimal(): size = int(mem[:-1]) else: raise ValueError("Ppmd:Unsupported memory size is specified: {0}".format(mem)) elif isinstance(mem, int): size = 1 << mem else: raise ValueError("Ppmd:Unsupported memory size is specified: {0}".format(mem)) properties = struct.pack("<BLBB", order, size, 0, 0) return properties
[docs]class BcjSparcDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = bcj.SparcDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BcjSparcEncoder(ISevenZipCompressor): def __init__(self): self.encoder = bcj.SparcEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
[docs]class BcjPpcDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = bcj.PPCDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BcjPpcEncoder(ISevenZipCompressor): def __init__(self): self.encoder = bcj.PPCEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
[docs]class BcjArmtDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = bcj.ARMTDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BcjArmtEncoder(ISevenZipCompressor): def __init__(self): self.encoder = bcj.ARMTEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
[docs]class BcjArmDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = bcj.ARMDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BcjArmEncoder(ISevenZipCompressor): def __init__(self): self.encoder = bcj.ARMEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
[docs]class BCJDecoder(ISevenZipDecompressor): def __init__(self, size: int): self.decoder = bcj.BCJDecoder(size)
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decoder.decode(data)
[docs]class BCJEncoder(ISevenZipCompressor): def __init__(self): self.encoder = bcj.BCJEncoder()
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.encoder.encode(data)
[docs] def flush(self): return self.encoder.flush()
[docs]class BrotliCompressor(ISevenZipCompressor): def __init__(self, level): self._compressor = brotli.Compressor(quality=level)
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self._compressor.process(data)
[docs] def flush(self) -> bytes: return self._compressor.flush()
[docs]class BrotliDecompressor(ISevenZipDecompressor): def __init__(self, properties: bytes, block_size: int): if len(properties) != 3: raise UnsupportedCompressionMethodError(properties, "Unknown size of properties are passed") if (properties[0], properties[1]) > (brotli_major, brotli_minor): raise UnsupportedCompressionMethodError( properties, "Unsupported brotli version: {}.{} our {}.{}".format( properties[0], properties[1], brotli_major, brotli_minor ), ) self._prefix_checked = False self._decompressor = brotli.Decompressor()
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1): if not self._prefix_checked: # check first 4bytes if data[:4] == b"\x50\x2a\x4d\x18": raise UnsupportedCompressionMethodError( data[:4], "Unauthorized and modified Brotli data (skipable frame) found." ) self._prefix_checked = True return self._decompressor.process(data)
[docs]class ZstdCompressor(ISevenZipCompressor): def __init__(self, level: int): self.compressor = pyzstd.ZstdCompressor(level)
[docs] def compress(self, data: Union[bytes, bytearray, memoryview]) -> bytes: return self.compressor.compress(data)
[docs] def flush(self) -> bytes: return self.compressor.flush()
[docs]class ZstdDecompressor(ISevenZipDecompressor): def __init__(self, properties: bytes, blocksize: int): if len(properties) not in [3, 5]: raise UnsupportedCompressionMethodError(properties, "Zstd takes 3 or 5 bytes properties.") if (properties[0], properties[1], 0) > pyzstd.zstd_version_info: raise UnsupportedCompressionMethodError(properties, "Zstd version of archive is higher than us.") self.decompressor = pyzstd.ZstdDecompressor()
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self.decompressor.decompress(data)
algorithm_class_map = { FILTER_ZSTD: (ZstdCompressor, ZstdDecompressor), FILTER_BROTLI: (BrotliCompressor, BrotliDecompressor), FILTER_PPMD: (PpmdCompressor, PpmdDecompressor), FILTER_BZIP2: (bz2.BZ2Compressor, bz2.BZ2Decompressor), FILTER_COPY: (CopyCompressor, CopyDecompressor), FILTER_DEFLATE: (DeflateCompressor, DeflateDecompressor), FILTER_DEFLATE64: (Deflate64Compressor, Deflate64Decompressor), FILTER_CRYPTO_AES256_SHA256: (AESCompressor, AESDecompressor), FILTER_X86: (BCJEncoder, BCJDecoder), FILTER_ARM: (BcjArmEncoder, BcjArmDecoder), FILTER_ARMTHUMB: (BcjArmtEncoder, BcjArmtDecoder), FILTER_POWERPC: (BcjPpcEncoder, BcjPpcDecoder), FILTER_SPARC: (BcjSparcEncoder, BcjSparcDecoder), } # type: Dict[int, Tuple[Any, Any]]
[docs]class LZMA1Decompressor(ISevenZipDecompressor): def __init__(self, filters, unpacksize): self._decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters) self.unpacksize = unpacksize
[docs] def decompress(self, data: Union[bytes, bytearray, memoryview], max_length: int = -1) -> bytes: return self._decompressor.decompress(data, max_length)
[docs]class SevenZipDecompressor: """Main decompressor object which is properly configured and bind to each 7zip folder. because 7zip folder can have a custom compression method""" def __init__( self, coders: List[Dict[str, Any]], packsize: int, unpacksizes: List[int], crc: Optional[int], password: Optional[str] = None, blocksize: Optional[int] = None, ) -> None: self.input_size = packsize self.unpacksizes = unpacksizes self.consumed: int = 0 self.crc = crc self.digest: int = 0 if blocksize: self.block_size: int = blocksize else: self.block_size = get_default_blocksize() if len(coders) > 4: raise UnsupportedCompressionMethodError( coders, "Maximum cascade of filters is 4 but got {}.".format(len(coders)) ) self.methods_map = [SupportedMethods.is_native_coder(coder) for coder in coders] # type: List[bool] # Check if password given for encrypted archive if SupportedMethods.needs_password(coders) and password is None: raise PasswordRequired(coders, "Password is required for extracting given archive.") # Check filters combination and required parameters if len(coders) >= 2: target_compressor = False has_bcj = False bcj_index = -1 for i, coder in enumerate(coders): filter_id = SupportedMethods.get_filter_id(coder) if SupportedMethods.is_compressor_id(filter_id) and filter_id != FILTER_LZMA2: target_compressor = True if filter_id in [ FILTER_X86, FILTER_ARM, FILTER_ARMTHUMB, FILTER_POWERPC, FILTER_SPARC, ]: has_bcj = True bcj_index = i # hack for LZMA1+BCJ which should be native+alternative if target_compressor and has_bcj: self.methods_map[bcj_index] = False break self.chain = [] # type: List[Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor]] self._unpacksizes = [] # type: List[int] self.input_size = self.input_size shift = 0 prev = False for i, r in enumerate(self.methods_map): shift += 1 if r and prev else 0 prev = r self._unpacksizes.append(unpacksizes[i - shift]) self._unpacked = [0 for _ in range(len(self._unpacksizes))] self.consumed = 0 self._unused = bytearray() self._buf = bytearray() self._pos = 0 # --- if all(self.methods_map): decompressor = self._get_lzma_decompressor(coders, unpacksizes[-1]) self.chain.append(decompressor) elif not any(self.methods_map): for i in range(len(coders)): self.chain.append(self._get_alternative_decompressor(coders[i], unpacksizes[i], password)) elif any(self.methods_map): for i in range(len(coders)): if (not any(self.methods_map[:i])) and all(self.methods_map[i:]): for j in range(i): self.chain.append(self._get_alternative_decompressor(coders[j], unpacksizes[j], password)) self.chain.append(self._get_lzma_decompressor(coders[i:], unpacksizes[i])) break else: for i in range(len(coders)): if self.methods_map[i]: self.chain.append(self._get_lzma_decompressor([coders[i]], unpacksizes[i])) else: self.chain.append(self._get_alternative_decompressor(coders[i], unpacksizes[i], password)) else: raise UnsupportedCompressionMethodError(coders, "Combination order of methods is not supported.") def _decompress(self, data, max_length: int): for i, decompressor in enumerate(self.chain): if self._unpacked[i] < self._unpacksizes[i]: data = decompressor.decompress(data, max_length) self._unpacked[i] += len(data) elif len(data) == 0: data = b"" else: raise EOFError return data def _read_data(self, fp): # read data from disk # determine read siize # rest_size: rest size of packed data # unused_s: unused packed data size # size to consume for target file is smaller one from # rest_size - unused_s # block_size - unused_s rest_size = self.input_size - self.consumed unused_s = len(self._unused) read_size = min(rest_size - unused_s, self.block_size - unused_s) if read_size > 0: data = self.consumed += len(data) else: data = b"" return data def decompress(self, fp, max_length: int = -1) -> bytes: if max_length < 0: data = self._read_data(fp) res = self._buf[self._pos :] + self._decompress(self._unused + data, max_length) self._buf = bytearray() self._unused = bytearray() self._pos = 0 else: current_buf_len = len(self._buf) - self._pos if current_buf_len >= max_length: # we already have enough data res = self._buf[self._pos : self._pos + max_length] self._pos += max_length else: data = self._read_data(fp) if len(self._unused) > 0: tmp = self._decompress(self._unused + data, max_length) self._unused = bytearray() else: tmp = self._decompress(data, max_length) if current_buf_len + len(tmp) <= max_length: res = self._buf[self._pos :] + tmp self._buf = bytearray() self._pos = 0 else: res = self._buf[self._pos :] + tmp[: max_length - current_buf_len] self._buf = bytearray(tmp[max_length - current_buf_len :]) self._pos = 0 self.digest = calculate_crc32(res, self.digest) return res def check_crc(self): return self.crc == self.digest @property def unused_size(self): return len(self._unused) def _get_lzma_decompressor(self, coders: List[Dict[str, Any]], unpacksize: int): filters: List[Dict[str, Any]] = [] lzma1 = False for coder in coders: if coder["numinstreams"] != 1 or coder["numoutstreams"] != 1: raise UnsupportedCompressionMethodError(coders, "Only a simple compression method is currently supported.") if not SupportedMethods.is_native_coder(coder): raise UnsupportedCompressionMethodError(coders, "Non python native method is requested.") properties = coder.get("properties", None) filter_id = SupportedMethods.get_filter_id(coder) if filter_id == FILTER_LZMA: lzma1 = True if properties is not None: filters[:0] = [lzma._decode_filter_properties(filter_id, properties)] # type: ignore else: filters[:0] = [{"id": filter_id}] if lzma1: return LZMA1Decompressor(filters, unpacksize) else: return lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=filters) def _get_alternative_decompressor( self, coder: Dict[str, Any], unpacksize=None, password=None ) -> Union[bz2.BZ2Decompressor, lzma.LZMADecompressor, ISevenZipDecompressor]: # noqa filter_id = SupportedMethods.get_filter_id(coder) # Special treatment for BCJ filters if filter_id in [ FILTER_X86, FILTER_ARM, FILTER_ARMTHUMB, FILTER_POWERPC, FILTER_SPARC, ]: return algorithm_class_map[filter_id][1](size=unpacksize) # Check supported? if SupportedMethods.is_native_coder(coder): raise UnsupportedCompressionMethodError(coder, "Unknown method code:{}".format(coder["method"])) if filter_id not in algorithm_class_map: raise UnsupportedCompressionMethodError(coder, "Unknown method filter_id:{}".format(filter_id)) if algorithm_class_map[filter_id][1] is None: raise UnsupportedCompressionMethodError( coder, "Decompression is not supported by {}.".format(SupportedMethods.get_method_name_id(filter_id)) ) # if SupportedMethods.is_crypto_id(filter_id): return algorithm_class_map[filter_id][1](coder["properties"], password, self.block_size) elif SupportedMethods.need_property(filter_id): return algorithm_class_map[filter_id][1](coder["properties"], self.block_size) else: return algorithm_class_map[filter_id][1]()
[docs]class SevenZipCompressor: """Main compressor object to configured for each 7zip folder.""" __slots__ = [ "filters", "chain", "compressor", "coders", "methods_map", "digest", "packsize", "_block_size", "_unpacksizes", ] def __init__(self, filters=None, password=None, blocksize: Optional[int] = None): self.filters: List[Dict[str, Any]] = [] self.chain: List[ISevenZipCompressor] = [] self.digest = 0 self.packsize = 0 self._unpacksizes: List[int] = [] if blocksize: self._block_size = blocksize else: self._block_size = get_default_blocksize() if filters is None: self.filters = [{"id": lzma.FILTER_LZMA2, "preset": 7 | lzma.PRESET_EXTREME}] else: self.filters = filters if len(self.filters) > 4: raise UnsupportedCompressionMethodError( filters, "Maximum cascade of filters is 4 but got {}.".format(len(self.filters)) ) self.methods_map = [SupportedMethods.is_native_filter(filter) for filter in self.filters] self.coders: List[Dict[str, Any]] = [] if all(self.methods_map) and SupportedMethods.is_compressor(self.filters[-1]): # all native self._set_native_compressors_coders(self.filters) return # has_lzma2 = False for f in self.filters: if f["id"] == FILTER_LZMA2: has_lzma2 = True break if not has_lzma2: # when specified other than lzma2, BCJ filters should be alternative for i, f in enumerate(self.filters): if ( f["id"] == FILTER_X86 or f["id"] == FILTER_ARM or f["id"] == FILTER_ARMTHUMB or f["id"] == FILTER_SPARC or f["id"] == FILTER_POWERPC ): self.methods_map[i] = False # if not any(self.methods_map): # all alternative for f in filters: self._set_alternate_compressors_coders(f, password) elif SupportedMethods.is_crypto_id(self.filters[-1]["id"]) and all(self.methods_map[:-1]): self._set_native_compressors_coders(self.filters[:-1]) self._set_alternate_compressors_coders(self.filters[-1], password) else: raise UnsupportedCompressionMethodError(filters, "Unknown combination of methods.") def _set_native_compressors_coders(self, filters): self.chain.append(lzma.LZMACompressor(format=lzma.FORMAT_RAW, filters=filters)) self._unpacksizes.append(0) for filter in filters: self.coders.insert(0, SupportedMethods.get_coder(filter)) def _set_alternate_compressors_coders(self, alt_filter, password=None): filter_id = alt_filter["id"] properties = None if filter_id not in algorithm_class_map: raise UnsupportedCompressionMethodError(filter_id, "Unknown filter_id is given.") elif SupportedMethods.is_crypto_id(filter_id): compressor = algorithm_class_map[filter_id][0](password) elif SupportedMethods.need_property(filter_id): if filter_id == FILTER_ZSTD: level = alt_filter.get("level", 3) properties = struct.pack("BBBBB", pyzstd.zstd_version_info[0], pyzstd.zstd_version_info[1], level, 0, 0) compressor = algorithm_class_map[filter_id][0](level=level) elif filter_id == FILTER_PPMD: properties = PpmdCompressor.encode_filter_properties(alt_filter) compressor = algorithm_class_map[filter_id][0](properties) elif filter_id == FILTER_BROTLI: level = alt_filter.get("level", 11) properties = struct.pack("BBB", brotli_major, brotli_minor, level) compressor = algorithm_class_map[filter_id][0](level) else: compressor = algorithm_class_map[filter_id][0]() if SupportedMethods.is_crypto_id(filter_id): properties = compressor.encode_filter_properties() self.chain.append(compressor) self._unpacksizes.append(0) self.coders.insert( 0, { "method": SupportedMethods.get_method_id(filter_id), "properties": properties, "numinstreams": 1, "numoutstreams": 1, }, ) def compress(self, fd, fp, crc=0): data = insize = len(data) foutsize = 0 while data: crc = calculate_crc32(data, crc) for i, compressor in enumerate(self.chain): self._unpacksizes[i] += len(data) data = compressor.compress(data) self.packsize += len(data) self.digest = calculate_crc32(data, self.digest) foutsize += len(data) fp.write(data) data = insize += len(data) return insize, foutsize, crc def flush(self, fp): data = None for i, compressor in enumerate(self.chain): if data: self._unpacksizes[i] += len(data) data = compressor.compress(data) data += compressor.flush() else: data = compressor.flush() self.packsize += len(data) self.digest = calculate_crc32(data, self.digest) fp.write(data) return len(data) @property def unpacksizes(self): result = [] shift = 0 prev = False for i, r in enumerate(self.methods_map): shift += 1 if r and prev else 0 prev = r result.insert(0, self._unpacksizes[i - shift]) return result
[docs]class MethodsType(Enum): compressor = 0 filter = 1 crypto = 2
[docs]class SupportedMethods: """Hold list of methods.""" formats = [{"name": "7z", "magic": MAGIC_7Z}] methods = [ { "id": COMPRESSION_METHOD.COPY, "name": "COPY", "native": False, "need_prop": False, "filter_id": FILTER_COPY, "type": MethodsType.compressor, }, { "id": COMPRESSION_METHOD.LZMA2, "name": "LZMA2", "native": True, "need_prop": True, "filter_id": FILTER_LZMA2, "type": MethodsType.compressor, }, { "id": COMPRESSION_METHOD.DELTA, "name": "DELTA", "native": True, "need_prop": True, "filter_id": FILTER_DELTA, "type": MethodsType.filter, }, { "id": COMPRESSION_METHOD.LZMA, "name": "LZMA", "native": True, "need_prop": True, "filter_id": FILTER_LZMA, "type": MethodsType.compressor, }, { "id": COMPRESSION_METHOD.P7Z_BCJ, "name": "BCJ", "native": True, "need_prop": False, "filter_id": FILTER_X86, "type": MethodsType.filter, }, { "id": COMPRESSION_METHOD.BCJ_PPC, "name": "PPC", "native": True, "need_prop": False, "filter_id": FILTER_POWERPC, "type": MethodsType.filter, }, { "id": COMPRESSION_METHOD.BCJ_IA64, "name": "IA64", "native": True, "need_prop": False, "filter_id": FILTER_IA64, "type": MethodsType.filter, }, { "id": COMPRESSION_METHOD.BCJ_ARM, "name": "ARM", "native": True, "need_prop": False, "filter_id": FILTER_ARM, "type": MethodsType.filter, }, { "id": COMPRESSION_METHOD.BCJ_ARMT, "name": "ARMT", "native": True, "need_prop": False, "filter_id": FILTER_ARMTHUMB, "type": MethodsType.filter, }, { "id": COMPRESSION_METHOD.BCJ_SPARC, "name": "SPARC", "native": True, "need_prop": False, "filter_id": FILTER_SPARC, "type": MethodsType.filter, }, { "id": COMPRESSION_METHOD.MISC_DEFLATE, "name": "DEFLATE", "native": False, "need_prop": False, "filter_id": FILTER_DEFLATE, "type": MethodsType.compressor, }, { "id": COMPRESSION_METHOD.MISC_BZIP2, "name": "BZip2", "native": False, "need_prop": False, "filter_id": FILTER_BZIP2, "type": MethodsType.compressor, }, { "id": COMPRESSION_METHOD.MISC_ZSTD, "name": "ZStandard", "native": False, "need_prop": True, "filter_id": FILTER_ZSTD, "type": MethodsType.compressor, }, { "id": COMPRESSION_METHOD.PPMD, "name": "PPMd", "native": False, "need_prop": True, "filter_id": FILTER_PPMD, "type": MethodsType.compressor, }, { "id": COMPRESSION_METHOD.MISC_BROTLI, "name": "Brotli", "native": False, "need_prop": True, "filter_id": FILTER_BROTLI, "type": MethodsType.compressor, }, { "id": COMPRESSION_METHOD.MISC_DEFLATE64, "name": "DEFLATE64", "native": False, "need_prop": False, "filter_id": FILTER_DEFLATE64, "type": MethodsType.compressor, }, { "id": COMPRESSION_METHOD.CRYPT_AES256_SHA256, "name": "7zAES", "native": False, "need_prop": True, "filter_id": FILTER_CRYPTO_AES256_SHA256, "type": MethodsType.crypto, }, ] @classmethod def _find_method(cls, key_id, key_value): return next((item for item in cls.methods if item[key_id] == key_value), None) @classmethod def get_method_name_id(cls, filter_id): method = cls._find_method("filter_id", filter_id) return method["name"] @classmethod def get_filter_id(cls, coder): method = cls._find_method("id", coder["method"]) if method is None: return None return method["filter_id"] @classmethod def is_native_filter(cls, filter) -> bool: method = cls._find_method("filter_id", filter["id"]) if method is None: raise UnsupportedCompressionMethodError(filter["id"], "Unknown method id is given.") return method["native"] @classmethod def is_compressor(cls, filter): method = cls._find_method("filter_id", filter["id"]) return method["type"] == MethodsType.compressor @classmethod def is_compressor_id(cls, filter_id): method = cls._find_method("filter_id", filter_id) return method["type"] == MethodsType.compressor @classmethod def is_native_coder(cls, coder) -> bool: method = cls._find_method("id", coder["method"]) if method is None: cls.raise_unsupported_method_id(coder) return method["native"] @classmethod def need_property(cls, filter_id): method = cls._find_method("filter_id", filter_id) if method is None: raise UnsupportedCompressionMethodError(filter_id, "Found an sunpported filter id.") return method["need_prop"] @classmethod def is_crypto_id(cls, filter_id) -> bool: method = cls._find_method("filter_id", filter_id) if method is None: cls.raise_unsupported_filter_id(filter_id) return method["type"] == MethodsType.crypto @classmethod def get_method_id(cls, filter_id) -> bytes: method = cls._find_method("filter_id", filter_id) if method is None: cls.raise_unsupported_filter_id(filter_id) return method["id"] @classmethod def get_coder(cls, filter) -> Dict[str, Any]: method = cls.get_method_id(filter["id"]) if filter["id"] in [lzma.FILTER_LZMA1, lzma.FILTER_LZMA2, lzma.FILTER_DELTA]: properties: Optional[bytes] = lzma._encode_filter_properties(filter) # type: ignore # noqa else: properties = None return { "method": method, "properties": properties, "numinstreams": 1, "numoutstreams": 1, } @classmethod def needs_password(cls, coders) -> bool: for coder in coders: filter_id = SupportedMethods.get_filter_id(coder) if filter_id is None: continue if SupportedMethods.is_crypto_id(filter_id): return True return False @classmethod def raise_unsupported_method_id(cls, coder): if coder["method"] == COMPRESSION_METHOD.P7Z_BCJ2: raise UnsupportedCompressionMethodError( coder["method"], "BCJ2 filter is not supported by py7zr." " Please consider to contribute to XZ/liblzma project" " and help Python core team implementing it." " Or please use another tool to extract it.", ) if coder["method"] == COMPRESSION_METHOD.MISC_DEFLATE64: raise UnsupportedCompressionMethodError( coder["method"], "DEFLATE64 compression is not supported by py7zr yet." " Please check the progress in py7zr project home page.", ) if coder["method"] == COMPRESSION_METHOD.MISC_LZ4: raise UnsupportedCompressionMethodError( coder["method"], "Archive is compressed by an unsupported algorythm LZ4." ) raise UnsupportedCompressionMethodError( coder["method"], "Archive is compressed by an unsupported compression algorythm." ) @classmethod def raise_unsupported_filter_id(cls, filter_id): raise UnsupportedCompressionMethodError( filter_id, "Found an unsupported filter id is specified." "Please use another compression method." )
def get_methods_names(coders_lists: List[List[dict]]) -> List[str]: # list of known method names with a display priority order methods_namelist = [ "LZMA2", "LZMA", "BZip2", "DEFLATE", "DEFLATE64", "delta", "COPY", "PPMd", "ZStandard", "LZ4*", "BCJ2*", "BCJ", "ARM", "ARMT", "IA64", "PPC", "SPARC", "7zAES", ] unsupported_methods = { COMPRESSION_METHOD.P7Z_BCJ2: "BCJ2*", COMPRESSION_METHOD.MISC_LZ4: "LZ4*", } methods_names = [] for coders in coders_lists: for coder in coders: for m in SupportedMethods.methods: if coder["method"] == m["id"]: methods_names.append(m["name"]) if coder["method"] in unsupported_methods: methods_names.append(unsupported_methods[coder["method"]]) return list(filter(lambda x: x in methods_names, methods_namelist))