package com.alibaba.ververica.connectors.hologres.rpc;

import com.alibaba.blink.store.client.BlinkStore;
import com.alibaba.blink.store.client.Cell;
import com.alibaba.blink.store.client.PkCells;
import com.alibaba.blink.store.client.QueryOptions;
import com.alibaba.blink.store.client.Table;
import com.alibaba.blink.store.core.rpc.RpcException;
import com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader;
import com.alibaba.ververica.connectors.hologres.api.HologresRecordConverter;
import com.alibaba.ververica.connectors.hologres.api.HologresTableSchema;
import com.alibaba.ververica.connectors.hologres.api.table.HologresRowDataConverter;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import com.alibaba.ververica.connectors.hologres.utils.HologresUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/rpc/HologresRpcReader.class */
public class HologresRpcReader<T> extends AbstractHologresReader<T> {
    private static final transient Logger LOG = LoggerFactory.getLogger((Class<?>) HologresRpcReader.class);
    protected transient BlinkStore blinkStore;
    protected transient AtomicReference<Table> table;
    private AtomicBoolean failing;
    private LinkedBlockingQueue<FailedInvoke> failedInvokes;
    private ScheduledExecutorService executorService;
    private transient QueryOptions queryOptions;
    private final HologresRecordConverter<T, List<Cell>> recordConverter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/ververica/connectors/hologres/rpc/HologresRpcReader$FailedInvoke.class */
    public static class FailedInvoke {
        PkCells pkCells;
        CompletableFuture resultFuture;

        FailedInvoke(PkCells pkCells, CompletableFuture completableFuture) {
            this.pkCells = pkCells;
            this.resultFuture = completableFuture;
        }
    }

    public HologresRpcReader(String[] strArr, HologresConnectionParam hologresConnectionParam, TableSchema tableSchema, HologresRecordConverter<T, List<Cell>> hologresRecordConverter) {
        super(hologresConnectionParam, tableSchema, strArr);
        this.blinkStore = null;
        this.table = null;
        this.recordConverter = hologresRecordConverter;
    }

    public static HologresRpcReader<RowData> createTableReader(HologresConnectionParam hologresConnectionParam, TableSchema tableSchema, String[] strArr, HologresTableSchema hologresTableSchema) {
        return new HologresRpcReader<>(strArr, hologresConnectionParam, tableSchema, new HologresRowDataConverter(strArr, tableSchema, hologresConnectionParam, new HologresRpcRecordWriter(), new HologresRpcRecordReader(), hologresTableSchema));
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresIOClient
    public void open(RuntimeContext runtimeContext) throws IOException {
        try {
            this.blinkStore = BlinkStore.get(HologresUtils.getConf(runtimeContext, this.param), this.param.getDatabase());
            this.table = new AtomicReference<>(this.blinkStore.getTable(this.param.getTable()));
            this.queryOptions = new QueryOptions(Arrays.asList(this.fieldNames));
            this.failing = new AtomicBoolean(false);
            this.failedInvokes = new LinkedBlockingQueue<>();
            this.executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(this.table.get().getTableName()).setDaemon(true).build());
            this.executorService.scheduleAtFixedRate(() -> {
                if (this.failedInvokes.size() > 0) {
                    LOG.info("Pending failed " + this.failedInvokes.size());
                    retryFailedIfNeeded();
                }
            }, 0L, 1L, TimeUnit.SECONDS);
            LOG.info("Successfully initiated connection to database [{}] / table[{}]", this.param.getDatabase(), this.param.getDatabase());
        } catch (Exception e) {
            LOG.error("Exception while creating connection to Hologres.", (Throwable) e);
            throw new IOException("Cannot create connection to Hologres, endpoint: " + this.param.getEndpoint() + ", database: " + this.param.getDatabase() + ", tableName: " + this.param.getTable(), e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresIOClient
    public void close() throws IOException {
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader
    public CompletableFuture<T> asyncGet(T t) throws IOException {
        retryFailedIfNeeded();
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        try {
            invoke(getPKCells(t), completableFuture);
            return completableFuture;
        } catch (InterruptedException e) {
            throw new IOException(e);
        }
    }

    private void invoke(PkCells pkCells, CompletableFuture<T> completableFuture) throws InterruptedException {
        if (this.failing.get()) {
            this.failedInvokes.put(new FailedInvoke(pkCells, completableFuture));
        } else {
            long tableId = this.table.get().getTableId();
            this.table.get().asyncGet(pkCells, this.queryOptions).handle((rowData, th) -> {
                try {
                    if (th == null) {
                        completableFuture.complete(this.recordConverter.convertTo(rowData == null ? null : rowData.getCells()));
                    } else if (shouldRetryForJoin(th)) {
                        LOG.info("Retry for invalid table id");
                        synchronized (this) {
                            if (tableId == this.table.get().getTableId() && !this.failing.get()) {
                                this.failing.set(true);
                                scheduleRenewTable(tableId);
                            }
                        }
                        this.failedInvokes.put(new FailedInvoke(pkCells, completableFuture));
                    } else {
                        completableFuture.completeExceptionally(th);
                    }
                    return null;
                } catch (Throwable th) {
                    completableFuture.completeExceptionally(th);
                    return null;
                }
            });
        }
    }

    private void scheduleRenewTable(long j) {
        this.executorService.schedule(() -> {
            try {
                this.table.set(this.blinkStore.getTable(this.table.get().getTableName()));
                LOG.info("Retrieve new table info, old id:" + j + " new id: " + this.table.get().getTableId());
                this.failing.set(false);
            } catch (RpcException e) {
                e.printStackTrace();
                scheduleRenewTable(j);
            }
        }, new Random().nextInt(5000), TimeUnit.MILLISECONDS);
    }

    private boolean shouldRetryForJoin(Throwable th) {
        if (!(th instanceof RpcException)) {
            return false;
        }
        RpcException rpcException = (RpcException) th;
        if (rpcException.getErrorCode() == 112) {
            return true;
        }
        return rpcException.getErrorCode() == 14 && rpcException.getMessage().contains("Query is cancelled");
    }

    private void retryFailedIfNeeded() {
        if (this.failedInvokes.size() <= 0 || this.failing.get()) {
            return;
        }
        this.executorService.schedule(() -> {
            ArrayList<FailedInvoke> arrayList = new ArrayList(this.failedInvokes.size());
            this.failedInvokes.drainTo(arrayList);
            for (FailedInvoke failedInvoke : arrayList) {
                try {
                    invoke(failedInvoke.pkCells, failedInvoke.resultFuture);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, 1000L, TimeUnit.MILLISECONDS);
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader
    public CompletableFuture<List<T>> asyncGetMany(T t) {
        throw new UnsupportedOperationException("Hologres RPC dimension table does not support one to many join.");
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader
    public T get(T t) throws IOException {
        try {
            return asyncGet(t).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException(e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader
    public List<T> getMany(T t) throws IOException {
        throw new UnsupportedOperationException("Hologres RPC dimension table does not support one to many join.");
    }

    private PkCells getPKCells(T t) {
        List<Cell> convertToPrimaryKey = this.recordConverter.convertToPrimaryKey(t);
        Table.PkCellsBuilder newPkCellsBuilder = this.table.get().newPkCellsBuilder();
        for (int i = 0; i < this.primaryKeys.length; i++) {
            newPkCellsBuilder.add(this.primaryKeys[i], convertToPrimaryKey.get(i));
        }
        return newPkCellsBuilder.build();
    }
}
