package com.alibaba.pairec.linucb;

import java.util.Map;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/pairec/linucb/AbstractLearner.class */
public class AbstractLearner extends KeyedProcessFunction<Long, Event, RowData> {
    protected static final Logger logger = Logger.getLogger(AbstractLearner.class);
    protected boolean isHybrid;
    protected Map<String, Long> durationMsec;
    protected long defaultDuration;
    protected long lateEventDuration;
    protected transient MapState<String, Event> events;
    protected transient Counter receiveCounter;
    protected transient Counter lateCounter;
    protected transient Counter learnCounter;
    protected transient Counter filterCounter;
    protected transient Counter emitCounter;

    public void open(Configuration configuration) {
        RuntimeContext runtimeContext = getRuntimeContext();
        ParameterTool fromMap = ParameterTool.fromMap(runtimeContext.getExecutionConfig().getGlobalJobParameters().toMap());
        this.isHybrid = !fromMap.get("linucb.algo", "disjoint").equalsIgnoreCase("disjoint");
        this.durationMsec = Utils.getEventWaitDuration(fromMap);
        this.defaultDuration = this.durationMsec.getOrDefault("default", Long.valueOf(Time.minutes(5L).toMilliseconds())).longValue();
        this.lateEventDuration = this.durationMsec.getOrDefault("late_event", Long.valueOf(Time.minutes(30L).toMilliseconds())).longValue();
        StateTtlConfig build = StateTtlConfig.newBuilder(Time.minutes(30L)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupIncrementally(1000, true).cleanupInRocksdbCompactFilter(1000L).build();
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("events", String.class, Event.class);
        mapStateDescriptor.enableTimeToLive(build);
        this.events = runtimeContext.getMapState(mapStateDescriptor);
        this.receiveCounter = runtimeContext.getMetricGroup().counter("RECEIVE_COUNTER");
        this.lateCounter = runtimeContext.getMetricGroup().counter("LATE_COUNTER");
        this.learnCounter = runtimeContext.getMetricGroup().counter("LEARN_COUNTER");
        this.filterCounter = runtimeContext.getMetricGroup().counter("FILTER_COUNTER");
        this.emitCounter = runtimeContext.getMetricGroup().counter("EMIT_ARM_COUNTER");
    }

    public void processElement(Event event, KeyedProcessFunction<Long, Event, RowData>.Context context, Collector<RowData> collector) throws Exception {
        TimerService timerService = context.timerService();
        long currentWatermark = timerService.currentWatermark();
        if (event.getEventTime() + this.lateEventDuration <= currentWatermark) {
            return;
        }
        String key = event.getKey();
        Event event2 = (Event) this.events.get(key);
        if (event2 != null) {
            if (event2.merge(event)) {
                this.events.put(key, event2);
                return;
            }
            return;
        }
        this.events.put(key, event);
        long eventTime = event.getEventTime();
        long longValue = this.durationMsec.getOrDefault(event.getEventType(), Long.valueOf(this.defaultDuration)).longValue();
        long j = (eventTime - (eventTime % longValue)) + (2 * longValue);
        if (j <= currentWatermark) {
            j += (((currentWatermark - j) / longValue) + 1) * longValue;
            this.lateCounter.inc();
            if (this.lateCounter.getCount() % 100 == 0) {
                logger.info("late:" + event + "_watermark:" + currentWatermark);
            }
        }
        if (this.isHybrid) {
            j += Time.minutes(getRuntimeContext().getIndexOfThisSubtask()).toMilliseconds();
        }
        this.receiveCounter.inc();
        long count = this.receiveCounter.getCount();
        if (count % 1000 == 0) {
            logger.info(String.format("[%d] receive merged events:%d%n", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Long.valueOf(count)));
            logger.info("register:" + j + "---" + event + "_watermark:" + currentWatermark);
        }
        timerService.registerEventTimeTimer(j);
        if (this.durationMsec.containsKey("check_feature")) {
            timerService.registerEventTimeTimer(eventTime + this.durationMsec.get("check_feature").longValue());
        }
    }

    public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
        processElement((Event) obj, (KeyedProcessFunction<Long, Event, RowData>.Context) context, (Collector<RowData>) collector);
    }
}
