package com.alibaba.ververica.connectors.hologres.rpc;

import com.alibaba.blink.store.client.BlinkStore;
import com.alibaba.blink.store.client.Cell;
import com.alibaba.blink.store.client.StoreConstants;
import com.alibaba.blink.store.client.Table;
import com.alibaba.blink.store.client.WriteOptions;
import com.alibaba.blink.store.client.WriteRequestPackager;
import com.alibaba.blink.store.core.configuration.Configuration;
import com.alibaba.blink.store.core.rpc.RpcException;
import com.alibaba.ververica.connectors.common.MetricUtils;
import com.alibaba.ververica.connectors.common.metrics.SimpleGauge;
import com.alibaba.ververica.connectors.hologres.api.AbstractHologresWriter;
import com.alibaba.ververica.connectors.hologres.api.HologresRecordConverter;
import com.alibaba.ververica.connectors.hologres.api.HologresTableSchema;
import com.alibaba.ververica.connectors.hologres.api.table.HologresRowDataConverter;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import com.alibaba.ververica.connectors.hologres.utils.HologresUtils;
import java.io.IOException;
import java.util.List;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/rpc/HologresRpcWriter.class */
public class HologresRpcWriter<T> extends AbstractHologresWriter<T> {
    private static final transient Logger LOG = LoggerFactory.getLogger((Class<?>) HologresRpcWriter.class);
    private BlinkStore blinkStore;
    private WriteRequestPackager writeRequestPackager;
    private SimpleGauge currentSendTime;
    private HologresRecordConverter<T, List<Cell>> recordConverter;

    public HologresRpcWriter(HologresConnectionParam hologresConnectionParam, TableSchema tableSchema, HologresRecordConverter<T, List<Cell>> hologresRecordConverter) {
        super(hologresConnectionParam, tableSchema);
        this.recordConverter = hologresRecordConverter;
    }

    public static HologresRpcWriter<RowData> createTableWriter(HologresConnectionParam hologresConnectionParam, TableSchema tableSchema, HologresTableSchema hologresTableSchema) {
        return new HologresRpcWriter<>(hologresConnectionParam, tableSchema, new HologresRowDataConverter(tableSchema, hologresConnectionParam, new HologresRpcRecordWriter(), new HologresRpcRecordReader(), hologresTableSchema));
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresIOClient
    public void open(RuntimeContext runtimeContext) throws IOException {
        try {
            Configuration conf = HologresUtils.getConf(runtimeContext, this.param);
            conf.setBoolean(StoreConstants.STORE_WRITE_REQUEST_IN_ORDER, true);
            this.blinkStore = BlinkStore.get(conf, this.param.getDatabase());
            Table table = this.blinkStore.getTable(this.param.getTable());
            WriteOptions build = new WriteOptions.Builder().withDisableWal(false).withSync(true).withUpsertType(this.param.getUpsertType()).withDisableBinlog(this.param.isDisableBinlog()).withPartitionTableRouter(this.param.isEnablePartitionRouter()).withIgnoreNull(this.param.isIgnoreNullWhenUpdate()).withWriteRequestPackageOptions(new WriteOptions.WriteRequestPackageOptions(WriteOptions.WriteRequestPackageOptions.SplitStrategy.LENGTH, this.param.getMaxConcurrencyPerShard(), this.param.getSplitDataSize(), this.param.getSplitLength())).build();
            this.writeRequestPackager = table.createWriteRequestPackager(Lists.newArrayList(this.fieldNames), build);
            this.currentSendTime = MetricUtils.registerCurrentSendTime(runtimeContext);
            LOG.info("WriteRequestPackageOptions is : " + build.getWriteRequestPackageOptions());
            LOG.info("Array delimiter is : {}", this.param.getJdbcOptions().getDelimiter());
            LOG.info("Using mutate type: " + this.param.getUpsertType());
        } catch (Exception e) {
            LOG.error("Exception while creating connection to Hologres.", (Throwable) e);
            throw new IOException("Cannot create connection to Hologres, endpoint: " + this.param.getEndpoint() + ", database: " + this.param.getDatabase() + ", tableName: " + this.param.getTable(), e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresWriter
    public long writeAddRecord(T t) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.writeRequestPackager.asyncUpsert(this.recordConverter.convertFrom(t)).handle((num, th) -> {
                if (this.currentSendTime == null) {
                    return null;
                }
                this.currentSendTime.report(System.currentTimeMillis() - currentTimeMillis);
                return null;
            });
            return 0L;
        } catch (RpcException e) {
            throw new IOException(e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresWriter
    public long writeDeleteRecord(T t) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            this.writeRequestPackager.asyncDelete(this.recordConverter.convertFrom(t)).handle((num, th) -> {
                if (this.currentSendTime == null) {
                    return null;
                }
                this.currentSendTime.report(System.currentTimeMillis() - currentTimeMillis);
                return null;
            });
            return 0L;
        } catch (RpcException e) {
            throw new IOException(e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresWriter
    public void flush() throws IOException {
        try {
            this.writeRequestPackager.waitRequestToFinish();
        } catch (RpcException e) {
            throw new IOException(e);
        }
    }

    @Override // com.alibaba.ververica.connectors.hologres.api.AbstractHologresIOClient
    public void close() throws IOException {
        flush();
        if (null != this.blinkStore) {
            this.blinkStore.close();
            this.blinkStore = null;
        }
    }
}
