package com.xxdb.streaming.client;

import com.xxdb.DBConnection;
import com.xxdb.comm.SqlStdEnum;
import com.xxdb.data.BasicAnyVector;
import com.xxdb.data.BasicBoolean;
import com.xxdb.data.BasicDictionary;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicLong;
import com.xxdb.data.BasicString;
import com.xxdb.data.BasicStringVector;
import com.xxdb.data.Entity;
import com.xxdb.data.Vector;
import com.xxdb.data.Void;
import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;

/* loaded from: input_file:com/xxdb/streaming/client/AbstractClient.class */
abstract class AbstractClient implements MessageDispatcher {
    protected static final int DEFAULT_PORT = 8849;
    protected static final String DEFAULT_HOST = "localhost";
    protected static final String DEFAULT_ACTION_NAME = "javaStreamingApi";
    protected ConcurrentHashMap<String, ReconnectItem> reconnectTable;
    protected int listeningPort;
    protected String listeningHost;
    protected QueueManager queueManager;
    protected ConcurrentHashMap<String, List<IMessage>> messageCache;
    protected HashMap<String, String> tableNameToTrueTopic;
    protected HashMap<String, String> HATopicToTrueTopic;
    protected HashMap<String, Boolean> hostEndian;
    protected Thread pThread;
    protected ConcurrentHashMap<String, Site[]> trueTopicToSites;
    protected CopyOnWriteArraySet<String> waitReconnectTopic;
    protected Map<String, StreamDeserializer> subInfos_;
    protected HashMap<List<String>, List<String>> users;
    protected boolean isClose_;
    protected LinkedBlockingQueue<DBConnection> connList;
    private Daemon daemon;

    /* loaded from: input_file:com/xxdb/streaming/client/AbstractClient$ReconnectItem.class */
    class ReconnectItem {
        private int reconnectState;
        private long lastReconnectTimestamp;
        private List<String> topics = new ArrayList();

        public ReconnectItem(int i, long j) {
            this.reconnectState = i;
            this.lastReconnectTimestamp = j;
        }

        public void setState(int i) {
            this.reconnectState = i;
        }

        public int getState() {
            return this.reconnectState;
        }

        public void setTimestamp(long j) {
            this.lastReconnectTimestamp = j;
        }

        public long getTimestamp() {
            return this.lastReconnectTimestamp;
        }

        public void putTopic(String str) {
            if (this.topics == null) {
                this.topics = new ArrayList();
                this.topics.add(str);
            } else {
                if (this.topics.contains(this.topics)) {
                    return;
                }
                this.topics.add(str);
            }
        }

        public List<String> getTopics() {
            return this.topics;
        }
    }

    /* loaded from: input_file:com/xxdb/streaming/client/AbstractClient$Site.class */
    public class Site {
        String host;
        int port;
        String tableName;
        String actionName;
        MessageHandler handler;
        long msgId;
        boolean reconnect;
        Vector filter;
        boolean closed = false;
        boolean allowExistTopic;
        StreamDeserializer deserializer;
        String userName;
        String passWord;
        boolean msgAstable;

