/** publishToKafka.txt Script to publish to kafka streaming services with DolphinDB Kafka Plugin DolphinDB Inc. DolphinDB server version: 2.00.6 2022.05.31 Storage engine: TSDB Last modification time: 2022.07.07 */ //login account login("admin", "123456") //create stream table: messageStream def createStreamTableFunc(){ colName = `msgTime`msgType`msgBody colType = [TIMESTAMP,SYMBOL, BLOB] messageTemp = streamTable(1000000:0, colName, colType) enableTableShareAndPersistence(table=messageTemp, tableName="messageStream", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) messageTemp = NULL } createStreamTableFunc() go //load kafka plugin loadPlugin("/yourPluginsPath/kafka/PluginKafka.txt") go //initialize kafka producer def initKafkaProducerFunc(metadataBrokerList){ producerCfg = dict(STRING, ANY) producerCfg["metadata.broker.list"] = metadataBrokerList return kafka::producer(producerCfg) } producer = initKafkaProducerFunc("localhost") //publish to kafka topic def sendMsgToKafkaFunc(dataType, producer, msg){ startTime = now() try { kafka::produce(producer, "topic-message", 1, msg, true) cost = now() - startTime writeLog("[Kafka Plugin] Successed to send " + dataType + " : " + msg.size() + " rows, " + cost + " ms.") } catch(ex) {writeLog("[Kafka Plugin] Failed to send msg to kafka with error: " +ex)} } //register stream computing engine and subscribe the stream tables def filterAndParseStreamFunc(producer){ filter1 = dict(STRING,ANY) filter1["condition"] = "order" filter1["handler"] = sendMsgToKafkaFunc{"order", producer} filter2 = dict(STRING,ANY) filter2["condition"] = "trade" filter2["handler"] = sendMsgToKafkaFunc{"trade", producer} filter3 = dict(STRING,ANY) filter3["condition"] = "snapshot" filter3["handler"] = sendMsgToKafkaFunc{"snapshot", producer} schema = dict(["order","trade", "snapshot"], [loadTable("dfs://order", "order"), loadTable("dfs://trade", "trade"), loadTable("dfs://snapshot", "snapshot")]) engine = streamFilter(name="streamFilter", dummyTable=messageStream, filter=[filter1, filter2, filter3], msgSchema=schema) subscribeTable(tableName="messageStream", actionName="sendMsgToKafka", offset=-1, handler=engine, msgAsTable=true, reconnect=true) } filterAndParseStreamFunc(producer) //replay history data def replayStockMarketData(){ timeRS = cutPoints(09:15:00.000..15:00:00.000, 100) orderDS = replayDS(sqlObj=, dateColumn=`Date, timeColumn=`Time, timeRepartitionSchema=timeRS) snapshotDS = replayDS(sqlObj=