1 Star 1 Fork 3

张小农 / 某扫描器核心反编译

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
_compression.py 4.73 KB
一键复制 编辑 原始数据 按行查看 历史
张小农 提交于 2019-03-21 13:47 . first code
# uncompyle6 version 3.2.3
# Python bytecode 3.6 (3379)
# Decompiled from: Python 3.6.8 |Anaconda custom (64-bit)| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
# Embedded file name: _compression.py
"""Internal classes used by the gzip, lzma and bz2 modules"""
import io
BUFFER_SIZE = io.DEFAULT_BUFFER_SIZE
class BaseStream(io.BufferedIOBase):
"""Mode-checking helper functions."""
def _check_not_closed(self):
if self.closed:
raise ValueError("I/O operation on closed file")
def _check_can_read(self):
if not self.readable():
raise io.UnsupportedOperation("File not open for reading")
def _check_can_write(self):
if not self.writable():
raise io.UnsupportedOperation("File not open for writing")
def _check_can_seek(self):
if not self.readable():
raise io.UnsupportedOperation(
"Seeking is only supported on files open for reading"
)
if not self.seekable():
raise io.UnsupportedOperation(
"The underlying file object does not support seeking"
)
class DecompressReader(io.RawIOBase):
"""Adapts the decompressor API to a RawIOBase reader API"""
def readable(self):
return True
def __init__(self, fp, decomp_factory, trailing_error=(), **decomp_args):
self._fp = fp
self._eof = False
self._pos = 0
self._size = -1
self._decomp_factory = decomp_factory
self._decomp_args = decomp_args
self._decompressor = self._decomp_factory(**self._decomp_args)
self._trailing_error = trailing_error
def close(self):
self._decompressor = None
return super().close()
def seekable(self):
return self._fp.seekable()
def readinto(self, b):
with memoryview(b) as (view):
with view.cast("B") as (byte_view):
data = self.read(len(byte_view))
byte_view[: len(data)] = data
return len(data)
def read(self, size=-1):
if size < 0:
return self.readall()
elif not size or self._eof:
return b""
else:
data = None
while 1:
if self._decompressor.eof:
rawblock = self._decompressor.unused_data or self._fp.read(
BUFFER_SIZE
)
if not rawblock:
break
self._decompressor = self._decomp_factory(**self._decomp_args)
try:
data = self._decompressor.decompress(rawblock, size)
except self._trailing_error:
break
else:
if self._decompressor.needs_input:
rawblock = self._fp.read(BUFFER_SIZE)
if not rawblock:
raise EOFError(
"Compressed file ended before the end-of-stream marker was reached"
)
else:
rawblock = b""
data = self._decompressor.decompress(rawblock, size)
if data:
break
if not data:
self._eof = True
self._size = self._pos
return b""
self._pos += len(data)
return data
def _rewind(self):
self._fp.seek(0)
self._eof = False
self._pos = 0
self._decompressor = self._decomp_factory(**self._decomp_args)
def seek(self, offset, whence=io.SEEK_SET):
if whence == io.SEEK_SET:
pass
else:
if whence == io.SEEK_CUR:
offset = self._pos + offset
else:
if whence == io.SEEK_END:
if self._size < 0:
while self.read(io.DEFAULT_BUFFER_SIZE):
pass
offset = self._size + offset
else:
raise ValueError(("Invalid value for whence: {}").format(whence))
if offset < self._pos:
self._rewind()
else:
offset -= self._pos
while offset > 0:
data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset))
if not data:
break
offset -= len(data)
return self._pos
def tell(self):
"""Return the current file position."""
return self._pos
Python
1
https://gitee.com/zhanghk668/opsrv_extracted.git
git@gitee.com:zhanghk668/opsrv_extracted.git
zhanghk668
opsrv_extracted
某扫描器核心反编译
master

搜索帮助