量化策略:量化买卖压力选股策略(附源码)
股票的任何一笔交易都需要对手盘的存在,这是一个双方买卖撮合的过程。在这一过程中,股票价格的变动与买压(买入强度)、卖压(卖出强度)相关。
买压越大,价格上涨的概率越大,卖压越大,价格下跌的概率越大。
那如何衡量股票的买压和卖压呢?我们来看一份由东方证券发布的研报——《基于量价关系度量股票的买卖压力》。
研报中认为:成交量在价格高位放大时,卖压较大;成交量在价格低位放大时,买压更大。那么当价格高位下的成交量放大时,vwap(成交量加权价格)就比较高,当价格低位下的成交量放大时,vwap就比较低。
为此,其中提出了用股票i在第m个月的均价偏差(average price bias, APB)以度量买卖压力,具体定义如下:
基于APB因子进一步构建APB_5d指标:基于过去 5 个交易日滚动计算 APB,过去 1 个月求均值。其中,研报中并没有给出vwap均价的计算方式,这里为了计算的方便以当天成交额比成交量计算平均价格作为代替。
进一步编写回测,相关参数如下:
初始资金:1000万
基准指数:沪深300
回测品种:沪深300成分股/全A
回测区间:2017年12月29日-2022年01月07日
样本过滤:剔除停牌股、ST股、次新股(一年期)
因子参数:N为9个月,M为3个月
数据预处理:异常值处理,标准化、市值中性化
交易逻辑:买入因子最大前N只股票,月末调仓换股;涨停买不入,跌停卖不出。
我们分别以沪深300成分股和全A股票进行回测,持仓数量分别为30、50和100只股票。
整体而言,策略效果较为一般。
在全A股票中持股100只的形式下该策略能够获得较好收益,只是持股50只与100只的策略表现差异较大,需要进一步细分持股数量以观察该参数是否存在幸存偏差。
策略python代码
# coding=utf-8from __future__ importprint_function, absolute_import
from gm.api import*
importmath
importdatetime
import numpy asnp
import pandas aspd
import statsmodels.api assm
importmultiprocessing
# 策略中必须有init方法def init(context): # 每天的09:30 定时执行algo任务 schedule(schedule_func=algo, date_rule=1d, time_rule=09:31:00)
context.N = 5 context.M = 21def algo(context): # 当前时间str today = context.now.strftime(“%Y-%m-%d %H:%M:%S”)
# 下一个交易日 next_date = get_next_trading_date(exchange=SZSE, date=today)
# 上一个交易日 last_date = get_previous_trading_date(exchange=SHSE, date=context.now)
# 每月最后一个交易日时换股 if today[5:7]!=next_date[5:7]:
if context.base_security==ALL:
# 获取全A股票(剔除停牌股和ST股)all_stocks,all_stocks_str = get_normal_stocks(context.now)
else:
# 获取指数成分股all_stocks = get_history_constituents(index=context.base_security, start_date=last_date,end_date=last_date)
all_stocks_str = ,.join(all_stocks[0][constituents].keys())
# 计算新版动量因子factor = get_alpha_APB(context,all_stocks_str,last_date,context.N,context.M)
# 获取最大因子的前N只股票 trade_stocks = list(factor.replace([-np.inf,np.inf],np.nan).dropna().sort_values(ascending=False)[:context.num].index)
print(context.now,待买入股票{}只:{}.format(len(trade_stocks),trade_stocks))
## 股票交易 # 获取持仓positions = context.account().positions()
# 卖出不在trade_stocks中的持仓(跌停不卖出) for position inpositions:
symbol = position[symbol]
if symbol not intrade_stocks:
price_limit = get_history_instruments(symbol, fields=lower_limit, start_date=context.now, end_date=context.now, df=True)
new_price = history(symbol=symbol, frequency=60s, start_time=context.now, end_time=context.now, fields=close, df=True)
if symbol not in trade_stocks and (len(new_price)==0 or len(price_limit)==0 or price_limit[lower_limit][0]!=round(new_price[close][0],2)):
# new_price为空时,是开盘后无成交的现象,此处忽略该情况,可能会包含涨跌停的股票 order_target_percent(symbol=symbol, percent=0, order_type=OrderType_Market, position_side=PositionSide_Long)
# 买入股票(涨停不买入) for symbol intrade_stocks:
price_limit = get_history_instruments(symbol, fields=upper_limit, start_date=context.now, end_date=context.now, df=True)
new_price = history(symbol=symbol, frequency=60s, start_time=context.now, end_time=context.now, fields=close, df=True)
if len(new_price)==0 or len(price_limit)==0 or price_limit[upper_limit][0]!=round(new_price[close][0],2):
# new_price为空时,是开盘后无成交的现象,此处忽略该情况,可能会包含涨跌停的股票 order_target_percent(symbol=symbol, percent=1/len(trade_stocks), order_type=OrderType_Market, position_side=PositionSide_Long)
# 获取每次回测的报告数据def on_backtest_finished(context, indicator): data = [indicator[pnl_ratio], indicator[pnl_ratio_annual], indicator[sharp_ratio], indicator[max_drawdown],
context.num]
# 将超参加入context.resultcontext.result.append(data)
def get_normal_stocks(date):“””
获取目标日期date的A股代码(剔除停牌股、ST股、次新股(一年期))
:param date:目标日期
“”” ifisinstance(date,str):
date = datetime.datetime.strptime(date,“%Y-%m-%d %H:%M:%S”)
df_code = get_instruments(sec_types=SEC_TYPE_STOCK, skip_suspended=True, skip_st=True, fields=symbol, sec_level, is_suspended, listed_date, delisted_date, df=True)
all_stocks = [code for code in df_code[(df_code[sec_level]==1)&(df_code[is_suspended]==0)&(df_code[listed_date]<=date-datetime.timedelta(days=365))&(df_code[delisted_date]>date)].symbol.to_list() if code[:6]!=SHSE.9 and code[:6]!=SZSE.2]
all_stocks_str = ,.join(all_stocks)
returnall_stocks,all_stocks_str
def get_previous_N_trading_date(date,counts=1):“””
获取end_date前N个交易日,end_date为datetime格式
:param date:目标日期
:param counts:历史回溯天数,默认为1,即前一天
“”” if isinstance(date,str) and len(date)>10:
date = datetime.datetime.strptime(date,%Y-%m-%d %H:%M:%S)
if isinstance(date,str) and date[4]==–:
date = datetime.datetime.strptime(date,%Y-%m-%d)
previous_N_trading_date = get_trading_dates(exchange=SHSE, start_date=date-datetime.timedelta(days=counts+30), end_date=date)[-counts]
returnprevious_N_trading_date
def get_alpha_APB(context,security,date,N=5,M=21,frequency=1d):“””计算 买卖压力 因子数据
:param security:目标股票,***,***,***格式
:param date:目标日期
:param N:基于过去N个交易日滚动计算APB
:param M:过去M个交易日求均值(21天为一个月)
“”” # 获取结束日期end_date = date
# 获取开始日期start_date = get_previous_N_trading_date(date,counts=N+M)
amount = history_new(security,frequency=frequency,start_time=start_date,end_time=end_date,fields=eob,symbol,amount,skip_suspended=True,fill_missing=None,adjust=ADJUST_POST,df=True).fillna(method=ffill)
volume = history_new(security,frequency=frequency,start_time=start_date,end_time=end_date,fields=eob,symbol,volume,skip_suspended=True,fill_missing=None,adjust=ADJUST_POST,df=True).fillna(method=ffill)
vwap = amount/volume
mean_vwap = vwap.rolling(window=N,min_periods=N).mean()
volume_sum = volume.rolling(window=N,min_periods=N).sum()
volume_vwap = vwap*volume
volume_vwap_sum = volume_vwap.rolling(window=N,min_periods=N).sum()
denominator = volume_vwap_sum/volume_sum# 分母 APB = (mean_vwap/denominator).apply(np.log).fillna(method=ffill).dropna(how=all)
alpha_factor = APB.iloc[-M:,:].mean()
# 去极值alpha_factor = winsorize_med(alpha_factor)
# 标准化alpha_factor = standardlize(alpha_factor)
# 市值中性化alpha_factor = neutralize_MarketValue(context,alpha_factor,date)
returnalpha_factor
def history_new(security,frequency,start_time,end_time,fields,skip_suspended=True,fill_missing=None,adjust=ADJUST_POST,df=True): # 分区间获取数据(以避免超出数据限制)(start_time和end_date为字符串,fields需包含eob和symbol)Data = pd.DataFrame()
trading_date = get_trading_dates(exchange=SZSE, start_date=start_time, end_date=end_time)
space = 5 iflen(trading_date)<=space:
Data = history(security, frequency=frequency, start_time=start_time, end_time=end_time, fields=fields, skip_suspended=skip_suspended, fill_missing=fill_missing, adjust=adjust, df=df)
else:
for n inrange(int(np.floor(len(trading_date)/space))):
start = n*space
end = start+space
ifend==len(trading_date):
data = history(security, frequency=frequency, start_time=trading_date[start], end_time=trading_date[end-1], fields=fields, skip_suspended=skip_suspended, fill_missing=fill_missing, adjust=adjust, df=df)
else:
data = history(security, frequency=frequency, start_time=trading_date[start], end_time=trading_date[end], fields=fields, skip_suspended=skip_suspended, fill_missing=fill_missing, adjust=adjust, df=df)
if len(data)>=33000:
print(请检查返回数据量,可能超过系统限制,缺少数据!!!!!!!!!!)
Data = pd.concat([Data,data])
Data.drop_duplicates(keep=first,inplace=True)
Data = Data.set_index([eob,symbol])
Data = Data.unstack()
Data.columns = Data.columns.droplevel(level=0)
returnData
def valid_sample_size(data,min_size_rate=2/3,axis=0): “””有效样本数量,默认最低比例为2/3″””min_size = int(round(len(data)*min_size_rate))
nan_data = np.isnan(data).sum(axis=axis)
security = nan_data[nan_data
data = data.loc[:,security]
returndata
def winsorize_med(data, scale=3, inclusive=True, inf2nan=True):“””
去极值
:param data:待处理数据[Series]
:param scale:标准差倍数,默认为3
:param inclusive:True为将边界外的数值调整为边界值,False为将边界外的数值调整为NaN
:param inf2nan:True为将inf转化为nan,False不转化
“”” data = data.astype(float)
ifinf2nan:
data = data.replace([np.inf, -np.inf], np.nan)
std_ = data.std()
mean_ = data.mean()
ifinclusive:
data[data>mean_+std_*scale]=mean_+std_*scale
data[data
else:
data[data>mean_+std_*scale]=np.nan
data[data
returndata
def standardlize(data, inf2nan=True):“””
标准化
:param data:待处理数据
:param inf2nan:是否将inf转化为nan
“”” ifinf2nan:
data = data.replace([np.inf, -np.inf], np.nan)
return(data – data.mean()) / data.std()
def neutralize_MarketValue(context,data,date,counts=1):“””
市值中性化
:param data:待处理数据
:param date:目标日期
:param counts:历史回溯天数
“”” ifisinstance(data,pd.Series):
data = data.to_frame()
data = data.dropna(how=any)
security = data.index.to_list()
market_value = get_fundamentals_n(table=trading_derivative_indicator, symbols=security, end_date=date, fields=TOTMKTCAP,count=counts, df=True)
max_date = market_value[pub_date].max()
market_value = market_value[market_value[pub_date]==max_date][[symbol,TOTMKTCAP]].set_index(symbol).dropna(how=any)
x = sm.add_constant(market_value)
common_index = list(set(x.index) & set(data.index))
x = x.loc[common_index,:]
data = data.loc[common_index,:]
residual = sm.OLS(data, x).fit().resid# 此处使用最小二乘回归计算 returnresidual
def run_strategy(num,base_security): from gm.model.storage importcontext
# 用context传入回测次数参数context.num = num
# 股票池基准context.base_security = base_security
# context.result用以存储超参context.result = []
# 循环输入参数 base_security = ALL# 基础股票池:ALL为全A股票,SHSE.000300为沪深300成分股,SHSE.000905为中证500成分股 num = [30,50,100]
a_list = []
pool = multiprocessing.Pool(processes=len(num), maxtasksperchild=1) # create processes for i inrange(len(num)):
a_list.append(pool.apply_async(func=run_strategy, args=(num[i],base_security,)))
pool.close()
pool.join()
info = []
for pro ina_list:
print(pro, pro.get()[0])
info.append(pro.get()[0])
print(info)
info = pd.DataFrame(np.array(info), columns=[pnl_ratio, pnl_ratio_annual, sharp_ratio, max_drawdown, holding_num])
info.to_csv(买卖压力因子策略:{}.csv.format(base_security), index=False)
return context.result
if __name__ == __main__:
strategy_id策略ID, 由系统生成
filename文件名, 请与本文件名保持一致
mode运行模式, 实时模式:MODE_LIVE回测模式:MODE_BACKTEST
token绑定计算机的ID, 可在系统设置-密钥管理中生成
backtest_start_time回测开始时间
backtest_end_time回测结束时间
backtest_adjust股票复权方式, 不复权:ADJUST_NONE前复权:ADJUST_PREV后复权:ADJUST_POST
backtest_initial_cash回测初始资金
backtest_commission_ratio回测佣金比例
backtest_slippage_ratio回测滑点比例
run(strategy_id=adbd86dd-5d90-11ec-9850-7085c223669d,
filename=main.py,
mode=MODE_BACKTEST,
token={{token}},
backtest_start_time=2017-12-29 08:00:00,
backtest_end_time=2022-01-07 16:00:00,
backtest_adjust=ADJUST_PREV,
backtest_initial_cash=10000000,
backtest_commission_ratio=0.0016,# 买入万三手续费+卖出万三手续费和千1印花税,免5 backtest_slippage_ratio=0.00246)
声明:本内容仅供学习、交流、演示之用,不构成任何投资建议!点击【阅读原文】
了解东财证券机构交易平台。
发表回复