1 Star 1 Fork 3

张小农 / 某扫描器核心反编译

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
tracemalloc.py 16.54 KB
一键复制 编辑 原始数据 按行查看 历史
张小农 提交于 2019-03-21 13:47 . first code
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
# uncompyle6 version 3.2.3
# Python bytecode 3.6 (3379)
# Decompiled from: Python 3.6.8 |Anaconda custom (64-bit)| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
# Embedded file name: tracemalloc.py
from collections import Sequence, Iterable
from functools import total_ordering
import fnmatch, linecache, os.path, pickle
from _tracemalloc import *
from _tracemalloc import _get_object_traceback, _get_traces
def _format_size(size, sign):
for unit in ("B", "KiB", "MiB", "GiB", "TiB"):
if abs(size) < 100:
if unit != "B":
if sign:
return "%+.1f %s" % (size, unit)
return "%.1f %s" % (size, unit)
if abs(size) < 10240 or unit == "TiB":
if sign:
return "%+.0f %s" % (size, unit)
return "%.0f %s" % (size, unit)
size /= 1024
class Statistic:
"""
Statistic difference on memory allocations between two Snapshot instance.
"""
__slots__ = ("traceback", "size", "count")
def __init__(self, traceback, size, count):
self.traceback = traceback
self.size = size
self.count = count
def __hash__(self):
return hash((self.traceback, self.size, self.count))
def __eq__(self, other):
return (
self.traceback == other.traceback
and self.size == other.size
and self.count == other.count
)
def __str__(self):
text = "%s: size=%s, count=%i" % (
self.traceback,
_format_size(self.size, False),
self.count,
)
if self.count:
average = self.size / self.count
text += ", average=%s" % _format_size(average, False)
return text
def __repr__(self):
return "<Statistic traceback=%r size=%i count=%i>" % (
self.traceback,
self.size,
self.count,
)
def _sort_key(self):
return (self.size, self.count, self.traceback)
class StatisticDiff:
"""
Statistic difference on memory allocations between an old and a new
Snapshot instance.
"""
__slots__ = ("traceback", "size", "size_diff", "count", "count_diff")
def __init__(self, traceback, size, size_diff, count, count_diff):
self.traceback = traceback
self.size = size
self.size_diff = size_diff
self.count = count
self.count_diff = count_diff
def __hash__(self):
return hash(
(self.traceback, self.size, self.size_diff, self.count, self.count_diff)
)
def __eq__(self, other):
return (
self.traceback == other.traceback
and self.size == other.size
and self.size_diff == other.size_diff
and self.count == other.count
and self.count_diff == other.count_diff
)
def __str__(self):
text = "%s: size=%s (%s), count=%i (%+i)" % (
self.traceback,
_format_size(self.size, False),
_format_size(self.size_diff, True),
self.count,
self.count_diff,
)
if self.count:
average = self.size / self.count
text += ", average=%s" % _format_size(average, False)
return text
def __repr__(self):
return "<StatisticDiff traceback=%r size=%i (%+i) count=%i (%+i)>" % (
self.traceback,
self.size,
self.size_diff,
self.count,
self.count_diff,
)
def _sort_key(self):
return (
abs(self.size_diff),
self.size,
abs(self.count_diff),
self.count,
self.traceback,
)
def _compare_grouped_stats(old_group, new_group):
statistics = []
for traceback, stat in new_group.items():
previous = old_group.pop(traceback, None)
if previous is not None:
stat = StatisticDiff(
traceback,
stat.size,
stat.size - previous.size,
stat.count,
stat.count - previous.count,
)
else:
stat = StatisticDiff(
traceback, stat.size, stat.size, stat.count, stat.count
)
statistics.append(stat)
for traceback, stat in old_group.items():
stat = StatisticDiff(traceback, 0, -stat.size, 0, -stat.count)
statistics.append(stat)
return statistics
@total_ordering
class Frame:
"""
Frame of a traceback.
"""
__slots__ = ("_frame",)
def __init__(self, frame):
self._frame = frame
@property
def filename(self):
return self._frame[0]
@property
def lineno(self):
return self._frame[1]
def __eq__(self, other):
return self._frame == other._frame
def __lt__(self, other):
return self._frame < other._frame
def __hash__(self):
return hash(self._frame)
def __str__(self):
return "%s:%s" % (self.filename, self.lineno)
def __repr__(self):
return "<Frame filename=%r lineno=%r>" % (self.filename, self.lineno)
@total_ordering
class Traceback(Sequence):
"""
Sequence of Frame instances sorted from the most recent frame
to the oldest frame.
"""
__slots__ = ("_frames",)
def __init__(self, frames):
Sequence.__init__(self)
self._frames = frames
def __len__(self):
return len(self._frames)
def __getitem__(self, index):
if isinstance(index, slice):
return tuple((Frame(trace) for trace in self._frames[index]))
else:
return Frame(self._frames[index])
def __contains__(self, frame):
return frame._frame in self._frames
def __hash__(self):
return hash(self._frames)
def __eq__(self, other):
return self._frames == other._frames
def __lt__(self, other):
return self._frames < other._frames
def __str__(self):
return str(self[0])
def __repr__(self):
return "<Traceback %r>" % (tuple(self),)
def format(self, limit=None):
lines = []
if limit is not None:
if limit < 0:
return lines
for frame in self[:limit]:
lines.append(' File "%s", line %s' % (frame.filename, frame.lineno))
line = linecache.getline(frame.filename, frame.lineno).strip()
if line:
lines.append(" %s" % line)
return lines
def get_object_traceback(obj):
"""
Get the traceback where the Python object *obj* was allocated.
Return a Traceback instance.
Return None if the tracemalloc module is not tracing memory allocations or
did not trace the allocation of the object.
"""
frames = _get_object_traceback(obj)
if frames is not None:
return Traceback(frames)
else:
return
class Trace:
"""
Trace of a memory block.
"""
__slots__ = ("_trace",)
def __init__(self, trace):
self._trace = trace
@property
def domain(self):
return self._trace[0]
@property
def size(self):
return self._trace[1]
@property
def traceback(self):
return Traceback(self._trace[2])
def __eq__(self, other):
return self._trace == other._trace
def __hash__(self):
return hash(self._trace)
def __str__(self):
return "%s: %s" % (self.traceback, _format_size(self.size, False))
def __repr__(self):
return "<Trace domain=%s size=%s, traceback=%r>" % (
self.domain,
_format_size(self.size, False),
self.traceback,
)
class _Traces(Sequence):
def __init__(self, traces):
Sequence.__init__(self)
self._traces = traces
def __len__(self):
return len(self._traces)
def __getitem__(self, index):
if isinstance(index, slice):
return tuple((Trace(trace) for trace in self._traces[index]))
else:
return Trace(self._traces[index])
def __contains__(self, trace):
return trace._trace in self._traces
def __eq__(self, other):
return self._traces == other._traces
def __repr__(self):
return "<Traces len=%s>" % len(self)
def _normalize_filename(filename):
filename = os.path.normcase(filename)
if filename.endswith(".pyc"):
filename = filename[:-1]
return filename
class BaseFilter:
def __init__(self, inclusive):
self.inclusive = inclusive
def _match(self, trace):
raise NotImplementedError
class Filter(BaseFilter):
def __init__(
self, inclusive, filename_pattern, lineno=None, all_frames=False, domain=None
):
super().__init__(inclusive)
self.inclusive = inclusive
self._filename_pattern = _normalize_filename(filename_pattern)
self.lineno = lineno
self.all_frames = all_frames
self.domain = domain
@property
def filename_pattern(self):
return self._filename_pattern
def _match_frame_impl(self, filename, lineno):
filename = _normalize_filename(filename)
if not fnmatch.fnmatch(filename, self._filename_pattern):
return False
elif self.lineno is None:
return True
else:
return lineno == self.lineno
def _match_frame(self, filename, lineno):
return self._match_frame_impl(filename, lineno) ^ (not self.inclusive)
def _match_traceback(self, traceback):
if self.all_frames:
if any(
(
self._match_frame_impl(filename, lineno)
for filename, lineno in traceback
)
):
return self.inclusive
return not self.inclusive
else:
filename, lineno = traceback[0]
return self._match_frame(filename, lineno)
def _match(self, trace):
domain, size, traceback = trace
res = self._match_traceback(traceback)
if self.domain is not None:
if self.inclusive:
return res and domain == self.domain
return res or domain != self.domain
else:
return res
class DomainFilter(BaseFilter):
def __init__(self, inclusive, domain):
super().__init__(inclusive)
self._domain = domain
@property
def domain(self):
return self._domain
def _match(self, trace):
domain, size, traceback = trace
return (domain == self.domain) ^ (not self.inclusive)
class Snapshot:
"""
Snapshot of traces of memory blocks allocated by Python.
"""
def __init__(self, traces, traceback_limit):
self.traces = _Traces(traces)
self.traceback_limit = traceback_limit
def dump(self, filename):
"""
Write the snapshot into a file.
"""
with open(filename, "wb") as (fp):
pickle.dump(self, fp, pickle.HIGHEST_PROTOCOL)
@staticmethod
def load(filename):
"""
Load a snapshot from a file.
"""
with open(filename, "rb") as (fp):
return pickle.load(fp)
def _filter_trace(self, include_filters, exclude_filters, trace):
if include_filters:
if not any(
(trace_filter._match(trace) for trace_filter in include_filters)
):
return False
if exclude_filters:
if any(
(not trace_filter._match(trace) for trace_filter in exclude_filters)
):
return False
return True
def filter_traces(self, filters):
"""
Create a new Snapshot instance with a filtered traces sequence, filters
is a list of Filter or DomainFilter instances. If filters is an empty
list, return a new Snapshot instance with a copy of the traces.
"""
if not isinstance(filters, Iterable):
raise TypeError(
"filters must be a list of filters, not %s" % type(filters).__name__
)
if filters:
include_filters = []
exclude_filters = []
for trace_filter in filters:
if trace_filter.inclusive:
include_filters.append(trace_filter)
else:
exclude_filters.append(trace_filter)
new_traces = [
trace
for trace in self.traces._traces
if self._filter_trace(include_filters, exclude_filters, trace)
]
else:
new_traces = self.traces._traces.copy()
return Snapshot(new_traces, self.traceback_limit)
def _group_by(self, key_type, cumulative):
if key_type not in ("traceback", "filename", "lineno"):
raise ValueError("unknown key_type: %r" % (key_type,))
if cumulative:
if key_type not in ("lineno", "filename"):
raise ValueError(
"cumulative mode cannot by used with key type %r" % key_type
)
stats = {}
tracebacks = {}
if not cumulative:
for trace in self.traces._traces:
domain, size, trace_traceback = trace
try:
traceback = tracebacks[trace_traceback]
except KeyError:
if key_type == "traceback":
frames = trace_traceback
else:
if key_type == "lineno":
frames = trace_traceback[:1]
else:
frames = ((trace_traceback[0][0], 0),)
traceback = Traceback(frames)
tracebacks[trace_traceback] = traceback
try:
stat = stats[traceback]
stat.size += size
stat.count += 1
except KeyError:
stats[traceback] = Statistic(traceback, size, 1)
else:
for trace in self.traces._traces:
domain, size, trace_traceback = trace
for frame in trace_traceback:
try:
traceback = tracebacks[frame]
except KeyError:
if key_type == "lineno":
frames = (frame,)
else:
frames = ((frame[0], 0),)
traceback = Traceback(frames)
tracebacks[frame] = traceback
try:
stat = stats[traceback]
stat.size += size
stat.count += 1
except KeyError:
stats[traceback] = Statistic(traceback, size, 1)
return stats
def statistics(self, key_type, cumulative=False):
"""
Group statistics by key_type. Return a sorted list of Statistic
instances.
"""
grouped = self._group_by(key_type, cumulative)
statistics = list(grouped.values())
statistics.sort(reverse=True, key=Statistic._sort_key)
return statistics
def compare_to(self, old_snapshot, key_type, cumulative=False):
"""
Compute the differences with an old snapshot old_snapshot. Get
statistics as a sorted list of StatisticDiff instances, grouped by
group_by.
"""
new_group = self._group_by(key_type, cumulative)
old_group = old_snapshot._group_by(key_type, cumulative)
statistics = _compare_grouped_stats(old_group, new_group)
statistics.sort(reverse=True, key=StatisticDiff._sort_key)
return statistics
def take_snapshot():
"""
Take a snapshot of traces of memory blocks allocated by Python.
"""
if not is_tracing():
raise RuntimeError(
"the tracemalloc module must be tracing memory allocations to take a snapshot"
)
traces = _get_traces()
traceback_limit = get_traceback_limit()
return Snapshot(traces, traceback_limit)
Python
1
https://gitee.com/zhanghk668/opsrv_extracted.git
git@gitee.com:zhanghk668/opsrv_extracted.git
zhanghk668
opsrv_extracted
某扫描器核心反编译
master

搜索帮助