package com.alibaba.ververica.connectors.hologres.binlog.source.reader;

import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs;
import com.alibaba.ververica.connectors.hologres.binlog.HolohubClientProvider;
import com.alibaba.ververica.connectors.hologres.binlog.RecordConverter;
import com.alibaba.ververica.connectors.hologres.binlog.source.split.HologresBinlogInputSplit;
import com.alibaba.ververica.connectors.hologres.config.JDBCOptions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.aliyun.datahub.client.model.RecordEntry;
import shaded.com.aliyun.datahub.client.model.TupleRecordData;
import shaded.com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import shaded.com.aliyun.datahub.clientlibrary.consumer.ShardGroupReader;
import shaded.com.aliyun.datahub.clientlibrary.models.Offset;
import shaded.com.aliyun.datahub.exception.DatahubClientException;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/binlog/source/reader/HologresBinlogShardSplitReader.class */
public class HologresBinlogShardSplitReader<T> implements SplitReader<Tuple3<T, Long, Long>, HologresBinlogInputSplit> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HologresBinlogShardSplitReader.class);
    public static final int DEFAULT_FETCH_LATEST_DELAY = 500;
    private final RecordConverter<T> recordConverter;
    private final int subtaskId;
    private final Configuration configuration;
    private final JDBCOptions jdbcOptions;
    private ShardGroupReader shardGroupReader;
    protected int internalFetchSize;
    protected int requestTimeout;
    protected int retryInterval;
    protected int retryTimesLimit;
    private DoubleGauge latencyGauge;
    private Counter exceptionCounter;
    private Counter readCounter;
    private Counter readNullCounter;
    private volatile boolean wakeup = false;
    protected int fetchLatestDelay = 500;
    protected Map<String, Long> shardIdToMaxLsn = new HashMap();

    /* loaded from: input_file:com/alibaba/ververica/connectors/hologres/binlog/source/reader/HologresBinlogShardSplitReader$DoubleGauge.class */
    public static class DoubleGauge implements Gauge<Double> {
        private double value;

        void report(long j) {
            this.value = j * 1.0d;
        }

        void report(double d) {
            this.value = d;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Double m1314getValue() {
            return Double.valueOf(this.value);
        }
    }

    /* loaded from: input_file:com/alibaba/ververica/connectors/hologres/binlog/source/reader/HologresBinlogShardSplitReader$HologresShardSplitRecords.class */
    private static class HologresShardSplitRecords<T> implements RecordsWithSplitIds<T> {
        private final Map<String, Collection<T>> recordsBySplits;
        private static final Set<String> FINISHED_SPLITS = new HashSet();
        private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
        private String currentSplitId;
        private Iterator<T> recordIterator;

        private HologresShardSplitRecords() {
            this.recordsBySplits = new HashMap();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Collection<T> recordsForSplit(String str) {
            return this.recordsBySplits.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void prepareForRead() {
            this.splitIterator = this.recordsBySplits.entrySet().iterator();
        }

        @Nullable
        public String nextSplit() {
            if (!this.splitIterator.hasNext()) {
                this.currentSplitId = null;
                this.recordIterator = null;
                return null;
            }
            Map.Entry<String, Collection<T>> next = this.splitIterator.next();
            this.currentSplitId = next.getKey();
            this.recordIterator = next.getValue().iterator();
            return this.currentSplitId;
        }

        @Nullable
        public T nextRecordFromSplit() {
            Preconditions.checkNotNull(this.currentSplitId, "Make sure nextSplit() did not return null before iterate over the records split.");
            if (this.recordIterator.hasNext()) {
                return this.recordIterator.next();
            }
            return null;
        }

        public Set<String> finishedSplits() {
            return FINISHED_SPLITS;
        }
    }

    public HologresBinlogShardSplitReader(RecordConverter<T> recordConverter, Configuration configuration, JDBCOptions jDBCOptions, SourceReaderContext sourceReaderContext) {
        this.recordConverter = recordConverter;
        this.configuration = configuration;
        this.jdbcOptions = jDBCOptions;
        this.subtaskId = sourceReaderContext.getIndexOfSubtask();
        this.requestTimeout = configuration.getInteger(HologresBinlogConfigs.BINLOG_REQUEST_TIMEOUT);
        this.retryTimesLimit = configuration.getInteger(HologresBinlogConfigs.BINLOG_MAX_RETRY_TIMES);
        this.retryInterval = configuration.getInteger(HologresBinlogConfigs.BINLOG_RETRY_INTERVAL_MS);
        this.internalFetchSize = configuration.getInteger(HologresBinlogConfigs.BINLOG_BATCH_READ_SIZE);
        this.latencyGauge = (DoubleGauge) sourceReaderContext.metricGroup().addGroup("input").addGroup("hologres").gauge("readLatency", new DoubleGauge());
        this.exceptionCounter = sourceReaderContext.metricGroup().addGroup("input").addGroup("hologres").counter("exceptionCounter", new SimpleCounter());
        this.readCounter = sourceReaderContext.metricGroup().addGroup("input").addGroup("hologres").counter("readCounter", new SimpleCounter());
        this.readNullCounter = sourceReaderContext.metricGroup().addGroup("input").addGroup("hologres").counter("nullCounter", new SimpleCounter());
    }

    public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
        HologresShardSplitRecords hologresShardSplitRecords = new HologresShardSplitRecords();
        int i = 0;
        int i2 = 0;
        boolean z = false;
        DatahubClientException datahubClientException = null;
        while (i < this.internalFetchSize && !this.wakeup) {
            long nanoTime = z ? 0L : System.nanoTime();
            RecordEntry read = this.shardGroupReader.read();
            long nanoTime2 = z ? 0L : System.nanoTime() - nanoTime;
            this.readCounter.inc();
            if (read == null) {
                try {
                    this.readNullCounter.inc();
                    if (z) {
                        break;
                    }
                } catch (DatahubClientException e) {
                    this.exceptionCounter.inc();
                    LOG.warn("Failed to read record", (Throwable) e);
                    datahubClientException = e;
                    try {
                        Thread.sleep(this.retryInterval);
                        i2++;
                    } catch (InterruptedException e2) {
                    }
                } catch (Exception e3) {
                    LOG.warn("Failed to read record", (Throwable) e3);
                    throw new RuntimeException(e3);
                }
            } else {
                if (nanoTime2 > 0) {
                    this.latencyGauge.report(nanoTime2 / 1000);
                }
                TupleRecordData tupleRecordData = (TupleRecordData) read.getRecordData();
                Long l = (Long) tupleRecordData.getField(0);
                if (!this.shardIdToMaxLsn.containsKey(read.getShardId())) {
                    this.shardIdToMaxLsn.put(read.getShardId(), l);
                } else if (l.longValue() > this.shardIdToMaxLsn.get(read.getShardId()).longValue()) {
                    this.shardIdToMaxLsn.put(read.getShardId(), l);
                }
                z = true;
                i++;
                hologresShardSplitRecords.recordsForSplit(read.getShardId()).add(new Tuple3(this.recordConverter.convert(tupleRecordData), Long.valueOf(read.getSequence()), Long.valueOf(read.getSystemTime())));
                if (datahubClientException != null && i2 > this.retryTimesLimit) {
                    throw new IOException("Failed to read hologres binlog after retries: ", datahubClientException);
                }
            }
        }
        this.wakeup = false;
        hologresShardSplitRecords.prepareForRead();
        return hologresShardSplitRecords;
    }

    public void handleSplitsChanges(SplitsChange<HologresBinlogInputSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        LOG.info(String.format("Subtask %s got splits %s", Integer.valueOf(this.subtaskId), splitsChange));
        HashMap hashMap = new HashMap();
        for (HologresBinlogInputSplit hologresBinlogInputSplit : splitsChange.splits()) {
            if (hologresBinlogInputSplit.getLsn() > 0) {
                hashMap.put(hologresBinlogInputSplit.getShardId(), new Offset(hologresBinlogInputSplit.getLsn(), 0L));
            } else {
                hashMap.put(hologresBinlogInputSplit.getShardId(), new Offset(-1L, hologresBinlogInputSplit.getStartTimeInMs()));
            }
        }
        if (this.shardGroupReader == null) {
            this.shardGroupReader = new ShardGroupReader(this.jdbcOptions.getDatabase(), this.jdbcOptions.getBinlogTableName(), getConsumerConfig(splitsChange.splits().size()));
        }
        this.shardGroupReader.createShardReader(hashMap);
    }

    public void wakeUp() {
        this.wakeup = true;
    }

    public void close() throws Exception {
        if (this.shardGroupReader != null) {
            this.shardGroupReader.close();
        }
    }

    private ConsumerConfig getConsumerConfig(int i) {
        ConsumerConfig fetchLatestDelayMs = new ConsumerConfig(this.jdbcOptions.getEndpoint(), this.jdbcOptions.getUsername(), this.jdbcOptions.getPassword()).setClientProvider(HolohubClientProvider.newHolohubClientProvider(this.jdbcOptions)).setEnableBinary(false).setAutoCommit(false).setFetchSize(this.internalFetchSize).setMaxShardReaderPoolSize(i).setMaxBufferSize(this.internalFetchSize * 2).setFetchLatestDelayMs(this.fetchLatestDelay);
        fetchLatestDelayMs.getHttpConfig().setCompressType(null).setConnTimeout(this.requestTimeout).setReadTimeout(this.requestTimeout);
        return fetchLatestDelayMs;
    }
}
