try{loadPlugin("./path/PluginRedis.txt")}catch(ex){print ex}; go def saveHashDataToRedis(conn, tb){ id = exec id from tb // 通过 string 函数将数据转为 string 类型 data = select string(time_stamp) as time_stamp, exchange, string(last_price) as last_price, string(volume) as volume, string(bid_price1) as bid_price1, string(bid_volume1) as bid_volume1, string(bid_price2) as bid_price2, string(bid_volume2) as bid_volume2, string(bid_price3) as bid_price3, string(bid_volume3) as bid_volume3, string(bid_price4) as bid_price4, string(bid_volume4) as bid_volume4, string(bid_price5) as bid_price5, string(bid_volume5) as bid_volume5, string(ask_price1) as ask_price1, string(ask_volume1) as ask_volume1, string(ask_price2) as ask_price2, string(ask_volume2) as ask_volume2, string(ask_price3) as ask_price3, string(ask_volume3) as ask_volume3, string(ask_price4) as ask_price4, string(ask_volume4) as ask_volume4, string(ask_price5) as ask_price5, string(ask_volume5) as ask_volume5 from tb redis::batchHashSet(conn, id, data) } def redisjob(){ conn=redis::connect("192.168.0.75", 6379) redis::run(conn, "AUTH", "password") go cffex_data = select * from loadTable("dfs://CFFEX", "CFFEX_PRICE") context by id order by time_stamp limit -1 czce_data = select * from loadTable("dfs://CZCE", "CZCE_PRICE") context by id order by time_stamp limit -1 saveHashDataToRedis(conn, cffex_data) saveHashDataToRedis(conn, czce_data) redis::release(conn) } def submitByHalfSecond(){ do { redisjob() sleep(500) } while(true) } submitJob("submitByHalfSecond", "submitByHalfSecond", submitByHalfSecond)