package com.alibaba.ververica.connectors.common.dim;

import com.alibaba.ververica.connectors.common.dim.cache.AllCache;
import com.alibaba.ververica.connectors.common.dim.cache.Cache;
import com.alibaba.ververica.connectors.common.dim.cache.CacheFactory;
import com.alibaba.ververica.connectors.common.dim.cache.CacheStrategy;
import com.alibaba.ververica.connectors.common.dim.reload.CacheAllReloadConf;
import com.alibaba.ververica.connectors.common.dim.reload.SerializableRunnable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.source.KeyGroupPruner;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/common/dim/DimJoinFetcher.class */
public abstract class DimJoinFetcher<V> extends AbstractRichFunction {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DimJoinFetcher.class);
    protected final String sqlTableName;
    protected final RowType dimRowType;
    protected final String[] lookupKeys;
    protected final CacheStrategy cacheStrategy;
    protected SerializableRunnable cacheReloadRunner;
    protected CacheAllReloadConf reloadConf;
    protected KeyGroupPruner<RowData> cachePartitioner;
    protected transient CacheFactory<Object, V> cacheFactory;
    protected transient Cache<Object, V> cache;
    protected transient ScheduledExecutorService reloadExecutor;
    protected transient AllCache<Object, V> allCacheHandler;
    private transient String cacheId;
    private List<Integer> sourceKeys;
    private List<Integer> targetKeys;
    private LogicalType[] keyTypes;
    private transient Projection<RowData, BinaryRowData> srcKeyProjection;
    private transient Projection<RowData, BinaryRowData> cacheKeyProjection;
    private RowDataSerializer cacheKeySer;
    protected RowDataSerializer cacheRowSer;

    /* JADX INFO: Access modifiers changed from: protected */
    public DimJoinFetcher(String str, RowType rowType, String[] strArr, CacheStrategy cacheStrategy) {
        this(str, rowType, strArr, cacheStrategy, null);
    }

    protected DimJoinFetcher(String str, RowType rowType, String[] strArr, CacheStrategy cacheStrategy, KeyGroupPruner<RowData> keyGroupPruner) {
        Preconditions.checkArgument(null != str, "sqlTableName cannot be null!");
        Preconditions.checkArgument(null != strArr, "lookupKeys cannot be null!");
        Preconditions.checkArgument(null != rowType, "dimRowType cannot be null!");
        Preconditions.checkArgument(null != cacheStrategy, "cacheStrategy cannot be null!");
        this.dimRowType = rowType;
        this.lookupKeys = strArr;
        this.sqlTableName = str;
        this.cacheStrategy = cacheStrategy;
        this.cachePartitioner = keyGroupPruner;
        this.sourceKeys = new ArrayList();
        this.targetKeys = new ArrayList();
        this.cacheRowSer = new RowDataSerializer(rowType);
        this.keyTypes = new LogicalType[strArr.length];
        String[] strArr2 = (String[]) rowType.getFieldNames().toArray(new String[0]);
        for (int i = 0; i < strArr.length; i++) {
            this.sourceKeys.add(Integer.valueOf(i));
            int columnIndex = getColumnIndex(strArr[i], strArr2);
            if (columnIndex < 0) {
                throw new TableException("Column: " + strArr[i] + " doesn't exists.");
            }
            this.targetKeys.add(Integer.valueOf(columnIndex));
            this.keyTypes[i] = rowType.getTypeAt(columnIndex);
        }
        this.cacheKeySer = new RowDataSerializer(this.keyTypes);
    }

    public void setAllCacheReloadRunner(SerializableRunnable serializableRunnable, CacheAllReloadConf cacheAllReloadConf) {
        if (this.cacheStrategy.isAllCache()) {
            Objects.requireNonNull(serializableRunnable);
            Objects.requireNonNull(cacheAllReloadConf);
            Objects.requireNonNull(cacheAllReloadConf.timeRangeBlackList);
            this.cacheReloadRunner = serializableRunnable;
            this.reloadConf = cacheAllReloadConf;
        }
    }

    public abstract void openConnection(Configuration configuration);

    public abstract void closeConnection();

    public abstract boolean hasPrimaryKey();

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        openConnection(configuration);
        if (this.cacheStrategy.isBinaryCacheEnabled()) {
            this.srcKeyProjection = genProjection(this.keyTypes, this.sourceKeys.stream().mapToInt(num -> {
                return num.intValue();
            }).toArray());
            this.cacheKeyProjection = genProjection((LogicalType[]) this.dimRowType.getFields().stream().map((v0) -> {
                return v0.getType();
            }).toArray(i -> {
                return new LogicalType[i];
            }), this.targetKeys.stream().mapToInt(num2 -> {
                return num2.intValue();
            }).toArray());
        }
        this.cacheId = this.sqlTableName + ": " + Arrays.toString(this.lookupKeys);
        if (this.cachePartitioner != null) {
            FunctionContext functionContext = new FunctionContext(getRuntimeContext());
            this.cachePartitioner.open(functionContext);
            this.cacheId += "-" + functionContext.getIndexOfThisSubtask();
        }
        String str = hasPrimaryKey() ? "one2oneCahce" : "one2manyCache";
        LOG.info("table " + this.sqlTableName + " preparing " + str);
        this.cacheFactory = CacheFactory.getInstance();
        this.cache = this.cacheFactory.getCache(this.cacheId, this.cacheStrategy, hasPrimaryKey(), this.cacheKeySer, this.cacheRowSer);
        LOG.info("table " + this.sqlTableName + ", strategy:" + this.cacheStrategy);
        if (this.cacheStrategy.isAllCache()) {
            this.allCacheHandler = (AllCache) this.cache;
            this.allCacheHandler.setCachePartitioner(this.cachePartitioner);
            int incrementAndGet = this.allCacheHandler.counter.incrementAndGet();
            LOG.info("the {}th started lookup join worker on {}.", Integer.valueOf(incrementAndGet), this.sqlTableName);
            if (this.allCacheHandler.isRegisteredTimer.compareAndSet(false, true)) {
                LOG.info("Subtask-{} hold the reloading schedule future for table: {}", Integer.valueOf(incrementAndGet), this.sqlTableName);
                this.allCacheHandler.setScheduledFuture(scheduleCacheLoaderRunner(str + "-reload-" + incrementAndGet));
            }
            while (!this.allCacheHandler.isLoadedOrThrowException()) {
                Thread.sleep(10L);
            }
        }
    }

    private ScheduledFuture<?> scheduleCacheLoaderRunner(String str) {
        this.reloadExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(str).setDaemon(true).build());
        return this.reloadExecutor.scheduleWithFixedDelay(new Thread(this.cacheReloadRunner), 0L, this.reloadConf.ttlMs, TimeUnit.MILLISECONDS);
    }

    public void close() throws Exception {
        if (this.cacheStrategy.isAllCache() && this.allCacheHandler.counter.decrementAndGet() == 0) {
            LOG.info("start to cancel reloading thread...");
            ScheduledFuture<?> scheduledFuture = this.allCacheHandler.getScheduledFuture();
            if (scheduledFuture != null) {
                LOG.info("start to cancel reloading thread for table: {}.", this.sqlTableName);
                scheduledFuture.cancel(true);
            }
            if (null != this.reloadExecutor && !this.reloadExecutor.isShutdown()) {
                this.reloadExecutor.shutdownNow();
                this.reloadExecutor = null;
            }
            this.allCacheHandler.isRegisteredTimer.compareAndSet(true, false);
            removeCache();
        }
        LOG.info("start to close connection...");
        closeConnection();
        if (!this.cacheStrategy.isAllCache()) {
            removeCache();
        }
        super.close();
    }

    private void removeCache() {
        LOG.info("start to release cache of table: {}...", this.sqlTableName);
        if (this.cacheFactory != null) {
            LOG.info("table " + this.sqlTableName + " cache removing...");
            this.cacheFactory.removeCache(this.cacheId);
            LOG.info("table " + this.sqlTableName + " cache removed");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Object getSourceKey(RowData rowData) {
        return getKey(rowData, this.sourceKeys, this.keyTypes, this.srcKeyProjection, false);
    }

    protected Object prepareCacheKey(RowData rowData) {
        return getKey(rowData, this.targetKeys, this.keyTypes, this.cacheKeyProjection, true);
    }

    public Object getKey(RowData rowData, List<Integer> list, LogicalType[] logicalTypeArr, Projection<RowData, BinaryRowData> projection, boolean z) {
        if (list.size() == 1) {
            return safeGet(rowData, list.get(0).intValue(), logicalTypeArr[0]);
        }
        if (projection != null) {
            return z ? projection.apply(rowData).copy() : projection.apply(rowData);
        }
        GenericRowData genericRowData = new GenericRowData(list.size());
        for (int i = 0; i < list.size(); i++) {
            Object safeGet = safeGet(rowData, list.get(i).intValue(), logicalTypeArr[i]);
            if (safeGet == null) {
                return null;
            }
            genericRowData.setField(i, safeGet);
        }
        return genericRowData;
    }

    public Object getKey(RowData rowData, List<Integer> list, LogicalType[] logicalTypeArr) {
        return getKey(rowData, list, logicalTypeArr, null, false);
    }

    @VisibleForTesting
    public AllCache<Object, V> getAllCacheHandler() {
        return this.allCacheHandler;
    }

    protected Projection<RowData, BinaryRowData> genProjection(LogicalType[] logicalTypeArr, int[] iArr) {
        return (Projection) ProjectionCodeGenerator.toBinaryRow(new TableConfig(), logicalTypeArr, iArr).newInstance(Thread.currentThread().getContextClassLoader());
    }

    protected Object safeGet(RowData rowData, int i, LogicalType logicalType) {
        if (rowData == null || rowData.isNullAt(i)) {
            return null;
        }
        return RowData.createFieldGetter(logicalType, i).getFieldOrNull(rowData);
    }

    protected int getColumnIndex(String str, String[] strArr) {
        for (int i = 0; i < strArr.length; i++) {
            if (str.equals(strArr[i])) {
                return i;
            }
        }
        return -1;
    }
}
