def getSchema(csv){ schema1=extractTextSchema(csv) update schema1 set name=`Volume where name="TotalVolume" update schema1 set name=`Amount where name="TotalAmount" return schema1 } def loadOneFile(csvFile,orderbooktb,schema1){ t = loadText(csvFile,,schema1) t["market"] = left(t["Symbol"],2) t["Symbol"] = substr(t["Symbol"],2) t["Volume"] = eachPre(-, t["volume"], 0) t["Amount"] = eachPre(-, t["Amount"], 0) t.reorderColumns!(orderbooktb.schema().colDefs[`name]) return t } def loadOneDayFiles(dbName,tableName,path,schema1){ tb = loadTable(dbName,tableName) fileList = exec filename from files(path, "%.csv") fs= fileList.cut(100) for(i in 0:fs.size()){ bigTable=table(500000:0,tb.schema().colDefs[`name],tb.schema().colDefs[`typeString]) for(f in fs[i]) { try { bigTable.append!(loadOneFile(path+"/"+f,bigTable,schema1)) } catch(ex){ print f + ": "+ex } } tb.append!(bigTable) } } def loopLoadOneYearFiles(dbName,tableName, filePath,schema1){ dirs = exec filename from files(filePath) where isDir=true for (path in dirs){ submitJob("new"+path,"loadOrderDir"+path,loadOneDayFiles{dbName,tableName,filePath+"/"+path,schema1}) } } dbName = "dfs://stocks_orderbook" tableName = "orderBook" filePath="/hdd/hdd9/data/quotes/2020" csv="/hdd/hdd9/data/quotes/2020/20200102/SH501000.csv" schema1=getSchema(csv) loopLoadOneYearFiles(dbName,tableName, filePath,schema1)