package com.aliyun.datahub.clientlibrary.producer;

import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.MalformedRecordException;
import com.aliyun.datahub.client.exception.ShardNotFoundException;
import com.aliyun.datahub.client.exception.ShardSealedException;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.util.FormatUtils;
import com.aliyun.datahub.clientlibrary.common.BackEndTask;
import com.aliyun.datahub.clientlibrary.common.ClientHelper;
import com.aliyun.datahub.clientlibrary.config.ProducerConfig;
import com.aliyun.datahub.clientlibrary.exception.ClientException;
import com.aliyun.datahub.clientlibrary.exception.ExceptionRetryer;
import com.aliyun.datahub.clientlibrary.interceptor.WriteInterceptor;
import com.aliyun.datahub.clientlibrary.models.Assignment;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/datahub/clientlibrary/producer/Producer.class */
public class Producer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Producer.class);
    private ClientHelper clientHelper;
    private ShardGroupWriter shardGroupWriter;
    private ShardAssigner shardAssigner;
    private boolean autoAssigned;
    private Heartbeat heartbeat;
    private final WriteInterceptor interceptor;
    private volatile long assignmentVersion = -1;
    private final Object updateLock = new Object();
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/aliyun/datahub/clientlibrary/producer/Producer$Heartbeat.class */
    public class Heartbeat extends BackEndTask {
        private String projectName;
        private String topicName;
        private ProducerConfig config;
        private final Set<String> visited = new HashSet();

        Heartbeat(String str, String str2, ProducerConfig producerConfig) {
            this.taskName = "producer-heartbeat";
            this.projectName = str;
            this.topicName = str2;
            this.config = producerConfig;
        }

        void heartbeat() {
            RecordEntry genHeartbeat = this.config.getHeartbeatGenerator().genHeartbeat();
            if (genHeartbeat != null) {
                this.visited.clear();
                Producer.this.syncAssignmentIfNeeded();
                int size = Producer.this.shardAssigner.getAssignment().getSize();
                List<RecordEntry> beforeWrite = Producer.this.interceptor.beforeWrite(Collections.singletonList(genHeartbeat));
                for (int i = 0; i < size; i++) {
                    String write = Producer.this.shardGroupWriter.write(beforeWrite, null);
                    if (this.visited.contains(write)) {
                        break;
                    }
                    this.visited.add(write);
                }
                Producer.LOG.info("Send heartbeat, Project: {}, Topic: {}, Shards: {}", this.projectName, this.topicName, this.visited.toString());
            }
        }

        @Override // com.aliyun.datahub.clientlibrary.common.BackEndTask
        protected void run() {
            Producer.LOG.info("Producer heartbeat start, Project: {}, Topic: {}, IntervalMs: {}", this.projectName, this.topicName, 300000);
            while (true) {
                if (!isRunning()) {
                    break;
                }
                waitSignal(300000L, 300000L);
                if (!isRunning()) {
                    break;
                }
                try {
                    heartbeat();
                    Producer.LOG.debug("Producer send heartbeat message, Project: {}, Topic: {}", this.projectName, this.topicName);
                } catch (DatahubClientException e) {
                    if (ExceptionRetryer.isFatalException(e)) {
                        Producer.LOG.error("Producer send heartbeat fatal error, task stopped, Project: {}, Topic: {}", this.projectName, this.topicName, e);
                        stop();
                        break;
                    }
                } catch (Throwable th) {
                    Producer.LOG.error("Producer send heartbeat failed, Project: {}, Topic: {}", this.projectName, this.topicName, th);
                }
            }
            Producer.LOG.info("Producer heartbeat stop, Project: {}, Topic: {}", this.projectName, this.topicName);
        }
    }

    public Producer(String str, String str2, ProducerConfig producerConfig) {
        preCheck(str, str2);
        this.clientHelper = producerConfig.getHelperBuilder().setProjectName(str).setTopicName(str2).build();
        this.shardGroupWriter = new ShardGroupWriter(str, str2, producerConfig, this.clientHelper);
        this.shardAssigner = new ShardAssigner(this.clientHelper.getShardManager());
        this.autoAssigned = true;
        this.interceptor = producerConfig.getInterceptorBuilder().buildWriteInterceptor();
        this.interceptor.setProperty("topic", str2);
        startHeartbeat(str, str2, producerConfig);
    }

    public Producer(String str, String str2, List<String> list, ProducerConfig producerConfig) {
        preCheck(str, str2, list);
        this.clientHelper = producerConfig.getHelperBuilder().setProjectName(str).setTopicName(str2).build();
        this.shardAssigner = new ShardAssigner(this.clientHelper.getShardManager());
        if (!this.shardAssigner.checkAllActive(list)) {
            throw new InvalidParameterException("Shard must be valid and active");
        }
        this.shardGroupWriter = new ShardGroupWriter(str, str2, producerConfig, this.clientHelper);
        this.shardGroupWriter.createShardWriter(list);
        this.autoAssigned = false;
        this.interceptor = producerConfig.getInterceptorBuilder().buildWriteInterceptor();
        this.interceptor.setProperty("topic", str2);
        startHeartbeat(str, str2, producerConfig);
    }

    public void send(List<RecordEntry> list, int i) {
        send(list, null, i);
    }

    public void send(List<RecordEntry> list, String str, int i) {
        if (this.closed.get()) {
            throw new ClientException("This producer has already been closed");
        }
        if (i < 0) {
            throw new InvalidParameterException("Retry must not be negative");
        }
        if (this.heartbeat != null && this.heartbeat.isStopped()) {
            this.heartbeat.start();
        }
        List<RecordEntry> beforeWrite = this.interceptor.beforeWrite(list);
        int i2 = 0;
        while (i2 <= i && !this.closed.get()) {
            try {
                syncAssignmentIfNeeded();
                this.shardGroupWriter.write(beforeWrite, str);
                return;
            } catch (InvalidParameterException | MalformedRecordException | ClientException e) {
                fail(e, true);
                i2++;
            } catch (ShardNotFoundException | ShardSealedException e2) {
                if (i2 < i) {
                    this.shardAssigner.triggerUpdateAndWait();
                } else {
                    this.shardAssigner.triggerUpdate();
                }
                fail(e2, i2 == i);
                i2++;
            } catch (DatahubClientException e3) {
                fail(e3, i2 == i);
                i2++;
            }
        }
        throw new ClientException("Send records failed, retry limit exceeded");
    }

    public void close() {
        if (this.heartbeat != null) {
            this.heartbeat.stop();
        }
        if (this.closed.compareAndSet(false, true)) {
            this.shardGroupWriter.close();
        }
        if (this.clientHelper != null) {
            this.clientHelper.close();
        }
    }

    private void preCheck(String str, String str2) {
        if (!FormatUtils.checkProjectName(str)) {
            throw new InvalidParameterException("ProjectName format is invalid");
        }
        if (!FormatUtils.checkTopicName(str2)) {
            throw new InvalidParameterException("TopicName format is invalid");
        }
    }

    private void preCheck(String str, String str2, List<String> list) {
        preCheck(str, str2);
        if (list == null || list.isEmpty()) {
            throw new InvalidParameterException("ShardIds must not be empty");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void syncAssignmentIfNeeded() {
        if (this.autoAssigned) {
            Assignment assignment = this.shardAssigner.getAssignment();
            if (this.assignmentVersion != assignment.getVersion()) {
                synchronized (this.updateLock) {
                    if (this.assignmentVersion != assignment.getVersion()) {
                        List<String> newShardList = assignment.getNewShardList(this.shardGroupWriter.getShards());
                        List<String> releaseShardList = assignment.getReleaseShardList(this.shardGroupWriter.getShards());
                        this.shardGroupWriter.createShardWriter(newShardList);
                        this.shardGroupWriter.removeShardWriter(releaseShardList);
                        this.assignmentVersion = assignment.getVersion();
                    }
                }
            }
        }
    }

    private void fail(DatahubClientException datahubClientException, boolean z) {
        if (z) {
            LOG.error("Send records failed", (Throwable) datahubClientException);
            throw datahubClientException;
        }
        try {
            Thread.sleep(50L);
        } catch (InterruptedException e) {
        }
        ExceptionRetryer.invalidUriProtect(datahubClientException);
        LOG.debug("Send records retrying, Exception: {}", datahubClientException.getMessage());
    }

    private void startHeartbeat(String str, String str2, ProducerConfig producerConfig) {
        if (!producerConfig.isSendHeartbeat() || producerConfig.getHeartbeatGenerator() == null) {
            return;
        }
        this.heartbeat = new Heartbeat(str, str2, producerConfig);
        this.heartbeat.start();
    }
}
