package com.alibaba.ververica.connectors.hologres.jdbc;

import com.alibaba.hologres.client.Get;
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.Put;
import com.alibaba.hologres.client.Scan;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader;
import com.alibaba.ververica.connectors.hologres.api.HologresRecordConverter;
import com.alibaba.ververica.connectors.hologres.api.HologresTableSchema;
import com.alibaba.ververica.connectors.hologres.api.table.HologresRowDataConverter;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/jdbc/HologresJDBCReader.class */
public class HologresJDBCReader<T> extends AbstractHologresReader<T> {
    private static final transient Logger LOG = LoggerFactory.getLogger((Class<?>) HologresJDBCReader.class);
    private final HologresRecordConverter<T, Record> recordConverter;
    protected transient HologresJDBCClientProvider holoClientProvider;
    protected final boolean insertIfNotExists;

    public HologresJDBCReader(String[] strArr, HologresConnectionParam hologresConnectionParam, TableSchema tableSchema, HologresRecordConverter<T, Record> hologresRecordConverter) {
        super(hologresConnectionParam, tableSchema, strArr);
        this.recordConverter = hologresRecordConverter;
        this.insertIfNotExists = hologresConnectionParam.isInsertIfNotExists();
    }

    public static HologresJDBCReader<RowData> createTableReader(HologresConnectionParam hologresConnectionParam, TableSchema tableSchema, String[] strArr, HologresTableSchema hologresTableSchema) {
        return new HologresJDBCReader<>(strArr, hologresConnectionParam, tableSchema, new HologresRowDataConverter(strArr, tableSchema, hologresConnectionParam, new HologresJDBCRecordWriter(hologresConnectionParam), new HologresJDBCRecordReader(tableSchema.getFieldNames(), hologresTableSchema), hologresTableSchema));
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresIOClient
    public void open(RuntimeContext runtimeContext) {
        LOG.info("Initiating connection to database [{}] / table[{}]", this.param.getJdbcOptions().getDatabase(), this.param.getTable());
        if (this.insertIfNotExists) {
            LOG.info("Hologres dim table will insert new record if primary key does not exist.");
        }
        this.holoClientProvider = new HologresJDBCClientProvider(this.param);
        LOG.info("Successfully initiated connection to database [{}] / table[{}]", this.param.getJdbcOptions().getDatabase(), this.param.getTable());
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresIOClient
    public void close() {
        LOG.info("Closing HologresLookUpFunction");
        if (this.holoClientProvider != null) {
            this.holoClientProvider.closeClient();
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader
    public CompletableFuture<T> asyncGet(T t) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        Record convertToPrimaryKey = this.recordConverter.convertToPrimaryKey(t);
        try {
            this.holoClientProvider.getClient().get(new Get(convertToPrimaryKey.m651clone())).handleAsync((record, th) -> {
                try {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else if (record != null) {
                        completableFuture.complete(this.recordConverter.convertTo(record));
                    } else if (this.insertIfNotExists) {
                        try {
                            completableFuture.complete(insertNewPrimaryKey(convertToPrimaryKey));
                        } catch (IOException e) {
                            completableFuture.completeExceptionally(e);
                        }
                    } else {
                        completableFuture.complete(null);
                    }
                    return null;
                } catch (Throwable th) {
                    completableFuture.completeExceptionally(th);
                    return null;
                }
            });
        } catch (HoloClientException e) {
            completableFuture.completeExceptionally(e);
        }
        return completableFuture;
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader
    public T get(T t) throws IOException {
        try {
            return asyncGet(t).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException(e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader
    public CompletableFuture<List<T>> asyncGetMany(T t) throws IOException {
        CompletableFuture<List<T>> completableFuture = new CompletableFuture<>();
        Record convertToPrimaryKey = this.recordConverter.convertToPrimaryKey(t);
        Scan.Builder withSelectedColumns = new Scan.Builder(convertToPrimaryKey.getSchema()).withSelectedColumns(this.fieldNames);
        for (String str : this.primaryKeys) {
            withSelectedColumns.addEqualFilter(str, convertToPrimaryKey.getObject(str));
        }
        try {
            this.holoClientProvider.getClient().asyncScan(withSelectedColumns.build()).handleAsync((recordScanner, th) -> {
                ArrayList arrayList = new ArrayList();
                while (recordScanner.next()) {
                    try {
                        arrayList.add(this.recordConverter.convertTo(recordScanner.getRecord()));
                    } catch (HoloClientException e) {
                        completableFuture.completeExceptionally(e);
                    }
                }
                completableFuture.complete(arrayList);
                return null;
            });
            return completableFuture;
        } catch (HoloClientException e) {
            throw new IOException(e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader
    public List<T> getMany(T t) throws IOException {
        try {
            return asyncGetMany(t).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException(e);
        }
    }

    protected T insertNewPrimaryKey(Record record) throws IOException {
        Put put = new Put(record);
        try {
            HoloClient client = this.holoClientProvider.getClient();
            client.put(put);
            client.flush();
            Record record2 = client.get(new Get(record)).get();
            if (record2 == null) {
                throw new IOException("Could not get value for " + record);
            }
            return this.recordConverter.convertTo(record2);
        } catch (HoloClientException | InterruptedException | ExecutionException e) {
            throw new IOException(e);
        }
    }
}
