package com.alibaba.ververica.connectors.kafka.precheck;

import com.alibaba.ververica.connectors.common.precheck.SinkConnectivityChecker;
import com.alibaba.ververica.connectors.common.precheck.SourceConnectivityChecker;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.AdminClient;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.admin.TopicListing;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.connectors.kafka.table.KafkaOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/kafka/precheck/KafkaConnectivityChecker.class */
public class KafkaConnectivityChecker implements SourceConnectivityChecker, SinkConnectivityChecker {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaConnectivityChecker.class);
    private final AdminClient adminClient;
    private final Map<String, String> tableOptions;
    private final KafkaTopicsDescriptor topicsDescriptor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConnectivityChecker(Map<String, String> map) {
        validateOptions(map, "checking connectivity", KafkaOptions.PROPS_BOOTSTRAP_SERVERS.key());
        this.tableOptions = map;
        this.adminClient = getAdminClient(map, "Kafka Connectivity Checker");
        Configuration fromMap = Configuration.fromMap(map);
        this.topicsDescriptor = new KafkaTopicsDescriptor(KafkaOptions.getSourceTopics(fromMap), KafkaOptions.getSourceTopicPattern(fromMap));
    }

    @Override // com.alibaba.ververica.connectors.common.precheck.ConnectivityChecker
    public CompletableFuture<Void> checkConnectivity(long j) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.adminClient.listTopics(new ListTopicsOptions().timeoutMs(Integer.valueOf(j > 2147483647L ? Integer.MAX_VALUE : (int) j))).listings().whenComplete((collection, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            } else {
                checkTopicListing(collection, this.topicsDescriptor, completableFuture);
            }
        });
        return completableFuture;
    }

    private AdminClient getAdminClient(Map<String, String> map, String str) {
        Properties properties = new Properties();
        properties.putAll(KafkaOptions.getKafkaProperties(map));
        properties.setProperty("client.id", str);
        return AdminClient.create(properties);
    }

    private void checkTopicListing(Collection<TopicListing> collection, KafkaTopicsDescriptor kafkaTopicsDescriptor, CompletableFuture<Void> completableFuture) {
        LOG.trace("Topics retrieved from Kafka cluster: {}", collection);
        Stream<R> map = collection.stream().map((v0) -> {
            return v0.name();
        });
        kafkaTopicsDescriptor.getClass();
        List list = (List) map.filter(kafkaTopicsDescriptor::isMatchingTopic).collect(Collectors.toList());
        if (!kafkaTopicsDescriptor.isFixedTopics()) {
            LOG.debug("Successfully retrieved topics with topic pattern mode: {}", list);
        } else {
            if (list.size() < kafkaTopicsDescriptor.getFixedTopics().size()) {
                ArrayList arrayList = new ArrayList(kafkaTopicsDescriptor.getFixedTopics());
                arrayList.removeAll(list);
                completableFuture.completeExceptionally(new IllegalArgumentException("Unknown topic names: " + String.join(", ", arrayList)));
                return;
            }
            LOG.debug("Successfully retrieved topics with fixed topic mode: {}", list);
        }
        completableFuture.complete(null);
    }

    private void validateOptions(Map<String, String> map, String str, String... strArr) {
        ArrayList arrayList = new ArrayList();
        for (String str2 : strArr) {
            if (!map.containsKey(str2)) {
                arrayList.add(str2);
            }
        }
        if (!arrayList.isEmpty()) {
            throw new IllegalArgumentException(String.format("Missing these keys for %s: %s", str, String.join(", ", arrayList)));
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.adminClient.close();
    }
}
