package com.alibaba.pairec.linucb;

import com.alibaba.pairec.io.Compress;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;

/* loaded from: input_file:com/alibaba/pairec/linucb/DisjointLearner.class */
public class DisjointLearner extends AbstractLearner {
    private transient ValueState<DisjointArmEJML> armState;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Override // com.alibaba.pairec.linucb.AbstractLearner
    public void open(Configuration configuration) {
        super.open(configuration);
        RuntimeContext runtimeContext = getRuntimeContext();
        StateTtlConfig build = StateTtlConfig.newBuilder(Time.hours(1L)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupIncrementally(1000, true).cleanupInRocksdbCompactFilter(1000L).build();
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("armBuilder", DisjointArmEJML.class);
        valueStateDescriptor.enableTimeToLive(build);
        this.armState = runtimeContext.getState(valueStateDescriptor);
    }

    public void onTimer(long j, KeyedProcessFunction<Long, Event, RowData>.OnTimerContext onTimerContext, Collector<RowData> collector) throws Exception {
        long longValue = ((Long) onTimerContext.getCurrentKey()).longValue();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        DisjointArmEJML disjointArmEJML = (DisjointArmEJML) this.armState.value();
        Iterator it = this.events.iterator();
        while (it.hasNext()) {
            Event event = (Event) ((Map.Entry) it.next()).getValue();
            if (!$assertionsDisabled && longValue != event.getArmId()) {
                throw new AssertionError();
            }
            long longValue2 = this.durationMsec.getOrDefault(event.getEventType(), Long.valueOf(this.defaultDuration)).longValue();
            if (j - event.getEventTime() >= Long.min(this.durationMsec.getOrDefault("check_feature", Long.valueOf(longValue2)).longValue(), longValue2)) {
                double[] feature = event.getFeature();
                if (null == feature || feature.length == 0 || feature.length > 10000) {
                    it.remove();
                    this.filterCounter.inc();
                    long count = this.filterCounter.getCount();
                    if (count % 1000 == 0) {
                        logger.info(String.format("[%d] filter empty feature events:%d%n", Integer.valueOf(indexOfThisSubtask), Long.valueOf(count)));
                    }
                } else if (j - event.getEventTime() >= longValue2) {
                    if (null == disjointArmEJML) {
                        disjointArmEJML = new DisjointArmEJML(feature.length);
                    } else if (disjointArmEJML.getFeatureLen() != feature.length) {
                        logger.info(String.format("[%d] Feature length change from %d to %d%n", Integer.valueOf(indexOfThisSubtask), Integer.valueOf(disjointArmEJML.getFeatureLen()), Integer.valueOf(feature.length)));
                        disjointArmEJML = new DisjointArmEJML(feature.length);
                    }
                    disjointArmEJML.learn(feature, event.getReward());
                    it.remove();
                    this.learnCounter.inc();
                    long count2 = this.learnCounter.getCount();
                    if (count2 % 100 == 0) {
                        logger.info(String.format("[%d] learned events:%d%n", Integer.valueOf(indexOfThisSubtask), Long.valueOf(count2)));
                    }
                }
            }
        }
        if (null != disjointArmEJML) {
            this.armState.update(disjointArmEJML);
            int[] convertToIntArray = Compress.convertToIntArray(Compress.compressArray(disjointArmEJML.getInvertMatrixA()));
            int[] convertToIntArray2 = Compress.convertToIntArray(Compress.compressArray(disjointArmEJML.getVectorB()));
            GenericRowData genericRowData = new GenericRowData(5);
            genericRowData.setField(0, Long.valueOf(longValue));
            genericRowData.setField(1, Long.valueOf(System.currentTimeMillis()));
            genericRowData.setField(2, new GenericArrayData(convertToIntArray));
            genericRowData.setField(3, new GenericArrayData(convertToIntArray2));
            collector.collect(genericRowData);
            this.emitCounter.inc();
            long count3 = this.emitCounter.getCount();
            if (count3 % 100 == 0) {
                logger.info(String.format("[%d] emit %d arms:%n", Integer.valueOf(indexOfThisSubtask), Long.valueOf(count3)));
            }
        }
    }

    static {
        $assertionsDisabled = !DisjointLearner.class.desiredAssertionStatus();
    }
}
