量化交易系统之数据管理器
MongodbDataHandler
本来打算先写一个简单的,基于csv文件的数据管理器,考虑到真实应用,还是直接连接数据库吧。这里我们使用mongodb,使用mysql等关系数据库也是一样的。我们主要使用pandas的Dataframe作用数据结构。
具体实现
在aiquant/engine/data.py下,继承自抽象基类DataHandler,需要实现update_bars函数。
class MongodbHandler(DataHandler):
#events是事件管理器
#start,end是起止日期
#benchmark是回测的参考标的,一般是指数,比如上证综指或沪深300指数
def __init__(self,events,start_date,end_date,benchmark='000300'):
#交易日列表,这个需要从指数里取,在这里最好从banchmark里取,因为普通股票有存在停牌的可能,在某几个交易日无交易数据
self.trading_days = []
#我们的投资组合空间,只加载这里的数据,如果为空,则全部加载
self.universe = []
#handler内部数据结构
self.symbol_bars_list = {}#这是{symbol:[]}
self.symbol_latest_bars = {}#这是最近一天的{symbol:bar}
def update_bars(self):
#这里查出当天的universe里的列表[{},{}]
daily_datas = data_utils.get_prices_by_date(self.dt_current,self.universe)
#当天的数据,保存到两个内存数据结构里,方便strategy调用
for data in daily_datas:
code = data['code']
if code not in self.symbol_bars.keys():
self.symbol_bars[code] = []
self.symbol_bars_list[code].append(data)
self.symbol_latest_bars[code] = data
#发生onbar事件,并返回True
self.events.put(BarEvent())
return True
在aiquant/data/data_utils.py里实现数据查询函数
#查询某个交易日下,codes指定的证券的k线
def get_prices_by_date(date,codes):
items = query_docs('prices', {'code':{'$in':codes}, 'date': date})
return list(items)
#返回格式如下:
_id amount close code com_code date high \
0 000001_2017-12-05 2.301868e+09 13.30 000001 3 2017-12-05 13.49
1 000002_2017-12-05 1.747241e+09 31.03 000002 6 2017-12-05 31.48
low open volume
0 13.1 13.15 172368132.0
1 30.2 30.60 56430201.0
#查询这个日期周期内的k线数据
def get_prices_by_code(code,start_date,end_date):
items = query_docs('price_no_restore', {'code': code, 'date': {'$gt': start_date,
'$lt': end_date}})
#返回如下格式:
_id amount close code com_code date high \
0 000001_2016-06-29 319908690.0 8.69 000001 3 2016-06-29 8.69
1 000001_2016-06-28 289122214.0 8.63 000001 3 2016-06-28 8.64
2 000001_2016-06-27 280044073.0 8.61 000001 3 2016-06-27 8.64
3 000001_2016-06-24 367771054.0 8.57 000001 3 2016-06-24 8.70
4 000001_2016-06-23 290490407.0 8.66 000001 3 2016-06-23 8.70
5 000001_2016-06-22 413947369.0 8.73 000001 3 2016-06-22 8.73
6 000001_2016-06-21 327940703.0 8.61 000001 3 2016-06-21 8.65
7 000001_2016-06-20 232205298.0 8.60 000001 3 2016-06-20 8.60
8 000001_2016-06-17 270236698.0 8.58 000001 3 2016-06-17 8.61
9 000001_2016-06-16 331192445.0 8.57 000001 3 2016-06-16 8.60
10 000001_2016-06-15 394033686.0 10.44 000001 3 2016-06-15 10.48
11 000001_2016-06-14 283578297.0 10.40 000001 3 2016-06-14 10.41
上面还有提到一个BarEvent(),定义在aiquant/engine/event.py里
class Event(object):
pass
#只有一个type,就是告知相关方,有一个新的bar到达,本身事件不传递数据结构,需要用到数据可以到DataHandler里去取。
class BarEvent(Event):
def __init__(self):
self.type = 'BAR'
关于作者:魏佳斌,互联网产品/技术总监,北京大学光华管理学院(MBA),特许金融分析师(CFA),资深产品经理/码农。偏爱python,深度关注互联网趋势,人工智能,AI金融量化。致力于使用最前沿的认知技术去理解这个复杂的世界。 扫描下方二维码,关注:AI量化实验室(ailabx),了解AI量化最前沿技术、资讯。