        Site(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z, Vector vector, StreamDeserializer streamDeserializer, boolean z2, String str4, String str5, boolean z3) {
            this.filter = null;
            this.allowExistTopic = false;
            this.userName = "";
            this.passWord = "";
            this.msgAstable = false;
            this.host = str;
            this.port = i;
            this.tableName = str2;
            this.actionName = str3;
            this.handler = messageHandler;
            this.msgId = j;
            this.reconnect = z;
            this.filter = vector;
            this.allowExistTopic = z2;
            this.deserializer = streamDeserializer;
            this.userName = str4;
            this.passWord = str5;
            this.msgAstable = z3;
        }
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public void setNeedReconnect(String str, int i) {
        if (str.equals("")) {
            return;
        }
        String substring = str.substring(0, str.indexOf("/"));
        if (!this.reconnectTable.keySet().contains(substring)) {
            ReconnectItem reconnectItem = new ReconnectItem(i, System.currentTimeMillis());
            reconnectItem.putTopic(str);
            this.reconnectTable.put(substring, reconnectItem);
        } else {
            ReconnectItem reconnectItem2 = this.reconnectTable.get(substring);
            reconnectItem2.setState(i);
            reconnectItem2.setTimestamp(System.currentTimeMillis());
            reconnectItem2.putTopic(str);
        }
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public int getNeedReconnect(String str) {
        ReconnectItem reconnectItem = this.reconnectTable.get(str);
        if (reconnectItem != null) {
            return reconnectItem.getState();
        }
        return 0;
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public long getReconnectTimestamp(String str) {
        ReconnectItem reconnectItem = this.reconnectTable.get(str);
        if (reconnectItem != null) {
            return reconnectItem.getTimestamp();
        }
        return 0L;
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public void setReconnectTimestamp(String str, long j) {
        ReconnectItem reconnectItem = this.reconnectTable.get(str);
        if (reconnectItem != null) {
            reconnectItem.setTimestamp(j);
        }
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public List<String> getAllTopicsBySite(String str) {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.trueTopicToSites.keySet().iterator();
        while (it.hasNext()) {
            String str2 = (String) it.next();
            if (str2.substring(0, str2.indexOf("/")).equals(str)) {
                arrayList.add(str2);
            }
        }
        return arrayList;
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public Set<String> getAllReconnectTopic() {
        return this.waitReconnectTopic;
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public List<String> getAllReconnectSites() {
        ArrayList arrayList = new ArrayList();
        Iterator it = this.reconnectTable.keySet().iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            if (this.reconnectTable.get(str).getState() > 0) {
                arrayList.add(str);
            }
        }
        return arrayList;
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public Site getSiteByName(String str) {
        List<String> allTopicsBySite = getAllTopicsBySite(str);
        if (allTopicsBySite.size() <= 0) {
            return null;
        }
        Site[] siteArr = this.trueTopicToSites.get(allTopicsBySite.get(0));
        if (siteArr.length > 0) {
            return getActiveSite(siteArr);
        }
        return null;
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public Map<String, StreamDeserializer> getSubInfos() {
        return this.subInfos_;
    }

    protected abstract boolean doReconnect(Site site);

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public void setMsgId(String str, long j) {
        synchronized (this.trueTopicToSites) {
            Site[] siteArr = this.trueTopicToSites.get(str);
            if (siteArr == null || siteArr.length == 0) {
                return;
            }
            if (siteArr.length == 1) {
                siteArr[0].msgId = j;
            }
        }
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public boolean tryReconnect(String str) {
        Site[] siteArr;
        synchronized (this.reconnectTable) {
            String str2 = this.HATopicToTrueTopic.get(str);
            this.queueManager.removeQueue(str2);
            synchronized (this.trueTopicToSites) {
                siteArr = this.trueTopicToSites.get(str2);
            }
            if (siteArr == null || siteArr.length == 0) {
                return false;
            }
            if (siteArr.length == 1 && !siteArr[0].reconnect) {
                return false;
            }
            Site activeSite = getActiveSite(siteArr);
            if (activeSite == null) {
                return false;
            }
            if (doReconnect(activeSite)) {
                this.waitReconnectTopic.remove(str2);
                return true;
            }
            this.waitReconnectTopic.add(str2);
            return false;
        }
    }

    private Site getActiveSite(Site[] siteArr) {
        int i = 0;
        int length = siteArr.length;
        while (true) {
            Site site = siteArr[i];
            i = (i + 1) % length;
            try {
                DBConnection dBConnection = new DBConnection();
                dBConnection.connect(site.host, site.port);
                try {
                    try {
                        dBConnection.run("1");
                        dBConnection.close();
                        return site;
                    } catch (Throwable th) {
                        dBConnection.close();
                        throw th;
                    }
                } catch (IOException e) {
                    throw e;
                }
            } catch (Exception e2) {
                e2.printStackTrace();
                try {
                    Thread.sleep(500L);
                } catch (Exception e3) {
                    e3.printStackTrace();
                }
            }
        }
    }

    private int getVersionNumber(String str) {
        try {
            String[] split = str.split(StringUtils.SPACE);
            if (split.length >= 2) {
                return Integer.parseInt(split[0].replace(".", ""));
            }
            return 0;
        } catch (Exception e) {
            return 0;
        }
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public void activeCloseConnection(Site site) {
        DBConnection dBConnection;
        try {
            dBConnection = new DBConnection();
            dBConnection.connect(site.host, site.port);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("Unable to actively close the publish connection from site " + site.host + ":" + site.port);
        }
        try {
            try {
                int versionNumber = getVersionNumber(((BasicString) dBConnection.run("version()")).getString());
                String str = this.listeningHost;
                if (str.equals("")) {
                    str = dBConnection.getLocalAddress().getHostAddress();
                }
                ArrayList arrayList = new ArrayList();
                arrayList.add(new BasicString(str));
                arrayList.add(new BasicInt(this.listeningPort));
                if (versionNumber >= 995) {
                    arrayList.add(new BasicBoolean(true));
                }
                dBConnection.run("activeClosePublishConnection", arrayList);
                System.out.println("Successfully closed publish connection");
                dBConnection.close();
                try {
                    Thread.sleep(1000L);
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            } catch (Throwable th) {
                dBConnection.close();
                throw th;
            }
        } catch (IOException e3) {
            throw e3;
        }
    }

    public AbstractClient() throws SocketException {
        this(DEFAULT_PORT);
    }

    public AbstractClient(int i) throws SocketException {
        this.reconnectTable = new ConcurrentHashMap<>();
        this.listeningHost = "";
        this.queueManager = new QueueManager();
        this.messageCache = new ConcurrentHashMap<>();
        this.tableNameToTrueTopic = new HashMap<>();
        this.HATopicToTrueTopic = new HashMap<>();
        this.hostEndian = new HashMap<>();
        this.trueTopicToSites = new ConcurrentHashMap<>();
        this.waitReconnectTopic = new CopyOnWriteArraySet<>();
        this.subInfos_ = new HashMap();
        this.users = new HashMap<>();
        this.isClose_ = false;
        this.connList = new LinkedBlockingQueue<>();
        this.daemon = null;
        this.listeningPort = i;
    }

    public AbstractClient(String str, int i) throws SocketException {
        this.reconnectTable = new ConcurrentHashMap<>();
        this.listeningHost = "";
        this.queueManager = new QueueManager();
        this.messageCache = new ConcurrentHashMap<>();
        this.tableNameToTrueTopic = new HashMap<>();
        this.HATopicToTrueTopic = new HashMap<>();
        this.hostEndian = new HashMap<>();
        this.trueTopicToSites = new ConcurrentHashMap<>();
        this.waitReconnectTopic = new CopyOnWriteArraySet<>();
        this.subInfos_ = new HashMap();
        this.users = new HashMap<>();
        this.isClose_ = false;
        this.connList = new LinkedBlockingQueue<>();
        this.daemon = null;
        this.listeningHost = str;
        this.listeningPort = i;
    }

    private void addMessageToCache(IMessage iMessage) {
        for (String str : iMessage.getTopic().split(",")) {
            String str2 = this.HATopicToTrueTopic.get(str);
            List<IMessage> list = this.messageCache.get(str2);
            if (list == null) {
                list = new ArrayList();
                this.messageCache.put(str2, list);
            }
            list.add(iMessage);
        }
    }

    private void flushToQueue() {
        for (String str : this.messageCache.keySet()) {
            try {
                BlockingQueue<List<IMessage>> queue = this.queueManager.getQueue(str);
                if (queue != null) {
                    queue.put(this.messageCache.get(str));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        this.messageCache.clear();
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public synchronized void dispatch(IMessage iMessage) {
        for (String str : iMessage.getTopic().split(",")) {
            BlockingQueue<List<IMessage>> queue = this.queueManager.getQueue(this.HATopicToTrueTopic.get(str));
            if (queue != null) {
                try {
                    queue.put(Arrays.asList(iMessage));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public synchronized void batchDispatch(List<IMessage> list) {
        for (int i = 0; i < list.size(); i++) {
            addMessageToCache(list.get(i));
        }
        flushToQueue();
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public boolean isRemoteLittleEndian(String str) {
        if (this.hostEndian.containsKey(str)) {
            return this.hostEndian.get(str).booleanValue();
        }
        return false;
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public synchronized boolean isClosed(String str) {
        String str2 = this.HATopicToTrueTopic.get(str);
        synchronized (this.trueTopicToSites) {
            Site[] siteArr = this.trueTopicToSites.get(str2);
            if (siteArr == null || siteArr.length == 0) {
                return true;
            }
            return siteArr[0].closed;
        }
    }

    private String getTopic(String str, int i, String str2, String str3, String str4) {
        return String.format("%s:%d:%s/%s/%s", str, Integer.valueOf(i), str2, str3, str4);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z, Vector vector, StreamDeserializer streamDeserializer, boolean z2) throws IOException, RuntimeException {
        return subscribeInternal(str, i, str2, str3, messageHandler, j, z, vector, streamDeserializer, z2, "", "", false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BlockingQueue<List<IMessage>> subscribeInternal(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z, Vector vector, StreamDeserializer streamDeserializer, boolean z2, String str4, String str5, boolean z3) throws IOException, RuntimeException {
        checkServerVersion(str, i);
        this.users.put(Arrays.asList(str, String.valueOf(i), str2, str3), Arrays.asList(str4, str5));
        DBConnection dBConnection = this.listeningPort > 0 ? new DBConnection() : new DBConnection(false, false, false, false, false, true, SqlStdEnum.DolphinDB);
        if (str4.equals("")) {
            dBConnection.connect(str, i);
        } else {
            dBConnection.connect(str, i, str4, str5);
        }
        if (streamDeserializer != null && !streamDeserializer.isInited()) {
            streamDeserializer.init(dBConnection);
        }
        if (streamDeserializer != null) {
            streamDeserializer.checkSchema((BasicDictionary) dBConnection.run(str2 + ".schema()"));
        }
        try {
            try {
                String str6 = this.listeningHost;
                if (str6.equals("")) {
                    str6 = dBConnection.getLocalAddress().getHostAddress();
                }
                if (!this.hostEndian.containsKey(str)) {
                    this.hostEndian.put(str, Boolean.valueOf(dBConnection.getRemoteLittleEndian()));
                }
                ArrayList arrayList = new ArrayList();
                arrayList.add(new BasicString(str2));
                arrayList.add(new BasicString(str3));
                String string = ((BasicAnyVector) dBConnection.run("getSubscriptionTopic", arrayList)).getEntity(0).getString();
                arrayList.clear();
                arrayList.add(new BasicString(str6));
                arrayList.add(new BasicInt(this.listeningPort));
                arrayList.add(new BasicString(str2));
                arrayList.add(new BasicString(str3));
                arrayList.add(new BasicLong(j));
                if (vector != null) {
                    arrayList.add(vector);
                } else {
                    arrayList.add(new Void());
                }
                if (z2) {
                    arrayList.add(new BasicBoolean(z2));
                }
                Entity run = dBConnection.run("publishTable", arrayList);
                this.connList.add(dBConnection);
                if (run instanceof BasicAnyVector) {
                    BasicStringVector basicStringVector = (BasicStringVector) ((BasicAnyVector) run).getEntity(1);
                    int rows = basicStringVector.rows();
                    Site[] siteArr = new Site[rows];
                    for (int i2 = 0; i2 < rows; i2++) {
                        String[] split = basicStringVector.getString(i2).split(":");
                        String str7 = split[0];
                        int intValue = new Integer(split[1]).intValue();
                        String str8 = split[2];
                        siteArr[i2] = new Site(str7, intValue, str2, str3, messageHandler, j - 1, true, vector, streamDeserializer, z2, str4, str5, z3);
                        if (!z) {
                            siteArr[i2].closed = true;
                        }
                        synchronized (this.tableNameToTrueTopic) {
                            this.tableNameToTrueTopic.put(str7 + ":" + intValue + "/" + str2 + "/" + str3, string);
                        }
                        String topic = getTopic(str7, intValue, str8, str2, str3);
                        synchronized (this.HATopicToTrueTopic) {
                            this.HATopicToTrueTopic.put(topic, string);
                        }
                    }
                    if (this.subInfos_.containsKey(string)) {
                        throw new RuntimeException("Subscription with topic " + string + " exist. ");
                    }
                    this.subInfos_.put(string, streamDeserializer);
                    synchronized (this.trueTopicToSites) {
                        this.trueTopicToSites.put(string, siteArr);
                    }
                } else {
                    Site[] siteArr2 = {new Site(str, i, str2, str3, messageHandler, j - 1, z, vector, streamDeserializer, z2, str4, str5, z3)};
                    if (!z) {
                        siteArr2[0].closed = true;
                    }
                    synchronized (this.subInfos_) {
                        this.subInfos_.put(string, streamDeserializer);
                    }
                    synchronized (this.tableNameToTrueTopic) {
                        this.tableNameToTrueTopic.put(str + ":" + i + "/" + str2 + "/" + str3, string);
                    }
                    synchronized (this.HATopicToTrueTopic) {
                        this.HATopicToTrueTopic.put(string, string);
                    }
                    synchronized (this.trueTopicToSites) {
                        this.trueTopicToSites.put(string, siteArr2);
                    }
                }
                return this.queueManager.addQueue(string);
            } catch (Exception e) {
                throw e;
            }
        } finally {
            if (this.listeningPort > 0) {
                dBConnection.close();
            }
        }
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String str, int i, String str2, String str3, long j, boolean z) throws IOException, RuntimeException {
        return subscribeInternal(str, i, str2, str3, null, j, z, null, null, false);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String str, int i, String str2, long j) throws IOException, RuntimeException {
        return subscribeInternal(str, i, str2, DEFAULT_ACTION_NAME, j, false);
    }

    protected BlockingQueue<List<IMessage>> subscribeInternal(String str, int i, String str2, String str3, long j) throws IOException, RuntimeException {
        return subscribeInternal(str, i, str2, str3, j, false);
    }

    protected void unsubscribeInternal(String str, int i, String str2, String str3) throws IOException {
        String str4;
        DBConnection dBConnection = new DBConnection();
        List<String> list = this.users.get(Arrays.asList(str, String.valueOf(i), str2, str3));
        String str5 = list.get(0);
        String str6 = list.get(1);
        if (str5.equals("")) {
            dBConnection.connect(str, i);
        } else {
            dBConnection.connect(str, i, str5, str6);
        }
        try {
            try {
                String str7 = this.listeningHost;
                if (str7.equals("")) {
                    str7 = dBConnection.getLocalAddress().getHostAddress();
                }
                ArrayList arrayList = new ArrayList();
                arrayList.add(new BasicString(str7));
                arrayList.add(new BasicInt(this.listeningPort));
                arrayList.add(new BasicString(str2));
                arrayList.add(new BasicString(str3));
                dBConnection.run("stopPublishTable", arrayList);
                String str8 = str + ":" + i + "/" + str2 + "/" + str3;
                synchronized (this.tableNameToTrueTopic) {
                    str4 = this.tableNameToTrueTopic.get(str8);
                }
                synchronized (this.trueTopicToSites) {
                    Site[] siteArr = this.trueTopicToSites.get(str4);
                    if (siteArr == null || siteArr.length == 0) {
                    }
                    for (Site site : siteArr) {
                        site.closed = true;
                    }
                }
                System.out.println("Successfully unsubscribed table " + str8);
                dBConnection.close();
            } catch (Exception e) {
                throw e;
            }
        } catch (Throwable th) {
            dBConnection.close();
            throw th;
        }
    }

    public void close() {
        if (this.pThread != null) {
            this.pThread.interrupt();
        }
        this.isClose_ = true;
    }

    public boolean isClose() {
        return this.isClose_;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void unsubscribeInternal(String str, int i, String str2) throws IOException {
        unsubscribeInternal(str, i, str2, DEFAULT_ACTION_NAME);
    }

    void checkServerVersion(String str, int i) throws IOException {
        DBConnection dBConnection = new DBConnection();
        dBConnection.connect(str, i);
        String[] split = dBConnection.run("version()").getString().split(StringUtils.SPACE)[0].split("\\.");
        int parseInt = Integer.parseInt(split[0]);
        int parseInt2 = Integer.parseInt(split[1]);
        int parseInt3 = Integer.parseInt(split[2]);
        if ((parseInt == 2 && parseInt2 == 0 && parseInt3 >= 9) || (parseInt == 2 && parseInt2 == 10)) {
            this.listeningPort = 0;
        } else if (this.listeningPort == 0) {
            throw new IOException("The server does not support subscription through reverse connection (connection initiated by the subscriber). Specify a valid port parameter.");
        }
        if (this.daemon == null) {
            synchronized (this.connList) {
                if (this.daemon == null) {
                    this.daemon = new Daemon(this.listeningPort, this, this.connList);
                    this.pThread = new Thread(this.daemon);
                    this.daemon.setRunningThread(this.pThread);
                    this.pThread.start();
                }
            }
        }
    }

    @Override // com.xxdb.streaming.client.MessageDispatcher
    public ConcurrentHashMap<String, Site[]> getTopicToSites() {
        return this.trueTopicToSites;
    }
}
