login(`admin,`123456) dbPath = "dfs://SAMPLE_TRDDB" tableName = `tradingDay rootDir="/home/data/sample" yearRange =date(2008.01M + 12*0..22) symbols = array(SYMBOL, 0, 100) yearDirs = files(rootDir)[`filename] for(yearDir in yearDirs){ path = rootDir + "/" + yearDir symbols = files(path)[`filename].upper().strReplace(".CSV","") } symbols = symbols.distinct().sort!().append!("999999"); symRanges = symbols.cutPoints(100) dbDate=database("", RANGE, yearRange) dbID=database("", RANGE, symRanges) db = database(dbPath, COMPO, [dbDate, dbID]) pt=db.createPartitionedTable(table(1000000:0, `symbol`exchange`cycle`tradingDay`date`time`open`high`low`close`volume`turnover`unixTime, [SYMBOL,SYMBOL,INT,DATE,DATE,TIME,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,LONG]), tableName, `tradingDay`symbol) def loadCsvFromYearPath(path, dbPath, tableName){ symbols = files(path)[`filename] for(sym in symbols){ filePath = path + "/" + sym t=loadText(filePath) database(dbPath).loadTable(tableName).append!(select symbol, exchange, cycle, tradingDay,date, datetimeParse(format(time,"000000000"),"HHmmssSSS"), open, high, low, close, volume, turnover,unixTime from t ) } } //datanode alias nodesAlias="NODE" + string(0..2) years= files(rootDir)[`filename] index = 0; for(year in years){ yearPath = rootDir + "/" + year des = "loadCsv_" + year rpc(nodesAlias[index%nodesAlias.size()], submitJob, des, des, loadCsvFromYearPath, yearPath, dbPath, tableName) index=index+1 }