login("admin","123456")

defg getAnnualReturn(value){
      return pow(1 + ((last(value) - first(value))\first(value)), 252\730) - 1
}


defg getAnnualVolatility(value){
	return std(deltas(value)\prev(value)) * sqrt(252)
}


defg getAnnualSkew(value){
	return skew(deltas(value)\prev(value))
}


defg getAnnualKur(value){
	return kurtosis(deltas(value)\prev(value)) 
}


defg getSharp(value){
	return (getAnnualReturn(value) - 0.03)\getAnnualVolatility(value) as sharpeRat
}


def getMaxDrawdown(value){
	i = imax((cummax(value) - value) \ cummax(value))
	if (i==0){
		return 0
	}
	j = imax(value[:i])
	return (value[j] - value[i]) \ (value[j])
}


def getDrawdownRatio(value){
	return getAnnualReturn(value) \ getMaxDrawdown(value)
}


def getBeta(value, price){
	return covar(deltas(value)\prev(value), deltas(price)\prev(price)) \ std(deltas(price)\prev(price))
}


def getAlpha(value, price){
	return getAnnualReturn(value) - 0.03 - getBeta(value, price) * (getAnnualReturn(price) - 0.03)
}


def calAllRs2(mret, symList, k){
        rowCount = mret.rows()/k * k
        demeanCum = rolling(cumsum, mret[0:rowCount,] - each(stretch{, rowCount}, rolling(avg, mret, k, k)), k, k)
        a = rolling(max, demeanCum, k, k) - rolling(min, demeanCum, k, k)
        RS = nullFill!(a/rolling(stdp, mret, k, k), 1.0).mean().log()
        return table(symList as fundNum, take(log(k), symList.size()) as knum, RS as factor1)
}


def getFactor(result2, symList){
	Return = select fundNum, 
	            getAnnualReturn(value) as annualReturn,
	            getAnnualVolatility(value) as annualVolRat,
	            getAnnualSkew(value) as skewValue,
	            getAnnualKur(value) as kurValue,
	            getSharp(value) as sharpValue,
	            getMaxDrawdown(value) as MaxDrawdown,
	            getDrawdownRatio(value) as DrawdownRatio,
	            getBeta(value, price) as Beta,
	            getAlpha(value, price) as Alpha	
             from result2
             where tradingDate in 2018.05.24..2021.05.27 and fundNum in symList group by fundNum
 }
 
def parJob(){
  	timer{fund_OLAP=select * from loadTable("dfs://fund_OLAP", "fund_OLAP")
  		  fund_hs_OLAP=select * from loadTable("dfs://fund_OLAP", "fund_hs_OLAP")
  		  ajResult = select tradingDate, fundNum, value, fund_hs_OLAP.tradingDate as hstradingDate, fund_hs_OLAP.value as price from aj(fund_OLAP, fund_hs_OLAP, `tradingDate)
  		  result2 = select tradingDate, fundNum, iif(isNull(value), ffill!(value), value) as value,price from ajResult where tradingDate == hstradingDate
  		  symList = exec distinct(fundNum) as fundNum from result2 order by fundNum
          symList2 = symList.cut(250)
  		  portfolio = select fundNum as fundNum, (deltas(value)\prev(value)) as log, tradingDate as tradingDate from result2 where tradingDate in 2018.05.24..2021.05.27 and fundNum in symList
          m_log = exec log from portfolio pivot by tradingDate, fundNum
          mlog =  m_log[1:,]
          knum = 2..365
            }
    timer{ploop(getFactor{result2}, symList2)
          a = ploop(calAllRs2{mlog,symList}, knum).unionAll(false)
          res2 = select fundNum, ols(factor1, kNum)[0] as hist, ols(factor1, kNum)[1] as hist2, ols(factor1, kNum)[2] as hist3 from a group by fundNum}
  }

/**
 * 1 job
 */
submitJob("parallJob1", "parallJob_single_ten", parJob)

/**
 * 5 jobs
 */
for(i in 0..4){
	submitJob("parallJob5", "parallJob_multi_ten", parJob)
}

//1 job
select endTime - startTime from getRecentJobs() where jobDesc = "parallJob_single_ten"
//5 jobs
select max(endTime) - min(startTime) from getRecentJobs() where jobDesc = "parallJob_multi_ten"