package com.alibaba.blink.store.client;

import com.alibaba.blink.store.client.rpc.request.ScanRecordRequest;
import com.alibaba.blink.store.core.InterfaceAudience;
import com.alibaba.blink.store.core.meta.IndexColumn;
import com.alibaba.blink.store.core.meta.ShardLocation;
import com.alibaba.blink.store.core.metric.MetricRegistry;
import com.alibaba.blink.store.core.rpc.Environment;
import com.alibaba.blink.store.core.rpc.RpcException;
import com.alibaba.blink.store.core.util.MetricHelper;
import com.alibaba.niagara.client.table.ServiceContractMsg;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.store.client.com.google.common.base.Preconditions;
import shaded.store.client.com.google.common.collect.Lists;
import shaded.store.client.com.google.common.collect.Sets;
import shaded.store.client.com.google.common.util.concurrent.ThreadFactoryBuilder;

@InterfaceAudience.Internal
/* loaded from: input_file:com/alibaba/blink/store/client/AsyncClientScanner.class */
public class AsyncClientScanner implements ResultScanner {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AsyncClientScanner.class);
    private static final List<ScanResult> STOP_OBJECT = Collections.emptyList();
    private final BlockingQueue<List<ScanResult>> cache;
    private final String storeName;
    private final Table table;
    private final QueryOptions queryOptions;
    private final int batchSize;
    private ScanOptions scan;
    private List<String> columnProjection;
    private static final String SCAN_BATCH_SIZE = "scan.batch.size";
    private static final String SCAN_BATCH_LATENCY = "scan.batch.latency";
    private List<ScanResult> currentReadBuffer = null;
    private int currentReadBufferIndex = 0;
    private AtomicBoolean closed = new AtomicBoolean(false);
    private boolean finishScan = false;
    private AtomicReference<Throwable> asyncException = new AtomicReference<>();
    private ShardLocation currentShard = null;
    private List<Integer> pkIndices = Lists.newArrayList();
    private List<Integer> scanColumnIndices = Lists.newArrayList();
    private int reportResultBatchSize = 0;
    private long reportBatchLatency = 0;
    private ExecutorService executor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("ClientScanner-pool").setDaemon(true).build());

    public AsyncClientScanner(Table table, ScanOptions scanOptions) throws RpcException {
        this.columnProjection = Lists.newArrayList();
        this.scan = scanOptions;
        this.table = table;
        try {
            MetricRegistry metricRegistry = Environment.getMetricRegistry(table.getSsProxyService().getEnvironment().getConf());
            metricRegistry.gauge(MetricHelper.formatMetricName(SCAN_BATCH_SIZE, new String[0]), this::reportBatchSize);
            metricRegistry.gauge(MetricHelper.formatMetricName(SCAN_BATCH_LATENCY, new String[0]), this::reportBatchLatency);
            Preconditions.checkArgument(checkAndReorderScanPkCells(scanOptions.getStartKey()), "StartKey key must be sorted by pk");
            Preconditions.checkArgument(checkAndReorderScanPkCells(scanOptions.getStopKey()), "StopKey key must be sorted by pk");
            Preconditions.checkArgument(scanOptions.getCachePreFetchCount() > 0, "cache pre-fetch count is not set");
            this.columnProjection.addAll(scanOptions.getColumns());
            if (null == this.columnProjection || this.columnProjection.isEmpty()) {
                this.columnProjection = (List) table.getSchema().getColumns().stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList());
                scanOptions.addColumns(this.columnProjection);
            } else {
                HashSet newHashSet = Sets.newHashSet();
                newHashSet.addAll(this.columnProjection);
                newHashSet.addAll((List) table.getSchema().getPkColumns().stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList()));
                this.columnProjection = new ArrayList(newHashSet);
            }
            Iterator<IndexColumn> it = table.getSchema().getPkColumns().iterator();
            while (it.hasNext()) {
                this.pkIndices.add(Integer.valueOf(this.columnProjection.indexOf(it.next().getName())));
            }
            Iterator<String> it2 = scanOptions.getColumns().iterator();
            while (it2.hasNext()) {
                this.scanColumnIndices.add(Integer.valueOf(this.columnProjection.indexOf(it2.next())));
            }
            this.queryOptions = new QueryOptions(this.columnProjection, scanOptions.getBatch());
            this.batchSize = scanOptions.getBatch();
            this.storeName = table.getSchema().getStoreName();
            this.cache = new LinkedBlockingQueue(scanOptions.getCachePreFetchCount());
            LOG.info("Scan store=" + table.getSchema().getStoreName() + ", table=" + table.getTableName() + ", startKey=" + scanOptions.getStartKey() + ", stopKey=" + scanOptions.getStopKey() + ", startShard=" + scanOptions.getStartShardValue() + ", stopShard=" + scanOptions.getStopShardValue() + ", keyPrefix=" + scanOptions.getKeyPrefix() + ", filterType=" + scanOptions.getFilterType() + ", batchSize=" + this.queryOptions.getBatchSize() + ", cachePreFetchCount=" + scanOptions.getCachePreFetchCount() + ", ScanIsolationLevel=" + scanOptions.getIsolationLevel() + ", scanColumns=" + scanOptions.getColumns() + ", columnProjection=" + this.columnProjection + ", pkIndices=" + this.pkIndices + ", scanColumnIndices=" + this.scanColumnIndices);
            ServiceContractMsg.Scanner buildScanner = null == scanOptions.getScannerPk() ? null : ScanRecordRequest.buildScanner(table, scanOptions.getScannerPk(), scanOptions, this.queryOptions);
            LOG.info("Init scan currentScanner = " + buildScanner);
            initializeScannerInConstruction(buildScanner);
        } catch (IOException e) {
            throw new RpcException((Exception) e);
        }
    }

    private boolean checkAndReorderScanPkCells(PkCells pkCells) {
        if (null == pkCells) {
            return true;
        }
        List<IndexColumn> pkColumns = this.table.getSchema().getPkColumns();
        if (pkCells.getCells().size() > pkColumns.size()) {
            return false;
        }
        List list = (List) pkCells.getColumns().stream().map((v0) -> {
            return v0.getColumnId();
        }).collect(Collectors.toList());
        for (int i = 0; i < list.size(); i++) {
            if (((Integer) list.get(i)).intValue() != pkColumns.get(i).getColumnId()) {
                return false;
            }
        }
        return true;
    }

    private void initializeScannerInConstruction(ServiceContractMsg.Scanner scanner) throws RpcException {
        nextScanner();
        doLoadCache(scanner);
    }

    protected boolean nextScanner() throws RpcException {
        int startShardValue;
        if (this.currentShard != null) {
            int endShardValue = this.currentShard.getEndShardValue();
            if (endShardValue == 65536 || checkScanStopRow(endShardValue)) {
                setClosed(null);
                if (!LOG.isDebugEnabled()) {
                    return false;
                }
                LOG.debug("Finished table scan, last shard " + this.currentShard);
                return false;
            }
            startShardValue = endShardValue;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Finished " + this.currentShard);
            }
        } else {
            startShardValue = this.scan.getStartShardValue();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Advancing internal scanner to localStartShardValue at '" + startShardValue + "'");
        }
        try {
            this.currentShard = this.table.getSsProxyService().getShardLocator().locateShardAndRefreshIfEmpty(this.table.getSchema().getTableGroupName(), startShardValue);
            return true;
        } catch (RpcException e) {
            setClosed(e);
            LOG.error("Open scanner Transaction error: storeName=" + this.storeName + ", tableName=" + this.table.getTableName() + ", localStartShardValue=" + startShardValue + ", msg=" + e.getMessage());
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setClosed(Throwable th) {
        this.closed.set(true);
        this.asyncException.compareAndSet(null, th);
    }

    private boolean checkScanStopRow(int i) {
        return this.scan.getStopShardValue() != 65536 && Integer.compare(this.scan.getStopShardValue(), i) <= 0;
    }

    private void doLoadCache(final ServiceContractMsg.Scanner scanner) {
        this.executor.submit(new Runnable() { // from class: com.alibaba.blink.store.client.AsyncClientScanner.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AsyncClientScanner.this.loadCache(AsyncClientScanner.this.scan.getCachePreFetchCount(), scanner);
                } catch (Throwable th) {
                    AsyncClientScanner.LOG.error("Load cache with batch size = " + AsyncClientScanner.this.batchSize + " failed.", th);
                    AsyncClientScanner.this.setClosed(th);
                }
            }
        });
    }

    public int reportBatchSize() {
        return this.reportResultBatchSize;
    }

    public long reportBatchLatency() {
        return this.reportBatchLatency;
    }

    protected void loadCache(int i, ServiceContractMsg.Scanner scanner) throws RpcException {
        if (this.closed.get()) {
            this.cache.add(STOP_OBJECT);
        } else {
            long currentTimeMillis = System.currentTimeMillis();
            new ScanRecordRequest(this.table, this.table.getSsProxyService(), this.scan.getStartKey(), this.scan, this.queryOptions, this.currentShard.getShardId(), scanner).asyncHandle().whenCompleteAsync((rowSet, th) -> {
                this.reportBatchLatency = System.currentTimeMillis() - currentTimeMillis;
                if (null != th) {
                    LOG.error("Load cache with batch size = " + this.batchSize + " failed.", th);
                    if (th instanceof RpcException) {
                        setClosed(th);
                    } else {
                        setClosed(new RpcException(th.getMessage()));
                    }
                    this.cache.add(STOP_OBJECT);
                    return;
                }
                ServiceContractMsg.Scanner scanner2 = null;
                if (null != rowSet) {
                    try {
                        List<ScanResult> list = (List) rowSet.getRowSet().getRowsList().stream().map(rowData -> {
                            return new ScanResult(rowData.getDataList(), this.columnProjection, this.scanColumnIndices, this.pkIndices, this.table);
                        }).collect(Collectors.toList());
                        this.reportResultBatchSize = list.size();
                        try {
                            this.cache.put(list);
                        } catch (InterruptedException e) {
                            LOG.error("put results to cache failed", (Throwable) e);
                            setClosed(new RpcException(e.getMessage()));
                        }
                        if (rowSet.getRowSet().hasNext()) {
                            scanner2 = rowSet.getRowSet().getNext().getScanner();
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Load cache size = " + i + ", return cache size = " + list.size());
                        }
                    } catch (Throwable th) {
                        LOG.error("Prepare result failed: " + th.getMessage());
                        setClosed(th);
                        this.cache.add(STOP_OBJECT);
                        return;
                    }
                } else {
                    scanner2 = null;
                    try {
                        this.cache.put(STOP_OBJECT);
                    } catch (InterruptedException e2) {
                        setClosed(new RpcException(e2.getMessage()));
                    }
                }
                if (scanner2 == null) {
                    try {
                        if (!nextScanner()) {
                            setClosed(null);
                        }
                    } catch (RpcException e3) {
                        LOG.error("nextScanner failed", (Throwable) e3);
                        setClosed(e3);
                    }
                }
                if (this.closed.get()) {
                    LOG.info("Scan finished");
                } else {
                    doLoadCache(scanner2);
                }
            }, (Executor) this.executor);
        }
    }

    @Override // com.alibaba.blink.store.client.ResultScanner
    public ScanResult next() throws RpcException {
        if (this.finishScan) {
            return null;
        }
        while (true) {
            if (this.currentReadBuffer != null && this.currentReadBufferIndex != this.currentReadBuffer.size()) {
                ScanResult scanResult = this.currentReadBuffer.get(this.currentReadBufferIndex);
                this.currentReadBufferIndex++;
                return scanResult;
            }
            if (this.asyncException.get() != null) {
                throw new RpcException(this.asyncException.get().getMessage());
            }
            if (this.closed.get() && this.cache.isEmpty()) {
                this.finishScan = true;
                return null;
            }
            try {
                this.currentReadBuffer = this.cache.poll(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOG.warn("Waiting for cache failed.", (Throwable) e);
            }
            if (this.currentReadBuffer != null) {
                this.currentReadBufferIndex = 0;
            } else {
                LOG.warn("Still no result in cache.");
            }
        }
    }

    @Override // com.alibaba.blink.store.client.ResultScanner
    public List<ScanResult> next(int i) throws RpcException {
        ScanResult next;
        Preconditions.checkArgument(i > 0, "numRows must be positive integer");
        ArrayList arrayList = new ArrayList(Math.min(i, 1024));
        for (int i2 = 0; i2 < i && (next = next()) != null; i2++) {
            arrayList.add(next);
        }
        return arrayList;
    }

    @Override // com.alibaba.blink.store.client.ResultScanner, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        LOG.info("shutting down...");
        setClosed(null);
        this.cache.clear();
        this.executor.shutdown();
        LOG.info("shutdown successfully");
    }

    @Override // java.lang.Iterable
    public Iterator<ScanResult> iterator() {
        return new Iterator<ScanResult>() { // from class: com.alibaba.blink.store.client.AsyncClientScanner.2
            ScanResult next = null;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.next != null) {
                    return true;
                }
                try {
                    this.next = AsyncClientScanner.this.next();
                    return this.next != null;
                } catch (RpcException e) {
                    throw new RuntimeException(e);
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public ScanResult next() {
                if (!hasNext()) {
                    return null;
                }
                ScanResult scanResult = this.next;
                this.next = null;
                return scanResult;
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}
