def createDatabase(dbName,tableName, ps1, ps2, numMetrics){ m = "tag" + string(1..numMetrics) schema = table(1:0,`id`datetime join m, [INT,DATETIME] join take(FLOAT,50) ) db1 = database("",VALUE,ps1) db2 = database("",RANGE,ps2) db = database(dbName,COMPO,[db1,db2]) db.createPartitionedTable(schema,tableName,`datetime`id) } def generate1DayData(day,id,freqPerDay,numMetrics){ startTime = day.datetime() idSize = size(id) numRecords = freqPerDay * idSize idVec = array(INT, numRecords) for(i in 0:idSize) idVec[(i*freqPerDay) : ((i+1)*freqPerDay)] = id[i] t = table(idVec ,take(startTime + (0..(freqPerDay-1)), numRecords) as ts) m = "tag" + string(1..numMetrics) for (i in 0 : numMetrics) t[m[i]] =rand(1.0, numRecords) return t } def singleThreadWriting(id, startDay, days, freqPerDay, numMachinesPerPartition,numMetrics,dbName,tableName){ t = loadTable(dbName,tableName) idSize=size(id) for(d in 0:days){ index=0 do{ idMax= numMachinesPerPartition - 1 if(idSize - index <= 9) idMax = idSize - index - 1 t.append!(generate1DayData(startDay + d,id[index+0..idMax],freqPerDay,numMetrics)) index +=numMachinesPerPartition }while (index < idSize) } } def multipleThreadWriting(id, startDay, days,freqPerDay, numMachinesPerPartition,numMetrics, threads,dbName,tableName) { //split devVec to multiple part for parallel writing idCountPerThread = ceil(id.size() \ threads/10)*10 ploop(singleThreadWriting{,startDay, days,freqPerDay, numMachinesPerPartition,numMetrics,dbName,tableName}, id.cut(idCountPerThread)) } def mainJob(id,startDay,days, ps1, ps2,freqPerDay, numMachinesPerPartition,threads) { dbName="dfs://mvmDemo" tableName="machines" numMetrics = 50 if(existsDatabase(dbName)) dropDatabase(dbName) createDatabase(dbName,tableName, ps1, ps2, numMetrics) if(threads == 1) submitJob("submit_singleThreadWriting", "write data", singleThreadWriting{id, startDay, days,freqPerDay, numMachinesPerPartition,numMetrics,dbName,tableName}) else submitJob("submit_multiThreadWriting", "write data", multipleThreadWriting{id, startDay, days,freqPerDay, numMachinesPerPartition,numMetrics, threads,dbName,tableName}) } /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// login("admin","123456") freqPerDay=86400 numMachines=100 id=1..numMachines //设备编号 startDay=2020.09.01//开始日期 days=5 //写几天数据 threads = 20 //多个线程同时写 numMachinesPerPartition=10 ps1=2020.09.01..2020.12.31 ps2=numMachinesPerPartition*(0..(numMachines/numMachinesPerPartition))+1 mainJob(id, startDay, days, ps1, ps2, freqPerDay, numMachinesPerPartition, threads)