package shaded.com.aliyun.datahub.clientlibrary.consumer;

import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.com.aliyun.datahub.client.exception.DatahubClientException;
import shaded.com.aliyun.datahub.client.exception.ExpiredAccessTokenException;
import shaded.com.aliyun.datahub.client.exception.InvalidParameterException;
import shaded.com.aliyun.datahub.client.exception.SeekOutOfRangeException;
import shaded.com.aliyun.datahub.client.exception.ShardNotFoundException;
import shaded.com.aliyun.datahub.client.exception.ShardSealedException;
import shaded.com.aliyun.datahub.client.model.CursorType;
import shaded.com.aliyun.datahub.client.model.GetCursorResult;
import shaded.com.aliyun.datahub.client.model.GetRecordsResult;
import shaded.com.aliyun.datahub.client.model.RecordEntry;
import shaded.com.aliyun.datahub.client.model.ShardState;
import shaded.com.aliyun.datahub.clientlibrary.common.ClientHelper;
import shaded.com.aliyun.datahub.clientlibrary.config.ConsumerConfig;
import shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer;
import shaded.com.aliyun.datahub.clientlibrary.interceptor.ReadInterceptor;
import shaded.com.aliyun.datahub.clientlibrary.models.Offset;

/* loaded from: input_file:shaded/com/aliyun/datahub/clientlibrary/consumer/ShardReader.class */
public class ShardReader {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ShardReader.class);
    private ConsumerConfig config;
    private ClientHelper clientHelper;
    private String projectName;
    private String topicName;
    private String shardId;
    private volatile Offset offset;
    private ExecutorService executor;
    private volatile String cursor;
    private volatile DatahubClientException exception;
    private final ReadInterceptor interceptor;
    private final Object emptyCond;
    private final AtomicInteger feasibleBufferSize;
    private volatile boolean fetchEnd = false;
    private volatile boolean readEnd = false;
    private volatile long endSequence = -1;
    private final ConcurrentLinkedQueue<RecordEntry> fetchedQueue = new ConcurrentLinkedQueue<>();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicInteger queueSize = new AtomicInteger(0);
    private final AtomicBoolean needMore = new AtomicBoolean(false);
    private volatile boolean fetchLatest = false;
    private volatile boolean shardNotFound = false;
    private String subId = "";
    private volatile Future currentTask = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:shaded/com/aliyun/datahub/clientlibrary/consumer/ShardReader$FetchTask.class */
    public class FetchTask implements Runnable {
        private int fetchSize;

        FetchTask(int i) {
            this.fetchSize = i;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ShardReader.this.fetch(this.fetchSize);
                ShardReader.this.exception = null;
            } catch (ExpiredAccessTokenException e) {
                ShardReader.LOG.info("Refresh token, Project: {}, Topic: {}, ShardId: {}, SubId: {}", ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId);
            } catch (ShardNotFoundException e2) {
                ShardReader.this.clientHelper.getShardManager().triggerUpdateAndWait();
                ShardReader.this.shardNotFound = !ShardReader.this.clientHelper.getShardManager().getShardMeta().getShardIds().contains(ShardReader.this.shardId);
                ShardReader.LOG.info("Trigger refresh shard info, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Msg: {}", ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, e2.getErrorMessage());
            } catch (ShardSealedException e3) {
                ShardReader.this.clientHelper.getShardManager().triggerUpdateAndWait();
                ShardReader.LOG.info("Fetch end of shard, Project: {}, Topic: {}, ShardId: {}, SubId: {}, endSeq: {}", ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, Long.valueOf(ShardReader.this.endSequence));
                ShardReader.this.fetchEnd = true;
            } catch (DatahubClientException e4) {
                if (!ShardReader.this.closed.get()) {
                    ShardReader.LOG.warn("Fetch records fail, Project: {}, Topic: {}, ShardId: {}, SubId: {}", ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, e4);
                    if (!ExceptionRetryer.canSwallow(e4)) {
                        ShardReader.this.exception = e4;
                    }
                }
            } catch (Exception e5) {
                if (!ShardReader.this.closed.get()) {
                    ShardReader.LOG.error("Fetch records fail, Project: {}, Topic: {}, ShardId: {}, SubId: {}", ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, e5);
                    ShardReader.this.exception = new DatahubClientException(e5.getMessage());
                }
            }
            if (ShardReader.this.needMore.compareAndSet(true, false)) {
                ShardReader.this.submitFetchTask();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShardReader(String str, String str2, String str3, Offset offset, ConsumerConfig consumerConfig, ExecutorService executorService, ClientHelper clientHelper, Object obj, ReadInterceptor readInterceptor) {
        this.projectName = str;
        this.topicName = str2;
        this.shardId = str3;
        this.offset = offset;
        this.config = consumerConfig;
        this.executor = executorService;
        this.clientHelper = clientHelper;
        this.emptyCond = obj;
        this.feasibleBufferSize = new AtomicInteger(Math.max(1, consumerConfig.getMaxBufferSize()));
        this.interceptor = readInterceptor;
        if (offset.hasCursor()) {
            this.cursor = offset.getCursor();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getShardId() {
        return this.shardId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.closed.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initCursor() {
        if (this.cursor == null) {
            this.cursor = seekCursor(this.offset);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordEntry read() {
        if (this.closed.get() || isReadEnd()) {
            return null;
        }
        this.readEnd = this.fetchEnd && isShardClosed() && this.fetchedQueue.isEmpty();
        fetchIfNeeded();
        RecordEntry peek = this.fetchedQueue.peek();
        if (peek != null) {
            this.fetchedQueue.poll();
            this.queueSize.decrementAndGet();
            fetchIfNeeded();
            return peek;
        }
        if (this.exception == null) {
            return null;
        }
        DatahubClientException datahubClientException = this.exception;
        this.exception = null;
        throw datahubClientException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isReadEnd() {
        return this.readEnd && isShardClosed();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getEndSequence() {
        return this.endSequence;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long frontRecordTime() {
        if (this.fetchedQueue.isEmpty() || this.fetchedQueue.peek() == null) {
            return Long.MIN_VALUE;
        }
        return this.fetchedQueue.peek().getSystemTime();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        if (!this.closed.compareAndSet(false, true) || this.currentTask == null || this.currentTask.isDone()) {
            return;
        }
        this.currentTask.cancel(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setSubId(String str) {
        this.subId = str;
    }

    private ShardState getShardState() {
        return this.clientHelper.getShardManager().getShardMeta().getStateMap().get(this.shardId);
    }

    private boolean isShardClosed() {
        return ShardState.CLOSED.equals(getShardState());
    }

    private boolean isShardInactive() {
        return ShardState.INACTIVE.equals(getShardState());
    }

    private void fetchIfNeeded() {
        if (this.closed.get() || isShardInactive()) {
            return;
        }
        if (this.fetchEnd && isShardClosed()) {
            return;
        }
        this.feasibleBufferSize.set(Math.min(this.feasibleBufferSize.get(), this.config.getMaxBufferSize()));
        if (this.queueSize.get() > this.feasibleBufferSize.get()) {
            return;
        }
        this.needMore.set(true);
        if (isTaskRunning()) {
            return;
        }
        this.needMore.set(false);
        submitFetchTask();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitFetchTask() {
        if (this.shardNotFound) {
            this.shardNotFound = !this.clientHelper.getShardManager().getShardMeta().getShardIds().contains(this.shardId);
            if (this.shardNotFound) {
                return;
            }
        }
        try {
            this.currentTask = this.executor.submit(new FetchTask(this.config.getFetchSize()));
        } catch (RejectedExecutionException e) {
            if (this.closed.get()) {
                return;
            }
            LOG.warn("Submit fetch task failed, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Exception: {}", this.projectName, this.topicName, this.shardId, this.subId, e.getMessage());
        }
    }

    private String seekCursor(Offset offset) {
        if (offset.isInvalid()) {
            throw new InvalidParameterException("Sequence and system time are all invalid");
        }
        CursorType cursorType = offset.hasSequence() ? CursorType.SEQUENCE : CursorType.SYSTEM_TIME;
        try {
            return getCursor(cursorType, cursorType.equals(CursorType.SEQUENCE) ? offset.getSequence() : offset.getTimestamp(), 3);
        } catch (SeekOutOfRangeException e) {
            if (cursorType.equals(CursorType.SYSTEM_TIME) || !offset.hasTimestamp()) {
                throw e;
            }
            LOG.warn("Get cursor by sequence failed, use system time instead, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Exception: {}", this.projectName, this.topicName, this.shardId, this.subId, e.getMessage());
            return getCursor(CursorType.SYSTEM_TIME, offset.getTimestamp(), 3);
        }
    }

    private String getCursor(final CursorType cursorType, final long j, int i) {
        return new ExceptionRetryer<String>() { // from class: shaded.com.aliyun.datahub.clientlibrary.consumer.ShardReader.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer
            public String func() {
                ShardReader.LOG.info("Get cursor, Project: {}, Topic: {}, ShardId: {}, SubId: {}, CursorType: {}, Param: {}", ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, cursorType.name(), Long.valueOf(j));
                GetCursorResult cursor = ShardReader.this.clientHelper.getDataClient().getCursor(ShardReader.this.shardId, cursorType, j);
                ShardReader.LOG.info("Get cursor result, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Cursor: {}, Sequence: {}, Timestamp: {}", ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, cursor.getCursor(), Long.valueOf(cursor.getSequence()), Long.valueOf(cursor.getTimestamp()));
                return cursor.getCursor();
            }

            @Override // shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer
            protected void onExceedRetryLimit(DatahubClientException datahubClientException) {
                if (ShardReader.this.closed.get()) {
                    return;
                }
                ShardReader.LOG.error("Get cursor failed, Project: {}, Topic: {}, ShardId: {}, SubId: {}, CursorType: {}, Param: {}", ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, cursorType.name(), Long.valueOf(j), datahubClientException);
            }

            @Override // shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer
            protected boolean isTerminated() {
                return ShardReader.this.closed.get();
            }
        }.run(i, 1000L);
    }

    private boolean isTaskRunning() {
        if (this.currentTask != null && this.currentTask.isDone()) {
            try {
                this.currentTask.get();
            } catch (InterruptedException | CancellationException e) {
            } catch (ExecutionException e2) {
                LOG.error("Fetch task failed, Project: {}, Topic: {}, ShardId: {}, SubId: {}", this.projectName, this.topicName, this.shardId, this.subId, e2);
                this.exception = new DatahubClientException(e2.getMessage());
            }
            this.currentTask = null;
        }
        return this.currentTask != null;
    }

    private void wakeEmpty() {
        synchronized (this.emptyCond) {
            this.emptyCond.notifyAll();
        }
    }

    private GetRecordsResult fetchRecords(final int i) {
        return new ExceptionRetryer<GetRecordsResult>() { // from class: shaded.com.aliyun.datahub.clientlibrary.consumer.ShardReader.2
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer
            public GetRecordsResult func() {
                return ShardReader.this.clientHelper.getDataClient().getRecords(ShardReader.this.shardId, ShardReader.this.cursor, i);
            }

            @Override // shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer
            protected void onExceedRetryLimit(DatahubClientException datahubClientException) {
                if (ShardReader.this.closed.get()) {
                    return;
                }
                ShardReader.LOG.error("Fetch task failed, Project: {}, Topic: {}, ShardId: {}, SubId: {}", ShardReader.this.projectName, ShardReader.this.topicName, ShardReader.this.shardId, ShardReader.this.subId, datahubClientException);
            }

            @Override // shaded.com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer
            protected boolean isTerminated() {
                return ShardReader.this.closed.get();
            }
        }.run(30, 1000L);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fetch(int i) {
        try {
            initCursor();
            if (this.fetchLatest) {
                this.fetchLatest = false;
                if (this.config.getFetchLatestDelayMs() > 0) {
                    Thread.sleep(this.config.getFetchLatestDelayMs());
                }
            }
            GetRecordsResult fetchRecords = fetchRecords(i);
            if (fetchRecords.getRecordCount() > 0) {
                this.fetchedQueue.addAll(this.interceptor.afterRead(fetchRecords.getRecords()));
                RecordEntry recordEntry = fetchRecords.getRecords().get(fetchRecords.getRecords().size() - 1);
                this.queueSize.addAndGet(fetchRecords.getRecordCount());
                this.endSequence = recordEntry.getSequence();
                this.feasibleBufferSize.set(((fetchRecords.getRecordCount() + this.feasibleBufferSize.get()) + this.config.getFetchSize()) / 3);
                this.fetchEnd = false;
                this.readEnd = false;
                wakeEmpty();
                LOG.debug("Fetch records, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Num: {}, QueueSize: {}", this.projectName, this.topicName, this.shardId, this.subId, Integer.valueOf(fetchRecords.getRecordCount()), Integer.valueOf(this.queueSize.get()));
            } else if (this.cursor.equals(fetchRecords.getNextCursor())) {
                this.fetchLatest = true;
            }
            this.cursor = fetchRecords.getNextCursor();
        } catch (InterruptedException e) {
        } catch (InvalidParameterException e2) {
            if (!"InvalidCursor".equalsIgnoreCase(e2.getErrorCode())) {
                throw e2;
            }
            LOG.warn("Cursor is expired, seek to OLDEST, Project: {}, Topic: {}, ShardId: {}, SubId: {}, Cursor: {}, Exception: {}", this.projectName, this.topicName, this.shardId, this.subId, this.cursor, e2.getMessage());
            try {
                this.cursor = getCursor(CursorType.OLDEST, 0L, 1);
            } catch (DatahubClientException e3) {
            }
        }
    }
}
