1 Star 1 Fork 3

张小农 / 某扫描器核心反编译

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
swagger.py 11.13 KB
一键复制 编辑 原始数据 按行查看 历史
张小农 提交于 2019-03-21 13:47 . first code
# uncompyle6 version 3.2.3
# Python bytecode 3.6 (3379)
# Decompiled from: Python 3.6.8 |Anaconda custom (64-bit)| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
# Embedded file name: swagger\__init__.py
__author__ = "sanyi"
import logging, re
from copy import deepcopy
from collections import defaultdict
RE_GENERIC_PATH_PARAM = re.compile("\\{[a-z0-9_]+\\}", re.IGNORECASE)
SUPPORTED_METHODS = {"get", "put", "post", "delete", "patch", "options"}
SUPPORTED_PARAMETER_SOURCES = {"headers", "query", "body", "path"}
SUPPORTED_TYPES = {"string", "integer", "boolean", "object", "array"}
logger = logging.getLogger("swagger")
class SchemaException(Exception):
"""
This Exception is raised only during the schema parsing, so usually during server boot,
will not be raised on API calls, etc.
"""
pass
def __swagger_resolve_reference(swagger_document, reference):
"""
Returns a copy of the document at the reference
:param reference:
:return:
"""
path = reference.split("/")
if path[0] != "#":
raise SchemaException("unsupported reference: %s" % reference)
try:
node = swagger_document
for path_part in path[1:]:
node = node[path_part]
except KeyError as e:
raise SchemaException("parameter reference not found for %s" % reference) from e
node = deepcopy(node)
return node
def __flatten_descriptor(swagger_document, descriptor):
"""
Takes a descriptor and attempts to flatten it
:param swagger_document:
:param descriptor:
:return:
"""
reference = descriptor.get("$ref", None)
if reference:
descriptor_from_reference = __swagger_resolve_reference(
swagger_document, reference
)
for key, value in descriptor_from_reference.items():
descriptor[key] = value
logger.debug("__flatten_descriptor: %s", descriptor)
object_type = descriptor.get("type", None)
if object_type is None:
if "properties" in descriptor:
object_type = "object"
logger.warning("object descriptor without type")
else:
raise SchemaException("object type is not defined")
if object_type not in SUPPORTED_TYPES:
raise SchemaException("unsupported type %s" % (object_type,))
if object_type == "object":
if "additionalProperties" in descriptor:
raise SchemaException("additionalProperties not supported yet")
properties = descriptor.get("properties", {})
required = set(descriptor.get("required", []))
if "allOf" in descriptor:
inherited_properties = {}
for obj in descriptor["allOf"]:
if obj.get("$ref"):
obj = __swagger_resolve_reference(swagger_document, obj.get("$ref"))
for property_name, property_value in obj.get("properties", {}).items():
inherited_properties[property_name] = property_value
for require_item in obj.get("required", {}):
required.add(require_item)
for property_name, property_value in inherited_properties.items():
if property_name not in properties:
properties[property_name] = property_value
required = list(required)
flatten_properties = {}
for name in properties.keys():
flatten_properties[name] = __flatten_descriptor(
swagger_document, properties[name]
)
if name in required:
flatten_properties[name]["required"] = True
else:
flatten_properties[name]["required"] = False
result = dict(properties=flatten_properties, type="object")
for key in descriptor.keys():
if key in frozenset({"name", "in"}):
result[key] = descriptor[key]
return result
elif object_type == "array":
items = descriptor.get("items", None)
if isinstance(items, (list, tuple)):
flattened_items = []
for item in items:
flattened_items.append(__flatten_descriptor(swagger_document, item))
else:
if isinstance(items, dict):
flattened_items = __flatten_descriptor(swagger_document, items)
else:
if items is None:
flattened_items = None
else:
raise SchemaException(
"invalid array items type %s" % (type(items),)
)
descriptor = deepcopy(descriptor)
descriptor["items"] = flattened_items
return descriptor
else:
return deepcopy(descriptor)
def __process_request_parameter(swagger_document, parameter):
"""
Inflates and populates parameter descriptor according to the API router's need
:param swagger_document:
:param parameter: parameter descriptor dict
:return:
"""
reference = parameter.get("$ref", None)
if reference:
if len(parameter) > 1:
raise SchemaException("$ref extra values")
parameter = __swagger_resolve_reference(swagger_document, reference)
if "name" not in parameter:
raise SchemaException('parameter contains no "name"')
if "in" not in parameter:
raise SchemaException('parameter contains no source (key="in")')
if parameter["in"] not in SUPPORTED_PARAMETER_SOURCES:
raise SchemaException(
'source (key="in"): %s not supported' % (parameter["in"],)
)
if parameter["in"] == "body":
schema = parameter.get("schema", None)
if schema:
body_parameter_descriptor = __flatten_descriptor(swagger_document, schema)
del parameter["schema"]
for key in body_parameter_descriptor.keys():
if key in parameter:
raise SchemaException("problems de indenting body schema")
parameter[key] = body_parameter_descriptor[key]
else:
if parameter["in"] == "path":
parameter["required"] = True
parameter["required"] = parameter.get("required", False)
return parameter
def extract_routes(swagger_document):
"""
:param swagger_document:
:return:
"""
route_descriptor = defaultdict(dict)
paths = swagger_document["paths"]
for path in paths.keys():
common_parameters = {}
for parameter in paths[path].get("parameters", []):
logger.debug("parsing common %s\t%s", path, parameter)
parameter = __process_request_parameter(swagger_document, parameter)
common_parameters[parameter["name"]] = parameter
for method in paths[path].keys():
if method in ("parameters", "x-private"):
continue
if method not in SUPPORTED_METHODS:
raise SchemaException("method not supported %s" % method)
parameters = {}
for parameter in paths[path][method].get("parameters", []):
logger.debug("parsing %s:%s\t%s", method, path, parameter)
parameter = __process_request_parameter(swagger_document, parameter)
parameters[parameter["name"]] = parameter
for key in common_parameters.keys():
if key not in parameters:
parameters[key] = common_parameters[key]
handler_method = paths[path][method]["operationId"]
handler_class = paths[path][method]["tags"][0]
path_parameters = dict()
for parameter_name, parameter in parameters.items():
if parameter["in"] != "path":
continue
parameter_type = parameter["type"]
if parameter_type == "string":
parameter_format = parameter.get("format", None)
if parameter_format == "uuid":
regex_string = "(([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})|([0-9a-f]{32}))"
else:
if parameter_format == "md5":
regex_string = "[a-f0-9]{32}"
else:
regex_string = "[a-z0-9_\\.\\-]+"
else:
if parameter_type in ("integer", "long"):
regex_string = "[0-9]+"
else:
if parameter_type in ("float", "double"):
regex_string = "[0-9\\.]+"
else:
raise SchemaException(
"unsupported path param type=%s (%s:%s:%s)"
% (parameter_type, method, path, parameter["name"])
)
path_parameters[parameter["name"]] = regex_string
matches = RE_GENERIC_PATH_PARAM.finditer(path)
api_entry_regex_parts = []
api_entry_regex_start = 0
for match in matches:
path_param_name = path[match.regs[0][0] : match.regs[0][1]][1:-1]
api_entry_regex_part = path[api_entry_regex_start : match.regs[0][0]]
api_entry_regex_start = match.regs[0][1]
if api_entry_regex_part:
api_entry_regex_parts.append(api_entry_regex_part)
try:
api_entry_regex_parts.append(
"(?P<%s>%s)"
% (path_param_name, path_parameters[path_param_name])
)
except KeyError:
raise SchemaException(
"path parameter definition not found for: $%s:%s:%s"
% (method, path, path_param_name)
)
api_entry_regex_part = path[api_entry_regex_start:]
if api_entry_regex_part:
api_entry_regex_parts.append(api_entry_regex_part)
regex_str = "^" + ("").join(api_entry_regex_parts) + "$"
regex = re.compile(regex_str)
_security_entries = paths[path][method].get("security", [])
security = []
for s in _security_entries:
if len(s) != 1:
raise SchemaException(
"security entry key count error: " + paths[path][method]
)
model_name = None
for key in s.keys():
model_name = key
break
security.append(
deepcopy(swagger_document["securityDefinitions"][model_name])
)
route_descriptor[method][path] = dict(
handler_class=handler_class,
handler_method=handler_method,
regex=regex,
request=parameters,
response=None,
security=security,
)
return (swagger_document["basePath"], route_descriptor)
Python
1
https://gitee.com/zhanghk668/opsrv_extracted.git
git@gitee.com:zhanghk668/opsrv_extracted.git
zhanghk668
opsrv_extracted
某扫描器核心反编译
master

搜索帮助