/*
 * This script is used to create database and tables with daily or minute-level data.
 * You can either write your data into database or use the simulated data given in this script.
 */

def createMinuteDbTable(dbName,tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	dbDate = database("", VALUE,2020.01M..2020.12M )
	dbSym= database("", HASH, [SYMBOL,3])
	db = database(dbName, COMPO, [dbDate, dbSym])

	t =table(1:0, `securityid`tradetime`open`close`high`low`vol`val`vwap, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE])
	db.createPartitionedTable(t, tbName, `tradetime`securityid)
}

def createDayDbTable(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	db = database(dbName, RANGE,  2000.01M + (0..30)*12 )
	t =table(1:0, `securityid`tradetime`open`close`high`low`vol`val`vwap, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE])
	db.createPartitionedTable(t, tbName, `tradetime)
}

//Simulated data
def genKminute(n){
	tradeDate= select * from table(distinct(businessDay(temporalAdd(2020.01.01,n, "M")..monthEnd(temporalAdd(2020.01.01,n, "M")))).sort() as tradeDate) where tradeDate>=temporalAdd(2020.01.01,n, "M")
	tradeMin = table(09:30:00.000+0..120*60*1000 join (13:00:00.000+0..120*60*1000) as tradeMin)
	tradetime = select concatDateTime(tradeDate,tradeMin) as tradetime from cj(tradeDate,tradeMin)
	securityid ="sz"+lpad(string(000001..004000), 6, `0)
	tmpTable = cj(table(securityid as securityid),tradetime)

	open = rand(100.0, size(tradetime)*4000)
	high = open + rand(1.0,size(tradetime)*4000)
	low = high -  rand(2.0,size(tradetime)*4000)
	close = open + norm(0,2,size(tradetime)*4000)
	vol = rand(100000,size(tradetime)*4000)
	val = close*vol
	vwap = close
		
	resTable = tmpTable join table(open,close, high, low, vol, val, vwap)

	tradeDate=NULL
	tradeMin = NULL
	tradetime =NULL
	securityid =NULL
	tmpTable = NULL
	open =NULL
	high = NULL
	low = NULL
	close =NULL
	vol = NULL
	val = NULL
	vwap = NULL
	
	db = loadTable("dfs://k_minute_level","k_minute")
	db.append!(resTable)	
}

def writeInMinuteByMonth(numOfMonth){
	for (n in 0..(numOfMonth-1)){
		submitJob("genKminute_"+string(n),"genKminute_"+string(n),genKminute,n)
	}
}

def genKday(n){
	tradetime = select * from table(timestamp(distinct(businessDay(temporalAdd(2010.01.01,n, "y")..yearEnd(temporalAdd(2010.01.01,n, "y"))))).sort() as tradetime) where tradetime >=temporalAdd(2010.01.01,n, "y")
	securityid ="sz"+lpad(string(000001..004000), 6, `0)
	tmpTable = cj(table(securityid as securityid),tradetime)

	open = rand(100.0, size(tradetime)*4000)
	high = open + rand(1.0,size(tradetime)*4000)
	low = high -  rand(2.0,size(tradetime)*4000)
	close = open + norm(0,2,size(tradetime)*4000)
	vol = rand(100000,size(tradetime)*4000)
	val = close*vol
	vwap = close
		
	resTable = tmpTable join table(open,close, high, low, vol, val, vwap)

	tradeDate=NULL
	tradeMin = NULL
	tradetime =NULL
	securityid =NULL
	tmpTable = NULL
	open =NULL
	high = NULL
	low = NULL
	close =NULL
	vol = NULL
	val = NULL
	vwap = NULL
	
	db = loadTable("dfs://k_day_level","k_day")
	db.append!(resTable)	
}
	
def writeInDayByYear(numOfYear){
	for (n in 0..(numOfYear-1)){
		submitJob("genKday_"+string(n),"genKday_"+string(n),genKday,n)	
	}
}

//1.Create database and table with minute-level data
createMinuteDbTable("dfs://k_minute_level","k_minute")

//2.Create database and table with daily data
createDayDbTable("dfs://k_day_level","k_day")

//3. Append data to tables
writeInMinuteByMonth(12)
writeInDayByYear(10)