def getSchema(csv){ schema1 = extractTextSchema(csv) update schema1 set name=convertEncode(name,"gbk","utf-8") newName = `Symbol`Market`DateTime`Price`TickCount`Volume`Amount`AskPrice1`AskPrice2`AskPrice3`AskPrice4`AskPrice5`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`AskVolume1`AskVolume2`AskVolume3`AskVolume4`AskVolume5`BidVolume1`BidVolume2`BidVolume3`BidVolume4`BidVolume5 oldName = `证券代码`市场代码`时间`最新价`成交笔数`成交量`成交额`卖一价`卖二价`卖三价`卖四价`卖五价`买一价`买二价`买三价`买四价`买五价`卖一量`卖二量`卖三量`卖四量`卖五量`买一量`买二量`买三量`买四量`买五量 nameMap = dict(oldName, newName) update schema1 set name = nameMap[name] where name in nameMap update schema1 set col = rowNo(name) delete from schema1 where name in [`方向] return schema1 } def loadOneFile(csvFile,orderbooktb,schema1){ t = loadText(csvFile,,schema1) t["market"] = upper(t["market"]) t[`Volume] = t["Volume"]*100 t.addColumn(`Status`PreClose`Open`High`Low`AskPrice6`AskPrice7`AskPrice8`AskPrice9`AskPrice10`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidPrice10`AskVolume6`AskVolume7`AskVolume8`AskVolume9`AskVolume10`BidVolume6`BidVolume7`BidVolume8`BidVolume9`BidVolume10`BidOrderTotalVolume`AskOrderTotalVolume`AvgBidOrderPrice`AvgAskOrderPrice`LimitHighestPrice`LimitLowestPrice,[INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE]) t.reorderColumns!(orderbooktb.schema().colDefs[`name]) return t } def loadOneDayFiles(dbName,tableName,path,schema1){ tb = loadTable(dbName,tableName) fileList = exec filename from files(path, "%.csv") fs= fileList.cut(100) for(i in 0:fs.size()){ bigTable=table(500000:0,tb.schema().colDefs[`name],tb.schema().colDefs[`typeString]) for(f in fs[i]) { try { bigTable.append!(loadOneFile(path+"/"+f,bigTable,schema1)) } catch(ex){ print f + ": "+ex } } tb.append!(bigTable) } } def loopLoadOneYearFiles(dbName,tableName, filePath,schema1){ dirs = exec filename from files(filePath) where isDir=true for (path in dirs){ submitJob("old"+path,"loadOrderDir"+path,loadOneDayFiles{dbName,tableName,filePath+"/"+path,schema1}) } } dbName = "dfs://stocks_orderbook" tableName = "orderBook" filePath="/hdd/hdd9/data/quotes/2013" csv="/hdd/hdd9/data/quotes/2013/20130104/SH000001_20130104.csv" schema1 = getSchema(csv) loopLoadOneYearFiles(dbName,tableName, filePath,schema1)