package com.alibaba.blink.store.core.rpc;

import com.alibaba.blink.store.core.ActorLocation;
import com.alibaba.blink.store.core.InterfaceAudience;
import com.alibaba.blink.store.core.ServerName;
import com.alibaba.blink.store.core.configuration.Configuration;
import com.alibaba.blink.store.core.metric.MetricRegistry;
import com.alibaba.blink.store.core.util.MetricHelper;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.store.client.com.alibaba.erpc.EasyRPCChannel;
import shaded.store.client.com.alibaba.erpc.EasyRPCClient;
import shaded.store.client.com.google.common.collect.Maps;

@InterfaceAudience.Internal
/* loaded from: input_file:com/alibaba/blink/store/core/rpc/RpcChannelPoolImpl.class */
public class RpcChannelPoolImpl implements RpcChannelPool {
    private static Logger LOG = LoggerFactory.getLogger((Class<?>) RpcChannelPoolImpl.class);
    private final int channelThreadNum;
    private final long evictionIdleChannelMs;
    private MetricRegistry metricRegistry;
    private static final String RPC_CHANNEL_COUNT = "rpc.channel.count";
    private final int rpcTimeoutMs;
    private volatile ServerName proxyServer;
    private final boolean useRpcProxy;
    private final Map<ServerName, Object> channelLocks = Maps.newConcurrentMap();
    private final Map<ServerName, ReferenceCountingChannel> channelInstances = Maps.newConcurrentMap();
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final Timer timer = new Timer();
    private EasyRPCClient client = new EasyRPCClient();

    public RpcChannelPoolImpl(Configuration configuration, MetricRegistry metricRegistry) {
        this.client.start();
        this.channelThreadNum = configuration.getInteger(RpcConstants.STORE_RPC_CHANNEL_THREADNUM_KEY, 10);
        this.evictionIdleChannelMs = configuration.getLong(RpcConstants.STORE_EVICTION_IDLECHANNEL_TIME_MILLIS, RpcConstants.DEFAULT_STORE_EVICTION_IDLECHANNEL_TIME_MILLIS);
        this.useRpcProxy = configuration.getBoolean(RpcConstants.STORE_RPC_CHANNEL_USE_PROXY, true);
        long j = configuration.getLong(RpcConstants.STORE_TIME_BETWEEN_EVICTION_RUNS_MILLIS, RpcConstants.DEFAULT_STORE_TIME_BETWEEN_EVICTION_RUNS_MILLIS);
        this.rpcTimeoutMs = configuration.getInteger(RpcConstants.STORE_RPC_TIMEOUT_KEY, 60000);
        this.metricRegistry = metricRegistry;
        this.timer.schedule(new TimerTask() { // from class: com.alibaba.blink.store.core.rpc.RpcChannelPoolImpl.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                RpcChannelPoolImpl.this.evictIdleChannel();
            }
        }, 0L, j);
        this.metricRegistry.gauge(MetricHelper.formatMetricName(RPC_CHANNEL_COUNT, new String[0]), this::reportChannnelSize);
        if (this.useRpcProxy) {
            return;
        }
        LOG.info("Not using hologres rpc proxy.");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void evictIdleChannel() {
        for (ServerName serverName : this.channelInstances.keySet()) {
            Object obj = this.channelLocks.get(serverName);
            if (obj != null) {
                synchronized (obj) {
                    ReferenceCountingChannel referenceCountingChannel = this.channelInstances.get(serverName);
                    if (null != referenceCountingChannel && referenceCountingChannel.checkIdleChannel()) {
                        LOG.info("remove idle channel: {}", serverName.toString());
                        this.channelInstances.remove(serverName);
                        closeChannelInstances(referenceCountingChannel);
                    }
                }
            }
        }
    }

    public int reportChannnelSize() {
        return this.channelInstances.size();
    }

