package com.alibaba.ververica.connectors.hologres.source;

import com.alibaba.ververica.connectors.common.dim.AsyncLookupFunctionWrapper;
import com.alibaba.ververica.connectors.common.dim.LookupFunctionWrapper;
import com.alibaba.ververica.connectors.common.dim.cache.CacheConfig;
import com.alibaba.ververica.connectors.common.source.SourceUtils;
import com.alibaba.ververica.connectors.common.table.VervericaTableOptions;
import com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader;
import com.alibaba.ververica.connectors.hologres.api.HologresTableSchema;
import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs;
import com.alibaba.ververica.connectors.hologres.binlog.HolohubClientProvider;
import com.alibaba.ververica.connectors.hologres.binlog.RetryUtil;
import com.alibaba.ververica.connectors.hologres.binlog.RowDataRecordConverter;
import com.alibaba.ververica.connectors.hologres.binlog.source.HologresBinlogSource;
import com.alibaba.ververica.connectors.hologres.config.HologresConfigs;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import com.alibaba.ververica.connectors.hologres.config.JDBCOptions;
import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCConfigs;
import com.alibaba.ververica.connectors.hologres.jdbc.HologresJDBCReader;
import com.alibaba.ververica.connectors.hologres.rpc.HologresRpcReader;
import com.alibaba.ververica.connectors.hologres.source.lookup.HologresAsyncLookupFunction;
import com.alibaba.ververica.connectors.hologres.source.lookup.HologresLookupFunction;
import com.alibaba.ververica.connectors.hologres.source.scan.bulkread.HologresBulkreadInputFormat;
import com.alibaba.ververica.connectors.hologres.utils.HologresUtils;
import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import shaded.com.aliyun.datahub.client.DatahubClient;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/source/HologresTableSource.class */
public class HologresTableSource implements DynamicTableSource, LookupTableSource, ScanTableSource {
    private String tableName;
    private TableSchema tableSchema;
    private CacheConfig cacheConfig;
    private HologresConnectionParam connectionParam;
    private ReadableConfig config;
    private JDBCOptions jdbcOptions;
    private boolean cdcMode;

    public HologresTableSource(String str, TableSchema tableSchema, CacheConfig cacheConfig, HologresConnectionParam hologresConnectionParam, JDBCOptions jDBCOptions, ReadableConfig readableConfig) {
        this.tableName = str;
        this.tableSchema = tableSchema;
        this.cacheConfig = cacheConfig;
        this.connectionParam = hologresConnectionParam;
        this.jdbcOptions = jDBCOptions;
        this.config = readableConfig;
        this.cdcMode = ((Boolean) readableConfig.get(HologresBinlogConfigs.BINLOG_CDC_MODE)).booleanValue() && ((Boolean) readableConfig.get(HologresBinlogConfigs.OPTIONAL_BINLOG)).booleanValue();
    }

    public DynamicTableSource copy() {
        return new HologresTableSource(this.tableName, this.tableSchema, this.cacheConfig, this.connectionParam, this.jdbcOptions, this.config);
    }

    public String asSummaryString() {
        return "Hologres-" + this.tableName;
    }

    @VisibleForTesting
    public static boolean primaryKeyFullProvided(String[] strArr, HologresTableSchema hologresTableSchema) {
        return ((Set) Arrays.stream(strArr).collect(Collectors.toSet())).equals(Arrays.stream(hologresTableSchema.get().getPrimaryKeys()).collect(Collectors.toSet()));
    }

    public LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext lookupContext) {
        AbstractHologresReader createTableReader;
        String[] strArr = new String[lookupContext.getKeys().length];
        for (int i = 0; i < strArr.length; i++) {
            int[] iArr = lookupContext.getKeys()[i];
            Preconditions.checkArgument(iArr.length == 1, "Do not support nested lookup keys");
            strArr[i] = this.tableSchema.getFieldNames()[iArr[0]];
        }
        HologresTableSchema hologresTableSchema = HologresTableSchema.get(this.connectionParam);
        boolean primaryKeyFullProvided = primaryKeyFullProvided(strArr, hologresTableSchema);
        if (!HologresUtils.shouldUseRpc(this.config)) {
            createTableReader = HologresJDBCReader.createTableReader(this.connectionParam, this.tableSchema, strArr, hologresTableSchema);
        } else {
            if (((Boolean) this.config.get(HologresJDBCConfigs.INSERT_IF_NOT_EXISTS)).booleanValue()) {
                throw new UnsupportedOperationException("Hologres rpc mode dimension table does not support insertIfNotExists.");
            }
            if (!primaryKeyFullProvided) {
                throw new UnsupportedOperationException("Hologres rpc mode dimension table does not support one to many join.");
            }
            createTableReader = HologresRpcReader.createTableReader(this.connectionParam, this.tableSchema, strArr, hologresTableSchema);
        }
        return ((Boolean) this.config.get(HologresConfigs.HologresDimOptions.OPTIONAL_ASYNC)).booleanValue() ? AsyncTableFunctionProvider.of(new AsyncLookupFunctionWrapper(new HologresAsyncLookupFunction(this.tableName, this.tableSchema, strArr, this.cacheConfig.getCacheStrategy(), createTableReader, primaryKeyFullProvided))) : TableFunctionProvider.of(new LookupFunctionWrapper(new HologresLookupFunction(this.tableName, this.tableSchema, strArr, this.cacheConfig.getCacheStrategy(), createTableReader, primaryKeyFullProvided)));
    }

    public ChangelogMode getChangelogMode() {
        return this.cdcMode ? ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_BEFORE).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(RowKind.DELETE).build() : ChangelogMode.insertOnly();
    }

    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        if (!((Boolean) this.config.get(HologresBinlogConfigs.OPTIONAL_BINLOG)).booleanValue()) {
            return InputFormatProvider.of(new HologresBulkreadInputFormat(this.jdbcOptions, this.tableSchema, JDBCUtils.getSimpleSelectFromStatement(this.jdbcOptions.getTable(), this.tableSchema.getFieldNames())));
        }
        String str = (String) this.config.get(VervericaTableOptions.OPTIONAL_START_TIME);
        long j = 0;
        if (!StringUtils.isNullOrWhitespaceOnly(str)) {
            try {
                j = SourceUtils.parseDateString("yyyy-MM-dd HH:mm:ss", str).longValue();
            } catch (ParseException e) {
                throw new RuntimeException(String.format("Incorrect datetime format: %s, pls use ISO-8601 complete date plus hours, minutes and seconds format:%s", str, "yyyy-MM-dd HH:mm:ss"), e);
            }
        }
        this.jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(this.jdbcOptions));
        return SourceProvider.of(new HologresBinlogSource(this.tableSchema, this.config, this.jdbcOptions, new RowDataRecordConverter(this.jdbcOptions.getTable(), this.tableSchema, RetryUtil.getTopicWithRetry((DatahubClient) HolohubClientProvider.newHolohubClientProvider(this.jdbcOptions).getClient(), this.jdbcOptions.getDatabase(), this.jdbcOptions.getBinlogTableName()), this.cdcMode), j));
    }
}
