1 Star 1 Fork 3

张小农 / 某扫描器核心反编译

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
broker.py 136.06 KB
一键复制 编辑 原始数据 按行查看 历史
张小农 提交于 2019-03-21 13:47 . first code
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634
# uncompyle6 version 3.2.3
# Python bytecode 3.6 (3379)
# Decompiled from: Python 3.6.8 |Anaconda custom (64-bit)| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
# Embedded file name: broker\__init__.py
import time, uuid, logging, datetime, dateutil.rrule, copy
from dateutil.tz import tzutc
from dateutil.tz import tzlocal
import iso8601
from collections import defaultdict
from sqlalchemy import select as sql_select
from sqlalchemy import func as sql_func
from sqlalchemy import or_ as sql_or
from sqlalchemy import and_ as sql_and
from sqlalchemy import not_ as sql_not
from sqlalchemy import tuple_ as sql_tuple
from db import Connection
from db.tables.scan_sessions import ScanSessionRow, ScanSessionsTable
from db.tables.scans import ScanRow, ScansTable
from db.tables.targets import (
TargetRow,
TargetsTable,
TargetTypes,
TargetVerificationTypes,
)
from db.tables.profiles import ProfileRow, ProfilesTable
from db.tables.scan_session_jobs import ScanSessionJobRow, ScanSessionJobsTable
from db.tables.reports import ReportsTable
from db.tables.targets_allowed import TargetsAllowedRow, TargetsAllowedTable
from db.tables.targets_configurations import (
TargetConfigurationRow,
TargetConfigurationTable,
)
import broker.scan_session_job_helpers as job_helpers
from broker.scan_session_job_helpers import JobConstants, AbortReason
from broker.worker_manager.base import WorkerManager
from db.tables.events import create_event, EventResources
from db.views.targets_excluded_hours import (
TargetsWithExcludedHoursViewRow,
TargetsWithExcludedHoursViewTable,
)
from helpers.address import extract_domain_from_address
from helpers.licensing.features import (
Features,
is_report_id_compliance_id,
is_report_id_waf_export_id,
)
from helpers.updater import Updater
from db.tables.scan_session_vulns_stats import (
ScanSessionVulnsStatsRow,
ScanSessionVulnsStatsTable,
)
from helpers.excluded_hours import (
is_allowed_to_scan_now,
get_system_excluded_hours,
get_per_account_excluded_hours,
)
from scanners.scan_app.errors import *
from helpers.future_events.base import FutureEvents
from helpers.setup_types import *
from db.data.users import prepare_user_data, prepare_user_data_ext, get_owner_id
from db.tables.users import UserConfirmation
from helpers.target.tasks import do_check_target
from settings_provider import settings
from db.tables.events import EventRow
from helpers.constants.scans import *
from helpers.constants.jobs import ScanAppTypes
from helpers.licensing.features import BaseLicense
logger = logging.getLogger("service.broker")
lock_to_owner_id = None
def should_suppress_scan_starting():
if is_setup_type_on_premise():
user_license = BaseLicense.get_system_license()
if not user_license.is_active_license():
return True
system_update_object = settings.get("system_update_object", None)
if system_update_object:
if system_update_object.update_status in (
"waiting_for_scans",
"updating",
"restarting",
):
return True
return False
class Broker:
def __init__(
self,
shard_name,
worker_manager,
continuous_profile1=None,
continuous_profile2=None,
manual_browsing_storage=None,
scan_session_events=None,
max_scan_time=None,
max_job_preparation_time=datetime.timedelta(minutes=30),
max_job_abort_time=datetime.timedelta(minutes=30),
):
"""
:param shard_name:
"""
self.worker_manager = worker_manager
self.shard_name = shard_name
self.should_stop = False
self.profiles_cache = dict()
self.continuous_profile2 = continuous_profile2
self.continuous_profile1 = continuous_profile1
self.first_run = True
self.manual_browsing_storage = manual_browsing_storage
self.session_events = scan_session_events
self.max_scan_time = max_scan_time
self.max_job_preparation_time = max_job_preparation_time
self.max_job_abort_time = max_job_abort_time
def shutdown(self):
logger.debug("broker is shutting down")
self.should_stop = True
def loop(self):
"""
The broker does different task in a loop, each task should have different frequency
:return:
"""
self.should_stop = False
while not self.should_stop:
try:
self.abort_old_paused_jobs()
except Exception as e:
logger.exception("abort_old_paused_jobs:%s", e)
try:
self.restart_auto_paused_jobs()
except Exception as e:
logger.exception("restart_auto_paused_jobs:%s", e)
try:
self.worker_manager.process_queued_jobs()
except Exception as e:
logger.exception("worker_collection.process_queued_jobs:%s", e)
try:
if not should_suppress_scan_starting():
self.process_next_run()
except Exception as e:
logger.exception("process_next_run:%s", e)
try:
if not should_suppress_scan_starting():
self.start_queued_scan_session_jobs()
except Exception as e:
logger.exception("process_starting_jobs:%s", e)
try:
self.process_scan_session_jobs_in_progress()
self.first_run = False
except Exception as e:
logger.exception("process_about_to_abort_jobs:%s", e)
try:
self.process_finished_jobs()
except Exception as e:
logger.exception("process_finished_jobs:%s", e)
try:
self.worker_manager.workers_maintenance()
except Exception as e:
logger.exception("workers_maintenance:%s", e)
time.sleep(1)
try:
self.worker_manager.shutdown()
except Exception as e:
logger.exception("worker_manager.shut_down failed with %s", e)
scan_status_cache = settings.get("scan_status_cache")
if scan_status_cache:
try:
scan_status_cache.save_all(self.shard_name)
except Exception as e:
logger.exception("scan_status_cache.save_all failed with %s", e)
logger.info("Stop asked, broker is shutting down, bye.")
def __schedule_job_recheck(self, scan_session_job, time_delta):
if self.session_events:
self.session_events.add(
"%(scan_session_id)s:%(scanning_app)s" % scan_session_job,
time_delta,
namespace=self.shard_name,
)
def __remove_job_recheck_schedules(self, scheduled_job_check_events):
if self.session_events:
if scheduled_job_check_events:
self.session_events.delete_events(scheduled_job_check_events)
def __get_job_recheck_schedules(self, max_scheduled_events=20):
return self.session_events.get_many(
max_scheduled_events, namespace=self.shard_name
)
@property
def has_scheduled_job_checks_support(self):
return self.session_events is not None
def __create_scan_job_event(
self,
scan_session_job,
event_name,
job_status,
extended_job_status=None,
worker_id=None,
):
event_data = dict(
name=event_name,
owner_id=scan_session_job["owner_id"],
user_id=scan_session_job["creator_id"],
resource_type=EventResources.scan_session,
resource_id=scan_session_job["scan_session_id"],
data=dict(
scanning_app=scan_session_job["scanning_app"],
status=job_status,
extended_status=extended_job_status,
),
shard=self.shard_name,
)
if worker_id:
event_data["data"]["worker_id"] = worker_id
event = create_event(**event_data)
with Connection(self.shard_name) as (con):
con.execute(event)
def __cleanup_job(self, scan_session_job):
try:
self.worker_manager.delete_job(
scan_session_job["scan_session_id"], scan_session_job["scanning_app"]
)
except Exception:
logger.exception(
"delete job failed [%s:%s]",
scan_session_job["scan_session_id"],
scan_session_job["scanning_app"],
)
scan_status_cache = settings.get("scan_status_cache")
if scan_status_cache:
scan_status_cache.save_entry(
self.shard_name,
scan_session_job["scanning_app"],
scan_session_job["scan_session_id"],
)
scan_status_cache.del_entry(
self.shard_name,
scan_session_job["scanning_app"],
scan_session_job["scan_session_id"],
)
manual_browsing_storage = settings.get("manual_browsing_storage")
if manual_browsing_storage:
manual_browsing_storage.remove_item(
scan_session_job["scan_session_id"], scan_session_job["scanning_app"]
)
def __mark_job_failed(
self, scan_session_job, reason, scan_result=None, start_date=None
):
logger.debug(
"mark_job_failed %s:%s %s",
scan_session_job["scanning_app"],
scan_session_job["scan_session_id"],
reason,
)
job_update_data = dict(
status=ScanStatusTypes.FAILED,
end_date=sql_func.current_timestamp(),
last_update=sql_func.current_timestamp(),
status_data=reason,
)
if start_date is not None:
job_update_data["start_date"] = start_date
with Connection(self.shard_name) as (db):
q = (
(ScanSessionJobsTable.update(values=job_update_data))
.where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
.where(
ScanSessionJobRow.scanning_app == scan_session_job["scanning_app"]
)
.where(ScanSessionJobRow.status.in_(list(ActiveScanTypesExt)))
.where(ScanSessionJobRow.result_processed.is_(False))
)
db.execute(q)
if scan_result:
extended_status = scan_result.get("extended_status")
if extended_status:
extended_status = extended_status.to_dict()
self._Broker__create_scan_job_event(
scan_session_job, "scan_job_failed", scan_result.status, extended_status
)
else:
self._Broker__create_scan_job_event(
scan_session_job,
"scan_job_failed",
"failed",
dict(fail_reason=reason["reason"], attachments=[]),
)
self._Broker__cleanup_job(scan_session_job)
def __mark_job_completed(self, scan_session_job, scan_result):
logger.debug(
"mark_job_completed %s:%s",
scan_session_job["scanning_app"],
scan_session_job["scan_session_id"],
)
job_update_data = dict(
status="completed",
end_date=sql_func.current_timestamp(),
last_update=sql_func.current_timestamp(),
status_data=None,
)
with Connection(self.shard_name) as (db):
q = (
(ScanSessionJobsTable.update(values=job_update_data))
.where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
.where(
ScanSessionJobRow.scanning_app == scan_session_job["scanning_app"]
)
.where(ScanSessionJobRow.status.in_(list(ActiveScanTypes)))
.where(ScanSessionJobRow.result_processed.is_(False))
)
db.execute(q)
scan_session_job_stats = None
scan_status_cache = settings.get("scan_status_cache")
if scan_status_cache:
scan_session_job_stats = scan_status_cache.get_entry(
self.shard_name,
scan_session_job["scanning_app"],
scan_session_job["scan_session_id"],
)
if scan_session_job_stats:
scan_session_job_stats = copy.deepcopy(scan_session_job_stats)
scan_status_cache.update_main_keys(
self.shard_name,
scan_session_job["scanning_app"],
scan_session_job["scan_session_id"],
dict(progress=100),
)
targets_touched_by_scan_job = [scan_session_job["target_id"]]
if scan_session_job_stats:
for key in scan_session_job_stats.get("hosts", {}):
if key not in targets_touched_by_scan_job:
targets_touched_by_scan_job.append(key)
with Connection(self.shard_name) as (db):
q = (
(
TargetsTable.update(
values=dict(
last_scan_session_id=scan_session_job["scan_session_id"],
last_scan_id=scan_session_job["scan_id"],
)
)
)
.where(TargetRow.target_id.in_(targets_touched_by_scan_job))
.where(
sql_or(
TargetRow.last_scan_session_id
!= scan_session_job["scan_session_id"],
TargetRow.last_scan_session_id.is_(None),
)
)
)
db.execute(q)
extended_status = scan_result.get("extended_status")
if extended_status:
extended_status = extended_status.to_dict()
try:
self.worker_manager.download_scan_state_db_from_worker(scan_session_job)
except Exception:
logger.exception(
"download_scan_state_db_from_worker failed for %s",
scan_session_job["scan_session_id"],
)
self._Broker__create_scan_job_event(
scan_session_job, "scan_job_done", scan_result.status, extended_status
)
self._Broker__cleanup_job(scan_session_job)
def __mark_job_aborted(
self, scan_session_job, abort_reason=None, extended_status=None
):
if abort_reason is None:
abort_reason = scan_session_job["status_data"]
logger.debug(
"marking job aborted %s:%s %s",
scan_session_job["scanning_app"],
scan_session_job["scan_session_id"],
abort_reason,
)
job_update_data = dict(
status=ScanStatusTypes.ABORTED,
end_date=sql_func.current_timestamp(),
last_update=sql_func.current_timestamp(),
)
if abort_reason:
job_update_data["status_data"] = abort_reason
with Connection(self.shard_name) as (db):
q = (
(ScanSessionJobsTable.update(values=job_update_data))
.where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
.where(
ScanSessionJobRow.scanning_app == scan_session_job["scanning_app"]
)
.where(ScanSessionJobRow.status.in_(list(ActiveScanTypesExt)))
.where(ScanSessionJobRow.result_processed.is_(False))
)
db.execute(q)
self._Broker__cleanup_job(scan_session_job)
self._Broker__create_scan_job_event(
scan_session_job,
"scan_job_aborted",
abort_reason,
extended_job_status=extended_status,
)
def __mark_job_paused(self, scan_session_job, scan_result):
logger.debug(
"marking job paused %s:%s",
scan_session_job["scanning_app"],
scan_session_job["scan_session_id"],
)
job_update_data = dict(
status=ScanStatusTypes.PAUSED,
end_date=sql_func.current_timestamp(),
last_update=sql_func.current_timestamp(),
)
with Connection(self.shard_name) as (db):
q = (
(ScanSessionJobsTable.update(values=job_update_data))
.where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
.where(
ScanSessionJobRow.scanning_app == scan_session_job["scanning_app"]
)
.where(ScanSessionJobRow.status.in_(list(ActiveScanTypesExt)))
.where(ScanSessionJobRow.result_processed.is_(False))
)
db.execute(q)
extended_status = scan_result.get("extended_status")
if extended_status:
extended_status = extended_status.to_dict()
try:
self.worker_manager.download_scan_state_db_from_worker(scan_session_job)
except Exception:
logger.exception(
"download_scan_state_db_from_worker failed for %s",
scan_session_job["scan_session_id"],
)
self._Broker__cleanup_job(scan_session_job)
self._Broker__create_scan_job_event(
scan_session_job, "scan_job_paused", scan_result.status, extended_status
)
def __mark_job_executing(self, scan_session_job):
with Connection(self.shard_name) as (db):
job_update_data = dict(
last_update=sql_func.current_timestamp(),
status=ScanStatusTypes.PROCESSING,
)
q = (
(ScanSessionJobsTable.update(values=job_update_data))
.where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
.where(
ScanSessionJobRow.scanning_app == scan_session_job["scanning_app"]
)
.where(
ScanSessionJobRow.status.in_(
(
ScanStatusTypes.QUEUED,
ScanStatusTypes.STARTING,
ScanStatusTypes.RESUMING,
)
)
)
.where(ScanSessionJobRow.result_processed.is_(False))
)
result = db.execute(q)
if result.rowcount != 0:
q = (
(
ScanSessionsTable.update(
values=dict(status=ScanStatusTypes.PROCESSING)
)
)
.where(
ScanSessionRow.scan_session_id
== scan_session_job["scan_session_id"]
)
.where(
ScanSessionRow.status.in_(
(
ScanStatusTypes.QUEUED,
ScanStatusTypes.STARTING,
ScanStatusTypes.RESUMING,
)
)
)
)
db.execute(q)
self._Broker__schedule_job_recheck(
scan_session_job, job_helpers.CheckDeltaT.processing
)
def __abort_job(self, scan_session_job, abort_reason, update_status=True):
"""
Scan can be aborted in
:param scan_session_job:
:param abort_reason:
:param update_status: means we don't change scan status => aborting/aborted, the caller should handle this
:return:
"""
if scan_session_job["status"] not in (
ScanStatusTypes.STARTING,
ScanStatusTypes.PROCESSING,
ScanStatusTypes.PAUSING,
ScanStatusTypes.RESUMING,
):
if update_status:
return
logger.debug("should abort because %s", abort_reason)
try:
abort_success = self.worker_manager.abort_job(
scan_session_job["scan_session_id"],
scan_session_job["scanning_app"],
abort_reason,
)
if abort_success:
print("instant abort")
except Exception as e:
logger.exception("self.worker_manager.abort_job failed with %s", e)
if not update_status:
return
with Connection(self.shard_name) as (db):
job_update_data = dict(
status=ScanStatusTypes.ABORTING, status_data=abort_reason
)
q = (
(ScanSessionJobsTable.update(values=job_update_data))
.where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
.where(
ScanSessionJobRow.scanning_app
== scan_session_job["scanning_app"]
)
.where(
ScanSessionJobRow.status.in_(
(
ScanStatusTypes.STARTING,
ScanStatusTypes.PROCESSING,
ScanStatusTypes.ABORTING,
)
)
)
.where(ScanSessionJobRow.result_processed.is_(False))
)
db.execute(q)
self._Broker__schedule_job_recheck(
scan_session_job, job_helpers.CheckDeltaT.aborting
)
def __pause_job(self, scan_session_job, pause_reason):
"""
Scan can be aborted in
:param scan_session_job:
:param pause_reason:
:return:
"""
if scan_session_job["status"] not in (
ScanStatusTypes.STARTING,
ScanStatusTypes.PROCESSING,
ScanStatusTypes.PAUSING,
ScanStatusTypes.RESUMING,
):
return
logger.debug("should pause because %s", pause_reason)
try:
self.worker_manager.pause_job(
scan_session_job["scan_session_id"],
scan_session_job["scanning_app"],
pause_reason,
)
except Exception as e:
logger.exception("self.worker_manager.abort_job failed with %s", e)
with Connection(self.shard_name) as (db):
job_update_data = dict(
status=ScanStatusTypes.PAUSING, status_data=pause_reason
)
q = (
(ScanSessionJobsTable.update(values=job_update_data))
.where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
.where(
ScanSessionJobRow.scanning_app == scan_session_job["scanning_app"]
)
.where(
ScanSessionJobRow.status.in_(
(
ScanStatusTypes.STARTING,
ScanStatusTypes.PROCESSING,
ScanStatusTypes.ABORTING,
)
)
)
.where(ScanSessionJobRow.result_processed.is_(False))
)
db.execute(q)
self._Broker__schedule_job_recheck(
scan_session_job, job_helpers.CheckDeltaT.aborting
)
def restart_auto_paused_jobs(self):
"""
Searches any paused jobs with reason => ScanPauseReasonTypes.EXCLUDED_HOURS
:return:
"""
sf = ScanSessionJobsTable.join(
ScanSessionsTable,
ScanSessionJobRow.scan_session_id == ScanSessionRow.scan_session_id,
)
sf = sf.join(ScansTable, ScanSessionRow.scan_id == ScanRow.scan_id)
sf = sf.join(
TargetsWithExcludedHoursViewTable,
TargetsWithExcludedHoursViewRow.target_id == ScanRow.target_id,
)
base_query = (
sql_select(
(
ScanSessionJobRow.pause_requested,
ScanSessionJobRow.scan_session_id,
ScanSessionJobRow.scanning_app,
ScanRow.owner_id,
ScanRow.creator_id,
TargetsWithExcludedHoursViewRow.address,
TargetsWithExcludedHoursViewRow.description,
TargetsWithExcludedHoursViewRow.xh_data,
TargetsWithExcludedHoursViewRow.xh_time_offset,
)
)
.select_from(sf)
.where(ScanSessionJobRow.status == ScanStatusTypes.PAUSED)
.where(
ScanSessionJobRow.pause_requested["reason"].astext
== ScanPauseReasonTypes.EXCLUDED_HOURS
)
.where(ScanRow.deleted_at.is_(None))
.where(TargetsWithExcludedHoursViewRow.deleted_at.is_(None))
)
cursor = 0
limit = 20
with Connection(self.shard_name) as (db):
system_excluded_hours = None
if not has_feature(SetupFeatures.PER_ACCOUNT_EXCLUDED_HOURS):
system_excluded_hours = get_system_excluded_hours(db)
per_account_excluded_hours_cache = dict()
while True:
if self.should_stop:
break
q = (
base_query.limit(limit)
.offset(cursor)
.order_by(
ScanSessionJobRow.scan_session_id,
ScanSessionJobRow.scanning_app,
)
)
c = 0
for job_row in db.execute(q).fetchall():
if has_feature(SetupFeatures.PER_ACCOUNT_EXCLUDED_HOURS):
system_excluded_hours = per_account_excluded_hours_cache.get(
job_row.owner_id, -1
)
if system_excluded_hours == -1:
system_excluded_hours = get_per_account_excluded_hours(
db, job_row.owner_id
)
per_account_excluded_hours_cache[
job_row.owner_id
] = system_excluded_hours
resume = False
if job_row.xh_data:
if is_allowed_to_scan_now(
job_row.xh_data, job_row.xh_time_offset
):
resume = True
else:
if system_excluded_hours:
if is_allowed_to_scan_now(**system_excluded_hours):
resume = True
else:
resume = True
if resume:
query = (
(
ScanSessionJobsTable.update(
values=dict(
status=ScanStatusTypes.RESUMING,
result_processed=False,
pause_requested=False,
start_deadline=datetime.datetime.now(
tzlocal()
)
+ (datetime.timedelta(minutes=15)),
)
)
)
.where(
ScanSessionJobRow.scan_session_id
== job_row.scan_session_id
)
.where(
ScanSessionJobRow.scanning_app
== job_row.scanning_app
)
)
db.execute(query)
query = (
ScanSessionsTable.update(
values=dict(status=ScanStatusTypes.RESUMING)
)
).where(
ScanSessionRow.scan_session_id
== job_row.scan_session_id
)
db.execute(query)
c += 1
if c < limit:
break
cursor += limit
def abort_old_paused_jobs(self):
"""
Searches for paused jobs older than X hours. Marks any such job as aborted and propagates the abort if
necessary, meaning that the scan still has this session as current session
:return:
"""
sf = ScanSessionJobsTable.join(
ScanSessionsTable,
ScanSessionJobRow.scan_session_id == ScanSessionRow.scan_session_id,
)
sf = sf.join(ScansTable, ScanSessionRow.scan_id == ScanRow.scan_id)
q = (
sql_select(
(
ScanSessionJobRow.pause_requested,
ScanSessionJobRow.scan_session_id,
ScanSessionJobRow.scanning_app,
ScanRow.owner_id,
ScanRow.creator_id,
)
)
.select_from(sf)
.where(ScanSessionJobRow.status == ScanStatusTypes.PAUSED)
)
with Connection(self.shard_name) as (db):
for scan_session_job in db.execute(q).fetchall():
if self.should_stop:
break
if scan_session_job.pause_requested and iso8601.parse_date(
scan_session_job.pause_requested["date"]
) + (
datetime.timedelta(
hours=settings.get("max_paused_state_hours", 168)
)
) < datetime.datetime.now(
tzutc()
):
logger.warning(
"aborting paused scan %s", scan_session_job.scan_session_id
)
job_update_data = dict(
status=ScanStatusTypes.ABORTED,
end_date=sql_func.current_timestamp(),
last_update=sql_func.current_timestamp(),
status_data=AbortReason.pause_expired,
result_processed=False,
)
q = (
(ScanSessionJobsTable.update(values=job_update_data))
.where(
ScanSessionJobRow.scan_session_id
== scan_session_job.scan_session_id
)
.where(
ScanSessionJobRow.scanning_app
== scan_session_job.scanning_app
)
.where(ScanSessionJobRow.status == ScanStatusTypes.PAUSED)
)
db.execute(q)
self._Broker__create_scan_job_event(
dict(scan_session_job),
"scan_job_aborted",
AbortReason.pause_expired,
)
def process_scan_session_job_in_progress(self, scan_session_job):
"""
Checks the status of a specific scan_session_job_id / scanning_app
:param scan_session_job: job descriptor
:return:
"""
logger.debug("processing %s", scan_session_job)
if scan_session_job["status"] not in list(ActiveScanTypes):
return
abort_reason = None
if scan_session_job["status"] in ActiveScanTypes.difference(
{ScanStatusTypes.ABORTING}
):
pass
if scan_session_job["abort_requested"]:
abort_reason = AbortReason.abort_requested
else:
if scan_session_job["deleted_at"]:
abort_reason = AbortReason.scan_deleted
else:
if scan_session_job["end_deadline_exceeded"]:
abort_reason = AbortReason.scan_timeout
else:
if (
scan_session_job["xh_data"] is None
and scan_session_job["system_excluded_hours"]
and not (
is_allowed_to_scan_now(
**scan_session_job["system_excluded_hours"]
)
)
):
if scan_session_job["status"] in (
ScanStatusTypes.ABORTING,
ScanStatusTypes.PAUSING,
):
pass
elif not settings.get("pause_scan_on_excluded_hours"):
logging.debug(
"abort scan_session:%s:%s because system excluded hours [%s]",
scan_session_job["scan_session_id"],
scan_session_job["scanning_app"],
scan_session_job["system_excluded_hours"]["name"],
)
abort_reason = AbortReason.excluded_hours
else:
if settings.get("pause_scan_on_excluded_hours"):
logging.debug(
"pause scan_session:%s:%s because system excluded hours [%s]",
scan_session_job["scan_session_id"],
scan_session_job["scanning_app"],
scan_session_job["system_excluded_hours"]["name"],
)
with Connection(self.shard_name) as (db):
query = (
ScanSessionJobsTable.update(
values=dict(
pause_requested=dict(
date=datetime.datetime.now(tzutc()),
reason=ScanPauseReasonTypes.EXCLUDED_HOURS,
)
)
)
).where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
db.execute(query)
query = (
ScanSessionsTable.update(
values=dict(status="pausing")
)
).where(
ScanSessionRow.scan_session_id
== scan_session_job["scan_session_id"]
)
db.execute(query)
try:
self._Broker__pause_job(
scan_session_job,
ScanPauseReasonTypes.EXCLUDED_HOURS,
)
except (JobNotFound, WorkerNotFound) as e:
logger.warning(
"__pause_job, job lost %s", e.str_message
)
if (
scan_session_job["status"]
== ScanStatusTypes.ABORTING
):
self._Broker__mark_job_aborted(scan_session_job)
else:
self._Broker__mark_job_failed(
scan_session_job,
dict(reason=e.str_message, extra=None),
)
return
else:
if not is_allowed_to_scan_now(
scan_session_job["xh_data"],
scan_session_job["xh_time_offset"],
):
if scan_session_job["status"] in (
ScanStatusTypes.ABORTING,
ScanStatusTypes.PAUSING,
):
pass
elif not settings.get("pause_scan_on_excluded_hours"):
logging.debug(
"abort scan_session:%s:%s because target excluded hours",
scan_session_job["scan_session_id"],
scan_session_job["scanning_app"],
)
abort_reason = AbortReason.excluded_hours
else:
if settings.get("pause_scan_on_excluded_hours"):
logging.debug(
"pause scan_session:%s:%s because target excluded hours",
scan_session_job["scan_session_id"],
scan_session_job["scanning_app"],
)
with Connection(self.shard_name) as (db):
query = (
ScanSessionJobsTable.update(
values=dict(
pause_requested=dict(
date=datetime.datetime.now(
tzutc()
),
reason=ScanPauseReasonTypes.EXCLUDED_HOURS,
)
)
)
).where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
db.execute(query)
query = (
ScanSessionsTable.update(
values=dict(status="pausing")
)
).where(
ScanSessionRow.scan_session_id
== scan_session_job["scan_session_id"]
)
db.execute(query)
try:
self._Broker__pause_job(
scan_session_job,
ScanPauseReasonTypes.EXCLUDED_HOURS,
)
except (JobNotFound, WorkerNotFound) as e:
logger.warning(
"__pause_job, job lost %s", e.str_message
)
if (
scan_session_job["status"]
== ScanStatusTypes.ABORTING
):
self._Broker__mark_job_aborted(
scan_session_job
)
else:
self._Broker__mark_job_failed(
scan_session_job,
dict(reason=e.str_message, extra=None),
)
return
else:
abort_reason = None
if abort_reason is not None:
self._Broker__abort_job(scan_session_job, abort_reason)
return
if scan_session_job["status"] in ActiveScanTypes.difference(
{ScanStatusTypes.PAUSING, ScanStatusTypes.ABORTING}
):
if scan_session_job["pause_requested"]:
self._Broker__pause_job(
scan_session_job, scan_session_job["pause_requested"]["reason"]
)
return
try:
result = self.worker_manager.check_single_job_status(
scan_session_job["scan_session_id"], scan_session_job["scanning_app"]
)
except (JobNotFound, WorkerNotFound) as e:
logger.warning("check_single_job_status, job lost %s", e.str_message)
if scan_session_job["status"] == ScanStatusTypes.ABORTING:
self._Broker__mark_job_aborted(scan_session_job)
else:
self._Broker__mark_job_failed(
scan_session_job, dict(reason=e.str_message, extra=None)
)
return
else:
logger.debug("check_single_job_status %s", result.to_dict())
if result.status == ScanApiStatusTypes.FAILED:
self._Broker__mark_job_failed(
scan_session_job,
dict(reason=JobConstants.status_crashed, extra=result.to_dict()),
result,
)
return
if result.status == ScanApiStatusTypes.FINISHED:
self._Broker__mark_job_completed(scan_session_job, result)
return
if result.status == ScanApiStatusTypes.ABORTED:
self._Broker__mark_job_aborted(
scan_session_job, extended_status=result.get("extended_status")
)
return
if result.status == ScanApiStatusTypes.PAUSED:
self._Broker__mark_job_paused(scan_session_job, result)
return
if result.status == ScanApiStatusTypes.EXECUTING:
logger.debug(
"mark_job_processing %s:%s",
scan_session_job["scanning_app"],
scan_session_job["scan_session_id"],
)
self._Broker__mark_job_executing(scan_session_job)
else:
if result.status == ScanApiStatusTypes.PREPARING:
if self.max_job_preparation_time is not None:
if (
scan_session_job["last_update_delta"]
> self.max_job_preparation_time
):
logger.error(
"job %s failed to start within %s, aborting",
scan_session_job,
self.max_job_preparation_time,
)
self._Broker__abort_job(
scan_session_job,
AbortReason.start_timeout,
update_status=False,
)
self._Broker__mark_job_failed(
scan_session_job, dict(reason=AbortReason.start_timeout)
)
self._Broker__schedule_job_recheck(
scan_session_job, job_helpers.CheckDeltaT.starting
)
else:
if result.status == ScanApiStatusTypes.ABORTING:
if self.max_job_abort_time is not None:
if (
scan_session_job["last_update_delta"]
> self.max_job_abort_time
):
logger.error(
"aborting job %s takes more than %s!!",
scan_session_job,
self.max_job_abort_time,
)
self._Broker__schedule_job_recheck(
scan_session_job, job_helpers.CheckDeltaT.aborting
)
else:
if result.status == ScanApiStatusTypes.PAUSING:
if self.max_job_abort_time is not None:
if (
scan_session_job["last_update_delta"]
> self.max_job_abort_time
):
logger.error(
"pausing job %s takes more than %s!!",
scan_session_job,
self.max_job_abort_time,
)
self._Broker__schedule_job_recheck(
scan_session_job, job_helpers.CheckDeltaT.pausing
)
else:
logger.debug("unknown status %s", result.status)
self._Broker__schedule_job_recheck(
scan_session_job, job_helpers.CheckDeltaT.default
)
def process_scan_session_jobs_in_progress(self):
select_from = ScanSessionJobsTable.join(
ScanSessionsTable,
ScanSessionJobRow.scan_session_id == ScanSessionRow.scan_session_id,
)
select_from = select_from.join(
ScansTable, ScanRow.scan_id == ScanSessionRow.scan_id
)
select_from = select_from.join(
TargetsWithExcludedHoursViewTable,
TargetsWithExcludedHoursViewRow.target_id == ScanRow.target_id,
)
sql_fetch_processing_jobs = (
sql_select(
(
ScanSessionJobRow,
ScanSessionRow.deleted_at,
ScanRow.owner_id,
ScanRow.creator_id,
ScanRow.scan_id,
ScanRow.target_id,
TargetsWithExcludedHoursViewRow.address,
TargetsWithExcludedHoursViewRow.description,
TargetsWithExcludedHoursViewRow.xh_data,
TargetsWithExcludedHoursViewRow.xh_time_offset,
(
sql_func.current_timestamp() > ScanSessionJobRow.end_deadline
).label("end_deadline_exceeded"),
(
sql_func.current_timestamp() - ScanSessionJobRow.last_update
).label("last_update_delta"),
)
)
.select_from(select_from)
.where(ScanSessionJobRow.result_processed.isnot(True))
.where(
ScanSessionJobRow.status.in_(
(
ScanStatusTypes.STARTING,
ScanStatusTypes.PROCESSING,
ScanStatusTypes.ABORTING,
ScanApiStatusTypes.PAUSING,
)
)
)
)
if self.first_run or not self.has_scheduled_job_checks_support:
scheduled_job_check_events = None
else:
scheduled_job_check_events = self._Broker__get_job_recheck_schedules()
items_in_work = [
x["event_id"].split(":") for x in scheduled_job_check_events
]
conditions = [
sql_and(
ScanSessionJobRow.status.in_(
(
ScanStatusTypes.STARTING,
ScanStatusTypes.PROCESSING,
ScanStatusTypes.RESUMING,
ScanApiStatusTypes.PAUSING,
)
),
ScanSessionJobRow.abort_requested.is_(True),
),
sql_and(
ScanSessionJobRow.status.in_(
(
ScanStatusTypes.STARTING,
ScanStatusTypes.PROCESSING,
ScanStatusTypes.RESUMING,
ScanApiStatusTypes.PAUSING,
)
),
sql_not(ScanSessionRow.deleted_at.is_(None)),
),
sql_and(
ScanSessionJobRow.last_update
< sql_func.current_timestamp() - (datetime.timedelta(hours=1))
),
]
if items_in_work:
conditions.append(
sql_tuple(
ScanSessionJobRow.scan_session_id,
ScanSessionJobRow.scanning_app,
).in_(items_in_work)
)
sql_fetch_processing_jobs = sql_fetch_processing_jobs.where(
sql_or(*conditions)
)
scan_session_jobs = []
try:
with Connection(self.shard_name) as (db):
system_excluded_hours = None
if not has_feature(SetupFeatures.PER_ACCOUNT_EXCLUDED_HOURS):
system_excluded_hours = get_system_excluded_hours(db)
jobs = db.execute(sql_fetch_processing_jobs).fetchall()
for scan_session_job in jobs:
scan_session_job = dict(scan_session_job)
if has_feature(SetupFeatures.PER_ACCOUNT_EXCLUDED_HOURS):
system_excluded_hours = get_per_account_excluded_hours(
db, scan_session_job["owner_id"]
)
scan_session_job["system_excluded_hours"] = system_excluded_hours
scan_session_jobs.append(scan_session_job)
except Exception as e:
logger.exception("failed to pull jobs %s", e)
else:
self._Broker__remove_job_recheck_schedules(scheduled_job_check_events)
for scan_session_job in scan_session_jobs:
try:
self.process_scan_session_job_in_progress(scan_session_job)
except Exception as e:
logger.exception(
"process_scan_session_job_in_progress failed with %s", e
)
self._Broker__schedule_job_recheck(
scan_session_job, job_helpers.CheckDeltaT.on_error_recheck
)
def process_next_run(self):
"""
next_run < sql_func.now() means the scan has reached a state where a session should be created
Sessions can have different jobs (for now ovs and/or wvs)
:return:
"""
select_from = ScansTable.join(
ProfilesTable, ProfileRow.profile_id == ScanRow.profile_id
).join(
TargetsWithExcludedHoursViewTable,
TargetsWithExcludedHoursViewRow.target_id == ScanRow.target_id,
)
sql_get_scans = (
sql_select(
(
ScanRow,
ProfileRow.jobs,
TargetsWithExcludedHoursViewRow.manual_intervention,
TargetsWithExcludedHoursViewRow.xh_data,
TargetsWithExcludedHoursViewRow.xh_time_offset,
TargetsWithExcludedHoursViewRow.address,
TargetsWithExcludedHoursViewRow.description,
TargetsWithExcludedHoursViewRow.cah,
)
)
.select_from(select_from)
.where(ScanRow.next_run < sql_func.now())
.where(ScanRow.deleted_at.is_(None))
.where(ScanRow.schedule_disabled.isnot(True))
)
if lock_to_owner_id:
sql_get_scans = sql_get_scans.where(ScanRow.owner_id == lock_to_owner_id)
manual_browsing_storage_items = []
addresses_currently_scanned = []
with Connection(self.shard_name) as (db):
system_excluded_hours = None
if not has_feature(SetupFeatures.PER_ACCOUNT_EXCLUDED_HOURS):
system_excluded_hours = get_system_excluded_hours(db)
max_scans_per_license = 1
if is_setup_type_on_premise():
system_license = BaseLicense.get_system_license()
max_scans_per_license = system_license.max_scans_per_license
if not max_scans_per_license:
max_scans_per_license = 1
users_license_cache = dict()
for scan in db.execute(sql_get_scans).fetchall():
if should_suppress_scan_starting():
break
if is_setup_type_aws():
max_scans_per_license = users_license_cache.get(scan.owner_id)
if max_scans_per_license is None:
user_data = prepare_user_data(db, user_id=scan.owner_id)
if user_data is not None:
user_license = BaseLicense.get_license_from_license_data(
user_data["license_data"]
)
max_scans_per_license = user_license.max_scans_per_license
if not max_scans_per_license:
max_scans_per_license = 1
else:
users_license_cache[
scan.owner_id
] = max_scans_per_license
else:
max_scans_per_license = 1
sf = ScanSessionsTable.join(
ScansTable, ScanRow.scan_id == ScanSessionRow.scan_id
)
q = (
sql_select(
(sql_func.count(ScanSessionRow.scan_session_id),)
)
.select_from(sf)
.where(ScanRow.owner_id == scan.owner_id)
.where(ScanSessionRow.deleted_at.is_(None))
.where(
ScanSessionRow.status.in_(list(TargetsInWorkScanStatus))
)
)
concurrent_scans = db.execute(q).scalar()
if concurrent_scans > max_scans_per_license:
pass
else:
if has_feature(SetupFeatures.PER_ACCOUNT_EXCLUDED_HOURS):
system_excluded_hours = get_per_account_excluded_hours(
db, scan.owner_id
)
if not scan.xh_data:
pass
if system_excluded_hours:
if not (is_allowed_to_scan_now(**system_excluded_hours)):
logging.debug(
"postponing scan:[%s] by system excluded hours",
scan.scan_id,
)
continue
else:
if not is_allowed_to_scan_now(
scan.xh_data, scan.xh_time_offset
):
logging.debug(
"postponing scan:[%s] by target:[%s] excluded hours",
scan.scan_id,
scan.target_id,
)
continue
query = (
sql_select((TargetRow.address, TargetRow.target_id))
.select_from(
TargetsAllowedTable.join(
TargetsTable,
TargetRow.target_id
== TargetsAllowedRow.allowed_target_id,
)
)
.where(TargetsAllowedRow.target_id == scan.target_id)
)
allowed_targets = dict()
for at in db.execute(query):
allowed_targets[
extract_domain_from_address(at.address)
] = at.target_id
if not allowed_targets:
allowed_targets = None
if scan.continuous:
start_deadline = (
scan.next_run
+ self.worker_manager.START_DEADLINE_CONTINUOUS
)
r = dateutil.rrule.rrulestr(scan.recurrence)
use_anniversary_profile = (
r._dtstart.isocalendar()[2] % 7
== datetime.datetime.now(tzlocal()).isocalendar()[2]
)
used_profile_id = scan.profile_id
if use_anniversary_profile:
if self.continuous_profile1:
used_profile_id = self.continuous_profile1
else:
if self.continuous_profile2:
used_profile_id = self.continuous_profile2
else:
start_deadline = (
scan.next_run
+ self.worker_manager.get_start_deadline(
scan.schedule_start_date,
scan.schedule_time_sensitive,
scan.recurrence,
)
)
used_profile_id = scan.profile_id
scan_session_id = str(uuid.uuid4())
new_scan_session = dict(
scan_session_id=scan_session_id,
scan_id=scan.scan_id,
target_id=scan.target_id,
status=ScanStatusTypes.QUEUED,
scan_key=str(uuid.uuid4()),
start_expected=scan.next_run,
allowed_targets=allowed_targets,
used_profile_id=used_profile_id,
)
scan_update = dict(
current_session_id=scan_session_id,
previous_session_id=scan.previous_session_id,
next_run=None,
)
new_scan_session_query = ScanSessionsTable.insert().values(
**new_scan_session
)
db.execute(new_scan_session_query)
update_scan_session_id = (
ScansTable.update(values=scan_update)
).where(ScanRow.scan_id == scan.scan_id)
db.execute(update_scan_session_id)
for scanning_app in scan.jobs:
scan_session_job = dict(
scan_session_id=scan_session_id,
scanning_app=scanning_app,
status=ScanStatusTypes.QUEUED,
start_deadline=start_deadline,
end_deadline=None,
result_processed=False,
abort_requested=False,
)
new_job_query = ScanSessionJobsTable.insert().values(
**scan_session_job
)
db.execute(new_job_query)
self._Broker__handle_manual_browsing(
db,
scan,
scan_session_id,
manual_browsing_storage_items,
scanning_app,
)
addresses_currently_scanned.append(scan.cah)
if has_feature(SetupFeatures.MANUAL_BROWSING):
for manual_browsing_storage_item in manual_browsing_storage_items:
self.manual_browsing_storage.init_item(**manual_browsing_storage_item)
def __handle_manual_browsing(
self, db, scan, scan_session_id, manual_browsing_storage_items, scanning_app
):
"""
Helper method for process_next_run & manual browsing
:param db:
:param scan:
:param scan_session_id:
:param manual_browsing_storage_items:
:param scanning_app:
:return:
"""
if not has_feature(SetupFeatures.MANUAL_BROWSING):
return
if not (
scanning_app == ScanAppTypes.WVS
and scan.manual_intervention
and scan.manual_intervention
and self.manual_browsing_storage
):
return
ui_session_id = self.manual_browsing_storage.get_ui_session_id(scan.scan_id)
if not ui_session_id:
logging.warning(
"manual intervention scan_session w/o ui session %s", scan_session_id
)
event_query = create_event(
"scan_mi_required_no_ui_session",
owner_id=scan.owner_id,
user_id=scan.creator_id,
resource_type=EventResources.scan,
resource_id=scan.scan_id,
)
db.execute(event_query)
return
if scan.schedule_start_date is not None or scan.recurrence is not None:
logging.warning(
"manual intervention flag for scheduled scan_session %s",
scan_session_id,
)
event_query = create_event(
"scan_mi_required_scheduled_scan",
owner_id=scan.owner_id,
user_id=scan.creator_id,
resource_type=EventResources.scan,
resource_id=scan.scan_id,
)
db.execute(event_query)
return
manual_browsing_storage_items.append(
dict(
target_id=scan.target_id,
scan_id=scan.scan_id,
scan_session_id=scan_session_id,
user_id=scan.creator_id,
scanning_app=scanning_app,
ui_session_id=ui_session_id,
address=scan.address,
description=scan.description,
)
)
def start_queued_scan_session_jobs(self):
"""
start any jobs with status=queued or status=resuming
:return:
"""
def update_scanning_job(job_update_data):
with Connection(self.shard_name) as (dbs):
dbs.execute(
(ScanSessionJobsTable.update(values=job_update_data))
.where(
ScanSessionJobRow.scan_session_id
== scan_session_job["scan_session_id"]
)
.where(
ScanSessionJobRow.scanning_app
== scan_session_job["scanning_app"]
)
.where(ScanSessionJobRow.result_processed.isnot(True))
.where(
ScanSessionJobRow.status.in_(
(ScanStatusTypes.QUEUED, ScanStatusTypes.RESUMING)
)
)
)
select_from = ScanSessionJobsTable.join(
ScanSessionsTable,
ScanSessionJobRow.scan_session_id == ScanSessionRow.scan_session_id,
)
select_from = select_from.join(
ScansTable, ScanRow.scan_id == ScanSessionRow.scan_id
)
select_from = select_from.join(
TargetsTable, TargetRow.target_id == ScanSessionRow.target_id
)
select_from = select_from.join(
ProfilesTable, ProfileRow.profile_id == ScanSessionRow.used_profile_id
)
sql_fetch_starting_jobs = (
sql_select(
(
ScanSessionJobRow,
ScanSessionRow.scan_id,
ScanSessionRow.deleted_at,
ScanSessionRow.allowed_targets,
ScanSessionRow.scan_key,
ProfileRow.jobs,
TargetRow.address,
TargetRow.target_id,
TargetRow.type.label("target_type"),
TargetRow.verification,
ScanRow.owner_id,
ScanRow.creator_id,
ScanRow.continuous,
ScanRow.schedule_start_date.label("scan_schedule_start_date"),
ScanRow.recurrence.label("scan_recurrence"),
)
)
.select_from(select_from)
.where(
ScanSessionJobRow.status.in_(
(ScanStatusTypes.QUEUED, ScanStatusTypes.RESUMING)
)
)
.where(ScanSessionJobRow.result_processed.isnot(True))
.order_by(ScanSessionJobRow.start_deadline)
)
if lock_to_owner_id:
sql_fetch_starting_jobs = sql_fetch_starting_jobs.where(
ScanRow.owner_id == lock_to_owner_id
)
sql_abort_jobs = (
(
ScanSessionJobsTable.update(
values=dict(
status=ScanStatusTypes.ABORTED,
start_date=sql_func.current_timestamp(),
end_date=sql_func.current_timestamp(),
status_data=AbortReason.abort_requested,
)
)
)
.where(
ScanSessionJobRow.status.in_(
(ScanStatusTypes.QUEUED, ScanStatusTypes.RESUMING)
)
)
.where(ScanSessionJobRow.abort_requested.is_(True))
)
jobs_to_process = []
with Connection(self.shard_name) as (db):
db.execute(sql_abort_jobs)
for scan_session_job in db.execute(sql_fetch_starting_jobs).fetchall():
jobs_to_process.append(dict(scan_session_job))
jobs_count = len(jobs_to_process)
if jobs_count:
logger.debug("%s jobs to start", jobs_count)
has_slots = dict()
for scan_session_job in jobs_to_process:
if should_suppress_scan_starting():
break
if scan_session_job["deleted_at"]:
update_scanning_job(
dict(
status=ScanStatusTypes.ABORTED,
start_date=sql_func.current_timestamp(),
end_date=sql_func.current_timestamp(),
status_data=AbortReason.scan_deleted,
)
)
continue
if is_setup_type_aws():
user = prepare_user_data_ext(
self.shard_name,
scan_session_job["owner_id"],
datetime.timedelta(hours=12),
)
if not user:
logger.error("user not found %s", scan_session_job["owner_id"])
update_scanning_job(
dict(
status=ScanStatusTypes.ABORTED,
start_date=sql_func.current_timestamp(),
end_date=sql_func.current_timestamp(),
status_data=AbortReason.user_deleted,
)
)
continue
if user["license_data"]["product_code"] == "OVSTRIAL":
if scan_session_job["scanning_app"] == ScanAppTypes.WVS:
scan_session_job["scanning_mode"] = "demo"
if scan_session_job["target_type"] == TargetTypes.NETWORK_ONLY:
if scan_session_job["scanning_app"] != ScanAppTypes.OVAS:
logger.warning(
"aborted web scan:%s on network only target for disabled user:%s",
scan_session_job["scan_id"],
scan_session_job["owner_id"],
)
self._Broker__mark_job_aborted(
scan_session_job, AbortReason.network_scan_only
)
continue
if not user["enabled"]:
logger.warning(
"aborted scan:%s for disabled user:%s",
scan_session_job["scan_id"],
scan_session_job["owner_id"],
)
self._Broker__mark_job_aborted(
scan_session_job, AbortReason.user_disabled
)
continue
if user["license_data"]["product_code"] == "OVS_FREE_MODE":
pass
if scan_session_job["scanning_app"] == ScanAppTypes.WVS:
logger.warning(
"aborted wvs scan:%s for OVS_FREE_MODE user:%s",
scan_session_job["scan_id"],
scan_session_job["owner_id"],
)
self._Broker__mark_job_aborted(
scan_session_job, AbortReason.license_error
)
continue
if scan_session_job["target_type"] == TargetTypes.DEMO:
logger.warning(
"aborted demo network scan:%s for OVS_FREE_MODE user:%s",
scan_session_job["scan_id"],
scan_session_job["owner_id"],
)
self._Broker__mark_job_aborted(
scan_session_job, AbortReason.license_error
)
continue
else:
if user["license_data"]["product_code"] == "OVS_EXPIRED":
logger.warning(
"aborted scan:%s for OVS_EXPIRED user:%s",
scan_session_job["scan_id"],
scan_session_job["owner_id"],
)
self._Broker__mark_job_aborted(
scan_session_job, AbortReason.license_error
)
continue
else:
if user["license_data"]["product_code"] == "OVSTRIAL":
pass
if scan_session_job["scanning_app"] == ScanAppTypes.OVAS:
if scan_session_job["target_type"] == TargetTypes.DEMO:
pass
else:
if user["confirmation"] not in (
UserConfirmation.AUTO_VALIDATED,
UserConfirmation.ADMIN_VALIDATED,
):
logger.warning(
"aborted scan:%s for OVSTRIAL not confirmed user:%s",
scan_session_job["scan_id"],
scan_session_job["owner_id"],
)
self._Broker__mark_job_aborted(
scan_session_job, AbortReason.user_not_validated
)
continue
else:
if scan_session_job["scanning_app"] == ScanAppTypes.WVS:
wvs_scan_count = 0
with Connection(self.shard_name) as (db):
sf = ScanSessionsTable.join(
ProfilesTable,
ProfileRow.profile_id
== ScanSessionRow.used_profile_id,
)
q = (
sql_select((ProfileRow.jobs,))
.select_from(sf)
.where(
ScanSessionRow.scan_id.in_(
sql_select((ScanRow.scan_id,)).where(
ScanRow.owner_id == get_owner_id(user)
)
)
)
)
for r in db.execute(q).fetchall():
if ScanAppTypes.WVS in r.jobs:
wvs_scan_count += 1
if wvs_scan_count > 5:
self._Broker__mark_job_aborted(
scan_session_job, AbortReason.too_many_web_scans
)
continue
else:
if user["license_type"] == "customer":
user_license = BaseLicense.get_license_from_license_data(
user["license_data"]
)
if not user_license.is_active_license():
logger.warning(
"aborted scan:%s for expired license user:%s",
scan_session_job["scan_id"],
scan_session_job["owner_id"],
)
self._Broker__mark_job_aborted(
scan_session_job, AbortReason.license_error
)
continue
hs = has_slots.get(scan_session_job["scanning_app"])
if hs is None:
hs = self.worker_manager.has_slots(
scan_session_job["scanning_app"]
)
has_slots[scan_session_job["scanning_app"]] = hs
if not hs:
logging.warning(
"no more %s workers", scan_session_job["scanning_app"]
)
continue
if has_feature(SetupFeatures.TARGET_CHECKING_MANDATORY):
if scan_session_job["target_type"] == TargetTypes.DEFAULT:
if scan_session_job["scanning_app"] == ScanAppTypes.WVS:
if (
scan_session_job["verification"]
!= TargetVerificationTypes.ADMIN
):
r = do_check_target(
self.shard_name,
scan_session_job["target_id"],
generate_notification=True,
)
if r["status"] != "success":
logger.warning(
"aborted scan:%s for target verification failed user:%s",
scan_session_job["scan_id"],
scan_session_job["owner_id"],
)
self._Broker__mark_job_aborted(
scan_session_job,
AbortReason.target_verification_failed,
)
continue
try:
worker_id = self.worker_manager.add_job(**scan_session_job)
except NoMoreWorkerSlots:
logger.debug(
"NoMoreWorkerSlots, aborting attempt_to_start_queued_jobs process"
)
has_slots[scan_session_job["scanning_app"]] = False
continue
except NoMoreWorkers:
logger.debug(
"NoMoreWorkers, aborting attempt_to_start_queued_jobs process"
)
has_slots[scan_session_job["scanning_app"]] = False
continue
except StartFailed as e:
self._Broker__mark_job_failed(
scan_session_job,
dict(reason=JobConstants.start_failed, extra=e.str_message),
start_date=sql_func.current_timestamp(),
)
continue
end_deadline = None
if self.max_scan_time:
t = "continuous" if scan_session_job["continuous"] else "generic"
d = self.max_scan_time.get(
scan_session_job["scanning_app"], {}
).get(t)
if d:
end_deadline = (datetime.datetime.now(tz=tzlocal())) + d
update_scanning_job(
dict(
status=ScanStatusTypes.STARTING,
start_date=sql_func.current_timestamp(),
last_update=sql_func.current_timestamp(),
end_deadline=end_deadline,
worker_id=worker_id,
)
)
self._Broker__create_scan_job_event(
scan_session_job,
"scan_job_starting",
"starting",
worker_id=worker_id,
)
self._Broker__schedule_job_recheck(
scan_session_job, job_helpers.CheckDeltaT.starting
)
with Connection(self.shard_name) as (db):
q = (
(
ScanSessionsTable.update(
values=dict(start_date=sql_func.current_timestamp())
)
)
.where(
sql_and(
ScanSessionRow.scan_session_id
== scan_session_job["scan_session_id"],
ScanSessionRow.start_date.is_(None),
)
)
.returning(ScanSessionRow.scan_session_id)
)
sid = db.execute(q).scalar()
if sid:
if not scan_session_job["continuous"]:
event_query = create_event(
"scan_started",
owner_id=scan_session_job["owner_id"],
user_id=scan_session_job["creator_id"],
resource_type=EventResources.scan,
resource_id=scan_session_job["scan_id"],
data=dict(
scan_session_id=scan_session_job["scan_session_id"],
worker_id=worker_id,
),
shard=self.shard_name,
)
db.execute(event_query)
def process_finished_jobs(self):
"""
get all completed jobs and process them
:return:
"""
select_from = (
ScanSessionJobsTable.join(
ScanSessionsTable,
ScanSessionRow.scan_session_id == ScanSessionJobRow.scan_session_id,
)
.join(ScansTable, ScanSessionRow.scan_id == ScanRow.scan_id)
.join(
ProfilesTable, ProfileRow.profile_id == ScanSessionRow.used_profile_id
)
.join(TargetsTable, TargetRow.target_id == ScanSessionRow.target_id)
.outerjoin(
TargetConfigurationTable,
sql_and(
TargetConfigurationRow.name == "continuous_mode",
TargetConfigurationRow.target_id == ScanRow.target_id,
),
)
.outerjoin(
ScanSessionVulnsStatsTable,
ScanSessionVulnsStatsRow.scan_session_id
== ScanSessionJobRow.scan_session_id,
)
)
sql_fetch_finished_unprocessed_jobs = (
sql_select(
(
ScanSessionJobRow,
ScanSessionRow.used_profile_id,
ScanSessionRow.event_level,
ScanRow.scan_id,
ScanRow.creator_id,
ScanRow.owner_id,
ScanRow.schedule_disabled,
ScanRow.continuous,
ScanRow.report_template_id,
ScanRow.recurrence,
ScanRow.target_id,
ScanSessionVulnsStatsRow.vuln_stats,
ProfileRow.jobs,
ProfileRow.name.label("profile_name"),
TargetConfigurationRow.value.label("continuous_mode_enabled"),
TargetRow.address.label("target_address"),
TargetRow.description.label("target_description"),
)
)
.select_from(select_from)
.where(ScanSessionJobRow.status.in_(list(FinishedScanJobsStatusTypes)))
.where(
sql_not(
ScanSessionRow.status.in_(
(ScanStatusTypes.ABORTED, ScanStatusTypes.FAILED)
)
)
)
.where(ScanRow.deleted_at.is_(None))
.where(ScanSessionRow.deleted_at.is_(None))
.where(ScanSessionJobRow.result_processed.is_(False))
)
scan_sessions = defaultdict(dict)
required_jobs = dict()
with Connection(self.shard_name) as (db):
for job in db.execute(sql_fetch_finished_unprocessed_jobs).fetchall():
scan_sessions[job.scan_session_id][job.scanning_app] = dict(job)
if job.scan_session_id not in required_jobs:
required_jobs[job.scan_session_id] = set(job.jobs.keys())
for scan_session_id, jobs in scan_sessions.items():
logger.debug("check %s end status", scan_session_id)
logger.debug("jobs: ", jobs)
if set(jobs.keys()) != required_jobs[scan_session_id]:
logger.debug("%s is not finished yet", scan_session_id)
continue
scan_descriptor = jobs[list(jobs.keys())[0]]
now = datetime.datetime.now(tz=tzlocal())
start_date = min(
[
job["start_date"] if job["start_date"] else now
for job in jobs.values()
]
)
end_date = max(
[job["end_date"] if job["end_date"] else now for job in jobs.values()]
)
event_level = scan_descriptor["event_level"]
if event_level is None:
event_level = 0
target_id = scan_descriptor["target_id"]
recurrence = scan_descriptor["recurrence"]
target_in_continuous_mode = scan_descriptor["continuous_mode_enabled"]
this_is_continuous_mode_scan = scan_descriptor["continuous"]
scan_id = scan_descriptor["scan_id"]
paused = ScanStatusTypes.PAUSED in [job["status"] for job in jobs.values()]
aborted = (
ScanStatusTypes.ABORTED in [job["status"] for job in jobs.values()]
and not paused
)
logger.debug("Paused: %s, Aborted: %s", paused, aborted)
with Connection(self.shard_name) as (db):
user_data = prepare_user_data(db, user_id=scan_descriptor["owner_id"])
if user_data is None:
logger.warning(
"user %s not found or disabled", scan_descriptor["owner_id"]
)
q = (
ScanSessionJobsTable.update(values=dict(result_processed=True))
).where(ScanSessionJobRow.scan_session_id == scan_session_id)
db.execute(q)
if this_is_continuous_mode_scan:
q = (
(TargetConfigurationTable.update(values=dict(value=None)))
.where(TargetConfigurationRow.target_id == target_id)
.where(TargetConfigurationRow.name == "continuous_mode")
)
db.execute(q)
continue
if is_setup_type_aws():
user_license = BaseLicense.get_license_from_license_data(
user_data["license_data"]
)
else:
if is_setup_type_on_premise():
user_license = BaseLicense.get_system_license()
else:
logger.error("unsupported setup type")
continue
failed_count = 0
for job in jobs.values():
if job["status"] == ScanStatusTypes.FAILED:
failed_count += 1
scan_session_update = dict(start_date=start_date, end_date=end_date)
if aborted:
scan_session_update["status"] = ScanStatusTypes.ABORTED
else:
if paused:
scan_session_update["status"] = ScanStatusTypes.PAUSED
else:
scan_session_update["status"] = ScanStatusTypes.COMPLETED
if failed_count == len(jobs):
scan_session_update["status"] = ScanStatusTypes.FAILED
if ScanJobEventLevel.CRITICAL in [
job["event_level"] for job in jobs.values()
]:
with Connection(self.shard_name) as (db):
q = (
sql_select((sql_func.count(EventRow.event_id),))
.where(EventRow.resource_id == scan_session_id)
.where(EventRow.type_id.in_((430, 431)))
)
if db.execute(q).scalar():
scan_session_update["status"] = ScanStatusTypes.FAILED
if paused:
if scan_session_update["status"] != ScanStatusTypes.FAILED:
with Connection(self.shard_name) as (db):
q = (
ScanSessionsTable.update(values=scan_session_update)
).where(
ScanSessionRow.scan_session_id == scan_session_id
)
db.execute(q)
q = (
ScanSessionJobsTable.update(
values=dict(result_processed=True)
)
).where(
ScanSessionJobRow.scan_session_id == scan_session_id
)
db.execute(q)
continue
if not aborted:
if has_feature(SetupFeatures.SALES_UPDATES):
if user_data["license_type"] == "trial":
if failed_count < len(jobs):
from helpers.sales.events import trial_scan_done
trial_scan_done(user_data)
if this_is_continuous_mode_scan:
if not target_in_continuous_mode:
with Connection(self.shard_name, create_transactions=True) as (
db
):
q = (
ScanSessionsTable.update(values=scan_session_update)
).where(ScanSessionRow.scan_session_id == scan_session_id)
db.execute(q)
q = (
ScanSessionJobsTable.update(
values=dict(result_processed=True)
)
).where(
ScanSessionJobRow.scan_session_id == scan_session_id
)
db.execute(q)
continue
disable_recurrence = False
if not user_license.has_feature(Features.CONTINUOUS_SCANS):
logger.debug(
"disable_recurrence %s continuous scan not allowed",
scan_session_id,
)
disable_recurrence = AbortReason.license_error
else:
if aborted:
status_data_list = [
job["status_data"] for job in jobs.values()
]
if AbortReason.user_deleted in status_data_list:
logger.debug(
"disable_recurrence %s user_deleted",
scan_session_id,
)
disable_recurrence = 1
else:
if AbortReason.user_disabled in status_data_list:
logger.debug(
"disable_recurrence %s user disabled",
scan_session_id,
)
disable_recurrence = 1
else:
if (
AbortReason.network_scan_only
in status_data_list
):
logger.debug(
"disable_recurrence %s network scan only",
scan_session_id,
)
disable_recurrence = 1
else:
if (
AbortReason.too_many_web_scans
in status_data_list
):
logger.debug(
"disable_recurrence %s too many web scans",
scan_session_id,
)
disable_recurrence = 1
else:
if (
AbortReason.license_error
in status_data_list
):
logger.debug(
"disable_recurrence %s license error",
scan_session_id,
)
disable_recurrence = 1
else:
if (
AbortReason.user_not_validated
in status_data_list
):
scan_session_update[
"status"
] = ScanStatusTypes.COMPLETED
else:
if (
AbortReason.start_timeout
in status_data_list
):
logger.debug(
"disable_recurrence %s start timeout",
scan_session_id,
)
disable_recurrence = 1
else:
if (
AbortReason.scan_deleted
in status_data_list
):
logger.debug(
"disable_recurrence %s scan deleted",
scan_session_id,
)
disable_recurrence = 1
else:
if (
AbortReason.abort_requested
in status_data_list
):
pass
else:
if (
AbortReason.excluded_hours
in status_data_list
):
pass
else:
if (
AbortReason.target_verification_failed
in status_data_list
):
pass
else:
if (
AbortReason.scan_timeout
in status_data_list
):
scan_session_update[
"status"
] = (
ScanStatusTypes.COMPLETED
)
scan_session_update[
"progress"
] = 100
else:
disable_recurrence = (
1
)
logger.warning(
"disabling recurrence for scan %s",
scan_session_id,
)
if disable_recurrence:
disable_cm_query = (
(
TargetConfigurationTable.update(
values=dict(
value=None
)
)
)
.where(
TargetConfigurationRow.target_id
== target_id
)
.where(
TargetConfigurationRow.name
== "continuous_mode"
)
)
event = None
if isinstance(
disable_recurrence,
str,
):
event = dict(
name="target_continuous_mode_disabled",
user_id=scan_descriptor[
"creator_id"
],
owner_id=scan_descriptor[
"owner_id"
],
resource_id=scan_descriptor[
"scan_id"
],
resource_type=EventResources.scan,
shard=self.shard_name,
data=dict(
reason=disable_recurrence,
target_id=target_id,
target_desc=(
scan_descriptor[
"target_address"
],
scan_descriptor[
"target_description"
],
),
),
)
with Connection(
self.shard_name,
create_transactions=True,
) as (
db
):
db.execute(
disable_cm_query
)
if event:
query = create_event(
**event
)
db.execute(
query
)
query = (
ScanSessionsTable.update(
values=scan_session_update
)
).where(
ScanSessionRow.scan_session_id
== scan_session_id
)
db.execute(
query
)
query = (
ScanSessionJobsTable.update(
values=dict(
result_processed=True
)
)
).where(
ScanSessionJobRow.scan_session_id
== scan_session_id
)
db.execute(
query
)
else:
rs = (
dateutil.rrule.rruleset()
)
rs.rrule(
dateutil.rrule.rrulestr(
recurrence
)
)
next_run = rs.after(
datetime.datetime.now(
tz=tzlocal()
),
True,
)
with Connection(
self.shard_name,
create_transactions=True,
) as (
db
):
query = (
ScanSessionsTable.update(
values=scan_session_update
)
).where(
ScanSessionRow.scan_session_id
== scan_session_id
)
db.execute(
query
)
if next_run:
query = (
ScansTable.update(
values=dict(
next_run=next_run
)
)
).where(
ScanRow.scan_id
== scan_id
)
db.execute(
query
)
query = (
ScanSessionJobsTable.update(
values=dict(
result_processed=True
)
)
).where(
ScanSessionJobRow.scan_session_id
== scan_session_id
)
db.execute(
query
)
disable_recurrence = False
generate_event = True
if aborted:
generate_event = False
status_data_list = [job["status_data"] for job in jobs.values()]
if AbortReason.user_deleted in status_data_list:
logger.debug("disable_recurrence %s user deleted", scan_session_id)
disable_recurrence = 1
else:
if AbortReason.user_disabled in status_data_list:
logger.debug(
"disable_recurrence %s user disabled", scan_session_id
)
disable_recurrence = 1
else:
if AbortReason.network_scan_only in status_data_list:
logger.debug(
"disable_recurrence %s network scans only",
scan_session_id,
)
disable_recurrence = 1
else:
if AbortReason.too_many_web_scans in status_data_list:
logger.debug(
"disable_recurrence %s too many web scans",
scan_session_id,
)
disable_recurrence = 1
else:
if AbortReason.license_error in status_data_list:
logger.debug(
"disable_recurrence %s license error",
scan_session_id,
)
disable_recurrence = 1
else:
if (
AbortReason.user_not_validated
in status_data_list
):
logger.debug(
"disable_recurrence %s user not validated",
scan_session_id,
)
disable_recurrence = 1
else:
if (
AbortReason.start_timeout
in status_data_list
):
logger.debug(
"disable_recurrence %s start timeout",
scan_session_id,
)
generate_event = True
else:
if (
AbortReason.scan_deleted
in status_data_list
):
logger.debug(
"disable_recurrence %s scan deleted",
scan_session_id,
)
disable_recurrence = 1
else:
if (
AbortReason.abort_requested
in status_data_list
):
pass
else:
if (
AbortReason.excluded_hours
in status_data_list
):
pass
else:
if (
AbortReason.target_verification_failed
in status_data_list
):
generate_event = True
else:
if (
AbortReason.scan_timeout
in status_data_list
):
scan_session_update[
"status"
] = (
ScanStatusTypes.COMPLETED
)
scan_session_update[
"progress"
] = 100
generate_event = True
else:
disable_recurrence = 1
logger.warning(
"disabling recurrence for scan %s",
scan_session_id,
)
event = dict(
name="scan_done"
if scan_session_update[
"status"
]
== "completed"
else "scan_failed",
user_id=scan_descriptor[
"creator_id"
],
owner_id=scan_descriptor[
"owner_id"
],
resource_id=scan_descriptor[
"scan_id"
],
resource_type=EventResources.scan,
shard=self.shard_name,
data=dict(
scan_session_id=scan_session_id,
target_id=scan_descriptor[
"target_id"
],
target_desc=(
scan_descriptor[
"target_address"
],
scan_descriptor[
"target_description"
],
),
failed_job_count=failed_count,
event_level=event_level,
started=start_date,
ended=end_date,
vuln_stats=scan_descriptor[
"vuln_stats"
],
profile_name=scan_descriptor[
"profile_name"
],
),
)
next_run = None
if (
scan_session_update[
"status"
]
== ScanStatusTypes.FAILED
):
if (
is_setup_type_on_premise()
):
pass
else:
logger.debug(
"disable_recurrence %s scan failed",
scan_session_id,
)
disable_recurrence = (
True
)
if disable_recurrence:
event["data"][
"recurrence_disabled"
] = True
else:
if scan_descriptor[
"recurrence"
]:
if (
scan_descriptor[
"schedule_disabled"
]
is not True
):
rs = (
dateutil.rrule.rruleset()
)
rs.rrule(
dateutil.rrule.rrulestr(
scan_descriptor[
"recurrence"
]
)
)
next_run = rs.after(
datetime.datetime.now(
tz=tzlocal()
),
True,
)
if next_run:
event[
"data"
][
"next_run"
] = next_run
with Connection(
self.shard_name,
create_transactions=True,
) as (db):
query = (
ScanSessionsTable.update(
values=scan_session_update
)
).where(
ScanSessionRow.scan_session_id
== scan_session_id
)
db.execute(query)
if next_run:
query = (
ScansTable.update(
values=dict(
next_run=next_run
)
)
).where(
ScanRow.scan_id
== scan_id
)
db.execute(
query
)
else:
if scan_descriptor[
"recurrence"
]:
query = (
ScansTable.update(
values=dict(
schedule_disabled=True
)
)
).where(
ScanRow.scan_id
== scan_id
)
db.execute(
query
)
if generate_event:
query = create_event(
**event
)
db.execute(
query
)
query = (
ScanSessionJobsTable.update(
values=dict(
result_processed=True
)
)
).where(
ScanSessionJobRow.scan_session_id
== scan_session_id
)
db.execute(query)
if scan_descriptor["report_template_id"]:
if scan_session_update["status"] == ScanStatusTypes.COMPLETED:
if not (
not user_license.has_feature(Features.COMPLIANCE_REPORTS)
and is_report_id_compliance_id(
scan_descriptor["report_template_id"]
)
or not user_license.has_feature(Features.EXPORT_WAF)
and is_report_id_waf_export_id(
scan_descriptor["report_template_id"]
)
):
report_id = str(uuid.uuid4())
query = ReportsTable.insert().values(
report_id=report_id,
owner_id=scan_descriptor["owner_id"],
creator_id=scan_descriptor["creator_id"],
source=dict(
list_type="scan_result",
id_list=[scan_session_id],
description=(";").join(
(
scan_descriptor["target_address"],
scan_descriptor["target_description"],
)
),
),
report_template_id=scan_descriptor[
"report_template_id"
],
source_type="scan_result",
)
with Connection(self.shard_name) as (db):
db.execute(query)
Python
1
https://gitee.com/zhanghk668/opsrv_extracted.git
git@gitee.com:zhanghk668/opsrv_extracted.git
zhanghk668
opsrv_extracted
某扫描器核心反编译
master

搜索帮助