/* * This script is used to create database and tables with daily or minute-level data. * You can either write your data into database or use the simulated data given in this script. */ def createMinuteDbTable(dbName,tbName){ if(existsDatabase(dbName)){ dropDatabase(dbName) } dbDate = database("", VALUE,2020.01M..2020.12M ) dbSym= database("", HASH, [SYMBOL,3]) db = database(dbName, COMPO, [dbDate, dbSym]) t =table(1:0, `securityid`tradetime`open`close`high`low`vol`val`vwap, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE]) db.createPartitionedTable(t, tbName, `tradetime`securityid) } def createDayDbTable(dbName, tbName){ if(existsDatabase(dbName)){ dropDatabase(dbName) } db = database(dbName, RANGE, 2000.01M + (0..30)*12 ) t =table(1:0, `securityid`tradetime`open`close`high`low`vol`val`vwap, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE]) db.createPartitionedTable(t, tbName, `tradetime) } //Simulated data def genKminute(n){ tradeDate= select * from table(distinct(businessDay(temporalAdd(2020.01.01,n, "M")..monthEnd(temporalAdd(2020.01.01,n, "M")))).sort() as tradeDate) where tradeDate>=temporalAdd(2020.01.01,n, "M") tradeMin = table(09:30:00.000+0..120*60*1000 join (13:00:00.000+0..120*60*1000) as tradeMin) tradetime = select concatDateTime(tradeDate,tradeMin) as tradetime from cj(tradeDate,tradeMin) securityid ="sz"+lpad(string(000001..004000), 6, `0) tmpTable = cj(table(securityid as securityid),tradetime) open = rand(100.0, size(tradetime)*4000) high = open + rand(1.0,size(tradetime)*4000) low = high - rand(2.0,size(tradetime)*4000) close = open + norm(0,2,size(tradetime)*4000) vol = rand(100000,size(tradetime)*4000) val = close*vol vwap = close resTable = tmpTable join table(open,close, high, low, vol, val, vwap) tradeDate=NULL tradeMin = NULL tradetime =NULL securityid =NULL tmpTable = NULL open =NULL high = NULL low = NULL close =NULL vol = NULL val = NULL vwap = NULL db = loadTable("dfs://k_minute_level","k_minute") db.append!(resTable) } def writeInMinuteByMonth(numOfMonth){ for (n in 0..(numOfMonth-1)){ submitJob("genKminute_"+string(n),"genKminute_"+string(n),genKminute,n) } } def genKday(n){ tradetime = select * from table(timestamp(distinct(businessDay(temporalAdd(2010.01.01,n, "y")..yearEnd(temporalAdd(2010.01.01,n, "y"))))).sort() as tradetime) where tradetime >=temporalAdd(2010.01.01,n, "y") securityid ="sz"+lpad(string(000001..004000), 6, `0) tmpTable = cj(table(securityid as securityid),tradetime) open = rand(100.0, size(tradetime)*4000) high = open + rand(1.0,size(tradetime)*4000) low = high - rand(2.0,size(tradetime)*4000) close = open + norm(0,2,size(tradetime)*4000) vol = rand(100000,size(tradetime)*4000) val = close*vol vwap = close resTable = tmpTable join table(open,close, high, low, vol, val, vwap) tradeDate=NULL tradeMin = NULL tradetime =NULL securityid =NULL tmpTable = NULL open =NULL high = NULL low = NULL close =NULL vol = NULL val = NULL vwap = NULL db = loadTable("dfs://k_day_level","k_day") db.append!(resTable) } def writeInDayByYear(numOfYear){ for (n in 0..(numOfYear-1)){ submitJob("genKday_"+string(n),"genKday_"+string(n),genKday,n) } } //1.Create database and table with minute-level data createMinuteDbTable("dfs://k_minute_level","k_minute") //2.Create database and table with daily data createDayDbTable("dfs://k_day_level","k_day") //3. Append data to tables writeInMinuteByMonth(12) writeInDayByYear(10)