package com.alibaba.ververica.connectors.datahub.source;

import com.alibaba.ververica.connectors.common.errorcode.ConnectorErrors;
import com.alibaba.ververica.connectors.common.source.reader.AbstractPartitionNumsListener;
import com.alibaba.ververica.connectors.common.source.reader.Interruptible;
import com.alibaba.ververica.connectors.common.source.reader.RecordReader;
import com.alibaba.ververica.connectors.datahub.DatahubClientProvider;
import com.alibaba.ververica.connectors.datahub.DatahubUtils;
import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.http.HttpConfig;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.clientlibrary.callback.ShardReadEndCallback;
import com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import com.aliyun.datahub.clientlibrary.consumer.Consumer;
import com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer;
import com.aliyun.datahub.clientlibrary.models.Offset;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.SimpleCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/datahub/source/DatahubRecordReader.class */
public class DatahubRecordReader extends AbstractPartitionNumsListener implements RecordReader<RecordEntry, Long>, Interruptible {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DatahubRecordReader.class);
    private static final int CONSUMER_RETRY_COUNT = 0;
    public static final int DEFAULT_FETCH_SIZE = 50;
    public static final int DEFAULT_RETRY_TIMEOUT = 1800000;
    public static final int DEFAULT_RETRY_INTERVAL = 1000;
    public static final int DEFAULT_FETCH_LATEST_DELAY = 500;
    public static final int DEFAULT_REQUEST_TIMEOUT = 30000;
    public static final int PRINT_LOG_INTERVAL = 60000;
    private static final String DEFAULT_RECORD_READER_NAME = "datahub";
    protected String endpoint;
    protected String projectName;
    protected String topicName;
    protected String subId;
    protected String accessId;
    protected String accessKey;
    protected Configuration properties;
    protected Long sequence;
    protected DatahubClientProvider clientProvider;
    private long startTimeInMs;
    private long stopTimeInMs;
    protected DatahubShardInputSplit inputSplit;
    protected transient ConsumerConfig consumerConfig;
    protected transient Consumer consumer;
    protected RecordEntry currentElement;
    private transient DoubleGauge latencyGauge;
    private transient Counter exceptionCounter;
    private transient Counter readCounter;
    private transient Counter readNullCounter;
    private transient Counter readInvalidCounter;
    protected String name = DEFAULT_RECORD_READER_NAME;
    protected int requestTimeout = 30000;
    protected int maxFetchSize = 50;
    protected int maxBufferSize = 50;
    protected int retryInterval = 1000;
    protected int retryTimeout = DEFAULT_RETRY_TIMEOUT;
    protected int fetchLatestDelay = 500;
    protected long dataDeliveryDelayInMs = -1;
    protected boolean autoResetToOldest = false;
    private long lastPrintLogTime = 0;
    private volatile long currentWatermark = Long.MIN_VALUE;
    private volatile long currentMessageTimestamp = Long.MIN_VALUE;
    private long dataFetchedDelay = 0;
    private boolean readFinished = false;
    private volatile boolean interrupted = false;

    /* loaded from: input_file:com/alibaba/ververica/connectors/datahub/source/DatahubRecordReader$DoubleGauge.class */
    public static class DoubleGauge implements Gauge<Double> {
        private double value;

        void report(long j) {
            this.value = j * 1.0d;
        }

        void report(double d) {
            this.value = d;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Double m1302getValue() {
            return Double.valueOf(this.value);
        }
    }

    public DatahubRecordReader(String str, String str2, String str3, String str4, String str5, String str6, int i, long j, long j2) {
        this.endpoint = str;
        this.projectName = str2;
        this.topicName = str3;
        this.subId = str4;
        this.accessId = str5;
        this.accessKey = str6;
        this.startTimeInMs = j;
        this.stopTimeInMs = j2;
        setInitPartitionCount(i);
    }

    public DatahubRecordReader(String str, String str2, String str3, String str4, Configuration configuration, int i, long j, long j2) {
        this.endpoint = str;
        this.projectName = str2;
        this.topicName = str3;
        this.subId = str4;
        this.properties = configuration;
        this.startTimeInMs = j;
        this.stopTimeInMs = j2;
        setInitPartitionCount(i);
    }

    public DatahubRecordReader setRequestTimeout(int i) {
        this.requestTimeout = i;
        return this;
    }

    public DatahubRecordReader setMaxFetchSize(int i) {
        this.maxFetchSize = i;
        return this;
    }

    public DatahubRecordReader setMaxBufferSize(int i) {
        this.maxBufferSize = i;
        return this;
    }

    public DatahubRecordReader setRetryInterval(int i) {
        this.retryInterval = i;
        return this;
    }

    public DatahubRecordReader setRetryTimeout(int i) {
        this.retryTimeout = i;
        return this;
    }

    public DatahubRecordReader setFetchLatestDelay(int i) {
        this.fetchLatestDelay = i;
        return this;
    }

    public DatahubRecordReader setDataDeliveryDelayInMs(long j) {
        this.dataDeliveryDelayInMs = j;
        return this;
    }

    public DatahubRecordReader setAutoResetToOldest(boolean z) {
        this.autoResetToOldest = z;
        return this;
    }

    public DatahubRecordReader setClientProvider(DatahubClientProvider datahubClientProvider) {
        this.clientProvider = datahubClientProvider;
        return this;
    }

    protected DatahubClient getClient() {
        return (DatahubClient) this.clientProvider.getClient();
    }

    protected void initClientProvider() throws IOException {
        if (null == this.clientProvider) {
            this.clientProvider = new DatahubClientProvider(this.endpoint, this.accessId, this.accessKey, this.properties, new HttpConfig().setConnTimeout(this.requestTimeout).setReadTimeout(this.requestTimeout));
        }
    }

    protected ConsumerConfig createConsumerConfig() {
        ConsumerConfig shardReadEndCallback = new ConsumerConfig(this.endpoint, this.accessId, this.accessKey).setUserAgent(DatahubUtils.VERVERICA_VERSION).setClientProvider(this.clientProvider).setAutoCommit(true).setFetchSize(this.maxFetchSize).setMaxBufferSize(this.maxBufferSize).setFetchLatestDelayMs(this.fetchLatestDelay).setShardReadEndCallback(new ShardReadEndCallback() { // from class: com.alibaba.ververica.connectors.datahub.source.DatahubRecordReader.1
            @Override // com.aliyun.datahub.clientlibrary.callback.ShardReadEndCallback
            public void onShardReadEnd(List<String> list) {
                DatahubRecordReader.LOG.warn("Shard [{}] read to end", DatahubRecordReader.this.inputSplit.getShardId());
                DatahubRecordReader.this.readFinished = true;
                try {
                    DatahubRecordReader.this.partitionNumsChangeListener(DatahubRecordReader.this.getPartitionsNums(), DatahubRecordReader.this.initPartitionCount);
                } catch (Throwable th) {
                    DatahubRecordReader.LOG.error("Get partition of {} error: ", DatahubRecordReader.this.getReaderName(), th);
                }
            }
        });
        shardReadEndCallback.getHttpConfig().setCompressType(null).setConnTimeout(this.requestTimeout).setReadTimeout(this.requestTimeout);
        return shardReadEndCallback;
    }

    @Override // com.alibaba.ververica.connectors.common.source.reader.RecordReader
    public void open(InputSplit inputSplit, RuntimeContext runtimeContext) throws IOException {
        this.inputSplit = (DatahubShardInputSplit) inputSplit;
        initClientProvider();
        if (this.consumerConfig != null) {
            throw new IOException("consumerConfig is not null, not expected!");
        }
        this.consumerConfig = createConsumerConfig();
        if (this.consumer != null) {
            throw new IOException("consumer is not null, not expected!");
        }
        HashMap hashMap = new HashMap();
        Offset offset = new Offset();
        offset.setTimestamp(Math.max(0L, this.startTimeInMs));
        hashMap.put(this.inputSplit.getShardId(), offset);
        this.consumer = createConsumer(hashMap);
        if (runtimeContext != null) {
            this.latencyGauge = (DoubleGauge) runtimeContext.getMetricGroup().addGroup("input").addGroup(getName()).gauge("readLatency", new DoubleGauge());
            this.exceptionCounter = runtimeContext.getMetricGroup().addGroup("input").addGroup(getName()).counter("exceptionCounter", new SimpleCounter());
            this.readCounter = runtimeContext.getMetricGroup().addGroup("input").addGroup(getName()).counter("readCounter", new SimpleCounter());
            this.readNullCounter = runtimeContext.getMetricGroup().addGroup("input").addGroup(getName()).counter("nullCounter", new SimpleCounter());
            this.readInvalidCounter = runtimeContext.getMetricGroup().addGroup("input").addGroup(getName()).counter("invalidCounter", new SimpleCounter());
        }
        initPartitionNumsListener();
        LOG.info("Open [{}] using startTimeInMs: {}", inputSplit.toString(), Long.valueOf(this.startTimeInMs));
    }

    /* JADX WARN: Removed duplicated region for block: B:74:0x01b9 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:77:0x0004 A[SYNTHETIC] */
    @Override // com.alibaba.ververica.connectors.common.source.reader.RecordReader
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public boolean next() throws java.io.IOException, java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 608
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.alibaba.ververica.connectors.datahub.source.DatahubRecordReader.next():boolean");
    }

    @Override // com.alibaba.ververica.connectors.common.source.reader.RecordReader
    public void seek(Long l) throws IOException {
        long currentTimeMillis;
        this.sequence = l;
        long longValue = this.sequence.longValue() + serialVersionUID;
        if (l.longValue() == Long.MIN_VALUE) {
            this.readFinished = true;
            return;
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        do {
            try {
                if (this.consumer != null) {
                    this.consumer.close();
                }
                HashMap hashMap = new HashMap();
                Offset offset = new Offset();
                offset.setSequence(longValue);
                if (this.autoResetToOldest) {
                    offset.setTimestamp(0L);
                }
                hashMap.put(this.inputSplit.getShardId(), offset);
                this.consumer = createConsumer(hashMap);
                LOG.info("Seek shardId [{}] by sequence [{}], auto reset to oldest if expired: {}", this.inputSplit.getShardId(), Long.valueOf(longValue), Boolean.valueOf(this.autoResetToOldest));
                return;
            } catch (DatahubClientException | IOException e) {
                LOG.warn("Failed to seek cursor", e);
                currentTimeMillis = System.currentTimeMillis() - currentTimeMillis2;
            }
        } while (currentTimeMillis <= this.retryTimeout);
        throw new IOException(ConnectorErrors.INST.retryTimeExceedTimeoutError(currentTimeMillis, this.retryTimeout));
    }

    @Override // com.alibaba.ververica.connectors.common.source.reader.RecordReader
    public void close() throws IOException {
        try {
            destroyPartitionNumsListener();
        } catch (Throwable th) {
            LOG.warn("destroyPartitionNumsListener error in close", th);
        }
        try {
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
            this.consumerConfig = null;
        } catch (Throwable th2) {
            LOG.warn("close consumer error in close", th2);
        }
    }

    protected boolean isValid(RecordEntry recordEntry) {
        return true;
    }

    @Override // com.alibaba.ververica.connectors.common.source.reader.AbstractPartitionNumsListener
    public int getPartitionsNums() {
        return getClient().getTopic(this.projectName, this.topicName).getShardCount();
    }

    @Override // com.alibaba.ververica.connectors.common.source.reader.AbstractPartitionNumsListener
    public String getReaderName() {
        return getName() + "-" + this.projectName + ":" + this.topicName;
    }

    @Override // com.alibaba.ververica.connectors.common.source.reader.Interruptible
    public void interrupt() {
        this.interrupted = true;
    }

    @Override // com.alibaba.ververica.connectors.common.source.WatermarkProvider
    public long getWatermark() {
        return this.currentWatermark;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.ververica.connectors.common.source.reader.RecordReader
    public RecordEntry getMessage() {
        return this.currentElement;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // com.alibaba.ververica.connectors.common.source.reader.RecordReader
    public Long getProgress() throws IOException {
        return this.sequence;
    }

    @Override // com.alibaba.ververica.connectors.common.source.reader.RecordReader
    public long getDelay() {
        return this.currentMessageTimestamp;
    }

    @Override // com.alibaba.ververica.connectors.common.source.reader.RecordReader
    public long getFetchedDelay() {
        return this.dataFetchedDelay;
    }

    @Override // com.alibaba.ververica.connectors.common.source.reader.RecordReader
    public boolean isHeartBeat() {
        return false;
    }

    private String getName() {
        return this.name;
    }

    private Consumer createConsumer(Map<String, Offset> map) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        DatahubClientException datahubClientException = null;
        for (int i = 0; i <= 3; i++) {
            try {
                this.consumer = new Consumer(this.projectName, this.topicName, this.subId, map, this.consumerConfig);
                return this.consumer;
            } catch (DatahubClientException e) {
                if (ExceptionRetryer.isFatalException(e)) {
                    throw e;
                }
                datahubClientException = e;
                LOG.warn("fail to create consumer, retry {} times", Integer.valueOf(i), e);
                sleepAndRetry(currentTimeMillis);
            }
        }
        throw new DatahubClientException(String.format("create consumer failed after retrying %s times", 3), datahubClientException);
    }

    protected void sleepAndRetry(long j) throws IOException {
        try {
            Thread.sleep(this.retryInterval);
        } catch (InterruptedException e) {
        }
        long currentTimeMillis = System.currentTimeMillis() - j;
        if (currentTimeMillis > this.retryTimeout) {
            throw new IOException(ConnectorErrors.INST.retryTimeExceedTimeoutError(currentTimeMillis, this.retryTimeout));
        }
    }
}
