diff --git a/README.md b/README.md index f717bf796239fe2ab466b4b56925bc631d0ff008..746e6be1fdb3ab7a638bbb887dce7d39f53ddd29 100644 --- a/README.md +++ b/README.md @@ -133,6 +133,7 @@ initdb EXITED Dec 22 01:15 PM nginx RUNNING pid 15, uptime 0:10:03 postgres RUNNING pid 17, uptime 0:10:03 scheduler RUNNING pid 19, uptime 0:10:03 +seaweedfs RUNNING pid 24, uptime 0:10:03 worker RUNNING pid 21, uptime 0:10:03 ``` @@ -142,7 +143,7 @@ worker RUNNING pid 21, uptime 0:10:03 **大功告成,返回首页即可开始使用了!** 如需停止服务,执行: `docker rm -f rssant` -如需备份数据,备份 `rssant-postgres-data` 这个卷即可,其他卷可忽略。 +如需备份数据,备份 `rssant-postgres-data` 和 `rssant-data` 这两个卷,其他卷可忽略。 ## 反馈与协作 diff --git a/actorlib/prometheus.py b/actorlib/prometheus.py index 9ab0965eef05b687d026e989b6fd5c3223e98793..a9b8ae5c06e0d61f852a18934ea2a3ef4ca2480c 100644 --- a/actorlib/prometheus.py +++ b/actorlib/prometheus.py @@ -62,7 +62,6 @@ ACTOR_EXECUTOR_TIME = Histogram( 'dst', ], buckets=( - .005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, - 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0, 50.0, 55.0, 60.0, 120.0, + 0.001, 0.005, 0.025, 0.100, 0.500, 2.5, 10.0, 50.0, 200.0, ) ) diff --git a/box/Dockerfile b/box/Dockerfile index 160e031a430b4307bcad87a0b6197e99f15dd3f4..caddcf4902db8ce066483bd38377da387dc4e7e5 100644 --- a/box/Dockerfile +++ b/box/Dockerfile @@ -34,6 +34,17 @@ RUN sh -c "echo 'deb http://mirrors.zju.edu.cn/postgresql/repos/apt/ stretch-pgd apt-get update && \ apt-get install -y postgresql-11 +# install seaweedfs +RUN mkdir -p /tmp/seaweedfs && \ + wget -O /tmp/seaweedfs/weed.tar.gz 'https://github.com/chrislusf/seaweedfs/releases/download/1.79/linux_amd64.tar.gz' && \ + bash -exc '\ + pushd /tmp/seaweedfs; \ + tar -xzf weed.tar.gz; \ + mv weed /usr/bin/ && chmod u+x /usr/bin/weed; \ + popd;' && \ + weed version && \ + rm -r /tmp/seaweedfs + # install rssant ARG PYPI_MIRROR="https://mirrors.aliyun.com/pypi/simple/" ENV PIP_INDEX_URL=$PYPI_MIRROR PIP_DISABLE_PIP_VERSION_CHECK=1 @@ -51,6 +62,6 @@ RUN bash box/setup-container.sh VOLUME /var/lib/postgresql/11/main VOLUME /var/log/postgresql VOLUME /app/data -EXPOSE 80 5432 6786 6788 6790 6791 6792 9001 +EXPOSE 80 5432 6786 6788 6790 6791 6792 9001 9333 9080 19333 19080 CMD ["/bin/bash", "-c", "/usr/local/bin/supervisord -c /etc/supervisord.conf"] diff --git a/box/bin/start-initdb.sh b/box/bin/start-initdb.sh index d609b7580b96b9d5f285bbca0c35921d16ab040d..efd500c0652ba702aba03ae86f202a8440951f79 100644 --- a/box/bin/start-initdb.sh +++ b/box/bin/start-initdb.sh @@ -15,5 +15,10 @@ python manage.py runscript django_pre_migrate python manage.py migrate python manage.py runscript django_db_init +# wait and init seaweedfs +python scripts/seaweedfs_wait_and_init.py http://127.0.0.1:9333/dir/assign +curl -v http://127.0.0.1:9333/dir/status?pretty=y +curl -v http://127.0.0.1:9080/status?pretty=y + touch /app/data/initdb.ready exit 0 diff --git a/box/bin/start-seaweedfs.sh b/box/bin/start-seaweedfs.sh new file mode 100644 index 0000000000000000000000000000000000000000..fe13c8acdca47e49ab5e898529572c67fc528e61 --- /dev/null +++ b/box/bin/start-seaweedfs.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -ex + +/usr/bin/weed server \ + -dir /app/data/seaweedfs \ + -volume.max 1 \ + -volume.index leveldb \ + -master.port=9333 \ + -volume.port=9080 \ + -ip 127.0.0.1 diff --git a/box/bin/wait-initdb.sh b/box/bin/wait-initdb.sh index 260ccf0dd9c0e5c73d0f95c2e55241df13ebf4b9..ea6679c6fc940b0d6d7632dea838b279ada3e201 100644 --- a/box/bin/wait-initdb.sh +++ b/box/bin/wait-initdb.sh @@ -1,11 +1,11 @@ #!/bin/bash wait_file() { - local file="$1"; shift - # 180 seconds as default timeout - local wait_seconds="${1:-180}"; shift - until test $((wait_seconds--)) -eq 0 -o -f "$file" ; do sleep 1; done - ((++wait_seconds)) + local file="$1"; shift + # 180 seconds as default timeout + local wait_seconds="${1:-180}"; shift + until test $((wait_seconds--)) -eq 0 -o -f "$file" ; do sleep 1; done + ((++wait_seconds)) } wait_file "/app/data/initdb.ready" diff --git a/box/nginx/nginx.conf b/box/nginx/nginx.conf index 3f352dc4eb45ceccff9b21bb039f0d21ce09849d..cdbdd37fa27c8c438a11c9305d881faca83c13bf 100644 --- a/box/nginx/nginx.conf +++ b/box/nginx/nginx.conf @@ -58,7 +58,6 @@ http { text/css text/javascript text/plain - text/html text/markdown text/vcard text/calendar diff --git a/box/run.sh b/box/run.sh index 1501a71202f63b35e9acc3119c17812ccf59af39..c3c2813087e3d88b4e94f5fb80557d39fd7b7c51 100755 --- a/box/run.sh +++ b/box/run.sh @@ -15,4 +15,4 @@ docker run -ti --name rssant -d \ --restart unless-stopped \ guyskk/rssant:latest $@ -docker logs --tail 1000 -f rssant +# docker logs --tail 1000 -f rssant diff --git a/box/setup-container.sh b/box/setup-container.sh index d4ad8233167b7281abbaaa9859473d2bd4c9195c..528742dd9e26ab04c864f05371c51c17c5c810e6 100644 --- a/box/setup-container.sh +++ b/box/setup-container.sh @@ -4,6 +4,7 @@ set -e mkdir -p logs/ mkdir -p data/ +mkdir -p data/seaweedfs/ chmod a+x box/bin/* # config postgres diff --git a/box/supervisord.conf b/box/supervisord.conf index d33f7b131d05facb51f4f7f420802a3530f0f16b..cba6483470be355334614ad98e1ce13f6c9bbc68 100644 --- a/box/supervisord.conf +++ b/box/supervisord.conf @@ -1,99 +1,25 @@ ; For more information on the config file, please see: ; http://supervisord.org/configuration.html -; -; Notes: -; - Shell expansion ("~" or "$HOME") is not supported. Environment -; variables can be expanded using this syntax: "%(ENV_HOME)s". -; - Quotes around values are not supported, except in the case of -; the environment= options as shown below. -; - Comments must have a leading space: "a=b ;comment" not "a=b;comment". -; - Command will be truncated if it looks like a config file comment, e.g. -; "command=bash -c 'foo ; bar'" will truncate to "command=bash -c 'foo ". [unix_http_server] -file=/var/supervisor.sock ; the path to the socket file +file=/var/supervisor.sock -; Security Warning: Never expose the inet HTTP server to the public internet. -[inet_http_server] ; inet (TCP) server disabled by default -port=0.0.0.0:9001 ; ip_address:port specifier, *:port for all iface +[inet_http_server] +port=0.0.0.0:9001 [supervisord] user=root -logfile=/dev/stdout logfile_maxbytes=0 logfile_backups=0 -;logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log -;logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB -;logfile_backups=10 ; # of main logfile backups; 0 means none, default 10 -loglevel=info ; log level; default info; others: debug,warn,trace -pidfile=/var/supervisord.pid ; supervisord pidfile; default supervisord.pid -nodaemon=true ; start in foreground if true; default false -minfds=1024 ; min. avail startup file descriptors; default 1024 -minprocs=200 ; min. avail process descriptors;default 200 -;umask=022 ; process file creation umask; default 022 -;user=supervisord ; setuid to this UNIX account at startup; recommended if root -;identifier=supervisor ; supervisord identifier, default is 'supervisor' -;directory=/app ; default is not to cd during start -;nocleanup=true ; don't clean up tempfiles at start; default false -childlogdir=/tmp ; 'AUTO' child log dir, default $TEMP -;environment=KEY="value" ; key value pairs to add to environment -;strip_ansi=false ; strip ansi escape codes in logs; def. false - -; The rpcinterface:supervisor section must remain in the config file for -; RPC (supervisorctl/web interface) to work. Additional interfaces may be -; added by defining them in separate [rpcinterface:x] sections. +loglevel=info +pidfile=/var/supervisord.pid +nodaemon=true [rpcinterface:supervisor] -supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface - -; The supervisorctl section configures how supervisorctl will connect to -; supervisord. configure it match the settings in either the unix_http_server -; or inet_http_server section. +supervisor.rpcinterface_factory=supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] -serverurl=unix:////var/supervisor.sock ; use a unix:// URL for a unix socket -;serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket -;username= ; should be same as in [*_http_server] if set -;password= ; should be same as in [*_http_server] if set -;prompt=supervisor ; cmd line prompt (default "supervisor") -;history_file=~/.sc_history ; use readline history if available - -; The sample program section below shows all possible program subsection values. -; Create one or more 'real' program: sections to be able to control them under -; supervisor. - -;[program:theprogramname] -;command=/bin/cat ; the program (relative uses PATH, can take args) -;process_name=%(program_name)s ; process_name expr (default %(program_name)s) -;numprocs=1 ; number of processes copies to start (def 1) -;directory=/tmp ; directory to cwd to before exec (def no cwd) -;umask=022 ; umask for process (default None) -;priority=999 ; the relative start priority (default 999) -;autostart=true ; start at supervisord start (default: true) -;startsecs=1 ; # of secs prog must stay up to be running (def. 1) -;startretries=3 ; max # of serial start failures when starting (default 3) -;autorestart=unexpected ; when to restart if exited after running (def: unexpected) -;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0) -;stopsignal=QUIT ; signal used to kill process (default TERM) -;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10) -;stopasgroup=false ; send stop signal to the UNIX process group (default false) -;killasgroup=false ; SIGKILL the UNIX process group (def false) -;user=chrism ; setuid to this UNIX account to run the program -;redirect_stderr=true ; redirect proc stderr to stdout (default false) -;stdout_logfile=/a/path ; stdout log path, NONE for none; default AUTO -;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB) -;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10) -;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0) -;stdout_events_enabled=false ; emit events on stdout writes (default false) -;stdout_syslog=false ; send stdout to syslog with process name (default false) -;stderr_logfile=/a/path ; stderr log path, NONE for none; default AUTO -;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB) -;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10) -;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0) -;stderr_events_enabled=false ; emit events on stderr writes (default false) -;stderr_syslog=false ; send stderr to syslog with process name (default false) -;environment=A="1",B="2" ; process environment additions (def no adds) -;serverurl=AUTO ; override serverurl computation (childutils) +serverurl=unix:////var/supervisor.sock ;============================================================================= ; RSSAnt Process Configs @@ -115,6 +41,14 @@ redirect_stderr=true stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 +[program:seaweedfs] +user=root +directory=/app +command=/app/box/bin/start-seaweedfs.sh +redirect_stderr=true +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 + [program:initdb] user=root directory=/app diff --git a/box/test.sh b/box/test.sh index e3775353f2af9c6d1542dd0d80655c28f314e843..a2aa0fc4366ef379d6a3b3040278400274164b1c 100755 --- a/box/test.sh +++ b/box/test.sh @@ -2,4 +2,13 @@ set -e -docker run -ti guyskk/rssant:latest pytest +bash ./box/run.sh +sleep 3 +docker ps --latest +docker logs --tail 1000 rssant + +docker run -ti guyskk/rssant:latest pytest -m 'not dbtest' + +docker exec -ti rssant bash box/bin/wait-initdb.sh +docker exec -ti rssant pytest -m 'dbtest' +docker rm -f rssant || true diff --git a/django_rest_validr/__init__.py b/django_rest_validr/__init__.py index e66726b43a379a46243c50e562d68fcb2d29a2c9..c3196f579391fc35e249927b861b113f58271d37 100644 --- a/django_rest_validr/__init__.py +++ b/django_rest_validr/__init__.py @@ -1,4 +1,5 @@ import time +import json import itertools from collections import ChainMap @@ -7,6 +8,7 @@ import coreschema from validr import T, Compiler, Invalid from django.urls import path from django.http import HttpResponse +from django.core.serializers.json import DjangoJSONEncoder from rest_framework.response import Response from rest_framework.views import APIView from rest_framework.schemas import AutoSchema @@ -47,6 +49,9 @@ def coreschema_from_validr(item): return schema_cls(default=default, description=description) +JSON_TYPE = 'application/json; charset=utf-8' + + class RestViewSchema(AutoSchema): """ Overrides `get_link()` to provide Custom Behavior X @@ -119,18 +124,28 @@ class RestRouter: urls_priority.append(urls_map[url]) return urls_priority + urls - @staticmethod - def _response_from_invalid(ex): - return Response({ + @classmethod + def _response_from_invalid(cls, ex): + return cls._json_response({ 'description': str(ex), 'position': ex.position, 'message': ex.message, 'field': ex.field, 'value': ex.value, - }, status=400) - - @staticmethod - def _make_method(method, f, params, returns): + }, status=400, content_type=JSON_TYPE) + + @classmethod + def _json_response(cls, data, status=200): + # django.conf.Settings.DEFAULT_CONTENT_TYPE implement is slow !!! + text = json.dumps(data, ensure_ascii=False, cls=DjangoJSONEncoder) + return HttpResponse( + content=text.encode('utf-8'), + status=status, + content_type=JSON_TYPE, + ) + + @classmethod + def _make_method(cls, method, f, params, returns): def rest_method(self, request, format=None, **kwargs): ret = None validr_cost = 0 @@ -155,9 +170,9 @@ class RestRouter: t_begin = time.time() ret = returns(ret) validr_cost += time.time() - t_begin - ret = Response(ret) + ret = cls._json_response(ret) elif ret is None: - ret = Response(status=204) + ret = HttpResponse(status=204, content_type=JSON_TYPE) if validr_cost > 0: ret['X-Validr-Time'] = '{:.0f}ms'.format(validr_cost * 1000) if api_cost > 0: diff --git a/docs/design/seaweedfs-benchmark.md b/docs/design/seaweedfs-benchmark.md new file mode 100644 index 0000000000000000000000000000000000000000..0342be764040b23e875f01b542a5924af11a3592 --- /dev/null +++ b/docs/design/seaweedfs-benchmark.md @@ -0,0 +1,57 @@ +## SeaweedFS Benchmark + +阿里云 200G 高效云盘,大约 3000 IOPS。 + +### Write + +``` +Concurrency Level: 16 +Time taken for tests: 384.116 seconds +Complete requests: 1048576 +Failed requests: 0 +Total transferred: 1106812710 bytes +Requests per second: 2729.84 [#/sec] +Transfer rate: 2813.92 [Kbytes/sec] + +Connection Times (ms) + min avg max std +Total: 0.5 5.8 107.9 3.5 + +Percentage of the requests served within a certain time (ms) + 50% 4.8 ms + 66% 6.4 ms + 75% 7.2 ms + 80% 7.9 ms + 90% 10.0 ms + 95% 12.4 ms + 98% 15.5 ms + 99% 18.2 ms + 100% 107.9 ms +``` + +### Read + +``` +Concurrency Level: 16 +Time taken for tests: 149.622 seconds +Complete requests: 1048576 +Failed requests: 0 +Total transferred: 1106776080 bytes +Requests per second: 7008.19 [#/sec] +Transfer rate: 7223.80 [Kbytes/sec] + +Connection Times (ms) + min avg max std +Total: 0.1 2.2 395.3 2.2 + +Percentage of the requests served within a certain time (ms) + 50% 1.7 ms + 66% 2.4 ms + 75% 2.9 ms + 80% 3.1 ms + 90% 3.6 ms + 95% 5.0 ms + 98% 7.6 ms + 99% 9.7 ms + 100% 395.3 ms +``` diff --git a/manage.py b/manage.py index 482cd91d4bf6e24e042b85bd4102a1148e074100..cdf7d067c0dba2c0a67bc682c5bf674b0cd060cf 100755 --- a/manage.py +++ b/manage.py @@ -1,6 +1,5 @@ #!/usr/bin/env python import sys -import time import rssant_common.django_setup # noqa:F401 from rssant_config import CONFIG @@ -17,11 +16,4 @@ if __name__ == '__main__': "forget to activate a virtual environment?" ) from exc configure_logging(level=CONFIG.log_level) - while True: - try: - execute_from_command_line(sys.argv) - except Exception as ex: - print(f'* {type(ex).__name__}: {ex}') - time.sleep(3) - else: - break + execute_from_command_line(sys.argv) diff --git a/pytest.ini b/pytest.ini index b263f5a93b034e8244ce44d818847dce7fe66871..acf60022527d04b9a7a87cd2bad2d614c2b7d3f8 100644 --- a/pytest.ini +++ b/pytest.ini @@ -2,3 +2,7 @@ norecursedirs=data .vscode addopts=--doctest-modules -v --cov . -W all --ignore=data --ignore=box doctest_encoding=UTF-8 +python_files=test_*.py *_test.py tests.py *_tests.py +DJANGO_SETTINGS_MODULE=rssant.settings +markers= + dbtest: mark a test which need access django database. diff --git a/requirements.in b/requirements.in index 30038b16872e794e717abe56df156bec1a66701a..fe05eb552055020829a664bb3e787eaac871d887 100644 --- a/requirements.in +++ b/requirements.in @@ -8,7 +8,9 @@ django-rest-auth==0.9.5 django-rest-swagger==2.2.0 django-timezone-field==4.0 django-optimistic-lock==1.0.0 +django-seal==1.2.3 jsonfield==3.1.0 +pytest-django==3.9.0 # wsgi whitenoise==5.0.1 diff --git a/requirements.txt b/requirements.txt index a274bae673cc478b58f9b3306688718a17b31159..895d5f271bde1b2548eb15585dfd0d9f28f41605 100644 --- a/requirements.txt +++ b/requirements.txt @@ -43,8 +43,9 @@ django-optimistic-lock==1.0.0 # via -r requirements.in django-postgrespool2==1.0.1 # via -r requirements.in django-rest-auth==0.9.5 # via -r requirements.in django-rest-swagger==2.2.0 # via -r requirements.in +django-seal==1.2.3 # via -r requirements.in django-timezone-field==4.0 # via -r requirements.in -django==2.2.12 # via -r requirements.in, django-allauth, django-debug-toolbar, django-optimistic-lock, django-postgrespool2, django-rest-auth, django-timezone-field, djangorestframework, jsonfield +django==2.2.12 # via -r requirements.in, django-allauth, django-debug-toolbar, django-optimistic-lock, django-postgrespool2, django-rest-auth, django-seal, django-timezone-field, djangorestframework, jsonfield djangorestframework==3.11.0 # via -r requirements.in, django-rest-auth, django-rest-swagger entrypoints==0.3 # via flake8 feedparser==6.0.0b3 # via -r requirements.in @@ -101,8 +102,9 @@ pyinstrument==3.1.3 # via -r requirements.in pynliner==0.8.0 # via -r requirements.in pyparsing==2.4.2 # via packaging, validr pytest-cov==2.8.1 # via -r requirements.in +pytest-django==3.9.0 # via -r requirements.in pytest-httpserver==0.3.4 # via -r requirements.in -pytest==5.4.1 # via -r requirements.in, pytest-cov +pytest==5.4.1 # via -r requirements.in, pytest-cov, pytest-django python-dateutil==2.8.1 # via -r requirements.in, atoma python-dotenv==0.12.0 # via -r requirements.in python-slugify==3.0.2 # via -r requirements.in diff --git a/rssant/middleware/debug_toolbar.py b/rssant/middleware/debug_toolbar.py new file mode 100644 index 0000000000000000000000000000000000000000..9173e763010112e1912ebbb42a6340179f351a15 --- /dev/null +++ b/rssant/middleware/debug_toolbar.py @@ -0,0 +1,112 @@ +import logging +from debug_toolbar.toolbar import DebugToolbar +from debug_toolbar.middleware import DebugToolbarMiddleware + + +LOG = logging.getLogger(__name__) + + +def ms(t): + return '%dms' % int(t) if t is not None else '#ms' + + +def s_ms(t): + return ms(t * 1000) if t is not None else '#ms' + + +class RssantDebugToolbarMiddleware(DebugToolbarMiddleware): + """ + Middleware to set up Debug Toolbar on incoming request and render toolbar + on outgoing response. + + See also: + https://github.com/jazzband/django-debug-toolbar/blob/master/debug_toolbar/middleware.py + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + toolbar = DebugToolbar(request, self.get_response) + + # Activate instrumentation ie. monkey-patch. + for panel in toolbar.enabled_panels: + panel.enable_instrumentation() + try: + # Run panels like Django middleware. + response = toolbar.process_request(request) + finally: + # Deactivate instrumentation ie. monkey-unpatch. This must run + # regardless of the response. Keep 'return' clauses below. + for panel in reversed(toolbar.enabled_panels): + panel.disable_instrumentation() + + # generate stats and timing + for panel in reversed(toolbar.enabled_panels): + panel.generate_stats(request, response) + panel.generate_server_timing(request, response) + stats = self._extract_panel_stats(toolbar.enabled_panels) + message = self._stats_message(stats) + LOG.info(f'X-Time: {message}') + response['X-Time'] = message + return response + + def _stats_message(self, stats): + timer_msg = '0ms' + total_time = int(stats['timer'].get('total_time') or 0) + if total_time > 0: + timer_msg = '{},utime={},stime={}'.format( + ms(total_time), + ms(stats['timer']['utime']), + ms(stats['timer']['stime']), + ) + + sql_msg = 'sql=0' + if stats['sql']: + sql_msg = 'sql={},{}'.format( + stats['sql']['num_queries'] or 0, + ms(stats['sql']['time_spent']), + ) + similar_count = stats['sql']['similar_count'] + if similar_count and similar_count > 0: + sql_msg += f',similar={similar_count}' + duplicate_count = stats['sql']['duplicate_count'] + if duplicate_count and duplicate_count > 0: + sql_msg += f',duplicate={duplicate_count}' + + seaweed_msg = 'seaweed=0' + if stats['seaweed']: + seaweed_items = [] + for op in ['get', 'put', 'delete']: + count = stats['seaweed'].get(op) + if count and count > 0: + seaweed_items.append('{}:{}:{}'.format( + op, count, s_ms(stats['seaweed'].get(f'{op}_time')))) + if seaweed_items: + seaweed_msg = 'seaweed=' + ','.join(seaweed_items) + + return ';'.join([timer_msg, sql_msg, seaweed_msg]) + + def _extract_panel_stats(self, panels): + stats_map = {} + for panel in panels: + stats = panel.get_stats() + if not stats: + continue + stats_map[panel.__class__.__name__] = stats + result = {'sql': {}, 'timer': {}, 'seaweed': {}} + sql_panel_stats = stats_map.get('SQLPanel') + if sql_panel_stats and sql_panel_stats['databases']: + _, sql_stats = sql_panel_stats['databases'][0] + keys = ['time_spent', 'num_queries', 'similar_count', 'duplicate_count'] + for key in keys: + result['sql'][key] = sql_stats.get(key) + timer_stats = stats_map.get('TimerPanel') + if timer_stats: + keys = ['total_time', 'utime', 'stime', 'total'] + for key in keys: + result['timer'][key] = timer_stats.get(key) + seaweed_stats = stats_map.get('SeaweedPanel') + if seaweed_stats: + result['seaweed'].update(seaweed_stats) + return result diff --git a/rssant/middleware/profiler.py b/rssant/middleware/profiler.py new file mode 100644 index 0000000000000000000000000000000000000000..0310162480997012812953c6da341629fdb44df7 --- /dev/null +++ b/rssant/middleware/profiler.py @@ -0,0 +1,55 @@ +import time +import logging +from collections import OrderedDict + +from pyinstrument import Profiler +from django.http import HttpResponse, HttpRequest + + +LOG = logging.getLogger(__name__) + +_PROFILER_RECORDS = OrderedDict() + + +class RssantProfilerMiddleware: + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request: HttpRequest): + response = self._render_profiler_record(request) + if response: + return response + profiler = None + try: + profiler = Profiler() + profiler.start() + response = self.get_response(request) + finally: + if profiler is not None: + profiler.stop() + print(profiler.output_text(unicode=True, color=True)) + link = self._output_html(request, profiler) + print(f'* Profiler HTML: {link}\n') + return response + + def _output_html(self, request: HttpRequest, profiler: Profiler): + html = profiler.output_html() + t = int(time.time() * 1000) + key = '{}-{}-{}'.format(t, request.method, request.path) + _PROFILER_RECORDS[key] = html + while len(_PROFILER_RECORDS) > 20: + _PROFILER_RECORDS.popitem(False) + port = request.META['SERVER_PORT'] + link = f'http://localhost:{port}/__profiler__/{key}' + return link + + def _render_profiler_record(self, request: HttpRequest): + prefix = '/__profiler__/' + if not request.path.startswith(prefix): + return None + key = request.path[len(prefix):] + html = _PROFILER_RECORDS.get(key) + if not html: + return None + content = html.encode('utf-8') + return HttpResponse(content, content_type='text/html', charset='utf-8') diff --git a/rssant/middleware/seaweed_panel.py b/rssant/middleware/seaweed_panel.py new file mode 100644 index 0000000000000000000000000000000000000000..38f9d83f4291d9159de523d31184f5d155d91059 --- /dev/null +++ b/rssant/middleware/seaweed_panel.py @@ -0,0 +1,62 @@ +import threading +import time +from contextlib import contextmanager + +from debug_toolbar.panels import Panel + + +_seaweed_metrics = threading.local() + + +def _init_seaweed_metrics(): + _seaweed_metrics.value = { + 'get': 0, + 'get_time': 0.0, + 'put': 0, + 'put_time': 0.0, + 'delete': 0, + 'delete_time': 0.0, + } + + +def _close_seaweed_metrics(): + delattr(_seaweed_metrics, 'value') + + +class SeaweedMetrics: + + GET = 'get' + PUT = 'put' + DELETE = 'delete' + + @classmethod + @contextmanager + def record(cls, op: str, n: int = 1): + if op not in (cls.GET, cls.PUT, cls.DELETE): + raise ValueError(f'unknown seaweed operation {op!r}') + if getattr(_seaweed_metrics, 'value', None) is None: + _init_seaweed_metrics() + t_begin = time.time() + try: + yield + finally: + cost = time.time() - t_begin + _seaweed_metrics.value[op] += n + _seaweed_metrics.value[f'{op}_time'] += cost + + +class SeaweedPanel(Panel): + + has_content = False + + def enable_instrumentation(self): + _init_seaweed_metrics() + + def process_request(self, request): + response = super().process_request(request) + stats = dict(_seaweed_metrics.value) + self.record_stats(stats) + return response + + def disable_instrumentation(self): + _close_seaweed_metrics() diff --git a/rssant/middleware/time.py b/rssant/middleware/time.py deleted file mode 100644 index f0590b347efd4696dfd809b4919a6f3f4edcd468..0000000000000000000000000000000000000000 --- a/rssant/middleware/time.py +++ /dev/null @@ -1,18 +0,0 @@ -import time -from django.db import connection - - -class TimeMiddleware: - def __init__(self, get_response): - self.get_response = get_response - - def __call__(self, request): - t_begin = time.time() - response = self.get_response(request) - cost = (time.time() - t_begin) * 1000 - response['X-Time'] = f'{cost:.0f}ms' - num_sqls = len(connection.queries) - sql_cost = sum(float(x['time']) for x in connection.queries) * 1000 - sql_time = f'{num_sqls};{sql_cost:.0f}ms' - response['X-SQL-Time'] = sql_time - return response diff --git a/rssant/settings/settings.py b/rssant/settings/settings.py index 448f072e689e5c2ff333380f7f7792b3d06966ee..203d7d519528783d4618bb3e207331a6755547b0 100644 --- a/rssant/settings/settings.py +++ b/rssant/settings/settings.py @@ -63,18 +63,22 @@ INSTALLED_APPS.extend([ 'rssant_api', ]) -MIDDLEWARE = [ - 'rssant.middleware.time.TimeMiddleware', - 'debug_toolbar.middleware.DebugToolbarMiddleware', - 'django.middleware.security.SecurityMiddleware', - 'whitenoise.middleware.WhiteNoiseMiddleware', - 'django.contrib.sessions.middleware.SessionMiddleware', - 'django.middleware.common.CommonMiddleware', - 'django.middleware.csrf.CsrfViewMiddleware', - 'django.contrib.auth.middleware.AuthenticationMiddleware', - 'django.contrib.messages.middleware.MessageMiddleware', - 'django.middleware.clickjacking.XFrameOptionsMiddleware', -] + +def _gen_middleware(): + yield 'rssant.middleware.debug_toolbar.RssantDebugToolbarMiddleware' + if ENV_CONFIG.profiler_enable: + yield 'rssant.middleware.profiler.RssantProfilerMiddleware' + yield 'django.middleware.security.SecurityMiddleware' + yield 'whitenoise.middleware.WhiteNoiseMiddleware' + yield 'django.contrib.sessions.middleware.SessionMiddleware' + yield 'django.middleware.common.CommonMiddleware' + yield 'django.middleware.csrf.CsrfViewMiddleware' + yield 'django.contrib.auth.middleware.AuthenticationMiddleware' + yield 'django.contrib.messages.middleware.MessageMiddleware' + yield 'django.middleware.clickjacking.XFrameOptionsMiddleware' + + +MIDDLEWARE = list(_gen_middleware()) INTERNAL_IPS = ['127.0.0.1'] @@ -226,3 +230,13 @@ REST_FRAMEWORK = { # TODO: https://github.com/encode/django-rest-framework/issues/6809 'DEFAULT_SCHEMA_CLASS': 'rest_framework.schemas.coreapi.AutoSchema' } + +# Django debug toolbar and X-Time header +DEBUG_TOOLBAR_PANELS = [ + "debug_toolbar.panels.timer.TimerPanel", + "rssant.middleware.seaweed_panel.SeaweedPanel", + "debug_toolbar.panels.sql.SQLPanel", +] +DEBUG_TOOLBAR_CONFIG = { + "ENABLE_STACKTRACES": False, +} diff --git a/rssant_api/migrations/0026_feedstorystat.py b/rssant_api/migrations/0026_feedstorystat.py new file mode 100644 index 0000000000000000000000000000000000000000..6fc4196c8c58fad4849b74ed52e6d7bc38cc875c --- /dev/null +++ b/rssant_api/migrations/0026_feedstorystat.py @@ -0,0 +1,30 @@ +# Generated by Django 2.2.12 on 2020-05-23 14:00 + +from django.db import migrations, models +import ool + + +class Migration(migrations.Migration): + + dependencies = [ + ('rssant_api', '0025_auto_20200516_0959'), + ] + + operations = [ + migrations.CreateModel( + name='FeedStoryStat', + fields=[ + ('_version', ool.VersionField(default=0)), + ('_created', models.DateTimeField(auto_now_add=True, help_text='创建时间')), + ('_updated', models.DateTimeField(auto_now=True, help_text='更新时间')), + ('id', models.PositiveIntegerField(help_text='feed id', primary_key=True, serialize=False)), + ('monthly_story_count_data', models.BinaryField(blank=True, help_text='monthly story count data', max_length=514, null=True)), + ('checksum_data', models.BinaryField(blank=True, help_text='feed checksum data', max_length=4096, null=True)), + ('unique_ids_data', models.BinaryField(blank=True, help_text='unique ids data', max_length=102400, null=True)), + ], + options={ + 'abstract': False, + }, + bases=(ool.VersionedMixin, models.Model), + ), + ] diff --git a/rssant_api/migrations/0027_storyinfo.py b/rssant_api/migrations/0027_storyinfo.py new file mode 100644 index 0000000000000000000000000000000000000000..7dcd03f156986b847dd7d98284d9af5c8eda0d0e --- /dev/null +++ b/rssant_api/migrations/0027_storyinfo.py @@ -0,0 +1,38 @@ +# Generated by Django 2.2.12 on 2020-05-28 15:48 + +from django.db import migrations, models +import ool + + +class Migration(migrations.Migration): + + dependencies = [ + ('rssant_api', '0026_feedstorystat'), + ] + + operations = [ + migrations.CreateModel( + name='StoryInfo', + fields=[ + ('_version', ool.VersionField(default=0)), + ('id', models.BigIntegerField(help_text='feed_id and offset', primary_key=True, serialize=False)), + ('unique_id', models.CharField(help_text='Unique ID', max_length=200)), + ('title', models.CharField(help_text='标题', max_length=200)), + ('link', models.TextField(help_text='文章链接')), + ('author', models.CharField(blank=True, help_text='作者', max_length=200, null=True)), + ('image_url', models.TextField(blank=True, help_text='图片链接', null=True)), + ('audio_url', models.TextField(blank=True, help_text='播客音频链接', null=True)), + ('iframe_url', models.TextField(blank=True, help_text='视频iframe链接', null=True)), + ('has_mathjax', models.BooleanField(blank=True, help_text='has MathJax', null=True)), + ('is_user_marked', models.BooleanField(blank=True, help_text='is user favorited or watched ever', null=True)), + ('dt_published', models.DateTimeField(help_text='发布时间')), + ('dt_updated', models.DateTimeField(blank=True, help_text='更新时间', null=True)), + ('dt_created', models.DateTimeField(auto_now_add=True, help_text='创建时间')), + ('dt_synced', models.DateTimeField(blank=True, help_text='最近一次同步时间', null=True)), + ('content_length', models.IntegerField(blank=True, help_text='content length', null=True)), + ('summary', models.TextField(blank=True, help_text='摘要或较短的内容', null=True)), + ('content_hash_base64', models.CharField(blank=True, help_text='base64 hash value of content', max_length=200, null=True)), + ], + bases=(ool.VersionedMixin, models.Model), + ), + ] diff --git a/rssant_api/models/__init__.py b/rssant_api/models/__init__.py index 0c9fc236b50d62a4b7d28399826a6d066644d7e2..64c0e0492997f1274bdada82190ee71a7635fdf4 100644 --- a/rssant_api/models/__init__.py +++ b/rssant_api/models/__init__.py @@ -2,12 +2,15 @@ from .feed import Feed, RawFeed, UserFeed, FeedStatus from .feed_creation import FeedCreation, FeedUrlMap from .union_feed import FeedUnionId, UnionFeed from .story import Story, UserStory +from .feed_story_stat import FeedStoryStat from .union_story import UnionStory, StoryUnionId from .registery import Registery from .image import ImageInfo +from .story_service import STORY_SERVICE, CommonStory +from .story_info import StoryId, StoryInfo __models__ = ( FeedCreation, Feed, RawFeed, UserFeed, - Story, UserStory, FeedUrlMap, + Story, StoryInfo, UserStory, FeedUrlMap, FeedStoryStat, Registery, ImageInfo, ) diff --git a/rssant_api/models/feed_story_stat.py b/rssant_api/models/feed_story_stat.py new file mode 100644 index 0000000000000000000000000000000000000000..7b1f2dc282bdbb20f6141c00961fa508624a55b9 --- /dev/null +++ b/rssant_api/models/feed_story_stat.py @@ -0,0 +1,46 @@ +from .helper import Model, models, optional + + +class FeedStoryStat(Model): + """ + feed_id -> story stats + """ + + id = models.PositiveIntegerField(primary_key=True, help_text='feed id') + + # TODO: will migrate from feed table + monthly_story_count_data = models.BinaryField( + **optional, max_length=514, help_text="monthly story count data") + + # TODO: will migrate from feed table + checksum_data = models.BinaryField( + **optional, max_length=4096, help_text="feed checksum data") + + unique_ids_data = models.BinaryField( + **optional, max_length=100 * 1024, help_text="unique ids data") + + @classmethod + def _create_if_not_exists(cls, feed_id: int) -> bool: + is_exists = FeedStoryStat.objects.filter(pk=feed_id).exists() + if not is_exists: + FeedStoryStat(id=feed_id).save() + + @classmethod + def _create_or_update(cls, feed_id, **kwargs): + cls._create_if_not_exists(feed_id) + updated = FeedStoryStat.objects.filter(pk=feed_id)\ + .update(**kwargs) + if updated <= 0: + raise ValueError(f'update FeedStoryStat#{feed_id} failed') + + @classmethod + def save_unique_ids_data(cls, feed_id: int, unique_ids_data: bytes): + cls._create_or_update(feed_id, unique_ids_data=unique_ids_data) + + @classmethod + def save_monthly_story_count_data(cls, feed_id: int, monthly_story_count_data: bytes): + cls._create_or_update(feed_id, monthly_story_count_data=monthly_story_count_data) + + @classmethod + def save_checksum_data(cls, feed_id: int, checksum_data: bytes): + cls._create_or_update(feed_id, checksum_data=checksum_data) diff --git a/rssant_api/models/helper.py b/rssant_api/models/helper.py index defd8b807d99ce0e9bd92d928ce854747fc1988a..d63cc8537c94af1f2a61de7065738528e94cac4d 100644 --- a/rssant_api/models/helper.py +++ b/rssant_api/models/helper.py @@ -3,6 +3,8 @@ from django.contrib.auth import get_user_model from django.contrib.postgres.fields import JSONField # https://github.com/gavinwahl/django-optimistic-lock from ool import VersionField, VersionedMixin, ConcurrentUpdate as ConcurrentUpdateError +# https://github.com/charettes/django-seal +from seal.models import SealableModel from rssant.helper.content_hash import compute_hash_base64 @@ -14,7 +16,7 @@ def extract_choices(cls): return [(v, v)for k, v in vars(cls).items() if k.isupper()] -class Model(VersionedMixin, models.Model): +class Model(VersionedMixin, SealableModel): class Meta: abstract = True @@ -25,7 +27,7 @@ class Model(VersionedMixin, models.Model): def __str__(self): default = f'{self.__class__.__name__}#{self.id}' - admin = getattr(self.__class__, 'Admin') + admin = getattr(self.__class__, 'Admin', None) if not admin: return default fields = getattr(admin, 'display_fields') @@ -39,7 +41,7 @@ class Model(VersionedMixin, models.Model): return f'{self.__class__.__name__}#{self.id} {details}' -class ContentHashMixin(models.Model): +class ContentHashMixin(SealableModel): class Meta: abstract = True diff --git a/rssant_api/models/story.py b/rssant_api/models/story.py index 343c37d7fede437330ef662fcda6d4ef3970d250..eca10f08a8d438cb63b0183c72dded4337601270 100644 --- a/rssant_api/models/story.py +++ b/rssant_api/models/story.py @@ -72,12 +72,49 @@ class Story(Model, ContentHashMixin): summary = models.TextField(**optional, help_text="摘要或较短的内容") content = models.TextField(**optional, help_text="文章内容") + _STORY_FIELD_NAMES = None + + @classmethod + def _story_field_names(cls): + if cls._STORY_FIELD_NAMES is None: + names = set() + for field in cls._meta.get_fields(): + column = getattr(field, 'column', None) + if column: + names.add(column) + cls._STORY_FIELD_NAMES = list(sorted(names)) + return cls._STORY_FIELD_NAMES + @staticmethod def get_by_offset(feed_id, offset, detail=False) -> 'Story': q = Story.objects.filter(feed_id=feed_id, offset=offset) - if not detail: - q = q.defer(*STORY_DETAIL_FEILDS) - return q.get() + detail = Detail.from_schema(detail, StoryDetailSchema) + q = q.defer(*detail.exclude_fields) + return q.seal().get() + + @classmethod + def batch_get_by_offset(cls, keys, detail=False): + if not keys: + return [] + detail = Detail.from_schema(detail, StoryDetailSchema) + select_fields = set(cls._story_field_names()) - set(detail.exclude_fields) + select_fields_quoted = ','.join(['"{}"'.format(x) for x in select_fields]) + # Note: below query can not use index, it's very slow + # WHERE ("feed_id","offset")=Any(%s) + # WHERE ("feed_id","offset")=Any(ARRAY[(XX, YY), ...]) + where_items = [] + for feed_id, offset in keys: + # ensure integer, avoid sql inject attack + feed_id, offset = int(feed_id), int(offset) + where_items.append(f'("feed_id"={feed_id} AND "offset"={offset})') + where_clause = ' OR '.join(where_items) + sql = f""" + SELECT {select_fields_quoted} + FROM rssant_api_story + WHERE {where_clause} + """ + storys = list(Story.objects.seal().raw(sql)) + return storys @staticmethod def _dedup_sort_storys(storys): @@ -100,6 +137,9 @@ class Story(Model, ContentHashMixin): @staticmethod def bulk_save_by_feed(feed_id, storys, batch_size=100, is_refresh=False): + """ + Deprecated since 1.5.0 + """ if not storys: return [] # modified_story_objects storys = Story._dedup_sort_storys(storys) @@ -273,6 +313,8 @@ class Story(Model, ContentHashMixin): @staticmethod def delete_by_retention(feed_id, retention=5000, limit=5000): """ + Deprecated since 1.5.0 + Params: feed_id: feed ID retention: num storys to keep diff --git a/rssant_api/models/story_info.py b/rssant_api/models/story_info.py new file mode 100644 index 0000000000000000000000000000000000000000..8ecd36b11dfbcb239d19b122aeb3453845ee61e0 --- /dev/null +++ b/rssant_api/models/story_info.py @@ -0,0 +1,83 @@ +from rssant_common.detail import Detail +from .helper import models, SealableModel, VersionedMixin, VersionField, optional +from .story_storage import StoryId +from .story import STORY_DETAIL_FEILDS, StoryDetailSchema + + +STORY_INFO_DETAIL_FEILDS = list(STORY_DETAIL_FEILDS) +STORY_INFO_DETAIL_FEILDS.remove('content') + + +class StoryInfo(VersionedMixin, SealableModel): + + class Admin: + display_fields = ['id', 'title', 'link'] + + _version = VersionField() + + id = models.BigIntegerField(primary_key=True, help_text='feed_id and offset') + unique_id = models.CharField(max_length=200, help_text="Unique ID") + title = models.CharField(max_length=200, help_text="标题") + link = models.TextField(help_text="文章链接") + author = models.CharField(max_length=200, **optional, help_text='作者') + image_url = models.TextField(**optional, help_text="图片链接") + audio_url = models.TextField(**optional, help_text="播客音频链接") + iframe_url = models.TextField(**optional, help_text="视频iframe链接") + has_mathjax = models.BooleanField(**optional, help_text='has MathJax') + is_user_marked = models.BooleanField( + **optional, help_text='is user favorited or watched ever') + dt_published = models.DateTimeField(help_text="发布时间") + dt_updated = models.DateTimeField(**optional, help_text="更新时间") + dt_created = models.DateTimeField(auto_now_add=True, help_text="创建时间") + dt_synced = models.DateTimeField(**optional, help_text="最近一次同步时间") + content_length = models.IntegerField(**optional, help_text='content length') + summary = models.TextField(**optional, help_text="摘要或较短的内容") + content_hash_base64 = models.CharField( + max_length=200, **optional, help_text='base64 hash value of content') + + @property + def feed_id(self) -> int: + feed_id, offset = StoryId.decode(self.id) + return feed_id + + @property + def offset(self) -> int: + feed_id, offset = StoryId.decode(self.id) + return offset + + @classmethod + def _get_exclude_fields(cls, detail): + detail = Detail.from_schema(detail, StoryDetailSchema) + exclude_fields = set(detail.exclude_fields) + exclude_fields.discard('content') + return exclude_fields + + @classmethod + def get(cls, feed_id, offset, detail=False): + q = StoryInfo.objects.filter(pk=StoryId.encode(feed_id, offset)) + q = q.defer(*cls._get_exclude_fields(detail)) + return q.seal().first() + + @classmethod + def batch_get(cls, keys, detail=False): + if not keys: + return [] + story_ids = [] + for feed_id, offset in keys: + story_ids.append(StoryId.encode(feed_id, offset)) + q = StoryInfo.objects.filter(pk__in=story_ids) + q = q.defer(*cls._get_exclude_fields(detail)) + return list(q.seal().all()) + + @staticmethod + def delete_by_retention_offset(feed_id, retention_offset): + """ + delete storys < retention_offset and not is_user_marked + """ + begin_story_id = StoryId.encode(feed_id, 0) + retention_story_id = StoryId.encode(feed_id, retention_offset) + n, __ = StoryInfo.objects\ + .filter(pk__gte=begin_story_id, pk__lt=retention_story_id)\ + .exclude(is_user_marked=True)\ + .delete() + return n diff --git a/rssant_api/models/story_service.py b/rssant_api/models/story_service.py new file mode 100644 index 0000000000000000000000000000000000000000..ce9e637088d07d2abec1164153bae2c4a2159971 --- /dev/null +++ b/rssant_api/models/story_service.py @@ -0,0 +1,402 @@ +import logging +import time +import datetime + +from django.utils import timezone +from django.db import transaction +from validr import T, asdict, modelclass, fields + +from rssant_common.detail import Detail +from rssant_common.validator import compiler +from rssant_config import CONFIG +from .story import Story, StoryDetailSchema +from .story_info import StoryInfo, StoryId +from .feed import Feed +from .feed_story_stat import FeedStoryStat +from .story_unique_ids import StoryUniqueIdsData +from .errors import StoryNotFoundError + +from .story_storage import SeaweedClient, SeaweedStoryStorage +from .story_storage import PostgresClient, PostgresStoryStorage + + +LOG = logging.getLogger(__name__) + + +@modelclass(compiler=compiler) +class CommonStory: + feed_id: int = T.int + offset: int = T.int + unique_id: str = T.str + title: str = T.str + link: str = T.str.optional + author: str = T.str.optional + image_url: str = T.str.optional + audio_url: str = T.str.optional + iframe_url: str = T.str.optional + has_mathjax: bool = T.bool.optional + dt_published: datetime.datetime = T.datetime.object.optional + dt_updated: datetime.datetime = T.datetime.object.optional + dt_created: datetime.datetime = T.datetime.object.optional + dt_synced: datetime.datetime = T.datetime.object.optional + summary: str = T.str.optional + content: str = T.str.optional + content_length: int = T.int.min(0).optional + content_hash_base64: str = T.str.optional + + def to_dict(self): + return asdict(self) + + def __repr__(self): + base = f'{type(self).__name__}#{self.feed_id},{self.offset}' + return f'<{base} unique_id={self.unique_id!r} title={self.title!r}>' + + +class StoryService: + def __init__(self, storage: SeaweedStoryStorage): + self._storage = storage + + @staticmethod + def to_common(story: Story): + d = {} + for key in fields(CommonStory): + value = story.__dict__.get(key, None) + if value is not None: + d[key] = value + content = d.get('content', None) + if content: + d['content_length'] = len(content) + d['dt_created'] = story.dt_created + d['feed_id'] = story.feed_id + d['offset'] = story.offset + return CommonStory(d) + + def _is_include_content(self, detail): + detail = Detail.from_schema(detail, StoryDetailSchema) + return 'content' in detail.include_fields + + def get_by_offset(self, feed_id, offset, detail=False) -> CommonStory: + story_info = StoryInfo.get(feed_id, offset, detail=detail) + if story_info: + story = self.to_common(story_info) + include_content = self._is_include_content(detail) + if include_content: + content = self._storage.get_content(feed_id, offset) + story.content = content + return story + try: + story = Story.get_by_offset(feed_id, offset, detail=detail) + except Story.DoesNotExist: + return None + return self.to_common(story) + + def set_user_marked(self, feed_id, offset, is_user_marked=True): + try: + story = Story.get_by_offset(feed_id, offset) + except Story.DoesNotExist: + story = None + if (not story) and is_user_marked: + common_story = self.get_by_offset(feed_id, offset, detail=True) + d = asdict(common_story) + d.pop('content_length', None) + story = Story(**d) + story.is_user_marked = is_user_marked + story.save() + elif story: + Story.set_user_marked_by_id(story.id, is_user_marked=is_user_marked) + return story + + def update_story(self, feed_id, offset, data: dict): + story = self.get_by_offset(feed_id, offset, detail=True) + if not story: + raise StoryNotFoundError(f'story#{feed_id},{offset} not found') + for k, v in data.items(): + if v is not None: + setattr(story, k, v) + self._storage.save_content(feed_id, offset, story.content) + update_params = story.to_dict() + update_params.pop('content', None) + update_params.pop('feed_id', None) + update_params.pop('offset', None) + story_id = StoryId.encode(feed_id, offset) + updated = StoryInfo.objects\ + .filter(pk=story_id)\ + .update(**update_params) + if updated <= 0: + story_info = StoryInfo(pk=story_id, **update_params) + story_info.save() + + def _get_unique_ids_by_stat(self, feed_id): + stat = FeedStoryStat.objects\ + .only('unique_ids_data')\ + .filter(pk=feed_id).seal().first() + if not stat or not stat.unique_ids_data: + return None + result = {} + unique_ids_data = StoryUniqueIdsData.decode(stat.unique_ids_data) + for i, unique_id in enumerate(unique_ids_data.unique_ids): + offset = unique_ids_data.begin_offset + i + result[unique_id] = offset + return result + + def _get_unique_ids_by_story(self, feed_id, begin_offset, end_offset): + story_s = Story.objects\ + .only('offset', 'unique_id')\ + .filter(feed_id=feed_id)\ + .filter(offset__gte=begin_offset, offset__lt=end_offset)\ + .seal().all() + result = {} + for story in story_s: + result[story.unique_id] = story.offset + return result + + def _get_unique_ids(self, feed_id, feed_total_story): + unique_ids_map = self._get_unique_ids_by_stat(feed_id) + if unique_ids_map is None: + unique_ids_map = self._get_unique_ids_by_story( + feed_id, feed_total_story - 300, feed_total_story) + return unique_ids_map + + def _group_storys(self, storys, unique_ids_map): + old_storys_map = {} + new_storys = [] + for story in storys: + offset = unique_ids_map.get(story['unique_id']) + if offset is None: + new_storys.append(story) + else: + old_storys_map[offset] = story + return old_storys_map, new_storys + + def _query_story_infos(self, feed_id, offset_s): + keys = [(feed_id, offset) for offset in offset_s] + detail = '+dt_published,content_hash_base64' + story_infos = StoryInfo.batch_get(keys, detail=detail) + return story_infos + + def _query_story_objects(self, feed_id, offset_s): + keys = [(feed_id, offset) for offset in offset_s] + detail = '+dt_published,content_hash_base64' + storys = Story.batch_get_by_offset(keys, detail=detail) + return storys + + def _query_old_story_objects(self, feed_id, old_storys_map): + old_story_infos = self._query_story_infos(feed_id, old_storys_map.keys()) + remain_offsets = set(old_storys_map.keys()) - {x.offset for x in old_story_infos} + old_story_objects = self._query_story_objects(feed_id, remain_offsets) + old_story_objects_map = {} + for story_info in old_story_infos: + old_story_objects_map[story_info.offset] = (True, story_info) + for story_object in old_story_objects: + old_story_objects_map[story_object.offset] = (False, story_object) + return old_story_objects_map + + def _compute_modified_storys(self, feed_id, old_storys_map, new_storys, is_refresh): + old_story_objects_map = self._query_old_story_objects(feed_id, old_storys_map) + modified_story_objects = {} + new_storys = list(new_storys) + for offset, story in old_storys_map.items(): + if offset not in old_story_objects_map: + msg = 'story feed_id=%s offset=%r not consitent with unique_ids_data' + LOG.error(msg, feed_id, offset) + new_storys.append(story) + else: + is_story_info, old_story = old_story_objects_map[offset] + new_hash = story['content_hash_base64'] + if not new_hash: + is_modified = True + else: + is_modified = old_story.content_hash_base64 != new_hash + if is_refresh or is_modified: + modified_story_objects[offset] = (is_story_info, old_story, story) + return new_storys, modified_story_objects + + def _common_story_of(self, story, feed_id, offset, now): + story['feed_id'] = feed_id + story['offset'] = offset + story['dt_synced'] = now + for key in ['dt_created', 'dt_updated']: + if not story.get(key): + story[key] = now + story = CommonStory(story) + return story + + def _compute_storys_map(self, feed_id, feed_total_story, new_storys, modified_story_objects): + storys_map = {} + now = timezone.now() + for offset, story in enumerate(new_storys, feed_total_story): + story = self._common_story_of(story, feed_id, offset, now) + storys_map[offset] = (story, None, False) + for offset, (is_story_info, old_story, story) in modified_story_objects.items(): + story = self._common_story_of(story, feed_id, offset, now) + # 发布时间只第一次赋值,不更新 + if old_story.dt_published: + story.dt_published = old_story.dt_published + storys_map[offset] = (story, old_story, is_story_info) + return storys_map + + def _compute_story_infos(self, storys_map): + new_story_infos = [] + modified_story_infos = [] + for offset, (story, old_story, is_story_info) in storys_map.items(): + if (not old_story) or (not is_story_info): + story_id = StoryId.encode(story.feed_id, story.offset) + story_info = StoryInfo(id=story_id) + new_story_infos.append(story_info) + else: + assert isinstance(old_story, StoryInfo) + story_info = old_story + modified_story_infos.append(old_story) + update_params = story.to_dict() + update_params.pop('content', None) + update_params.pop('feed_id', None) + update_params.pop('offset', None) + for k, v in update_params.items(): + setattr(story_info, k, v) + new_story_infos = list(sorted(new_story_infos, key=lambda x: x.offset)) + modified_story_infos = list(sorted(modified_story_infos, key=lambda x: x.offset)) + return new_story_infos, modified_story_infos + + def _compute_new_unique_ids_data(self, feed_id, new_total_storys, modified_storys, unique_ids_map) -> bytes: + # 容忍旧的 unique_ids_map 有错,用新的正确的值覆盖旧值 + tmp_unique_ids = dict(unique_ids_map) + for story in modified_storys: + tmp_unique_ids[story.unique_id] = story.offset + tmp_unique_ids = {y: x for x, y in tmp_unique_ids.items()} + new_unique_ids = [] + size = min(len(tmp_unique_ids), 300) + begin_offset = max(0, new_total_storys - size) + new_begin_offset = new_total_storys + for offset in reversed(range(begin_offset, new_total_storys)): + unique_id = tmp_unique_ids.get(offset, None) + if not unique_id: + msg = 'wrong unique_ids_data, feed_id=%s offset=%s: %r' + LOG.error(msg, feed_id, offset, tmp_unique_ids) + break + new_unique_ids.append(unique_id) + new_begin_offset = offset + new_unique_ids = list(reversed(new_unique_ids)) + if len(new_unique_ids) != len(set(new_unique_ids)): + msg = 'found feed_id=%s begin_offset=%s duplicate new_unique_ids, will discard it: %r' + LOG.error(msg, feed_id, new_begin_offset, new_unique_ids) + return None + unique_ids_data = StoryUniqueIdsData( + new_begin_offset, new_unique_ids).encode() + return unique_ids_data + + def bulk_save_by_feed(self, feed_id, storys, batch_size=100, is_refresh=False): + if not storys: + return [] # modified_common_storys + storys = Story._dedup_sort_storys(storys) + + feed = Feed.get_by_pk(feed_id) + unique_ids_map = self._get_unique_ids(feed_id, feed.total_storys) + + old_storys_map, new_storys = self._group_storys(storys, unique_ids_map) + + new_storys, modified_story_objects = self._compute_modified_storys( + feed_id, old_storys_map, new_storys, is_refresh=is_refresh, + ) + new_storys = Story._dedup_sort_storys(new_storys) + new_total_storys = feed.total_storys + len(new_storys) + + storys_map = self._compute_storys_map( + feed_id, feed.total_storys, new_storys, modified_story_objects, + ) + + new_common_storys = [] + modified_common_storys = [] + save_story_contents = [] + for offset, (story, old_story, is_story_info) in storys_map.items(): + modified_common_storys.append(story) + if not old_story: + new_common_storys.append(story) + save_story_contents.append(((feed_id, offset), story.content)) + + new_story_infos, modified_story_infos = self._compute_story_infos(storys_map) + + unique_ids_data = self._compute_new_unique_ids_data( + feed_id, new_total_storys, modified_common_storys, unique_ids_map, + ) + + save_content_begin = time.time() + self._storage.batch_save_content(save_story_contents) + save_content_cost = int((time.time() - save_content_begin) * 1000) + LOG.info('storage.save_content %d cost=%dms', len(save_story_contents), save_content_cost) + + with transaction.atomic(): + for story_info in modified_story_infos: + story_info.save() + if new_story_infos: + StoryInfo.objects.bulk_create(new_story_infos, batch_size=batch_size) + if new_common_storys: + FeedStoryStat.save_unique_ids_data(feed_id, unique_ids_data) + Story._update_feed_monthly_story_count(feed, new_common_storys) + feed.total_storys = new_total_storys + if feed.dt_first_story_published is None: + feed.dt_first_story_published = new_common_storys[0].dt_published + feed.dt_latest_story_published = new_common_storys[-1].dt_published + feed.save() + + return modified_common_storys + + def _delete_content_by_retention(self, feed_id, begin_offset, end_offset): + keys = [] + for offset in range(begin_offset, end_offset, 1): + keys.append((feed_id, offset)) + self._storage.batch_delete_content(keys) + + def delete_by_retention(self, feed_id, retention=3000, limit=1000): + """ + Params: + feed_id: feed ID + retention: num storys to keep + limit: delete at most limit rows + """ + feed = Feed.get_by_pk(feed_id) + offset = feed.retention_offset or 0 + # delete at most limit rows, avoid out of memory and timeout + new_offset = min(offset + limit, feed.total_storys - retention) + if new_offset <= offset: + return 0 + self._delete_content_by_retention(feed_id, offset, new_offset) + with transaction.atomic(): + n = StoryInfo.delete_by_retention_offset(feed_id, new_offset) + m = Story.delete_by_retention_offset(feed_id, new_offset) + feed.retention_offset = new_offset + feed.save() + return n + m + return 0 + + +SEAWEED_CLIENT = SeaweedClient( + CONFIG.seaweed_volume_url, + thread_pool_size=CONFIG.seaweed_thread_pool_size, +) +SEAWEED_STORY_STORAGE = SeaweedStoryStorage(SEAWEED_CLIENT) + + +POSTGRES_CLIENT = PostgresClient(CONFIG.pg_story_volumes_parsed) +POSTGRES_STORY_STORAGE = PostgresStoryStorage(POSTGRES_CLIENT) + + +class MirrorStoryStorage: + + def _make_mirror(self, name): + + f_seaweed = getattr(SEAWEED_STORY_STORAGE, name) + f_postgres = getattr(POSTGRES_STORY_STORAGE, name) + + def mirror(*args, **kwargs): + r_seaweed = f_seaweed(*args, **kwargs) + r_postgres = f_postgres(*args, **kwargs) + assert r_seaweed == r_postgres + return r_seaweed + + return mirror + + def __getattr__(self, name): + return self._make_mirror(name) + + +STORY_SERVICE = StoryService(MirrorStoryStorage()) diff --git a/rssant_api/models/story_storage/__init__.py b/rssant_api/models/story_storage/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e8009709def7087c0e0cc80e1c90f177c64046c1 --- /dev/null +++ b/rssant_api/models/story_storage/__init__.py @@ -0,0 +1,6 @@ +from .common.story_data import StoryData +from .common.story_key import StoryId, StoryKey, hash_feed_id +from .postgres.postgres_story import PostgresStoryStorage +from .postgres.postgres_client import PostgresClient +from .seaweed.seaweed_story import SeaweedStoryStorage, SeaweedFileType +from .seaweed.seaweed_client import SeaweedClient, SeaweedError diff --git a/rssant_api/models/story_storage/common/__init__.py b/rssant_api/models/story_storage/common/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/rssant_api/models/story_storage/common/story_data.py b/rssant_api/models/story_storage/common/story_data.py new file mode 100644 index 0000000000000000000000000000000000000000..19a7fb52a99ecda8a011f4b79354770e1d3ee337 --- /dev/null +++ b/rssant_api/models/story_storage/common/story_data.py @@ -0,0 +1,87 @@ +import json +import gzip +import datetime +import struct + +from validr import T +import lz4.frame as lz4 + +from rssant_common.validator import compiler + + +_dump_datetime = compiler.compile(T.datetime) + + +def _json_default(obj): + if isinstance(obj, (datetime.date, datetime.datetime)): + return _dump_datetime(obj) + raise TypeError("Type %s not serializable" % type(obj)) + + +class StoryData: + + VERSION_GZIP = 1 + VERSION_LZ4 = 2 + + __slots__ = ('_value', '_version') + + def __init__(self, value: bytes, version: int): + self._value = value + self._check_version(version) + self._version = version + + @classmethod + def _check_version(cls, version: int): + supported = (cls.VERSION_GZIP, cls.VERSION_LZ4,) + if version not in supported: + raise ValueError(f'not support version {version}') + + @property + def value(self) -> bytes: + return self._value + + @property + def version(self) -> int: + return self._version + + def encode(self) -> bytes: + version = struct.pack('>B', self._version) + if self._version == self.VERSION_GZIP: + data_bytes = gzip.compress(self._value) + elif self._version == self.VERSION_LZ4: + data_bytes = lz4.compress(self._value) + else: + assert False, f'unknown version {version}' + return version + data_bytes + + @classmethod + def decode(cls, data: bytes) -> "StoryData": + (version,) = struct.unpack('>B', data[:1]) + cls._check_version(version) + if version == cls.VERSION_GZIP: + value = gzip.decompress(data[1:]) + elif version == cls.VERSION_LZ4: + value = lz4.decompress(data[1:]) + else: + assert False, f'unknown version {version}' + return cls(value, version=version) + + @classmethod + def encode_json(cls, value: dict, version: int = VERSION_LZ4) -> bytes: + value = json.dumps(value, ensure_ascii=False, default=_json_default).encode('utf-8') + return cls(value, version=version).encode() + + @classmethod + def decode_json(cls, data: bytes) -> dict: + value = cls.decode(data).value + return json.loads(value.decode('utf-8')) + + @classmethod + def encode_text(cls, value: str, version: int = VERSION_LZ4) -> bytes: + value = value.encode('utf-8') + return cls(value, version=version).encode() + + @classmethod + def decode_text(cls, data: bytes) -> str: + value = cls.decode(data).value + return value.decode('utf-8') diff --git a/rssant_api/models/story_storage/common/story_key.py b/rssant_api/models/story_storage/common/story_key.py new file mode 100644 index 0000000000000000000000000000000000000000..0a4c269b6a90ba0cb064c2be72ec03d9100fe8db --- /dev/null +++ b/rssant_api/models/story_storage/common/story_key.py @@ -0,0 +1,51 @@ +import typing +import struct +import binascii + + +def hash_feed_id(feed_id: int) -> int: + data = struct.pack('>I', feed_id) + value = binascii.crc32(data) & 0xffffffff + return value + + +class StoryKey: + """ + story key: 64 bits + +----------+---------+--------+----------+ + | 4 | 28 | 28 | 4 | + +----------+---------+--------+----------+ + | reserve1 | feed_id | offset | reserve2 | + +----------+---------+-------------------+ + """ + + @staticmethod + def encode(feed_id: int, offset: int, reserve1: int = 0, reserve2: int = 0) -> int: + assert 0 <= reserve1 <= 255, 'expect 0 <= reserve1 <= 255' + assert 0 <= reserve2 <= 255, 'expect 0 <= reserve2 <= 255' + assert 0 <= feed_id <= 0x0fffffff, 'expect 0 <= feed_id <= 0x0fffffff' + assert 0 <= offset <= 0x0fffffff, 'expect 0 <= offset <= 0x0fffffff' + return (reserve1 << 60) + (feed_id << 32) + (offset << 4) + reserve2 + + @staticmethod + def decode(key: int) -> typing.Tuple[int, int, int, int]: + reserve2 = key & 0b00001111 + offset = (key >> 4) & 0x0fffffff + feed_id = (key >> 32) & 0x0fffffff + reserve1 = (key >> 60) & 0b00001111 + return feed_id, offset, reserve1, reserve2 + + +class StoryId: + """ + virtual story id, composited by feed_id and offset + """ + + @staticmethod + def encode(feed_id: int, offset: int) -> int: + return StoryKey.encode(feed_id, offset) + + @staticmethod + def decode(story_id: int) -> typing.Tuple[int, int]: + feed_id, offset, __, __ = StoryKey.decode(story_id) + return feed_id, offset diff --git a/rssant_api/models/story_storage/postgres/__init__.py b/rssant_api/models/story_storage/postgres/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/rssant_api/models/story_storage/postgres/postgres_client.py b/rssant_api/models/story_storage/postgres/postgres_client.py new file mode 100644 index 0000000000000000000000000000000000000000..1e0788bfda87e0b99c9a1e4a6aa73fdb30acfeba --- /dev/null +++ b/rssant_api/models/story_storage/postgres/postgres_client.py @@ -0,0 +1,66 @@ +import logging +from threading import RLock +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine + + +LOG = logging.getLogger(__name__) + + +class PostgresClient: + def __init__(self, volumes: dict, pool_size=5, pool_recycle=600): + self._volumes = volumes + self._pool_size = pool_size + self._pool_recycle = pool_recycle + self._dsn_s = {} + self._table_s = {} + for volume, cfg in volumes.items(): + dsn = 'postgresql://{user}:{password}@{host}:{port}/{db}'.format( + user=cfg['user'], + password=cfg['password'], + host=cfg['host'], + port=cfg['port'], + db=cfg['db'], + ) + self._dsn_s[volume] = dsn + self._table_s[volume] = cfg['table'] + self._engine_s = {} + self._lock = RLock() + + def _init_table(self, engine: Engine, table: str): + query = """ + CREATE TABLE IF NOT EXISTS {table} ( + id BIGINT PRIMARY KEY, + content BYTEA NOT NULL + ) + """.format(table=table) + with engine.connect() as conn: + conn.execute(query) + + def _create_engine(self, volume: int) -> Engine: + LOG.info('create sqlalchemy engine for volume %s', volume) + # https://docs.sqlalchemy.org/en/13/core/pooling.html + engine: Engine = create_engine( + self._dsn_s[volume], + pool_size=self._pool_size, + max_overflow=0, + pool_pre_ping=False, + pool_recycle=self._pool_recycle, + ) + self._init_table(engine, self._table_s[volume]) + return engine + + def get_engine(self, volume: int) -> Engine: + if volume not in self._volumes: + raise ValueError(f'story volume {volume} not exists') + if volume not in self._engine_s: + with self._lock: + if volume not in self._engine_s: + engine = self._create_engine(volume) + self._engine_s[volume] = engine + return self._engine_s[volume] + + def get_table(self, volume: int) -> str: + if volume not in self._volumes: + raise ValueError(f'story volume {volume} not exists') + return self._table_s[volume] diff --git a/rssant_api/models/story_storage/postgres/postgres_sharding.py b/rssant_api/models/story_storage/postgres/postgres_sharding.py new file mode 100644 index 0000000000000000000000000000000000000000..f4f766fb57e5ee718424172de1ba4c4158df7626 --- /dev/null +++ b/rssant_api/models/story_storage/postgres/postgres_sharding.py @@ -0,0 +1,11 @@ + + +VOLUME_SIZE = 64 * 1024 + + +def sharding_for(feed_id: int) -> int: + """ + 数据分片算法,按 FeedID 范围分片。 + 每卷存储 64K 订阅的故事数据,大约64GB,1千万行记录。 + """ + return feed_id // VOLUME_SIZE diff --git a/rssant_api/models/story_storage/postgres/postgres_story.py b/rssant_api/models/story_storage/postgres/postgres_story.py new file mode 100644 index 0000000000000000000000000000000000000000..3370eeaf11374b146cbfbadf75d87778a2b71bc2 --- /dev/null +++ b/rssant_api/models/story_storage/postgres/postgres_story.py @@ -0,0 +1,116 @@ +from typing import List, Tuple, Dict +from collections import defaultdict + +from sqlalchemy.sql import text as sql + +from ..common.story_key import StoryId +from ..common.story_data import StoryData +from .postgres_client import PostgresClient +from .postgres_sharding import sharding_for + + +_KEY = Tuple[int, int] + + +class PostgresStoryStorage: + + def __init__(self, client: PostgresClient): + self._client = client + + def get_content(self, feed_id: int, offset: int) -> str: + r = self.batch_get_content([(feed_id, offset)]) + if not r: + return None + _, content = r[0] + return content + + def delete_content(self, feed_id: int, offset: int) -> None: + self.batch_delete_content([(feed_id, offset)]) + + def save_content(self, feed_id: int, offset: int, content: str) -> None: + self.batch_save_content([((feed_id, offset), content)]) + + @classmethod + def _split_by(cls, items: list, by: callable) -> dict: + groups = defaultdict(list) + for item in items: + groups[by(item)].append(item) + return groups + + @classmethod + def _split_keys(cls, keys: List[_KEY]) -> Dict[int, List[_KEY]]: + return cls._split_by(keys, lambda x: sharding_for(x[0])) + + @classmethod + def _split_items(cls, items: List[Tuple[_KEY, str]]) -> Dict[int, List[Tuple[_KEY, str]]]: + return cls._split_by(items, lambda x: sharding_for(x[0][0])) + + @staticmethod + def _to_id_list(keys: List[_KEY]): + return tuple(StoryId.encode(feed_id, offset) for feed_id, offset in keys) + + def batch_get_content(self, keys: List[_KEY]) -> List[Tuple[_KEY, str]]: + result = [] + if not keys: + return result + groups = self._split_keys(keys) + for volume, group_keys in groups.items(): + result.extend(self._batch_get_content(volume, group_keys)) + return result + + def _batch_get_content(self, volume: int, keys: List[_KEY]) -> List[Tuple[_KEY, str]]: + q = sql(""" + SELECT id, content FROM {table} WHERE id IN :id_list + """.format(table=self._client.get_table(volume))) + id_list = self._to_id_list(keys) + with self._client.get_engine(volume).connect() as conn: + rows = list(conn.execute(q, id_list=id_list).fetchall()) + result = [] + for story_id, content_data in rows: + key = StoryId.decode(story_id) + if content_data: + content = StoryData.decode_text(content_data) + else: + content = None + result.append((key, content)) + return result + + def batch_delete_content(self, keys: List[_KEY]) -> None: + if not keys: + return + groups = self._split_keys(keys) + for volume, group_keys in groups.items(): + self._batch_delete_content(volume, group_keys) + + def _batch_delete_content(self, volume: int, keys: List[_KEY]) -> None: + q = sql(""" + DELETE FROM {table} WHERE id IN :id_list + """.format(table=self._client.get_table(volume))) + id_list = self._to_id_list(keys) + with self._client.get_engine(volume).connect() as conn: + with conn.begin(): + conn.execute(q, id_list=id_list) + + def batch_save_content(self, items: List[Tuple[_KEY, str]]) -> None: + if not items: + return + groups = self._split_items(items) + for volume, group_items in groups.items(): + self._batch_save_content(volume, group_items) + + def _batch_save_content(self, volume: int, items: List[Tuple[_KEY, str]]) -> None: + q = sql(""" + INSERT INTO {table} (id, content) VALUES (:id, :content) + ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content + """.format(table=self._client.get_table(volume))) + params = [] + for (feed_id, offset), content in items: + story_id = StoryId.encode(feed_id, offset) + if content: + content_data = StoryData.encode_text(content) + else: + content_data = b'' + params.append({'id': story_id, 'content': content_data}) + with self._client.get_engine(volume).connect() as conn: + with conn.begin(): + conn.execute(q, params) diff --git a/rssant_api/models/story_storage/seaweed/__init__.py b/rssant_api/models/story_storage/seaweed/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/rssant_api/models/story_storage/seaweed/seaweed_client.py b/rssant_api/models/story_storage/seaweed/seaweed_client.py new file mode 100644 index 0000000000000000000000000000000000000000..37ae470a4196c868a3d2cc04c19609bf314210e2 --- /dev/null +++ b/rssant_api/models/story_storage/seaweed/seaweed_client.py @@ -0,0 +1,111 @@ +import typing +from concurrent.futures import ThreadPoolExecutor +import requests + +from rssant.middleware.seaweed_panel import SeaweedMetrics + + +class SeaweedError(Exception): + """ + SeaweedError + """ + + +class SeaweedClient: + def __init__(self, volume_url: str, thread_pool_size: int = 30, timeout: int = 3): + self.volume_url = volume_url.rstrip('/') + self.thread_pool_size = thread_pool_size + self.timeout = timeout + self._m_session: requests.Session = None + self._m_executor: ThreadPoolExecutor = None + + @property + def _session(self): + self._init() + return self._m_session + + @property + def _executor(self): + self._init() + return self._m_executor + + def _get_file_url(self, fid: str): + try: + volume_id, rest = fid.strip().split(",") + except ValueError: + raise ValueError( + "fid must be in format: ,") + return self.volume_url + f'/{fid}' + + def put(self, fid: str, data: bytes) -> None: + url = self._get_file_url(fid) + with SeaweedMetrics.record('put'): + response = self._session.post(url, files={'file': data}, timeout=self.timeout) + if response.status_code not in (200, 201, 204): + raise SeaweedError(self._err('put', fid, response)) + + def get(self, fid: str) -> bytes: + url = self._get_file_url(fid) + with SeaweedMetrics.record('get'): + return self._get_by_url(fid, url) + + def _get_by_url(self, fid, url: str) -> bytes: + response = self._session.get(url, timeout=self.timeout) + if response.status_code == 404: + return None + if response.status_code not in (200,): + raise SeaweedError(self._err('get', fid, response)) + return response.content + + def batch_get(self, fid_s: typing.List[str]) -> typing.Dict[str, bytes]: + result = {} + if not fid_s: + return result + url_s = [self._get_file_url(fid) for fid in fid_s] + fut_s = [] + with SeaweedMetrics.record('get', len(fid_s)): + for fid, url in zip(fid_s, url_s): + fut = self._executor.submit(self._get_by_url, fid, url) + fut_s.append((fid, fut)) + for fid, fut in fut_s: + result[fid] = fut.result() + return result + + def delete(self, fid: str) -> None: + url = self._get_file_url(fid) + with SeaweedMetrics.record('delete'): + response = self._session.delete(url, timeout=self.timeout) + if response.status_code not in (200, 202, 204, 404): + raise SeaweedError(self._err('delete', fid, response)) + + def __enter__(self): + self._init() + return self + + def __exit__(self, *exc_info): + self.close() + + def close(self): + if self._m_session: + self._m_session.close() + if self._m_executor: + self._m_executor.shutdown(wait=False) + + def _init(self): + if self._m_session is None: + self._m_session = requests.Session() + # requests get_environ_proxies is too slow + self._m_session.trust_env = False + pool_maxsize = 10 + self.thread_pool_size + for scheme in ['http://', 'https://']: + adapter = requests.adapters.HTTPAdapter( + pool_maxsize=pool_maxsize, pool_connections=pool_maxsize) + self._m_session.mount(scheme, adapter) + if self._m_executor is None: + self._m_executor = ThreadPoolExecutor( + self.thread_pool_size, thread_name_prefix='seaweed_client') + + def _err(self, method: str, fid: str, response: requests.Response) -> str: + msg = f': {response.text}' if response.text else '' + return (f'seaweed {method} failed, fid={fid} ' + f'status={response.status_code}{msg}') diff --git a/rssant_api/models/story_storage/seaweed/seaweed_sharding.py b/rssant_api/models/story_storage/seaweed/seaweed_sharding.py new file mode 100644 index 0000000000000000000000000000000000000000..7612ada9287d1b42e7ce21006950caf3319ce35c --- /dev/null +++ b/rssant_api/models/story_storage/seaweed/seaweed_sharding.py @@ -0,0 +1,58 @@ +from ..common.story_key import StoryKey, hash_feed_id + + +VOLUME_SIZE = 1 * 1024 +VOLUME_GROUP = 8 +VOLUME_GROUP_SIZE = VOLUME_GROUP * VOLUME_SIZE + + +def sharding_for(feed_id: int) -> int: + """ + 数据分片算法,按 FeedID 先范围分组,组内再哈希分片。 + 每卷存储 1K 订阅的故事数据(大约1GB),每 8 卷为一组。 + + [ 0 , 8K ) -> [ 0, 8 ) + [ 8K , 16K ) -> [ 8, 16 ) + [ 16K , 24K ) -> [ 16, 24 ) + """ + group = feed_id // VOLUME_GROUP_SIZE + index = hash_feed_id(feed_id) % VOLUME_GROUP + return group * VOLUME_GROUP + index + + +def seaweed_volume_for(feed_id: int) -> int: + """ + 返回seaweedfs数据卷ID + """ + return sharding_for(feed_id) + 1 + + +class SeaweedFileType: + CONTENT = 2 + + +def seaweed_fid_encode(feed_id: int, offset: int, ftype: int) -> str: + """ + 返回seaweedfs文件ID,其中cookie值设为0 + https://github.com/chrislusf/seaweedfs/blob/master/README.md#save-file-id + """ + volume_id = seaweed_volume_for(feed_id) + file_key = StoryKey.encode(feed_id, offset, reserve2=ftype) + file_key_hex = '%x' % file_key + cookie_hex = '00000000' + return f'{volume_id},{file_key_hex}{cookie_hex}' + + +def seaweed_fid_decode(fid: str) -> tuple: + """ + decode seaweed fid, ignore cookie: + -> (volume_id, feed_id, reserve, offset, ftype) + """ + try: + volume_id_str, remain = fid.split(',') + volume_id = int(volume_id_str) + file_key = int(remain[:-8], base=16) + except ValueError as ex: + raise ValueError('invalid seaweed fid') from ex + feed_id, offset, __, ftype = StoryKey.decode(file_key) + return volume_id, feed_id, offset, ftype diff --git a/rssant_api/models/story_storage/seaweed/seaweed_story.py b/rssant_api/models/story_storage/seaweed/seaweed_story.py new file mode 100644 index 0000000000000000000000000000000000000000..ba2e958b176bf0607216dc4449f9b1523b1a7ef6 --- /dev/null +++ b/rssant_api/models/story_storage/seaweed/seaweed_story.py @@ -0,0 +1,52 @@ +from typing import List, Tuple + +from ..common.story_data import StoryData +from .seaweed_sharding import seaweed_fid_encode, SeaweedFileType +from .seaweed_client import SeaweedClient + + +_KEY = Tuple[int, int] + + +class SeaweedStoryStorage: + def __init__(self, client: SeaweedClient): + self._client: SeaweedClient = client + + def _content_fid(self, feed_id: int, offset: int) -> str: + return seaweed_fid_encode(feed_id, offset, SeaweedFileType.CONTENT) + + def get_content(self, feed_id: int, offset: int) -> str: + content_data = self._client.get(self._content_fid(feed_id, offset)) + if content_data: + content = StoryData.decode_text(content_data) + else: + content = None + return content + + def delete_content(self, feed_id: int, offset: int) -> None: + self._client.delete(self._content_fid(feed_id, offset)) + + def save_content(self, feed_id: int, offset: int, content: str) -> None: + if not content: + return self.delete_content(feed_id, offset) + content_data = StoryData.encode_text(content) + self._client.put(self._content_fid(feed_id, offset), content_data) + + def batch_get_content(self, keys: List[_KEY]) -> List[Tuple[_KEY, str]]: + fid_s = {} + for feed_id, offset in keys: + fid = self._content_fid(feed_id, offset) + fid_s[fid] = (feed_id, offset) + result = [] + for fid, content in self._client.batch_get(fid_s.keys()).items(): + feed_id, offset = fid_s[fid] + result.append(((feed_id, offset), content)) + return result + + def batch_delete_content(self, keys: List[_KEY]) -> None: + for feed_id, offset in keys: + self.delete_content(feed_id, offset) + + def batch_save_content(self, items: List[Tuple[_KEY, str]]) -> None: + for (feed_id, offset), content in items: + self.save_content(feed_id, offset, content) diff --git a/rssant_api/models/story_unique_ids.py b/rssant_api/models/story_unique_ids.py new file mode 100644 index 0000000000000000000000000000000000000000..23d8653d1220481769c4f4535e4b0b4681a8420e --- /dev/null +++ b/rssant_api/models/story_unique_ids.py @@ -0,0 +1,46 @@ +import gzip +import struct + + +class StoryUniqueIdsData: + """ + +---------+--------------+-----------------+ + | 1 byte | 4 bytes | about 10KB | + +---------+--------------+-----------------+ + | version | begin_offset | unique_ids_gzip | + +---------+--------------+-----------------+ + """ + + def __init__(self, begin_offset: int, unique_ids: list, version=1): + self._version = version + self._begin_offset = begin_offset + for x in unique_ids: + if not x: + raise ValueError('unique_id can not be empty') + if '\n' in x: + raise ValueError(r"unique_id can not contains '\n' character") + self._unique_ids = unique_ids + + @property + def unique_ids(self) -> list: + return self._unique_ids + + @property + def begin_offset(self) -> int: + return self._begin_offset + + def encode(self) -> bytes: + value = '\n'.join(self._unique_ids).encode('utf-8') + unique_ids_gzip = gzip.compress(value) + header = struct.pack('>BI', self._version, self._begin_offset) + return header + unique_ids_gzip + + @classmethod + def decode(cls, data: bytes) -> "StoryUniqueIdsData": + (version,) = struct.unpack('>B', data[:1]) + if version != 1: + raise ValueError(f'not support version {version}') + (begin_offset,) = struct.unpack('>I', data[1:5]) + value = gzip.decompress(data[5:]).decode('utf-8') + unique_ids = value.split('\n') if value else [] + return cls(begin_offset, unique_ids, version=version) diff --git a/rssant_api/models/union_story.py b/rssant_api/models/union_story.py index 5e2933f3be040b783edee0caa33ef4466a4b6cec..6cb66c935a84e5fa6b8e7a0a878121d29e03898e 100644 --- a/rssant_api/models/union_story.py +++ b/rssant_api/models/union_story.py @@ -7,7 +7,9 @@ from rssant_common.validator import StoryUnionId, FeedUnionId from rssant_common.detail import Detail from .feed import UserFeed from .story import Story, UserStory, StoryDetailSchema, USER_STORY_DETAIL_FEILDS +from .story_info import StoryInfo, StoryId, STORY_INFO_DETAIL_FEILDS from .errors import FeedNotFoundError, StoryNotFoundError +from .story_service import STORY_SERVICE def convert_summary(summary): @@ -16,19 +18,6 @@ def convert_summary(summary): class UnionStory: - _STORY_FIELD_NAMES = None - - @classmethod - def _story_field_names(cls): - if cls._STORY_FIELD_NAMES is None: - names = set() - for field in Story._meta.get_fields(): - column = getattr(field, 'column', None) - if column: - names.add(column) - cls._STORY_FIELD_NAMES = list(sorted(names)) - return cls._STORY_FIELD_NAMES - def __init__(self, story, *, user_id, user_feed_id, user_story=None, detail=False): self._story = story self._user_id = user_id @@ -175,9 +164,8 @@ class UnionStory: user_story = q.get() except UserStory.DoesNotExist: user_story = None - try: - story = Story.get_by_offset(feed_id, offset, detail=detail) - except Story.DoesNotExist: + story = STORY_SERVICE.get_by_offset(feed_id, offset, detail=detail) + if not story: raise StoryNotFoundError() else: story = user_story.story @@ -196,14 +184,14 @@ class UnionStory: @staticmethod def _merge_storys(storys, user_storys, *, user_id, user_feeds=None, detail=False): - user_storys_map = {x.story_id: x for x in user_storys} + user_storys_map = {(x.feed_id, x.offset): x for x in user_storys} if user_feeds: user_feeds_map = {x.feed_id: x.id for x in user_feeds} else: user_feeds_map = {x.feed_id: x.user_feed_id for x in user_storys} ret = [] for story in storys: - user_story = user_storys_map.get(story.id) + user_story = user_storys_map.get((story.feed_id, story.offset)) user_feed_id = user_feeds_map.get(story.feed_id) ret.append(UnionStory( story, @@ -214,8 +202,47 @@ class UnionStory: )) return ret - @staticmethod - def query_by_feed(feed_unionid, offset=None, size=10, detail=False): + @classmethod + def _query_storys_by_feed(cls, feed_id, offset, size, detail): + q = Story.objects.filter(feed_id=feed_id, offset__gte=offset) + detail = Detail.from_schema(detail, StoryDetailSchema) + q = q.defer(*detail.exclude_fields) + q = q.order_by('offset')[:size] + storys = list(q.all()) + return storys + + @classmethod + def _query_storys_by_story_service(cls, feed_id, offset, size, detail): + begin_id = StoryId.encode(feed_id, offset) + end_id = StoryId.encode(feed_id, offset + size - 1) + q = StoryInfo.objects\ + .filter(pk__gte=begin_id, pk__lte=end_id) + if not detail: + q = q.defer(*STORY_INFO_DETAIL_FEILDS) + story_info_s = list(q.all()) + storys = [STORY_SERVICE.to_common(x) for x in story_info_s] + return storys + + @classmethod + def _query_user_storys_by_offset(cls, user_id, feed_id, offset_s): + q = UserStory.objects.filter(user_id=user_id, feed_id=feed_id, offset__in=offset_s) + q = q.exclude(is_favorited=False, is_watched=False) + user_storys = list(q.all()) + return user_storys + + @classmethod + def _query_storys(cls, feed_id, offset, size, detail): + storys = cls._query_storys_by_story_service(feed_id, offset, size, detail=detail) + got_offset_s = set(x.offset for x in storys) + if len(storys) < size: + for story in cls._query_storys_by_feed(feed_id, offset, size, detail=detail): + if story.offset not in got_offset_s: + storys.append(story) + storys = list(sorted(storys, key=lambda x: x.offset)) + return storys + + @classmethod + def query_by_feed(cls, feed_unionid, offset=None, size=10, detail=False): user_id, feed_id = feed_unionid q = UserFeed.objects.select_related('feed')\ .filter(user_id=user_id, feed_id=feed_id)\ @@ -227,15 +254,11 @@ class UnionStory: total = user_feed.feed.total_storys if offset is None: offset = user_feed.story_offset - q = Story.objects.filter(feed_id=feed_id, offset__gte=offset) - detail = Detail.from_schema(detail, StoryDetailSchema) - q = q.defer(*detail.exclude_fields) - q = q.order_by('offset')[:size] - storys = list(q.all()) - story_ids = [x.id for x in storys] - q = UserStory.objects.filter(user_id=user_id, feed_id=feed_id, story_id__in=story_ids) - q = q.exclude(is_favorited=False, is_watched=False) - user_storys = list(q.all()) + if offset + size > total: + size = total - offset + storys = cls._query_storys(feed_id, offset, size, detail=detail) + offset_s = [x.offset for x in storys] + user_storys = cls._query_user_storys_by_offset(user_id, feed_id, offset_s) ret = UnionStory._merge_storys( storys, user_storys, user_feeds=[user_feed], user_id=user_id, detail=detail) return total, offset, ret @@ -276,6 +299,9 @@ class UnionStory: @classmethod def _query_union_storys(cls, user_id, storys, detail): + """ + Deprecated since 1.5.0 + """ story_ids = [x.id for x in storys] feed_ids = list(set([x.feed_id for x in storys])) q = UserStory.objects.filter( @@ -286,6 +312,23 @@ class UnionStory: storys, user_storys, user_id=user_id, detail=detail) return union_storys + @classmethod + def _query_union_storys_by_offset(cls, user_id, storys, detail): + where_items = [] + for story in storys: + # ensure integer, avoid sql inject attack + feed_id, offset = int(story.feed_id), int(story.offset) + where_items.append(f'("feed_id"={feed_id} AND "offset"={offset})') + where_clause = ' OR '.join(where_items) + sql = f""" + SELECT * FROM rssant_api_userstory + WHERE user_id=%s AND ({where_clause}) + """ + user_storys = list(UserStory.objects.raw(sql, [user_id])) + union_storys = UnionStory._merge_storys( + storys, user_storys, user_id=user_id, detail=detail) + return union_storys + @classmethod def _validate_story_keys(cls, user_id, story_keys): if not story_keys: @@ -297,8 +340,15 @@ class UnionStory: for feed_id, offset in story_keys: if feed_id in feed_ids: verified_story_keys.append((feed_id, offset)) + verified_story_keys = list(sorted(verified_story_keys)) return verified_story_keys + @classmethod + def _batch_get_story_infos(cls, story_keys, detail): + story_info_s = StoryInfo.batch_get(story_keys, detail=detail) + storys = [STORY_SERVICE.to_common(x) for x in story_info_s] + return storys + @classmethod def batch_get_by_feed_offset(cls, user_id, story_keys, detail=False): """ @@ -307,25 +357,12 @@ class UnionStory: story_keys = cls._validate_story_keys(user_id, story_keys) if not story_keys: return [] - detail = Detail.from_schema(detail, StoryDetailSchema) - select_fields = set(cls._story_field_names()) - set(detail.exclude_fields) - select_fields_quoted = ','.join(['"{}"'.format(x) for x in select_fields]) - # Note: below query can not use index, it's very slow - # WHERE ("feed_id","offset")=Any(%s) - # WHERE ("feed_id","offset")=Any(ARRAY[(XX, YY), ...]) - where_items = [] - for feed_id, offset in story_keys: - # ensure integer, avoid sql inject attack - feed_id, offset = int(feed_id), int(offset) - where_items.append(f'("feed_id"={feed_id} AND "offset"={offset})') - where_clause = ' OR '.join(where_items) - sql = f""" - SELECT {select_fields_quoted} - FROM rssant_api_story - WHERE {where_clause} - """ - storys = list(Story.objects.raw(sql)) - union_storys = cls._query_union_storys( + storys = cls._batch_get_story_infos(story_keys, detail=detail) + finish_story_keys = set((x.feed_id, x.offset) for x in storys) + remain_story_keys = list(sorted(set(story_keys) - finish_story_keys)) + if remain_story_keys: + storys.extend(Story.batch_get_by_offset(remain_story_keys, detail=detail)) + union_storys = cls._query_union_storys_by_offset( user_id=user_id, storys=storys, detail=detail) return union_storys @@ -354,18 +391,26 @@ class UnionStory: @staticmethod def _set_tag_by_id(story_unionid, is_favorited=None, is_watched=None): - union_story = UnionStory.get_by_id(story_unionid) - user_feed_id = union_story._user_feed_id - user_story = union_story._user_story + user_id, feed_id, offset = story_unionid + story = STORY_SERVICE.set_user_marked(feed_id, offset) + if not story: + story = Story.get_by_offset(feed_id, offset, detail=False) + user_feed = UserFeed.objects\ + .only('id', 'user_id', 'feed_id')\ + .get(user_id=user_id, feed_id=feed_id) + user_feed_id = user_feed.id + try: + user_story = UserStory.get_by_offset(user_id, feed_id, offset, detail=False) + except UserStory.DoesNotExist: + user_story = None with transaction.atomic(): if user_story is None: - user_id, feed_id, offset = story_unionid user_story = UserStory( user_id=user_id, feed_id=feed_id, user_feed_id=user_feed_id, - story_id=union_story._story.id, - offset=union_story._story.offset + story_id=story.id, + offset=offset ) if is_favorited is not None: user_story.is_favorited = is_favorited @@ -375,9 +420,12 @@ class UnionStory: user_story.dt_watched = timezone.now() user_story.save() if is_favorited or is_watched: - union_story._story.is_user_marked = True - union_story._story.save() - union_story._user_story = user_story + if not story.is_user_marked: + story.is_user_marked = True + story.save() + union_story = UnionStory( + story, user_id=user_id, user_feed_id=user_feed_id, + user_story=user_story, detail=False) return union_story @staticmethod diff --git a/rssant_api/tests/__init__.py b/rssant_api/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/rssant_api/tests/feed_tests.py b/rssant_api/tests/feed_tests.py new file mode 100644 index 0000000000000000000000000000000000000000..f00670d4ec2f4e9e69d0b6897895c8233e6927bb --- /dev/null +++ b/rssant_api/tests/feed_tests.py @@ -0,0 +1,22 @@ +import pytest +from django.utils import timezone +from django.test import TestCase + +from rssant_api.models import Feed, FeedStatus + + +@pytest.mark.dbtest +class FeedSimpleTestCase(TestCase): + def setUp(self): + feed = Feed( + title='test feed', + url='https://blog.example.com/feed.xml', + status=FeedStatus.READY, + dt_updated=timezone.now(), + ) + feed.save() + + def test_get_feed_by_url(self): + url = 'https://blog.example.com/feed.xml' + got = Feed.get_first_by_url(url) + self.assertEqual(got.title, 'test feed') diff --git a/rssant_api/tests.py b/rssant_api/tests/reverse_url_tests.py similarity index 85% rename from rssant_api/tests.py rename to rssant_api/tests/reverse_url_tests.py index 0c201689d6e073e3fe66b17ae194456836029762..463bc11cb7e788da1b1f50e68a900a20c997941d 100644 --- a/rssant_api/tests.py +++ b/rssant_api/tests/reverse_url_tests.py @@ -1,7 +1,6 @@ import pytest import yarl - -from .helper import reverse_url, forward_url +from rssant_api.helper import reverse_url, forward_url reverse_and_forward_url_cases = [ @@ -44,7 +43,8 @@ reverse_and_forward_url_cases = [ ] -@pytest.mark.parametrize('url,rev_url', reverse_and_forward_url_cases) -def test_reverse_and_forward_url(url, rev_url): +@pytest.mark.parametrize('n', range(len(reverse_and_forward_url_cases))) +def test_reverse_and_forward_url(n): + url, rev_url = reverse_and_forward_url_cases[n] assert reverse_url(url) == rev_url assert yarl.URL(forward_url(rev_url)) == yarl.URL(url) diff --git a/rssant_api/tests/story_tests.py b/rssant_api/tests/story_tests.py new file mode 100644 index 0000000000000000000000000000000000000000..f25dc59becd55a8972c0be8775243479616f29aa --- /dev/null +++ b/rssant_api/tests/story_tests.py @@ -0,0 +1,182 @@ +import pytest +from django.utils import timezone +from django.test import TransactionTestCase +from validr import T + +from rssant_common.validator import compiler +from rssant_api.models import Feed, FeedStatus +from rssant_api.models import STORY_SERVICE, Story, StoryInfo +from rssant.helper.content_hash import compute_hash_base64 + + +StorySchema = T.dict( + unique_id=T.str, + title=T.str, + content_hash_base64=T.str, + author=T.str.optional, + link=T.str.optional, + image_url=T.url.optional, + iframe_url=T.url.optional, + audio_url=T.url.optional, + has_mathjax=T.bool.optional, + dt_published=T.datetime.object.optional.invalid_to_default, + dt_updated=T.datetime.object.optional, + summary=T.str.optional, + content=T.str.optional, +) + +validate_story = compiler.compile(StorySchema) + + +@pytest.mark.dbtest +class StoryTestCase(TransactionTestCase): + + def setUp(self): + print('setUp') + storys = [] + updated_storys = [] + now = timezone.datetime(2020, 6, 1, 12, 12, 12) + for i in range(200): + dt = now + timezone.timedelta(minutes=i) + content = f'test story content {i}' * (i % 5) + content_hash_base64 = compute_hash_base64(content) + summary = content[:30] + story = { + 'unique_id': f'blog.example.com/{i}', + 'title': f'test story {i}', + 'content_hash_base64': content_hash_base64, + 'author': 'tester', + 'link': f'https://blog.example.com/{i}.html', + 'dt_published': dt, + 'dt_updated': dt, + 'summary': summary, + 'content': content, + } + storys.append(validate_story(story)) + updated_story = dict(story) + updated_content = f'test story content updated {i}' * (i % 5 + 1) + updated_story.update( + content=updated_content, + content_hash_base64=compute_hash_base64(updated_content), + ) + updated_storys.append(validate_story(updated_story)) + self.storys = storys + self.updated_storys = updated_storys + + feed = Feed( + title='test feed', + url='https://blog.example.com/feed.xml', + status=FeedStatus.READY, + dt_updated=timezone.now(), + ) + feed.save() + self.feed_id = feed.id + + def assert_feed_total_storys(self, expect): + total_storys = Feed.get_by_pk(self.feed_id).total_storys + self.assertEqual(total_storys, expect) + + def assert_total_story_infos(self, expect): + total_storys = StoryInfo.objects.count() + self.assertEqual(total_storys, expect) + + def test_new_bulk_save_by_feed(self): + storys_0_30 = self.storys[:30] + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, storys_0_30, batch_size=10) + self.assertEqual(len(modified), 30) + self.assert_feed_total_storys(30) + self.assert_total_story_infos(30) + + storys_20_50 = self.storys[20:50] + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, storys_20_50, batch_size=10) + self.assertEqual(len(modified), 20) + self.assert_feed_total_storys(50) + self.assert_total_story_infos(50) + + updated_storys_30_50 = self.updated_storys[30:50] + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, updated_storys_30_50, batch_size=10) + self.assertEqual(len(modified), 20) + self.assert_feed_total_storys(50) + self.assert_total_story_infos(50) + + def test_mix_bulk_save_by_feed(self): + storys_0_30 = self.storys[:30] + modified = Story.bulk_save_by_feed( + self.feed_id, storys_0_30, batch_size=10) + self.assertEqual(len(modified), 30) + self.assert_feed_total_storys(30) + self.assert_total_story_infos(0) + + storys_10_50 = self.updated_storys[10:30] + self.storys[30:50] + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, storys_10_50, batch_size=10) + self.assertEqual(len(modified), 40) + self.assert_feed_total_storys(50) + self.assert_total_story_infos(40) + + storys_40_60 = self.storys[40:60] + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, storys_40_60, batch_size=10) + self.assertEqual(len(modified), 10) + self.assert_feed_total_storys(60) + self.assert_total_story_infos(50) + + def test_bulk_save_by_feed_refresh(self): + storys_0_20 = self.storys[:20] + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, storys_0_20, batch_size=10) + self.assertEqual(len(modified), 20) + self.assert_feed_total_storys(20) + self.assert_total_story_infos(20) + + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, storys_0_20, batch_size=10) + self.assertEqual(len(modified), 0) + self.assert_feed_total_storys(20) + self.assert_total_story_infos(20) + + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, storys_0_20, batch_size=10, is_refresh=True) + self.assertEqual(len(modified), 20) + self.assert_feed_total_storys(20) + self.assert_total_story_infos(20) + + def test_update_story(self): + storys_0_20 = self.storys[:20] + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, storys_0_20, batch_size=10) + self.assertEqual(len(modified), 20) + self.assert_feed_total_storys(20) + self.assert_total_story_infos(20) + + story_10 = self.updated_storys[10] + data = {k: story_10[k] for k in ['content', 'summary', 'dt_published']} + STORY_SERVICE.update_story(self.feed_id, 10, data) + + def test_delete_by_retention(self): + storys_0_30 = self.storys[:30] + modified = Story.bulk_save_by_feed( + self.feed_id, storys_0_30, batch_size=10) + self.assertEqual(len(modified), 30) + self.assert_feed_total_storys(30) + self.assert_total_story_infos(0) + + storys_20_50 = self.storys[20:50] + modified = STORY_SERVICE.bulk_save_by_feed( + self.feed_id, storys_20_50, batch_size=10) + self.assertEqual(len(modified), 20) + self.assert_feed_total_storys(50) + self.assert_total_story_infos(20) + + n = STORY_SERVICE.delete_by_retention(self.feed_id, retention=10, limit=10) + self.assertEqual(n, 10) + self.assert_feed_total_storys(50) + self.assert_total_story_infos(20) + + n = STORY_SERVICE.delete_by_retention(self.feed_id, retention=10, limit=50) + self.assertEqual(n, 30) + self.assert_feed_total_storys(50) + self.assert_total_story_infos(10) diff --git a/rssant_api/views/feed.py b/rssant_api/views/feed.py index 99d0011de1520c44f46caf9dd37581942b740d1a..20b151e26e71337e755a7e219e5b3a03d8008c41 100644 --- a/rssant_api/views/feed.py +++ b/rssant_api/views/feed.py @@ -81,7 +81,7 @@ FeedView = RestRouter() @FeedView.post('feed/query') def feed_query( request, - hints: T.list(T.dict(id=T.feed_unionid.object, dt_updated=T.datetime.object)).optional, + hints: T.list(T.dict(id=T.feed_unionid.object, dt_updated=T.datetime.object)).maxlen(5000).optional, detail: FeedDetailSchema, ) -> T.dict( total=T.int.optional, @@ -155,7 +155,7 @@ def feed_query_creation( ) -> T.dict( total=T.int.min(0), size=T.int.min(0), - feed_creations=T.list(FeedCreationSchema), + feed_creations=T.list(FeedCreationSchema).maxlen(2000), ): feed_creations = FeedCreation.query_by_user(request.user.id, limit=limit, detail=detail) feed_creations = [x.to_dict() for x in feed_creations] diff --git a/rssant_common/actor_helper.py b/rssant_common/actor_helper.py index 0b4cac0483698bec936af7924bc98d4016859df4..4eb7011795b785a39fa788e01d9cf3f15d87cbbb 100644 --- a/rssant_common/actor_helper.py +++ b/rssant_common/actor_helper.py @@ -1,12 +1,15 @@ import logging import time import functools +from contextlib import contextmanager from urllib.parse import urlparse import click from django import db +from django.db import connection from validr import T import backdoor +from pyinstrument import Profiler from actorlib import actor, collect_actors, ActorNode, NodeSpecSchema from actorlib.sentry import sentry_init @@ -26,16 +29,46 @@ def django_context(f): @functools.wraps(f) def wrapper(*args, **kwargs): - db.reset_queries() - db.close_old_connections() + with log_django_context_metric(f.__name__): + db.reset_queries() + db.close_old_connections() + try: + return f(*args, **kwargs) + finally: + db.close_old_connections() + + return wrapper + + +def profile_django_context(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + profiler = None + if CONFIG.profiler_enable: + profiler = Profiler() + profiler.start() try: return f(*args, **kwargs) finally: - db.close_old_connections() - + if profiler is not None: + profiler.stop() + print(profiler.output_text(unicode=True, color=True)) return wrapper +@contextmanager +def log_django_context_metric(name, profiler_enable=False): + begin_time = time.time() + try: + yield + finally: + num_sqls = len(connection.queries) + sql_cost = sum(float(x['time']) for x in connection.queries) * 1000 + sql_time = f'{num_sqls},{sql_cost:.0f}ms' + cost = int((time.time() - begin_time) * 1000) + LOG.info(f'{name} sql=%s cost=%dms', sql_time, cost) + + @actor('actor.update_registery') def do_update_registery(ctx, nodes: T.list(NodeSpecSchema)): LOG.info(f'update registery {ctx.message}') diff --git a/rssant_common/dns_service.py b/rssant_common/dns_service.py index ae31d1f336665e6427fa20caa6691927beb6117d..bfb0a4d941ed6fe822be47c8d9db1dc4ecaeefaf 100644 --- a/rssant_common/dns_service.py +++ b/rssant_common/dns_service.py @@ -179,7 +179,7 @@ class DNSService: response = self.client.request('GET', url, headers=headers) response.raise_for_status() except Exception as ex: - LOG.warning(ex, exc_info=ex) + LOG.warning(f'{type(ex).__name__}: {ex}') continue for item in response.json()['Answer']: if item['type'] == 1: # ipv4 diff --git a/rssant_config/env.py b/rssant_config/env.py index 2068d23b47c70c0473d63f7ebdd8e87abe99f0e7..0e0f899d48e40fc7c9fae04a018a0954692aee94 100644 --- a/rssant_config/env.py +++ b/rssant_config/env.py @@ -1,4 +1,5 @@ import os.path +import re from dotenv import load_dotenv from validr import T, modelclass, fields, Invalid @@ -20,6 +21,7 @@ class ConfigModel: class EnvConfig(ConfigModel): debug: bool = T.bool.default(False).desc('debug') + profiler_enable: bool = T.bool.default(False).desc('enable profiler or not') log_level: str = T.enum('DEBUG,INFO,WARNING,ERROR').default('INFO') root_url: str = T.url.relaxed.default('http://localhost:6789') scheduler_network: str = T.str.default('localhost') @@ -29,6 +31,9 @@ class EnvConfig(ConfigModel): allow_private_address: bool = T.bool.default(False) check_feed_minutes: int = T.int.min(1).default(30) feed_story_retention: int = T.int.min(1).default(5000).desc('max storys to keep per feed') + seaweed_volume_url: str = T.url.relaxed.default('http://localhost:9080') + seaweed_thread_pool_size: int = T.int.min(1).default(30) + pg_story_volumes: str = T.str.optional # actor actor_storage_path: str = T.str.default('data/actor_storage') actor_storage_compact_wal_delta: int = T.int.min(1).default(5000) @@ -74,6 +79,37 @@ class EnvConfig(ConfigModel): networks = validate_extra_networks(networks) return list(networks) + @classmethod + def _parse_story_volumes(cls, text: str): + """ + Format: + {volume}:{user}:{password}@{host}:{port}/{db}/{table} + + >>> volumes = EnvConfig._parse_story_volumes('0:user:password@host:5432/db/table') + >>> expect = {0: dict( + ... user='user', password='password', + ... host='host', port=5432, db='db', table='table' + ... )} + >>> volumes == expect + True + """ + re_volume = re.compile(r'^(\d+)\:(\w+)\:(\w+)\@(\w+)\:(\d+)\/(\w+)\/(\w+)$') + volumes = {} + for part in text.split(','): + match = re_volume.match(part) + if not match: + raise Invalid(f'invalid story volume {part!r}') + volume = int(match.group(1)) + volumes[volume] = dict( + user=match.group(2), + password=match.group(3), + host=match.group(4), + port=int(match.group(5)), + db=match.group(6), + table=match.group(7), + ) + return volumes + def __post_init__(self): if self.sentry_enable and not self.sentry_dsn: raise Invalid('sentry_dsn is required when sentry_enable=True') @@ -99,6 +135,18 @@ class EnvConfig(ConfigModel): 'url': None, }] } + if self.pg_story_volumes: + volumes = self._parse_story_volumes(self.pg_story_volumes) + else: + volumes = {0: dict( + user=self.pg_user, + password=self.pg_password, + host=self.pg_host, + port=self.pg_port, + db=self.pg_db, + table='story_volume_0', + )} + self.pg_story_volumes_parsed = volumes def load_env_config() -> EnvConfig: diff --git a/rssant_harbor/actors/rss.py b/rssant_harbor/actors/rss.py index 66ee2cef81cb6f8310e316ddebff83e0546c3c7f..4fb136505446d41174b6e8735ed8af017c822b41 100644 --- a/rssant_harbor/actors/rss.py +++ b/rssant_harbor/actors/rss.py @@ -13,10 +13,10 @@ from rssant_feedlib import processor from rssant_feedlib.reader import FeedResponseStatus from rssant_feedlib.processor import StoryImageProcessor, RSSANT_IMAGE_TAG, is_replaced_image from rssant_feedlib.fulltext import split_sentences, is_summary, is_fulltext_content -from rssant_api.models import UserFeed, Feed, Story, FeedUrlMap, FeedStatus, FeedCreation, ImageInfo +from rssant_api.models import UserFeed, Feed, STORY_SERVICE, FeedUrlMap, FeedStatus, FeedCreation, ImageInfo from rssant_api.helper import reverse_url from rssant_common.image_url import encode_image_url -from rssant_common.actor_helper import django_context +from rssant_common.actor_helper import django_context, profile_django_context from rssant_common.validator import compiler from rssant_config import CONFIG @@ -113,6 +113,7 @@ def do_update_feed_creation_status( @actor('harbor_rss.save_feed_creation_result') @django_context +@profile_django_context def do_save_feed_creation_result( ctx: ActorContext, feed_creation_id: T.int, @@ -143,6 +144,7 @@ def do_save_feed_creation_result( feed = Feed( url=url, status=FeedStatus.READY, reverse_url=reverse_url(url), + title=feed_dict['title'], dt_updated=now, dt_checked=now, dt_synced=now) feed.save() feed_creation.status = FeedStatus.READY @@ -171,6 +173,7 @@ def do_save_feed_creation_result( @actor('harbor_rss.update_feed') @django_context +@profile_django_context def do_update_feed( ctx: ActorContext, feed_id: T.int, @@ -208,19 +211,20 @@ def do_update_feed( feed.reverse_url = reverse_url(feed.url) feed.status = FeedStatus.READY feed.save() - for s in storys: - if not s['dt_updated']: - s['dt_updated'] = now - if not s['dt_published']: - # set dt_published to now - 30d to avoid these storys - # take over mushroom page, i.e. Story.query_recent_by_user - s['dt_published'] = now_sub_30d - modified_storys = Story.bulk_save_by_feed(feed.id, storys, is_refresh=is_refresh) - LOG.info( - 'feed#%s save storys total=%s num_modified=%s', - feed.id, len(storys), len(modified_storys) - ) - feed.refresh_from_db() + # save storys, bulk_save_by_feed has standalone transaction + for s in storys: + if not s['dt_updated']: + s['dt_updated'] = now + if not s['dt_published']: + # set dt_published to now - 30d to avoid these storys + # take over mushroom page, i.e. Story.query_recent_by_user + s['dt_published'] = now_sub_30d + modified_storys = STORY_SERVICE.bulk_save_by_feed(feed.id, storys, is_refresh=is_refresh) + LOG.info( + 'feed#%s save storys total=%s num_modified=%s', + feed.id, len(storys), len(modified_storys) + ) + feed = Feed.get_by_pk(feed_id) if modified_storys: feed.unfreeze() need_fetch_story = _is_feed_need_fetch_storys(feed, modified_storys) @@ -233,7 +237,8 @@ def do_update_feed( ctx.tell('worker_rss.fetch_story', dict( url=story.link, use_proxy=feed.use_proxy, - story_id=str(story.id), + feed_id=story.feed_id, + offset=story.offset, num_sub_sentences=num_sub_sentences, )) else: @@ -325,7 +330,7 @@ def _detect_story_images(ctx, story): url_root = ImageInfo.extract_url_root(url) todo_url_roots[url_root].append(url) LOG.info( - f'story#{story.id} {story.link} has {len(image_urls)} images, ' + f'story#{story.feed_id},{story.offset} {story.link} has {len(image_urls)} images, ' f'need detect {num_todo_image_urls} images ' f'from {len(todo_url_roots)} url_roots' ) @@ -337,40 +342,45 @@ def _detect_story_images(ctx, story): else: todo_urls.extend(items) ctx.hope('worker_rss.detect_story_images', dict( - story_id=story.id, + feed_id=story.feed_id, + offset=story.offset, story_url=story.link, image_urls=list(set(todo_urls)), )) else: - _replace_story_images(story.id) + _replace_story_images(feed_id=story.feed_id, offset=story.offset) @actor('harbor_rss.update_story') @django_context +@profile_django_context def do_update_story( ctx: ActorContext, - story_id: T.int, + feed_id: T.int, + offset: T.int, content: T.str, summary: T.str, has_mathjax: T.bool.optional, url: T.url, ): - story = Story.objects.get(pk=story_id) + story = STORY_SERVICE.get_by_offset(feed_id, offset, detail=True) + if not story: + LOG.error('story#%s,%s not found', feed_id, offset) + return if not is_fulltext_content(content): story_text = processor.story_html_to_text(story.content) text = processor.story_html_to_text(content) if not is_summary(story_text, text): - msg = 'fetched story#%s url=%r is not fulltext of feed story content' - LOG.info(msg, story_id, url) + msg = 'fetched story#%s,%s url=%r is not fulltext of feed story content' + LOG.info(msg, feed_id, offset, url) return - with transaction.atomic(): - story.refresh_from_db() - story.link = url - story.content = content - story.summary = summary - if has_mathjax is not None: - story.has_mathjax = has_mathjax - story.save() + data = dict( + link=url, + content=content, + summary=summary, + has_mathjax=has_mathjax, + ) + STORY_SERVICE.update_story(feed_id, offset, data) _detect_story_images(ctx, story) @@ -383,9 +393,11 @@ IMAGE_REFERER_DENY_STATUS = set([ @actor('harbor_rss.update_story_images') @django_context +@profile_django_context def do_update_story_images( ctx: ActorContext, - story_id: T.int, + feed_id: T.int, + offset: T.int, story_url: T.url, images: T.list(T.dict( url=T.url, @@ -412,11 +424,11 @@ def do_update_story_images( )) LOG.info(f'bulk create {len(image_info_objects)} ImageInfo objects') ImageInfo.objects.bulk_create(image_info_objects) - _replace_story_images(story_id) + _replace_story_images(feed_id, offset) -def _replace_story_images(story_id): - story = Story.objects.get(pk=story_id) +def _replace_story_images(feed_id, offset): + story = STORY_SERVICE.get_by_offset(feed_id, offset, detail=True) image_processor = StoryImageProcessor(story.link, story.content) image_indexs = image_processor.parse() image_urls = _image_urls_of_indexs(image_indexs) @@ -428,13 +440,12 @@ def _replace_story_images(story_id): if status in IMAGE_REFERER_DENY_STATUS: new_url_data = encode_image_url(url, story.link) image_replaces[url] = '/api/v1/image/{}?{}'.format(new_url_data, RSSANT_IMAGE_TAG) - LOG.info(f'story#{story_id} {story.link} ' + LOG.info(f'story#{feed_id},{offset} {story.link} ' f'replace {len(image_replaces)} referer deny images') # image_processor.process will (1) fix relative url (2) replace image url # call image_processor.process regardless of image_replaces is empty or not content = image_processor.process(image_indexs, image_replaces) - story.content = content - story.save() + STORY_SERVICE.update_story(feed_id, offset, {'content': content}) @actor('harbor_rss.check_feed') @@ -501,7 +512,7 @@ def do_clean_by_retention(ctx: ActorContext): for feed in feeds: feed_id = feed['feed_id'] url = feed['url'] - n = Story.delete_by_retention(feed_id, retention=retention) + n = STORY_SERVICE.delete_by_retention(feed_id, retention=retention) LOG.info(f'deleted {n} storys of feed#{feed_id} {url} by retention') diff --git a/rssant_worker/actors/rss.py b/rssant_worker/actors/rss.py index fffb3e7819dbd5a9b6eff078f03a852ac66eb4d4..2108e8c5d36dfe2ff2671b80062696ec9318ab08 100644 --- a/rssant_worker/actors/rss.py +++ b/rssant_worker/actors/rss.py @@ -232,12 +232,13 @@ def _update_feed_info(ctx, feed_id, response: FeedResponse, status: str = None, )) -async def _fetch_story(reader, story_id, url, use_proxy): +async def _fetch_story(reader, feed_id, offset, url, use_proxy): for i in range(2): response = await reader.read(url, use_proxy=use_proxy) if response and response.url: url = str(response.url) - LOG.info(f'fetch story#{story_id} url={unquote(url)} status={response.status} finished') + LOG.info( + f'fetch story#{feed_id},{offset} url={unquote(url)} status={response.status} finished') if not (response and response.ok and response.content): return None try: @@ -248,7 +249,7 @@ async def _fetch_story(reader, story_id, url, use_proxy): html_redirect = get_html_redirect_url(content) if (not html_redirect) or html_redirect == url: return url, content - LOG.info('story#%s resolve html redirect to %r', story_id, html_redirect) + LOG.info('story#%s,%s resolve html redirect to %r', feed_id, offset, html_redirect) url = html_redirect return url, content @@ -256,30 +257,32 @@ async def _fetch_story(reader, story_id, url, use_proxy): @actor('worker_rss.fetch_story') async def do_fetch_story( ctx: ActorContext, - story_id: T.int, + feed_id: T.int, + offset: T.int, url: T.url, use_proxy: T.bool.default(False), num_sub_sentences: T.int.optional, ): - LOG.info(f'fetch story#{story_id} url={unquote(url)} begin') + LOG.info(f'fetch story#{feed_id},{offset} url={unquote(url)} begin') options = _get_proxy_options() options.update(allow_private_address=CONFIG.allow_private_address) if DNS_SERVICE.is_resolved_url(url): use_proxy = False async with AsyncFeedReader(**options) as reader: use_proxy = use_proxy and reader.has_rss_proxy - url_content = await _fetch_story(reader, story_id, url, use_proxy=use_proxy) + url_content = await _fetch_story(reader, feed_id, offset, url, use_proxy=use_proxy) if not url_content: return url, content = url_content if len(content) >= _MAX_STORY_HTML_LENGTH: content = story_html_clean(content) if len(content) >= _MAX_STORY_HTML_LENGTH: - msg = 'too large story#%s size=%s url=%r' - LOG.warning(msg, story_id, len(content), url) + msg = 'too large story#%s,%s size=%s url=%r' + LOG.warning(msg, feed_id, offset, len(content), url) content = story_html_to_text(content)[:_MAX_STORY_HTML_LENGTH] await ctx.hope('worker_rss.process_story_webpage', dict( - story_id=story_id, + feed_id=feed_id, + offset=offset, url=url, text=content, num_sub_sentences=num_sub_sentences, @@ -289,7 +292,8 @@ async def do_fetch_story( @actor('worker_rss.process_story_webpage') def do_process_story_webpage( ctx: ActorContext, - story_id: T.int, + feed_id: T.int, + offset: T.int, url: T.url, text: T.str.maxlen(_MAX_STORY_HTML_LENGTH), num_sub_sentences: T.int.optional, @@ -307,22 +311,23 @@ def do_process_story_webpage( content = story_readability(text) content = process_story_links(content, url) if len(content) > _MAX_STORY_CONTENT_LENGTH: - msg = 'too large story#%s size=%s url=%r, will only save plain text' - LOG.warning(msg, story_id, len(content), url) + msg = 'too large story#%s,%s size=%s url=%r, will only save plain text' + LOG.warning(msg, feed_id, offset, len(content), url) content = shorten(story_html_to_text(content), width=_MAX_STORY_CONTENT_LENGTH) # 如果取回的内容比RSS内容更短,就不是正确的全文 if num_sub_sentences is not None: if not is_fulltext_content(content): num_sentences = len(split_sentences(story_html_to_text(content))) if num_sentences <= num_sub_sentences: - msg = 'fetched story#%s url=%s num_sentences=%s less than num_sub_sentences=%s' - LOG.info(msg, story_id, url, num_sentences, num_sub_sentences) + msg = 'fetched story#%s,%s url=%s num_sentences=%s less than num_sub_sentences=%s' + LOG.info(msg, feed_id, offset, url, num_sentences, num_sub_sentences) return summary = shorten(story_html_to_text(content), width=_MAX_STORY_SUMMARY_LENGTH) if not summary: return ctx.hope('harbor_rss.update_story', dict( - story_id=story_id, + feed_id=feed_id, + offset=offset, content=content, summary=summary, url=url, @@ -332,11 +337,12 @@ def do_process_story_webpage( @actor('worker_rss.detect_story_images') async def do_detect_story_images( ctx: ActorContext, - story_id: T.int, + feed_id: T.int, + offset: T.int, story_url: T.url, image_urls: T.list(T.url).unique, ): - LOG.info(f'detect story images story_id={story_id} num_images={len(image_urls)} begin') + LOG.info(f'detect story images story={feed_id},{offset} num_images={len(image_urls)} begin') options = dict( allow_non_webpage=True, allow_private_address=CONFIG.allow_private_address, @@ -368,11 +374,12 @@ async def do_detect_story_images( else: num_error += 1 images.append(dict(url=url, status=status)) - LOG.info(f'detect story images story_id={story_id} ' + LOG.info(f'detect story images story={feed_id},{offset} ' f'num_images={len(image_urls)} finished, ' f'ok={num_ok} error={num_error} cost={cost_ms:.0f}ms') await ctx.hope('harbor_rss.update_story_images', dict( - story_id=story_id, + feed_id=feed_id, + offset=offset, story_url=story_url, images=images, )) diff --git a/runserver.sh b/runserver.sh new file mode 100755 index 0000000000000000000000000000000000000000..8349d2cb2a99b325eceedd47583ea1cc0b334588 --- /dev/null +++ b/runserver.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +while true; do + python manage.py runserver 0.0.0.0:6788 + if [ $? -eq 0 ]; then + break + fi + echo '* ----------------------------------------------------------------------' + sleep 3 +done diff --git a/scripts/seaweedfs_start.sh b/scripts/seaweedfs_start.sh new file mode 100755 index 0000000000000000000000000000000000000000..4bc4888f80ee406278534fec053141216b0acb7e --- /dev/null +++ b/scripts/seaweedfs_start.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +docker network create rssant || true +docker volume create rssant_seaweedfs +docker rm -f rssant-seaweedfs +docker run -d \ + --name rssant-seaweedfs \ + --log-driver json-file --log-opt max-size=50m --log-opt max-file=10 \ + --restart unless-stopped \ + --memory=500M \ + --cpus=0.5 \ + --network rssant \ + -p 127.0.0.1:9333:9333 \ + -p 127.0.0.1:19333:19333 \ + -p 127.0.0.1:9080:9080 \ + -p 127.0.0.1:19080:19080 \ + -v rssant_seaweedfs:/data \ + chrislusf/seaweedfs:1.77 \ + server \ + -dir /data \ + -volume.max 2 \ + -volume.index leveldb \ + -master.port=9333 \ + -volume.port=9080 \ + -ip 127.0.0.1 diff --git a/scripts/seaweedfs_wait_and_init.py b/scripts/seaweedfs_wait_and_init.py new file mode 100644 index 0000000000000000000000000000000000000000..ab11dd06eb6b8cab5013598443cda25de17c90cf --- /dev/null +++ b/scripts/seaweedfs_wait_and_init.py @@ -0,0 +1,28 @@ +import time +import sys +import requests + + +DEFAULT_URL = "http://127.0.0.1:9333/dir/assign" + + +def wait_and_init(url=None): + if not url and len(sys.argv) >= 2: + url = sys.argv[1] + if not url: + url = DEFAULT_URL + timeout = time.time() + 180 + while time.time() < timeout: + try: + response = requests.get(url) + if response.ok: + print(response.json()) + break + print("seaweedfs response:", response.status_code) + except Exception as ex: + print(ex) + time.sleep(1) + + +if __name__ == "__main__": + wait_and_init() diff --git a/tests/models/__init__.py b/tests/models/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/models/test_seaweed_client.py b/tests/models/test_seaweed_client.py new file mode 100644 index 0000000000000000000000000000000000000000..aed72899f074611cf5ee38b0e22ec2d553eb1ee6 --- /dev/null +++ b/tests/models/test_seaweed_client.py @@ -0,0 +1,48 @@ +import pytest +from rssant_api.models.story_storage import ( + SeaweedClient, SeaweedError +) + + +@pytest.mark.xfail(run=False, reason='depends on seaweed service') +def test_seaweed_client(): + volume_url = 'http://127.0.0.1:9080' + client = SeaweedClient(volume_url) + try: + fid = '1,01637037d6' + client.delete(fid) + + got = client.get(fid) + assert got is None + client.put(fid, b'hello world') + got = client.get(fid) + assert got == b'hello world' + + bad_fid = '1,01637037d7' + assert client.get(bad_fid) is None + with pytest.raises(SeaweedError): + client.put(bad_fid, b'hello seaweed') + + client.delete(fid) + got = client.get(fid) + assert got is None + finally: + client.close() + + +def test_seaweed_client_context(): + volume_url = 'http://127.0.0.1:9080' + with SeaweedClient(volume_url) as client: + assert client + + +@pytest.mark.xfail(run=False, reason='depends on seaweed service') +def test_batch_get(): + volume_url = 'http://127.0.0.1:9080' + with SeaweedClient(volume_url) as client: + fid = '1,01637037d6' + client.delete(fid) + client.put(fid, b'hello world') + bad_fid = '1,01637037d7' + got = client.batch_get([fid, bad_fid]) + assert set(got.keys()) == {fid, bad_fid} diff --git a/tests/models/test_seaweed_sharding.py b/tests/models/test_seaweed_sharding.py new file mode 100644 index 0000000000000000000000000000000000000000..d6f06b5df3b662f23026d3995a174593c1ea45ad --- /dev/null +++ b/tests/models/test_seaweed_sharding.py @@ -0,0 +1,75 @@ +import random +from collections import defaultdict + +from rssant_api.models.story_storage.seaweed.seaweed_sharding import ( + sharding_for, seaweed_volume_for, + SeaweedFileType, seaweed_fid_encode, seaweed_fid_decode, +) +from rssant_api.models.story_storage.common.story_key import StoryId, hash_feed_id + + +def test_hash_feed_id(): + for i in [0, 1, 2, 7, 1024, 2**31, 2**32 - 1]: + val = hash_feed_id(i) + assert val >= 0 and val < 2**32 + + +def test_sharding_for_0(): + for i in [0, 1, 2, 1, 1024 - 1, 2 * 1024, 8 * 1024 - 1]: + v = sharding_for(i) + assert v >= 0 and v < 8 + assert sharding_for(8 * 1024) >= 8 + + +def test_seaweed_volume_for(): + assert seaweed_volume_for(0) >= 1 + assert seaweed_volume_for(8 * 1024 - 1) >= 1 + assert seaweed_volume_for(8 * 1024) >= 9 + + +def test_sharding_for_group(): + for i in [0, 1024 - 1, 1024, 8 * 1024 - 1]: + val = sharding_for(i) + assert val >= 0 and val < 8 + for i in [8 * 1024, 8 * 1024 + 1, 16 * 1024 - 1]: + val = sharding_for(i) + assert val >= 8 and val < 16 + + +def test_sharding_for_uniform(): + volumes = defaultdict(lambda: 0) + N = 100000 + for i in range(N): + feed_id = random.randint(0, 8 * 1024 - 1) + volumes[sharding_for(feed_id)] += 1 + min_count = min(volumes.values()) + max_count = max(volumes.values()) + msg = f'min={min_count} max={max_count} volumes={dict(volumes)}' + assert (max_count - min_count) / N < 0.01, msg + + +def test_seaweed_fid(): + cases = [ + (123, 10, SeaweedFileType.CONTENT, '1,7b000000a200000000'), + (123, 1023, SeaweedFileType.CONTENT, '1,7b00003ff200000000'), + (123, 1023, SeaweedFileType.CONTENT, '1,7b00003ff200000000'), + ] + for feed_id, offset, ftype, expect in cases: + fid = seaweed_fid_encode(feed_id, offset, ftype) + msg = f'expect {feed_id, offset, ftype} -> {expect}, got {fid}' + assert fid == expect, msg + volume_id, x_feed_id, x_offset, x_ftype = seaweed_fid_decode(fid) + assert volume_id >= 1 and volume_id < 8 + assert x_feed_id == feed_id + assert x_offset == offset + assert x_ftype == ftype + + +def test_story_id(): + cases = [ + (123, 10, 0x7b000000a0), + (123, 1023, 0x7b00003ff0), + ] + for feed_id, offset, story_id in cases: + assert StoryId.encode(feed_id, offset) == story_id + assert StoryId.decode(story_id) == (feed_id, offset) diff --git a/tests/models/test_seaweed_story.py b/tests/models/test_seaweed_story.py new file mode 100644 index 0000000000000000000000000000000000000000..483aa95ce2136e7d4caddd3a815acc6cd4d53f6d --- /dev/null +++ b/tests/models/test_seaweed_story.py @@ -0,0 +1,63 @@ +import datetime + +import pytest + +from rssant_api.models.story_storage import SeaweedStoryStorage +from rssant_api.models.story_storage import StoryData + + +class MockSeaweedClient: + def __init__(self): + self._store = {} + + def get(self, fid: str) -> bytes: + return self._store.get(fid) + + def batch_get(self, fid_s: list) -> list: + return {fid: self._store.get(fid) for fid in fid_s} + + def put(self, fid: str, data: bytes) -> None: + self._store[fid] = data + + def delete(self, fid: str) -> None: + self._store.pop(fid, None) + + +def test_encode_decode_json(): + dt = datetime.datetime(2020, 5, 23, 12, 12, 12, tzinfo=datetime.timezone.utc) + base = { + 'key': 'value', + 'text': '你好', + 'number': 123, + } + value = {**base, 'datetime': dt} + expect = {**base, 'datetime': '2020-05-23T12:12:12.000000Z'} + data = StoryData.encode_json(value) + got = StoryData.decode_json(data) + assert got == expect + + +def test_encode_decode_text(): + text = 'hello world\n你好世界\n' + data = StoryData.encode_text(text) + got = StoryData.decode_text(data) + assert got == text + + +CONTENTS = { + 'empty': None, + 'simple': 'hello world\n你好世界\n', +} + + +@pytest.mark.parametrize('content_name', list(CONTENTS)) +def test_seaweed_story_storage(content_name): + client = MockSeaweedClient() + storage = SeaweedStoryStorage(client) + content = CONTENTS[content_name] + storage.save_content(123, 234, content) + got = storage.get_content(123, 234) + assert got == content + storage.delete_content(123, 234) + got = storage.get_content(123, 234) + assert got is None diff --git a/tests/models/test_story_unique_ids.py b/tests/models/test_story_unique_ids.py new file mode 100644 index 0000000000000000000000000000000000000000..ed8030a934a2ed800a70fe959405a9a7823e4f58 --- /dev/null +++ b/tests/models/test_story_unique_ids.py @@ -0,0 +1,24 @@ +import pytest +from rssant_api.models.story_unique_ids import StoryUniqueIdsData + + +CASES = { + 'empty': (0, []), + 'one': (1, [ + '93C07B6C-D848-4405-A349-07A3775FA0A9', + ]), + 'two': (3, [ + 'https://www.example.com/2.html', + 'https://www.example.com/3.html', + ]) +} + + +@pytest.mark.parametrize('case_name', list(CASES)) +def test_encode_decode(case_name): + begin_offset, unique_ids = CASES[case_name] + data = StoryUniqueIdsData(begin_offset, unique_ids=unique_ids) + data_bytes = data.encode() + got = StoryUniqueIdsData.decode(data_bytes) + assert got.begin_offset == begin_offset + assert got.unique_ids == unique_ids