C++ Workflow是一个高性能的异步引擎,本项目着力于实现一个Python版的Workflow,让Python用户也能享受Workflow带来的绝佳体验。
在用户深入了解Workflow相关概念之前,先来看几个简单的示例,可以对使用方法有一个初步印象。pywf是本项目Python包的名称,在文档中有时会直接使用wf
作为其简称。
import pywf as wf
def http_callback(http_task):
resp = http_task.get_resp()
print("Http status:{}\n{}".format(
resp.get_status_code(), resp.get_body())) # body is bytes
http_task = wf.create_http_task("http://www.sogou.com/", redirect_max=4, retry_max=2, callback=http_callback)
http_task.start()
wf.wait_finish()
import pywf as wf
def series_callback(s):
print("All task in this series is done")
def http_callback(http_task):
req = http_task.get_req()
resp = http_task.get_resp()
print("uri:{} status:{}".format(
req.get_request_uri(),
resp.get_status_code()))
def create_http_task(url):
return wf.create_http_task(url, 4, 2, http_callback)
first_task = create_http_task("http://www.sogou.com")
series = wf.create_series_work(first_task, series_callback)
series.push_back(create_http_task("https://www.zhihu.com/people/kedixa"))
series.push_back(create_http_task("https://fanyi.sogou.com/document"))
series.start()
wf.wait_finish()
import pywf as wf
def parallel_callback(p):
print("All series in this parallel is done")
def http_callback(http_task):
req = http_task.get_req()
resp = http_task.get_resp()
print("uri:{} status:{}".format(
req.get_request_uri(),
resp.get_status_code()))
url = [
"http://www.sogou.com",
"https://www.zhihu.com/people/kedixa",
"https://fanyi.sogou.com/document"
]
parallel = wf.create_parallel_work(parallel_callback)
for u in url:
task = wf.create_http_task(u, 4, 2, http_callback)
series = wf.create_series_work(task, None) # without callback
parallel.add_series(series)
parallel.start()
wf.wait_finish()
通过create_xxx_task
等工厂函数创建的对象称作任务(task),例如create_http_task
。一个任务被创建后,必须被启动或取消,通过执行http_task.start()
,会自动以http_task
为first_task
创建一个串行并立即启动任务。如果用户指定了回调函数,当任务完成时回调函数会被调用,但在任务启动后且回调函数前,用户不能再操作该任务。当回调函数结束后,该任务被立即释放,用户也不能再操作该任务。
通过create_series_work
创建的对象称作串行(series),用户在创建时需要指定一个first_task
来作为启动该series
启动时应当执行的任务,用户可选地指定一个回调函数,当所有任务执行完成后,回调函数会被调用。
series
的回调函数用于通知用户该串行中的任务均已完成,不能再继续添加新的任务,且回调函数结束后,该串行会立即被销毁。
通过create_parallel_work
创建的对象称作并行(parallel),用户可以创建一个空的并行,然后通过add_series
接口向并行中添加串行,也可以在创建时指定一组串行。并行本身也是一种任务,所以并行也可以放到串行中。parallel.start()
就会自动创建一个串行,并将parallel
作为first_task
立即开始执行。
parallel
的回调函数用于通知用户该并行中的串行均已完成,不能再继续添加新的串行,且回调函数结束后,该并行会立即被销毁。
有了上述三个概念,就可以构建出各种复杂的任务结构,并在Workflow的管理下高效执行。
Workflow认为,一个典型的后端程序由三个部分组成,并且完全独立开发。即:程序=协议+算法+任务流。
Python Workflow将会逐步支持Workflow的六种基础任务:通讯,文件IO,CPU,GPU,定时器,计数器。
get
接口获取的对象可以自由使用,例如Http中的get_body
。本项目支持Python 3.6以上,在pypi上发布了一组Python 3.6、3.7、3.8、3.9的manylinux2014
版本,用户可以通过较高版本的pip直接安装。
pip3 install pywf
用户可以下载本项目源码进行编译安装。
# CentOS 7
yum install cmake3 ninja-build python36 python36-devel python36-pip
yum install gcc-c++ # if needed
git clone https://github.com/sogou/pyworkflow --recursive
cd pyworkflow
pip3 install wheel
python3 setup.py bdist_wheel
pip3 install dist/*.whl --user
# CentOS 8
yum install cmake ninja-build python36 python36-devel python3-pip
git clone https://github.com/sogou/pyworkflow --recursive
cd pyworkflow
pip3 install wheel
python3 setup.py bdist_wheel
pip3 install dist/*.whl --user
# Mac
# Build Requirement
brew install ninja cmake openssl
pip3 install wheel
# OpenSSL Env, see 'brew info openssl' for more infomation
echo 'export PATH="/usr/local/opt/openssl@1.1/bin:$PATH"' >> ~/.bash_profile
echo 'export LDFLAGS="-L/usr/local/opt/openssl@1.1/lib"' >> ~/.bash_profile
echo 'export CPPFLAGS="-I/usr/local/opt/openssl@1.1/include"' >> ~/.bash_profile
echo 'export PKG_CONFIG_PATH="/usr/local/opt/openssl@1.1/lib/pkgconfig"' >> ~/.bash_profile
echo 'export OPENSSL_ROOT_DIR="/usr/local/opt/openssl@1.1/"' >> ~/.bash_profile
# zsh Env, if needed
echo 'test -f ~/.bash_profile && source ~/.bash_profile' >> ~/.zshrc
source ~/.zshrc
# Build
git clone https://github.com/sogou/pyworkflow --recursive
cd pyworkflow
python3 setup.py bdist_wheel
pip3 install dist/*.whl --user
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。