package com.xxdb.streaming.client;

import com.xxdb.DBConnection;
import com.xxdb.data.BasicInt;
import com.xxdb.data.BasicString;
import com.xxdb.data.Vector;
import com.xxdb.streaming.client.AbstractClient;
import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/* loaded from: input_file:com/xxdb/streaming/client/ThreadPooledClient.class */
public class ThreadPooledClient extends AbstractClient {
    private static int CORES = Runtime.getRuntime().availableProcessors();
    private ExecutorService threadPool;
    private HashMap<String, List<String>> users;
    private Object lock;
    private HashMap<String, QueueHandlerBinder> queueHandlers;

    /* loaded from: input_file:com/xxdb/streaming/client/ThreadPooledClient$HandlerRunner.class */
    class HandlerRunner implements Runnable {
        MessageHandler handler;
        IMessage message;

        HandlerRunner(MessageHandler messageHandler, IMessage iMessage) {
            this.handler = messageHandler;
            this.message = iMessage;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.handler.doEvent(this.message);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/xxdb/streaming/client/ThreadPooledClient$QueueHandlerBinder.class */
    public class QueueHandlerBinder {
        private BlockingQueue<List<IMessage>> queue;
        private MessageHandler handler;

        public QueueHandlerBinder(BlockingQueue<List<IMessage>> blockingQueue, MessageHandler messageHandler) {
            this.queue = blockingQueue;
            this.handler = messageHandler;
        }
    }

    public ThreadPooledClient() throws SocketException {
        this("", 8849, CORES);
    }

    public ThreadPooledClient(int i, int i2) throws SocketException {
        this("", i, i2);
    }

    /* JADX WARN: Type inference failed for: r0v5, types: [com.xxdb.streaming.client.ThreadPooledClient$1] */
    public ThreadPooledClient(String str, int i, int i2) throws SocketException {
        super(str, i);
        this.users = new HashMap<>();
        this.lock = new Object();
        this.queueHandlers = new HashMap<>();
        this.threadPool = Executors.newFixedThreadPool(i2);
        new Thread() { // from class: com.xxdb.streaming.client.ThreadPooledClient.1
            private LinkedList<IMessage> backlog = new LinkedList<>();

            private boolean fillBacklog() {
                boolean z = false;
                synchronized (ThreadPooledClient.this.queueHandlers) {
                    Iterator it = ThreadPooledClient.this.queueHandlers.keySet().iterator();
                    while (it.hasNext()) {
                        List list = (List) ((QueueHandlerBinder) ThreadPooledClient.this.queueHandlers.get((String) it.next())).queue.poll();
                        if (list != null) {
                            this.backlog.addAll(list);
                            z = true;
                        }
                    }
                }
                return z;
            }

            private void refill() {
                int i3 = 200;
                while (true) {
                    int i4 = i3;
                    if (fillBacklog()) {
                        return;
                    }
                    if (i4 <= 100) {
                        if (i4 > 0) {
                            Thread.yield();
                        } else {
                            Thread.yield();
                        }
                    }
                    i3 = i4 - 1;
                }
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                QueueHandlerBinder queueHandlerBinder;
                while (true) {
                    IMessage poll = this.backlog.poll();
                    if (poll != null) {
                        synchronized (ThreadPooledClient.this.queueHandlers) {
                            queueHandlerBinder = (QueueHandlerBinder) ThreadPooledClient.this.queueHandlers.get(poll.getTopic());
                        }
                        ThreadPooledClient.this.threadPool.execute(new HandlerRunner(queueHandlerBinder.handler, poll));
                    } else {
                        refill();
                    }
                }
            }
        }.start();
    }

    @Override // com.xxdb.streaming.client.AbstractClient
    protected boolean doReconnect(AbstractClient.Site site) {
        this.threadPool.shutdownNow();
        try {
            Thread.sleep(1000L);
            subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord);
            System.out.println("Successfully reconnected and subscribed " + site.host + ":" + site.port + "/" + site.tableName + site.actionName);
            return true;
        } catch (Exception e) {
            System.out.println("Unable to subscribe table. Will try again after 1 seconds.");
            e.printStackTrace();
            return false;
        }
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z, Vector vector, StreamDeserializer streamDeserializer, boolean z2, String str4, String str5) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, j, z, vector, streamDeserializer, z2, str4, str5, false);
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z, Vector vector, StreamDeserializer streamDeserializer, boolean z2, String str4, String str5, boolean z3) throws IOException {
        BlockingQueue<List<IMessage>> subscribeInternal = subscribeInternal(str, i, str2, str3, messageHandler, j, z, vector, streamDeserializer, z2, str4, str5, z3);
        String str6 = str + ":" + i + "/" + str2 + "/" + str3;
        List<String> asList = Arrays.asList(str4, str5);
        synchronized (this.queueHandlers) {
            this.queueHandlers.put(this.tableNameToTrueTopic.get(str6), new QueueHandlerBinder(subscribeInternal, messageHandler));
            this.users.put(str6, asList);
        }
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z, Vector vector, boolean z2, String str4, String str5) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, j, z, vector, null, z2, str4, str5);
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z, Vector vector, StreamDeserializer streamDeserializer, boolean z2) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, j, z, vector, streamDeserializer, z2, "", "");
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z, Vector vector, boolean z2) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, j, z, vector, null, z2);
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z, Vector vector, StreamDeserializer streamDeserializer) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, j, z, vector, streamDeserializer, false);
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, boolean z) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, j, z, null, null, false);
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, long j, Vector vector) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, j, false, vector, null, false);
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, long j) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, j, false);
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, -1L);
    }

    public void subscribe(String str, int i, String str2, String str3, MessageHandler messageHandler, boolean z) throws IOException {
        subscribe(str, i, str2, str3, messageHandler, -1L, z);
    }

    public void subscribe(String str, int i, String str2, MessageHandler messageHandler) throws IOException {
        subscribe(str, i, str2, "javaStreamingApi", messageHandler, -1L);
    }

    public void subscribe(String str, int i, String str2, MessageHandler messageHandler, boolean z) throws IOException {
        subscribe(str, i, str2, "javaStreamingApi", messageHandler, -1L, z);
    }

    public void subscribe(String str, int i, String str2, MessageHandler messageHandler, long j) throws IOException {
        subscribe(str, i, str2, "javaStreamingApi", messageHandler, j);
    }

    public void subscribe(String str, int i, String str2, MessageHandler messageHandler, long j, boolean z) throws IOException {
        subscribe(str, i, str2, "javaStreamingApi", messageHandler, j, z);
    }

    public void unsubscribe(String str, int i, String str2, String str3) throws IOException {
        unsubscribeInternal(str, i, str2, str3);
    }

    public void unsubscribe(String str, int i, String str2) throws IOException {
        unsubscribeInternal(str, i, str2);
    }

    @Override // com.xxdb.streaming.client.AbstractClient
    protected void unsubscribeInternal(String str, int i, String str2, String str3) throws IOException {
        String str4;
        DBConnection dBConnection = new DBConnection();
        String str5 = str + ":" + i + "/" + str2 + "/" + str3;
        List<String> list = this.users.get(str5);
        String str6 = list.get(0);
        String str7 = list.get(1);
        if (str6.equals("")) {
            dBConnection.connect(str, i);
        } else {
            dBConnection.connect(str, i, str6, str7);
        }
        try {
            try {
                String str8 = this.listeningHost;
                if (str8.equals("")) {
                    str8 = dBConnection.getLocalAddress().getHostAddress();
                }
                ArrayList arrayList = new ArrayList();
                arrayList.add(new BasicString(str8));
                arrayList.add(new BasicInt(this.listeningPort));
                arrayList.add(new BasicString(str2));
                arrayList.add(new BasicString(str3));
                dBConnection.run("stopPublishTable", arrayList);
                synchronized (this.tableNameToTrueTopic) {
                    str4 = this.tableNameToTrueTopic.get(str5);
                }
                synchronized (this.trueTopicToSites) {
                    AbstractClient.Site[] siteArr = this.trueTopicToSites.get(str4);
                    if (siteArr == null || siteArr.length == 0) {
                    }
                    for (AbstractClient.Site site : siteArr) {
                        site.closed = true;
                    }
                }
                synchronized (this.queueManager) {
                    this.queueManager.removeQueue(str4);
                }
                System.out.println("Successfully unsubscribed table " + str5);
                dBConnection.close();
                String str9 = str + ":" + i + "/" + str2 + "/" + str3;
                synchronized (this.queueHandlers) {
                    this.queueHandlers.get(str9);
                    this.queueHandlers.remove(str9);
                }
            } catch (Exception e) {
                throw e;
            }
        } catch (Throwable th) {
            dBConnection.close();
            String str10 = str + ":" + i + "/" + str2 + "/" + str3;
            synchronized (this.queueHandlers) {
                this.queueHandlers.get(str10);
                this.queueHandlers.remove(str10);
                throw th;
            }
        }
    }

    @Override // com.xxdb.streaming.client.AbstractClient
    public void close() {
        synchronized (this.queueHandlers) {
            this.queueHandlers = null;
        }
        this.threadPool.shutdownNow();
        if (this.pThread != null) {
            this.pThread.interrupt();
        }
        this.isClose_ = true;
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ ConcurrentHashMap getTopicToSites() {
        return super.getTopicToSites();
    }

    @Override // com.xxdb.streaming.client.AbstractClient
    public /* bridge */ /* synthetic */ boolean isClose() {
        return super.isClose();
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ boolean isClosed(String str) {
        return super.isClosed(str);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ boolean isRemoteLittleEndian(String str) {
        return super.isRemoteLittleEndian(str);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ void batchDispatch(List list) {
        super.batchDispatch(list);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ void dispatch(IMessage iMessage) {
        super.dispatch(iMessage);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ void activeCloseConnection(AbstractClient.Site site) {
        super.activeCloseConnection(site);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ boolean tryReconnect(String str) {
        return super.tryReconnect(str);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ void setMsgId(String str, long j) {
        super.setMsgId(str, j);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ Map getSubInfos() {
        return super.getSubInfos();
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ AbstractClient.Site getSiteByName(String str) {
        return super.getSiteByName(str);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ List getAllReconnectSites() {
        return super.getAllReconnectSites();
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ Set getAllReconnectTopic() {
        return super.getAllReconnectTopic();
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ List getAllTopicsBySite(String str) {
        return super.getAllTopicsBySite(str);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ void setReconnectTimestamp(String str, long j) {
        super.setReconnectTimestamp(str, j);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ long getReconnectTimestamp(String str) {
        return super.getReconnectTimestamp(str);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ int getNeedReconnect(String str) {
        return super.getNeedReconnect(str);
    }

    @Override // com.xxdb.streaming.client.AbstractClient, com.xxdb.streaming.client.MessageDispatcher
    public /* bridge */ /* synthetic */ void setNeedReconnect(String str, int i) {
        super.setNeedReconnect(str, i);
    }
}