    @Override // com.alibaba.blink.store.core.rpc.RpcChannelPool
    public EasyRPCChannel getRpcChannel(ServerName serverName) throws IOException {
        if (this.isShutdown.get()) {
            throw new IOException("RpcChannelPool is shutting down, abort get channel to server: " + serverName);
        }
        if (null == this.channelLocks.get(serverName)) {
            this.channelLocks.putIfAbsent(serverName, new Object());
        }
        synchronized (this.channelLocks.get(serverName)) {
            ReferenceCountingChannel referenceCountingChannel = this.channelInstances.get(serverName);
            if (null == referenceCountingChannel || !referenceCountingChannel.getChannel().getChannel().isActive()) {
                if (this.isShutdown.get()) {
                    throw new IOException("RpcChannelPool is shutting down, abort create channel into server: " + serverName);
                }
                if (referenceCountingChannel != null) {
                    this.channelInstances.remove(serverName);
                    delayCloseChannelInstances(referenceCountingChannel);
                }
                try {
                    EasyRPCChannel openChannel = (getProxyServer() == null || !this.useRpcProxy) ? this.client.openChannel(new InetSocketAddress(serverName.getHostname(), serverName.getPort()), this.channelThreadNum) : this.client.openChannel(new InetSocketAddress(this.proxyServer.getHostname(), this.proxyServer.getPort()), this.channelThreadNum * 2);
                    if (openChannel == null) {
                        throw new NoRouteToHostException("failed to create channel into server: " + serverName);
                    }
                    referenceCountingChannel = new ReferenceCountingChannel(serverName, openChannel, this.evictionIdleChannelMs);
                    this.channelInstances.put(serverName, referenceCountingChannel);
                } catch (NoRouteToHostException e) {
                    LOG.error("getRpcChannel {} from pool failed", serverName.toString());
                    throw e;
                } catch (Exception e2) {
                    LOG.error("getRpcChannel {} from pool other failed", serverName.toString());
                    throw new IOException(e2);
                }
            }
            referenceCountingChannel.incrCount();
            if (!(serverName instanceof ActorLocation)) {
                return referenceCountingChannel.getChannel();
            }
            if (this.proxyServer == null || !this.useRpcProxy) {
                return new ActorChannel((ActorLocation) serverName, referenceCountingChannel.getChannel());
            }
            return new RpcProxyChannel((ActorLocation) serverName, referenceCountingChannel.getChannel());
        }
    }

    @Override // com.alibaba.blink.store.core.rpc.RpcChannelPool
    public void releaseConnection(ServerName serverName, EasyRPCChannel easyRPCChannel) {
        if (null == serverName || null == easyRPCChannel) {
            LOG.warn("releaseConnection failed, sn or channel is null");
            return;
        }
        ReferenceCountingChannel referenceCountingChannel = this.channelInstances.get(serverName);
        if (null != referenceCountingChannel) {
            referenceCountingChannel.decrCount();
        } else {
            LOG.warn("releaseConnection {} channel is null", serverName.toString());
            easyRPCChannel.getChannel().close();
        }
    }

    @Override // com.alibaba.blink.store.core.rpc.RpcChannelPool, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        LOG.warn("RpcChannelPool is shutting down");
        this.isShutdown.set(true);
        for (ReferenceCountingChannel referenceCountingChannel : this.channelInstances.values()) {
            referenceCountingChannel.decrCount();
            referenceCountingChannel.getChannel().getChannel().close();
        }
        this.channelInstances.clear();
        new Thread(() -> {
            this.client.stop();
        }).start();
        this.timer.cancel();
        this.channelLocks.clear();
        LOG.warn("RpcChannelPool shutdown successfully");
    }

    @Override // com.alibaba.blink.store.core.rpc.RpcChannelPool
    public ServerName getProxyServer() {
        return this.proxyServer;
    }

    @Override // com.alibaba.blink.store.core.rpc.RpcChannelPool
    public void setProxyServer(ServerName serverName) {
        this.proxyServer = serverName;
    }

    private void delayCloseChannelInstances(final ReferenceCountingChannel referenceCountingChannel) {
        if (null == referenceCountingChannel) {
            return;
        }
        if (referenceCountingChannel.isZeroReference()) {
            closeChannelInstances(referenceCountingChannel);
        } else {
            this.timer.schedule(new TimerTask() { // from class: com.alibaba.blink.store.core.rpc.RpcChannelPoolImpl.2
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    RpcChannelPoolImpl.this.closeChannelInstances(referenceCountingChannel);
                }
            }, this.rpcTimeoutMs);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeChannelInstances(ReferenceCountingChannel referenceCountingChannel) {
        if (null != referenceCountingChannel) {
            try {
                referenceCountingChannel.getChannel().getChannel().close();
                LOG.info("close channel instances sn={}", referenceCountingChannel.getSn());
            } catch (Throwable th) {
                LOG.error("close channel instance error sn={} errorMsg={}", referenceCountingChannel.getSn(), th.getMessage());
            }
        }
    }
}
