login("admin","123456") defg getAnnualReturn(value){ return pow(1 + ((last(value) - first(value))\first(value)), 252\730) - 1 } defg getAnnualVolatility(value){ return std(deltas(value)\prev(value)) * sqrt(252) } defg getAnnualSkew(value){ return skew(deltas(value)\prev(value)) } defg getAnnualKur(value){ return kurtosis(deltas(value)\prev(value)) } defg getSharp(value){ return (getAnnualReturn(value) - 0.03)\getAnnualVolatility(value) as sharpeRat } def getMaxDrawdown(value){ i = imax((cummax(value) - value) \ cummax(value)) if (i==0){ return 0 } j = imax(value[:i]) return (value[j] - value[i]) \ (value[j]) } def getDrawdownRatio(value){ return getAnnualReturn(value) \ getMaxDrawdown(value) } def getBeta(value, price){ return covar(deltas(value)\prev(value), deltas(price)\prev(price)) \ std(deltas(price)\prev(price)) } def getAlpha(value, price){ return getAnnualReturn(value) - 0.03 - getBeta(value, price) * (getAnnualReturn(price) - 0.03) } def calAllRs2(mret, symList, k){ rowCount = mret.rows()/k * k demeanCum = rolling(cumsum, mret[0:rowCount,] - each(stretch{, rowCount}, rolling(avg, mret, k, k)), k, k) a = rolling(max, demeanCum, k, k) - rolling(min, demeanCum, k, k) RS = nullFill!(a/rolling(stdp, mret, k, k), 1.0).mean().log() return table(symList as fundNum, take(log(k), symList.size()) as knum, RS as factor1) } def getFactor(result2, symList){ Return = select fundNum, getAnnualReturn(value) as annualReturn, getAnnualVolatility(value) as annualVolRat, getAnnualSkew(value) as skewValue, getAnnualKur(value) as kurValue, getSharp(value) as sharpValue, getMaxDrawdown(value) as MaxDrawdown, getDrawdownRatio(value) as DrawdownRatio, getBeta(value, price) as Beta, getAlpha(value, price) as Alpha from result2 where tradingDate in 2018.05.24..2021.05.27 and fundNum in symList group by fundNum } def parJob(){ timer{fund_OLAP=select * from loadTable("dfs://fund_OLAP", "fund_OLAP") fund_hs_OLAP=select * from loadTable("dfs://fund_OLAP", "fund_hs_OLAP") ajResult = select tradingDate, fundNum, value, fund_hs_OLAP.tradingDate as hstradingDate, fund_hs_OLAP.value as price from aj(fund_OLAP, fund_hs_OLAP, `tradingDate) result2 = select tradingDate, fundNum, iif(isNull(value), ffill!(value), value) as value,price from ajResult where tradingDate == hstradingDate symList = exec distinct(fundNum) as fundNum from result2 order by fundNum symList2 = symList.cut(250) portfolio = select fundNum as fundNum, (deltas(value)\prev(value)) as log, tradingDate as tradingDate from result2 where tradingDate in 2018.05.24..2021.05.27 and fundNum in symList m_log = exec log from portfolio pivot by tradingDate, fundNum mlog = m_log[1:,] knum = 2..365 } timer{ploop(getFactor{result2}, symList2) a = ploop(calAllRs2{mlog,symList}, knum).unionAll(false) res2 = select fundNum, ols(factor1, kNum)[0] as hist, ols(factor1, kNum)[1] as hist2, ols(factor1, kNum)[2] as hist3 from a group by fundNum} } /** * 1 job */ submitJob("parallJob1", "parallJob_single_ten", parJob) /** * 5 jobs */ for(i in 0..4){ submitJob("parallJob5", "parallJob_multi_ten", parJob) } //1 job select endTime - startTime from getRecentJobs() where jobDesc = "parallJob_single_ten" //5 jobs select max(endTime) - min(startTime) from getRecentJobs() where jobDesc = "parallJob_multi_ten"