package org.apache.flink.streaming.connectors.kafka.table;

import com.alibaba.hologres.client.ddl.StatementKeywords;
import java.text.ParseException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.configuration.description.TextElement;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.class */
public class KafkaOptions {
    public static final String VERVERICA_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
    public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
    public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
    public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
    public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
    public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
    public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
    public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
    public static final String SINK_SEMANTIC_VALUE_NONE = "none";
    public static final String PROPERTIES_PREFIX = "properties.";
    private static final String PARTITION = "partition";
    private static final String OFFSET = "offset";
    protected static final String AVRO_CONFLUENT = "avro-confluent";
    protected static final String DEBEZIUM_AVRO_CONFLUENT = "debezium-avro-confluent";
    private static final String TABLE_DYNAMIC_OPTION_PREFIX = "table.dynamic.option.";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaOptions.class);
    public static final ConfigOption<String> KEY_FORMAT = ConfigOptions.key("key.format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding key data. The identifier is used to discover a suitable format factory.");
    public static final ConfigOption<String> VALUE_FORMAT = ConfigOptions.key("value.format").stringType().noDefaultValue().withDescription("Defines the format identifier for encoding value data. The identifier is used to discover a suitable format factory.");
    public static final ConfigOption<List<String>> KEY_FIELDS = ConfigOptions.key("key.fields").stringType().asList().defaultValues(new String[0]).withDescription("Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined.");
    public static final ConfigOption<ValueFieldsStrategy> VALUE_FIELDS_INCLUDE = ConfigOptions.key("value.fields-include").enumType(ValueFieldsStrategy.class).defaultValue(ValueFieldsStrategy.ALL).withDescription(String.format("Defines a strategy how to deal with key columns in the data type of the value format. By default, '%s' physical columns of the table schema will be included in the value format which means that the key columns appear in the data type for both the key and value format.", ValueFieldsStrategy.ALL));
    public static final ConfigOption<String> KEY_FIELDS_PREFIX = ConfigOptions.key("key.fields-prefix").stringType().noDefaultValue().withDescription(Description.builder().text("Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty.").linebreak().text(String.format("If a custom prefix is defined, both the table schema and '%s' will work with prefixed names.", KEY_FIELDS.key())).linebreak().text("When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format.").linebreak().text(String.format("Please note that this option requires that '%s' must be '%s'.", VALUE_FIELDS_INCLUDE.key(), ValueFieldsStrategy.EXCEPT_KEY)).build());
    public static final ConfigOption<List<String>> TOPIC = ConfigOptions.key("topic").stringType().asList().noDefaultValue().withDescription("Topic names from which the table is read. Either 'topic' or 'topic-pattern' must be set for source. Option 'topic' is required for sink.");
    public static final ConfigOption<String> TOPIC_PATTERN = ConfigOptions.key("topic-pattern").stringType().noDefaultValue().withDescription("Optional topic pattern from which the table is read for source. Either 'topic' or 'topic-pattern' must be set.");
    public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions.key("properties.bootstrap.servers").stringType().noDefaultValue().withDescription("Required Kafka server connection string");
    public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions.key("properties.group.id").stringType().noDefaultValue().withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
    public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions.key("scan.startup.mode").stringType().defaultValue("group-offsets").withDescription(Description.builder().text("Optional startup mode for Kafka consumer, valid enumerations are").list(new InlineElement[]{TextElement.text("'earliest-offset'"), TextElement.text("'latest-offset'"), TextElement.text("'group-offsets'"), TextElement.text("'timestamp'"), TextElement.text("'specific-offsets'")}).build());
    public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions.key("scan.startup.specific-offsets").stringType().noDefaultValue().withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
    public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions.key("scan.startup.timestamp-millis").longType().noDefaultValue().withDescription("Optional timestamp used in case of \"timestamp\" startup mode");
    public static final ConfigOption<Duration> SCAN_TOPIC_PARTITION_DISCOVERY = ConfigOptions.key("scan.topic-partition-discovery.interval").durationType().noDefaultValue().withDescription("Optional interval for consumer to discover dynamically created Kafka partitions periodically.");
    public static final ConfigOption<String> VERVERICA_OPTIONAL_START_TIME = ConfigOptions.key("startTime").stringType().noDefaultValue().withDescription("Date-time string with ISO-8601 format specifying the time when the connector starts consuming from");
    public static final ConfigOption<Long> VERVERICA_START_TIME_MILLS = ConfigOptions.key("startTimeMs").longType().defaultValue(-1L).withDescription("Timestamp in millisecond specifying the time when the connector starts consuming from");
    public static final ConfigOption<String> VERVERICA_TIME_ZONE = ConfigOptions.key("timeZone").stringType().noDefaultValue();
    public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions.key("sink.partitioner").stringType().defaultValue("default").withDescription(Description.builder().text("Optional output partitioning from Flink's partitions into Kafka's partitions. Valid enumerations are").list(new InlineElement[]{TextElement.text("'default' (use kafka default partitioner to partition records)"), TextElement.text("'fixed' (each Flink partition ends up in at most one Kafka partition)"), TextElement.text("'round-robin' (a Flink partition is distributed to Kafka partitions round-robin when 'key.fields' is not specified)"), TextElement.text("custom class name (use custom FlinkKafkaPartitioner subclass)")}).build());
    public static final String SINK_SEMANTIC_VALUE_AT_LEAST_ONCE = "at-least-once";
    public static final String SINK_SEMANTIC_VALUE_EXACTLY_ONCE = "exactly-once";
    public static final ConfigOption<String> SINK_SEMANTIC = ConfigOptions.key("sink.semantic").stringType().defaultValue(SINK_SEMANTIC_VALUE_AT_LEAST_ONCE).withDescription(Description.builder().text("Optional semantic when committing. Valid enumerations are").list(new InlineElement[]{TextElement.text(SINK_SEMANTIC_VALUE_AT_LEAST_ONCE), TextElement.text(SINK_SEMANTIC_VALUE_EXACTLY_ONCE), TextElement.text("none")}).build());
    public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions.key("sink.buffer-flush.max-rows").intType().defaultValue(0).withDescription(Description.builder().text("The max size of buffered records before flushing. When the sink receives many updates on the same key, the buffer will retain the last records of the same key. This can help to reduce data shuffling and avoid possible tombstone messages to the Kafka topic.").linebreak().text("Can be set to '0' to disable it.").linebreak().text("Note both 'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' must be set to be greater than zero to enable sink buffer flushing.").build());
    public static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions.key("sink.buffer-flush.interval").durationType().defaultValue(Duration.ofSeconds(0)).withDescription(Description.builder().text("The flush interval millis. Over this time, asynchronous threads will flush data. When the sink receives many updates on the same key, the buffer will retain the last record of the same key.").linebreak().text("Can be set to '0' to disable it.").linebreak().text("Note both 'sink.buffer-flush.max-rows' and 'sink.buffer-flush.interval' must be set to be greater than zero to enable sink buffer flushing.").build());
    private static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT = ConfigOptions.key("schema-registry.subject").stringType().noDefaultValue();
    private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet(Arrays.asList("earliest-offset", "latest-offset", "group-offsets", "specific-offsets", "timestamp"));
    private static final Set<String> SINK_SEMANTIC_ENUMS = new HashSet(Arrays.asList(SINK_SEMANTIC_VALUE_AT_LEAST_ONCE, SINK_SEMANTIC_VALUE_EXACTLY_ONCE, "none"));
    private static final List<String> SCHEMA_REGISTRY_FORMATS = Arrays.asList("avro-confluent", "debezium-avro-confluent");

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaOptions$StartupOptions.class */
    public static class StartupOptions {
        public StartupMode startupMode;
        public Map<KafkaTopicPartition, Long> specificOffsets;
        public long startupTimestampMillis;
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaOptions$ValueFieldsStrategy.class */
    public enum ValueFieldsStrategy {
        ALL,
        EXCEPT_KEY
    }

    private KafkaOptions() {
    }

    public static void validateTableSourceOptions(ReadableConfig readableConfig) {
        validateSourceTopic(readableConfig);
        validateScanStartupMode(readableConfig);
    }

    public static void validateTableSinkOptions(ReadableConfig readableConfig) {
        validateSinkTopic(readableConfig);
        validateSinkPartitioner(readableConfig);
        validateSinkSemantic(readableConfig);
    }

    public static void validateSourceTopic(ReadableConfig readableConfig) {
        Optional optional = readableConfig.getOptional(TOPIC);
        Optional optional2 = readableConfig.getOptional(TOPIC_PATTERN);
        if (optional.isPresent() && optional2.isPresent()) {
            throw new ValidationException("Option 'topic' and 'topic-pattern' shouldn't be set together.");
        }
        if (!optional.isPresent() && !optional2.isPresent()) {
            throw new ValidationException("Either 'topic' or 'topic-pattern' must be set.");
        }
    }

    public static void validateSinkTopic(ReadableConfig readableConfig) {
        if (isSingleTopic(readableConfig)) {
            return;
        }
        if (!readableConfig.getOptional(TOPIC_PATTERN).isPresent()) {
            throw new ValidationException(String.format("Flink Kafka sink currently only supports single topic, but got %s: %s.", "'topic'", readableConfig.get(TOPIC)));
        }
        throw new ValidationException(String.format("Flink Kafka sink currently only supports single topic, but got %s: %s.", "'topic-pattern'", readableConfig.get(TOPIC_PATTERN)));
    }

    private static void validateScanStartupMode(ReadableConfig readableConfig) {
        readableConfig.getOptional(SCAN_STARTUP_MODE).map((v0) -> {
            return v0.toLowerCase();
        }).ifPresent(str -> {
            if (!SCAN_STARTUP_MODE_ENUMS.contains(str)) {
                throw new ValidationException(String.format("Invalid value for option '%s'. Supported values are %s, but was: %s", SCAN_STARTUP_MODE.key(), "[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]", str));
            }
            if (str.equals("timestamp") && !readableConfig.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent() && !readableConfig.getOptional(VERVERICA_START_TIME_MILLS).isPresent() && !readableConfig.getOptional(VERVERICA_OPTIONAL_START_TIME).isPresent()) {
                throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_TIMESTAMP_MILLIS.key(), "timestamp"));
            }
            if (str.equals("specific-offsets")) {
                if (!readableConfig.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
                    throw new ValidationException(String.format("'%s' is required in '%s' startup mode but missing.", SCAN_STARTUP_SPECIFIC_OFFSETS.key(), "specific-offsets"));
                }
                if (!isSingleTopic(readableConfig)) {
                    throw new ValidationException("Currently Kafka source only supports specific offset for single topic.");
                }
                parseSpecificOffsets((String) readableConfig.get(SCAN_STARTUP_SPECIFIC_OFFSETS), SCAN_STARTUP_SPECIFIC_OFFSETS.key());
            }
        });
    }

    private static void validateSinkPartitioner(ReadableConfig readableConfig) {
        readableConfig.getOptional(SINK_PARTITIONER).ifPresent(str -> {
            if (str.equals("round-robin") && readableConfig.getOptional(KEY_FIELDS).isPresent()) {
                throw new ValidationException("Currently 'round-robin' partitioner only works when option 'key.fields' is not specified.");
            }
            if (str.isEmpty()) {
                throw new ValidationException(String.format("Option '%s' should be a non-empty string.", SINK_PARTITIONER.key()));
            }
        });
    }

    private static void validateSinkSemantic(ReadableConfig readableConfig) {
        readableConfig.getOptional(SINK_SEMANTIC).ifPresent(str -> {
            if (!SINK_SEMANTIC_ENUMS.contains(str)) {
                throw new ValidationException(String.format("Unsupported value '%s' for '%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].", str, SINK_SEMANTIC.key()));
            }
        });
    }

    public static KafkaSinkSemantic getSinkSemantic(ReadableConfig readableConfig) {
        String str = (String) readableConfig.get(SINK_SEMANTIC);
        boolean z = -1;
        switch (str.hashCode()) {
            case -286864670:
                if (str.equals(SINK_SEMANTIC_VALUE_EXACTLY_ONCE)) {
                    z = false;
                    break;
                }
                break;
            case 3387192:
                if (str.equals("none")) {
                    z = 2;
                    break;
                }
                break;
            case 2125618495:
                if (str.equals(SINK_SEMANTIC_VALUE_AT_LEAST_ONCE)) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return KafkaSinkSemantic.EXACTLY_ONCE;
            case true:
                return KafkaSinkSemantic.AT_LEAST_ONCE;
            case true:
                return KafkaSinkSemantic.NONE;
            default:
                throw new TableException("Validator should have checked that");
        }
    }

    public static List<String> getSourceTopics(ReadableConfig readableConfig) {
        return (List) readableConfig.getOptional(TOPIC).orElse(null);
    }

    public static Pattern getSourceTopicPattern(ReadableConfig readableConfig) {
        return (Pattern) readableConfig.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
    }

    private static boolean isSingleTopic(ReadableConfig readableConfig) {
        return ((Boolean) readableConfig.getOptional(TOPIC).map(list -> {
            return Boolean.valueOf(list.size() == 1);
        }).orElse(false)).booleanValue();
    }

    public static StartupOptions getStartupOptions(ReadableConfig readableConfig) {
        HashMap hashMap = new HashMap();
        StartupMode startupMode = (StartupMode) readableConfig.getOptional(SCAN_STARTUP_MODE).map(str -> {
            boolean z = -1;
            switch (str.hashCode()) {
                case -1390285235:
                    if (str.equals("earliest-offset")) {
                        z = false;
                        break;
                    }
                    break;
                case -410146651:
                    if (str.equals("specific-offsets")) {
                        z = 3;
                        break;
                    }
                    break;
                case 55126294:
                    if (str.equals("timestamp")) {
                        z = 4;
                        break;
                    }
                    break;
                case 514263449:
                    if (str.equals("latest-offset")) {
                        z = true;
                        break;
                    }
                    break;
                case 1556617458:
                    if (str.equals("group-offsets")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return StartupMode.EARLIEST;
                case true:
                    return StartupMode.LATEST;
                case true:
                    return StartupMode.GROUP_OFFSETS;
                case true:
                    buildSpecificOffsets(readableConfig, (String) ((List) readableConfig.get(TOPIC)).get(0), hashMap);
                    return StartupMode.SPECIFIC_OFFSETS;
                case true:
                    return StartupMode.TIMESTAMP;
                default:
                    throw new TableException("Unsupported startup mode. Validator should have checked that.");
            }
        }).orElse(StartupMode.GROUP_OFFSETS);
        StartupOptions startupOptions = new StartupOptions();
        startupOptions.startupMode = startupMode;
        Long l = (Long) readableConfig.get(VERVERICA_START_TIME_MILLS);
        if (!StringUtils.isNullOrWhitespaceOnly((String) readableConfig.get(VERVERICA_OPTIONAL_START_TIME)) || (readableConfig.get(VERVERICA_START_TIME_MILLS) != null && l.longValue() >= 0)) {
            LOG.warn("Overriding startup mode to TIMESTAMP because {} or {} is specified in table option", VERVERICA_START_TIME_MILLS, VERVERICA_OPTIONAL_START_TIME);
            startupOptions.startupMode = StartupMode.TIMESTAMP;
        }
        startupOptions.specificOffsets = hashMap;
        if (startupOptions.startupMode == StartupMode.TIMESTAMP) {
            try {
                startupOptions.startupTimestampMillis = getStartupTimestampFromTableOption(readableConfig);
            } catch (Exception e) {
                throw new IllegalArgumentException(String.format("Startup mode %s is chosen but failed to get startup timestamp from table options", StartupMode.TIMESTAMP), e);
            }
        }
        return startupOptions;
    }

    private static long getStartupTimestampFromTableOption(ReadableConfig readableConfig) {
        Long l = (Long) readableConfig.get(VERVERICA_START_TIME_MILLS);
        String str = (String) readableConfig.get(VERVERICA_OPTIONAL_START_TIME);
        Long l2 = (Long) readableConfig.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
        String str2 = (String) readableConfig.get(VERVERICA_TIME_ZONE);
        if (l != null && l.longValue() >= 0) {
            if (!StringUtils.isNullOrWhitespaceOnly(str)) {
                LOG.warn("{}={} will override option {}={}", VERVERICA_START_TIME_MILLS.key(), l, VERVERICA_OPTIONAL_START_TIME.key(), str);
            }
            if (l2 != null && l2.longValue() >= 0) {
                LOG.warn("{}={} will override option {}={}", VERVERICA_START_TIME_MILLS.key(), l, SCAN_STARTUP_TIMESTAMP_MILLIS.key(), l2);
            }
            return l.longValue();
        }
        if (StringUtils.isNullOrWhitespaceOnly(str)) {
            if (l2 == null || l2.longValue() < 0) {
                throw new IllegalArgumentException("Cannot find any valid option specifying startup timestamp");
            }
            return l2.longValue();
        }
        if (StringUtils.isNullOrWhitespaceOnly(str2)) {
            throw new IllegalArgumentException(String.format("Time zone must be specified with table option '%s' when using option '%s'", VERVERICA_TIME_ZONE.key(), VERVERICA_OPTIONAL_START_TIME.key()));
        }
        try {
            long parseDateTimeString = parseDateTimeString("yyyy-MM-dd HH:mm:ss", str, str2);
            if (l2 != null && l2.longValue() >= 0) {
                LOG.warn("{}={} will override option {}={}", VERVERICA_OPTIONAL_START_TIME.key(), str, SCAN_STARTUP_TIMESTAMP_MILLIS.key(), l2);
            }
            return parseDateTimeString;
        } catch (ParseException e) {
            throw new IllegalArgumentException(String.format("Cannot parse date-time string '%s' provided in table option '%s'", str, VERVERICA_OPTIONAL_START_TIME.key()), e);
        }
    }

    private static long parseDateTimeString(String str, String str2, String str3) throws ParseException {
        return FastDateFormat.getInstance(str, TimeZone.getTimeZone(str3)).parse(str2).getTime();
    }

    private static void buildSpecificOffsets(ReadableConfig readableConfig, String str, Map<KafkaTopicPartition, Long> map) {
        parseSpecificOffsets((String) readableConfig.get(SCAN_STARTUP_SPECIFIC_OFFSETS), SCAN_STARTUP_SPECIFIC_OFFSETS.key()).forEach((num, l) -> {
            map.put(new KafkaTopicPartition(str, num.intValue()), l);
        });
    }

    public static Properties getKafkaProperties(Map<String, String> map) {
        Properties properties = new Properties();
        if (hasKafkaClientProperties(map)) {
            map.keySet().stream().filter(str -> {
                return str.startsWith(PROPERTIES_PREFIX);
            }).forEach(str2 -> {
                properties.put(str2.substring(PROPERTIES_PREFIX.length()), (String) map.get(str2));
            });
        }
        return properties;
    }

    public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(ReadableConfig readableConfig, ClassLoader classLoader) {
        return readableConfig.getOptional(SINK_PARTITIONER).flatMap(str -> {
            boolean z = -1;
            switch (str.hashCode()) {
                case -1662301013:
                    if (str.equals("round-robin")) {
                        z = 2;
                        break;
                    }
                    break;
                case 97445748:
                    if (str.equals("fixed")) {
                        z = false;
                        break;
                    }
                    break;
                case 1544803905:
                    if (str.equals("default")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return Optional.of(new FlinkFixedPartitioner());
                case true:
                case true:
                    return Optional.empty();
                default:
                    return Optional.of(initializePartitioner(str, classLoader));
            }
        });
    }

    public static Map<Integer, Long> parseSpecificOffsets(String str, String str2) {
        HashMap hashMap = new HashMap();
        String[] split = str.split(StatementKeywords.SEMICOLON);
        String format = String.format("Invalid properties '%s' should follow the format 'partition:0,offset:42;partition:1,offset:300', but is '%s'.", str2, str);
        if (split.length == 0) {
            throw new ValidationException(format);
        }
        for (String str3 : split) {
            if (null == str3 || str3.length() == 0 || !str3.contains(",")) {
                throw new ValidationException(format);
            }
            String[] split2 = str3.split(",");
            if (split2.length != 2 || !split2[0].startsWith("partition:") || !split2[1].startsWith("offset:")) {
                throw new ValidationException(format);
            }
            try {
                hashMap.put(Integer.valueOf(split2[0].substring(split2[0].indexOf(":") + 1)), Long.valueOf(split2[1].substring(split2[1].indexOf(":") + 1)));
            } catch (NumberFormatException e) {
                throw new ValidationException(format, e);
            }
        }
        return hashMap;
    }

    private static boolean hasKafkaClientProperties(Map<String, String> map) {
        return map.keySet().stream().anyMatch(str -> {
            return str.startsWith(PROPERTIES_PREFIX);
        });
    }

    private static <T> FlinkKafkaPartitioner<T> initializePartitioner(String str, ClassLoader classLoader) {
        try {
            if (FlinkKafkaPartitioner.class.isAssignableFrom(Class.forName(str, true, classLoader))) {
                return (FlinkKafkaPartitioner) InstantiationUtil.instantiate(str, FlinkKafkaPartitioner.class, classLoader);
            }
            throw new ValidationException(String.format("Sink partitioner class '%s' should extend from the required class %s", str, FlinkKafkaPartitioner.class.getName()));
        } catch (ClassNotFoundException | FlinkException e) {
            throw new ValidationException(String.format("Could not find and instantiate partitioner class '%s'", str), e);
        }
    }

    public static int[] createKeyFormatProjection(ReadableConfig readableConfig, DataType dataType) {
        LogicalType logicalType = dataType.getLogicalType();
        Preconditions.checkArgument(LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.ROW), "Row data type expected.");
        Optional optional = readableConfig.getOptional(KEY_FORMAT);
        Optional optional2 = readableConfig.getOptional(KEY_FIELDS);
        if (!optional.isPresent() && optional2.isPresent()) {
            throw new ValidationException(String.format("The option '%s' can only be declared if a key format is defined using '%s'.", KEY_FIELDS.key(), KEY_FORMAT.key()));
        }
        if (optional.isPresent() && (!optional2.isPresent() || ((List) optional2.get()).size() == 0)) {
            throw new ValidationException(String.format("A key format '%s' requires the declaration of one or more of key fields using '%s'.", KEY_FORMAT.key(), KEY_FIELDS.key()));
        }
        if (!optional.isPresent()) {
            return new int[0];
        }
        String str = (String) readableConfig.getOptional(KEY_FIELDS_PREFIX).orElse("");
        List list = (List) optional2.get();
        List fieldNames = LogicalTypeChecks.getFieldNames(logicalType);
        return list.stream().mapToInt(str2 -> {
            int indexOf = fieldNames.indexOf(str2);
            if (indexOf < 0) {
                throw new ValidationException(String.format("Could not find the field '%s' in the table schema for usage in the key format. A key field must be a regular, physical column. The following columns can be selected in the '%s' option:\n%s", str2, KEY_FIELDS.key(), fieldNames));
            }
            if (str2.startsWith(str)) {
                return indexOf;
            }
            throw new ValidationException(String.format("All fields in '%s' must be prefixed with '%s' when option '%s' is set but field '%s' is not prefixed.", KEY_FIELDS.key(), str, KEY_FIELDS_PREFIX.key(), str2));
        }).toArray();
    }

    public static int[] createValueFormatProjection(ReadableConfig readableConfig, DataType dataType) {
        LogicalType logicalType = dataType.getLogicalType();
        Preconditions.checkArgument(LogicalTypeChecks.hasRoot(logicalType, LogicalTypeRoot.ROW), "Row data type expected.");
        IntStream range = IntStream.range(0, LogicalTypeChecks.getFieldCount(logicalType));
        String str = (String) readableConfig.getOptional(KEY_FIELDS_PREFIX).orElse("");
        ValueFieldsStrategy valueFieldsStrategy = (ValueFieldsStrategy) readableConfig.get(VALUE_FIELDS_INCLUDE);
        if (valueFieldsStrategy == ValueFieldsStrategy.ALL) {
            if (str.length() > 0) {
                throw new ValidationException(String.format("A key prefix is not allowed when option '%s' is set to '%s'. Set it to '%s' instead to avoid field overlaps.", VALUE_FIELDS_INCLUDE.key(), ValueFieldsStrategy.ALL, ValueFieldsStrategy.EXCEPT_KEY));
            }
            return range.toArray();
        }
        if (valueFieldsStrategy != ValueFieldsStrategy.EXCEPT_KEY) {
            throw new TableException("Unknown value fields strategy:" + valueFieldsStrategy);
        }
        int[] createKeyFormatProjection = createKeyFormatProjection(readableConfig, dataType);
        return range.filter(i -> {
            return IntStream.of(createKeyFormatProjection).noneMatch(i -> {
                return i == i;
            });
        }).toArray();
    }

    public static DynamicTableFactory.Context autoCompleteSchemaRegistrySubject(DynamicTableFactory.Context context) {
        Map options = context.getCatalogTable().getOptions();
        Map<String, String> autoCompleteSchemaRegistrySubject = autoCompleteSchemaRegistrySubject((Map<String, String>) options);
        return autoCompleteSchemaRegistrySubject.size() > options.size() ? new FactoryUtil.DefaultDynamicTableContext(context.getObjectIdentifier(), context.getCatalogTable().copy(autoCompleteSchemaRegistrySubject), context.getConfiguration(), context.getClassLoader(), context.isTemporary()) : context;
    }

    private static Map<String, String> autoCompleteSchemaRegistrySubject(Map<String, String> map) {
        Configuration fromMap = Configuration.fromMap(map);
        validateSinkTopic(fromMap);
        Optional optional = fromMap.getOptional(VALUE_FORMAT);
        Optional optional2 = fromMap.getOptional(KEY_FORMAT);
        Optional optional3 = fromMap.getOptional(FactoryUtil.FORMAT);
        String str = (String) ((List) fromMap.get(TOPIC)).get(0);
        if (optional3.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(optional3.get())) {
            autoCompleteSubject(fromMap, (String) optional3.get(), str + "-value");
        } else if (optional.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(optional.get())) {
            autoCompleteSubject(fromMap, "value." + ((String) optional.get()), str + "-value");
        }
        if (optional2.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(optional2.get())) {
            autoCompleteSubject(fromMap, "key." + ((String) optional2.get()), str + "-key");
        }
        return fromMap.toMap();
    }

    private static void autoCompleteSubject(Configuration configuration, String str, String str2) {
        ConfigOption noDefaultValue = ConfigOptions.key(str + "." + SCHEMA_REGISTRY_SUBJECT.key()).stringType().noDefaultValue();
        if (configuration.getOptional(noDefaultValue).isPresent()) {
            return;
        }
        configuration.setString(noDefaultValue, str2);
    }

    public static Map<String, String> getTableOptionsFromTableConfig(DynamicTableFactory.Context context) {
        HashMap hashMap = new HashMap();
        Preconditions.checkState(context.getConfiguration() instanceof Configuration, "context.getConfiguration() should be an instance of Configuration");
        Map map = context.getConfiguration().toMap();
        map.keySet().forEach(str -> {
            if (str.startsWith("table.dynamic.option.")) {
                int length = "table.dynamic.option.".length();
                for (int i = 0; i < 3; i++) {
                    int seekNextCharacter = seekNextCharacter(str, '.', length);
                    if (seekNextCharacter < 0) {
                        return;
                    }
                    String substring = str.substring(length, seekNextCharacter);
                    String str = null;
                    switch (i) {
                        case 0:
                            str = context.getObjectIdentifier().getCatalogName();
                            break;
                        case 1:
                            str = context.getObjectIdentifier().getDatabaseName();
                            break;
                        case 2:
                            str = context.getObjectIdentifier().getObjectName();
                            break;
                    }
                    if (!substring.equals("*") && !substring.equals(str)) {
                        return;
                    }
                    length = seekNextCharacter + 1;
                }
                hashMap.put(str.substring(length), map.get(str));
            }
        });
        return hashMap;
    }

    private static int seekNextCharacter(String str, char c, int i) {
        for (int i2 = i; i2 < str.length(); i2++) {
            if (str.charAt(i2) == c) {
                return i2;
            }
        }
        return -1;
    }
}
