def createDatabase(dbName,tableName, ps1, ps2){ tableSchema = table(1:0,`id`datetime`value,[INT,DATETIME,FLOAT]); db1 = database("", VALUE, ps1) db2 = database("", RANGE, ps2) db = database(dbName,COMPO,[db1,db2]) dfsTable = db.createPartitionedTable(tableSchema,tableName,`datetime`id) } def generate1DayData(day, id, freqPerDay){ startTime = day.datetime() idSize = size(id) numRecords = freqPerDay * idSize idVec = array(INT, numRecords) for(i in 0:idSize) idVec[(i*freqPerDay) : ((i+1)*freqPerDay)] = id[i] return table(idVec, take(startTime+0..(freqPerDay-1),numRecords) as datetime, rand(1.0, numRecords) as value) } def singleThreadWriting(id, startDay, days, freqPerDay, numIdPerPartition){ t = loadTable("dfs://svmDemo","sensors") for(d in 0:days){ index=0 do{ t.append!(generate1DayData(startDay + d, id[index+0..(numIdPerPartition-1)], freqPerDay)) index += numIdPerPartition }while (index < size(id)) } } def multipleThreadWriting(id, startDay, days, freqPerDay, numIdPerPartition, threads) { //split id to multiple part for parallel writing idCountPerThread = ceil(id.size()\threads/numIdPerPartition)*numIdPerPartition ploop(singleThreadWriting{, startDay, days, freqPerDay, numIdPerPartition}, id.cut(idCountPerThread)) } def mainJob(id, startDay, days, ps1, ps2, freqPerDay, numIdPerPartition, threads){ if(existsDatabase("dfs://svmDemo")) dropDatabase("dfs://svmDemo") createDatabase("dfs://svmDemo","sensors", ps1, ps2) if(threads == 1) submitJob("submit_singleThreadWriting", "write data", singleThreadWriting{id, startDay, days, freqPerDay, numIdPerPartition}) else submitJob("submit_multipleThreadWriting", "write data", multipleThreadWriting{id, startDay, days, freqPerDay, numIdPerPartition, threads}) } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// login("admin","123456") freqPerDay=86400 numMachines=100 numMetrics=50 numMachinesPerPartition=2 numIdPerPartition=numMachinesPerPartition*numMetrics ps1=2020.09.01..2020.12.31 ps2=(numMetrics*numMachinesPerPartition)*(0..(numMachines/numMachinesPerPartition))+1 id =1..(numMachines*numMetrics) startDay=2020.09.01 //写入多少天的数据 days = 5 //多少个线程并行写入 threads = 20 mainJob(id, startDay, days, ps1, ps2, freqPerDay, numIdPerPartition, threads)