1 Star 0 Fork 67

lilinfeng88 / pyctp

forked from 海风 / pyctp 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
test_trade.py 3.05 KB
一键复制 编辑 原始数据 按行查看 历史
海风 提交于 2024-03-15 18:42 . 升级: v6.7.2.20230913
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__title__ = 'test py ctp of se'
__author__ = 'HaiFeng'
__mtime__ = '20190506'
from py_ctp.trade import CtpTrade
from py_ctp.quote import CtpQuote
from py_ctp.enums import *
import time
class TestTrade(object):
def __init__(self, addr: str, broker: str, investor: str, pwd: str, appid: str, auth_code: str):
self.front = addr
self.broker = broker
self.investor = investor
self.pwd = pwd
self.appid = appid
self.authcode = auth_code
self.t = CtpTrade()
self.t.OnConnected = self.on_connect
self.t.OnUserLogin = lambda o, x: print('Trade logon:', x)
self.t.OnDisConnected = lambda o, x: print('Trade release:', x)
self.t.OnRtnNotice = lambda obj, time, msg: print(f'OnNotice: {time}:{msg}')
self.t.OnErrRtnQuote = lambda obj, quote, info: None
self.t.OnErrRtnQuoteInsert = lambda obj, o: None
self.t.OnOrder = lambda obj, o: print(o)
self.t.OnErrOrder = lambda obj, f, info: print(info)
self.t.OnTrade = lambda obj, o: None
self.t.OnCancel = lambda obj, o: print(o)
self.t.OnInstrumentStatus = lambda obj, inst, stat: None
self.t.OnRspError = lambda obj, info: print(info)
def on_connect(self, obj):
self.t.ReqUserLogin(self.investor, self.pwd, self.broker, self.appid, self.authcode)
def run(self):
print('trade connect...')
self.t.ReqConnect(self.front)
def release(self):
self.t.ReqUserLogout()
class TestQuote(object):
"""TestQuote"""
def __init__(self, addr: str, broker: str, investor: str, pwd: str):
""""""
self.front = addr
self.broker = broker
self.investor = investor
self.pwd = pwd
self.q = CtpQuote()
self.q.OnTick = lambda o, x: print(x)
self.q.OnConnected = lambda x: self.q.ReqUserLogin(self.investor, self.pwd, self.broker)
self.q.OnUserLogin = self.onlogin
def run(self):
print('quote connecting...')
self.q.ReqConnect(self.front)
def release(self):
self.q.ReqUserLogout()
def onlogin(self, obj, info):
print(info)
self.q.ReqSubscribeMarketData('rb2409')
if __name__ == "__main__":
front_trade = 'tcp://180.168.146.187:10130'
front_quote = 'tcp://180.168.146.187:10131'
broker = '9999'
investor = '008107'
pwd = '1'
appid = 'simnow_client_test'
auth_code = '0000000000000000'
tt = TestTrade(front_trade, broker, investor, pwd, appid, auth_code)
tt.run()
while not tt.t.logined:
time.sleep(3)
print('account info')
print(tt.t.account)
print(len(tt.t.instruments))
tt.t.ReqOrderInsert('rb2409', DirectType.Sell, OffsetType.Open, 4002, 2)
qq = TestQuote(front_quote, broker, investor, pwd)
qq.run()
time.sleep(5)
print('press ENTER key to release')
input()
# for inst in tt.t.instruments.values():
# print(inst)
print('trade release')
tt.release()
print('quote release')
qq.release()
Python
1
https://gitee.com/lilinfeng88/pyctp.git
git@gitee.com:lilinfeng88/pyctp.git
lilinfeng88
pyctp
pyctp
master

搜索帮助