login("admin","123456") /** * 因子1:年化收益率 (1 + 当前收益率) ** (252\区间天数) -1 */ defg getAnnualReturn(value){ return pow(1 + ((last(value) - first(value))\first(value)), 252\730) - 1 } /** * 因子2:年化波动率 净值波动率:指净值的波动程度,某段时间内,净值的变动的标准差。 净值年化波动率:指将净值波动率年化处理。计算方式为波动率* sqrt(N) 。 (日净值N=250,周净值N=52,月净值N=12) */ defg getAnnualVolatility(value){ return std(deltas(value)\prev(value)) * sqrt(252) } /** * 因子3:偏度 偏度:指将收益率偏度年化处理。计算方式为偏度* sqrt(N) 。 (日净值N=250,周净值N=52,月净值N=12) */ defg getAnnualSkew(value){ return skew(deltas(value)\prev(value)) } /** * 因子4:峰度 峰度:指将净值峰度年化处理。计算方式为峰度* sqrt(N) 。 (日净值N=250,周净值N=52,月净值N=12) */ defg getAnnualKur(value){ return kurtosis(deltas(value)\prev(value)) } /** * 因子5:夏普比率 无风险收益率按3% 夏普比率:(年化收益率 - 无风险利率) / 收益年化波动率 */ defg getSharp(value){ return (getAnnualReturn(value) - 0.03)\getAnnualVolatility(value) as sharpeRat } /** * 因子6:最大回撤率 最大回撤率:max(1-第i天净值 / 第j天净值) */ def getMaxDrawdown(value){ i = imax((cummax(value) - value) \ cummax(value)) if (i==0){ return 0 } j = imax(value[:i]) return (value[j] - value[i]) \ (value[j]) } /** * 因子7:收益回撤比 收益回撤比:年化收益率 / 最大回撤率 */ def getDrawdownRatio(value){ return getAnnualReturn(value) \ getMaxDrawdown(value) } /** * 因子8:β指数 */ def getBeta(value, price){ return covar(deltas(value)\prev(value), deltas(price)\prev(price)) \ std(deltas(price)\prev(price)) } /** * 因子9:α指数 */ def getAlpha(value, price){ return getAnnualReturn(value) - 0.03 - getBeta(value, price) * (getAnnualReturn(price) - 0.03) } /** * 因子10:hist 指数 * */ def calAllRs2(mret, symList, k){ rowCount = mret.rows()/k * k demeanCum = rolling(cumsum, mret[0:rowCount,] - each(stretch{, rowCount}, rolling(avg, mret, k, k)), k, k) a = rolling(max, demeanCum, k, k) - rolling(min, demeanCum, k, k) RS = nullFill!(a/rolling(stdp, mret, k, k), 1.0).mean().log() return table(symList as fundNum, take(log(k), symList.size()) as knum, RS as factor1) } /** * 因子执行时间统计 */ def getFactor(result2, symList){ Return = select fundNum, getAnnualReturn(value) as annualReturn, getAnnualVolatility(value) as annualVolRat, getAnnualSkew(value) as skewValue, getAnnualKur(value) as kurValue, getSharp(value) as sharpValue, getMaxDrawdown(value) as MaxDrawdown, getDrawdownRatio(value) as DrawdownRatio, getBeta(value, price) as Beta, getAlpha(value, price) as Alpha from result2 where tradingDate in 2018.05.24..2021.05.27 and fundNum in symList group by fundNum }//定义计算九个因子的函数 def parJob(){ timer{fund_OLAP=select * from loadTable("dfs://fund_OLAP", "fund_OLAP") fund_hs_OLAP=select * from loadTable("dfs://fund_OLAP", "fund_hs_OLAP") ajResult = select tradingdate, fundNum, value, fund_hs_OLAP.tradingDate as hstradingDate, fund_hs_OLAP.value as price from aj(fund_OLAP, fund_hs_OLAP, `tradingDate) result2 = select tradingdate, fundNum, iif(isNull(value), ffill!(value), value) as value,price from ajResult where tradingDate == hstradingDate symList = exec distinct(fundNum) as fundNum from result2 order by fundNum symList2 = symList.cut(250) portfolio = select fundNum as fundNum, (deltas(value)\prev(value)) as log, tradingDate as tradingDate from result2 where tradingDate in 2018.05.24..2021.05.27 and fundNum in symList m_log = exec log from portfolio pivot by tradingDate, fundNum mlog = m_log[1:,] knum = 2..365 }//此处,将任务切分,按每次250个不同基金数据进行计算. timer{ploop(getFactor{result2}, symList2) a = ploop(calAllRs2{mlog,symList}, knum).unionAll(false) res2 = select fundNum, ols(factor1, kNum)[0] as hist, ols(factor1, kNum)[1] as hist2, ols(factor1, kNum)[2] as hist3 from a group by fundNum} }//定义获取十个因子计算和数据操作的时间的函数 /** * 提交1个job(单用户) */ submitJob("parallJob1", "parallJob_single_ten", parJob) /** * 提交5个job(多用户) */ for(i in 0..4){ submitJob("parallJob5", "parallJob_multi_ten", parJob) } //获取单个用户的运行时间 select endTime - startTime from getRecentJobs() where jobDesc = "parallJob_single_ten" //获取多个用户的运行时间 select max(endTime) - min(startTime) from getRecentJobs() where jobDesc = "parallJob_multi_ten"