量化回测引擎之DataHandler实现
还是在aiquant/engine/data.py下,继承DataHandler。
有几个关键点: 1,事件管理器,update_bars之后会触发onbar事件。
2,起止日期:回测周期
3,benchmark:参考标的,是某一个指数
4,universe需要重点说明,如果传入一个str型,代表取这个指数的成份股,比如000300是沪深300,如果传入list型,则代表一个股票池,一般是自己的自选股池。为None入代表沪深股票全集
5,针对benchmark指数的trading_days作循环。
6,每一个bar,会把数据缓存在symbol_bars_list,symbol_latest_bars里,前者是{‘code’:[bars(时间正序)]},后者是{‘code’:bar}
7,沿着trading_day从起始时间开始,往终止时间。
class MongodbHandler(DataHandler):
def __init__(self,events,start_date,end_date,benchmark='000300',universe='000300'):
#如果universe是一个字符串,则代表指数,如果是[],代表一个集合,为空则为全集
if type(universe) is str:
self.universe = list(pd.DataFrame(data_utils.get_index_components(universe))['code'])
elif type(universe) is list:
self.universe = universe
else:
self.universe = []
self.events = events
self.benchmark = Benchmark(benchmark,start_date,end_date)
self.trading_days = self.benchmark.get_trading_days()
self.update_index = 0
self.symbol_bars_list = {}#这是{symbol:[]}
self.symbol_latest_bars = {}#这是最近一天的{symbol:bar}
更新数据并触发onbar事件:
#这个函数向服务器请求新的bar,然后出发onbar事件,表示新一期收盘
def update_bars(self):
if self.update_index > (len(self.trading_days) -1):
return False
self.dt_current = self.trading_days[self.update_index]
self.update_index += 1
#这里查出当天的universe里的列表[{},{}]
daily_datas = data_utils.get_prices_by_date(self.dt_current,self.universe)
for data in daily_datas:
code = data['code']
if code not in self.symbol_bars_list.keys():
self.symbol_bars_list[code] = []
self.symbol_bars_list[code].append(data)
self.symbol_latest_bars[code] = data
self.symbol_latest_bars['date'] = self.dt_current
self.events.put(BarEvent())
return True
在数据预加载之后,提供接口供调用:
#取某symbol的最近N个bar,比如计算5日均线
def get_latest_bars(self, symbol, N=1):
try:
bars_list = self.symbol_bars_list[symbol]
except KeyError:
print("symbol:%s没有被加载,请检查universe配置。"%(symbol))
else:
return bars_list[-N:]
至此,一个基于mongo数据库的数据管理器就搞定 了,只需要配置起始时间,就会每天从服务器读取k线数据并缓存在内存中供回测使用。而且这种方式,有效规避使用未来数据的问题。因为在当天收盘之前,是取不到后面的数据。