package com.alibaba.pairec.linucb;

import com.alibaba.pairec.io.Hologres;
import com.alibaba.pairec.linucb.Event;
import com.alibaba.ververica.connectors.datahub.source.DatahubSourceFunction;
import com.alibaba.ververica.connectors.hologres.api.HologresTableSchema;
import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam;
import com.alibaba.ververica.connectors.hologres.rpc.HologresRpcWriter;
import com.alibaba.ververica.connectors.hologres.sink.HologresSinkFunction;
import com.aliyun.datahub.client.model.RecordEntry;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Locale;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/alibaba/pairec/linucb/Driver.class */
public class Driver {
    private static final Logger logger = Logger.getLogger(Driver.class);

    public static void main(String[] strArr) throws Exception {
        String lowerCase = ParameterTool.fromArgs(strArr).getRequired("datasource").toLowerCase(Locale.ROOT);
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case 101807910:
                if (lowerCase.equals("kafka")) {
                    z = true;
                    break;
                }
                break;
            case 1443204363:
                if (lowerCase.equals("datahub")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                runFromDatahub();
                return;
            case true:
                runFromKafka();
                return;
            default:
                throw new RuntimeException("Unsupported datasource");
        }
    }

    public static void runFromDatahub() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool fromMap = ParameterTool.fromMap(GlobalConfiguration.loadConfiguration().toMap());
        executionEnvironment.getConfig().setGlobalJobParameters(fromMap);
        DatahubSourceFunction datahubSourceFunction = new DatahubSourceFunction(fromMap.get("datahub.endpoint"), fromMap.get("datahub.project"), fromMap.get("datahub.topic"), fromMap.get("datahub.sub.id"), fromMap.get("datahub.access.id"), fromMap.get("datahub.access.key"), fromMap.getLong("datahub.start.in.ms", System.currentTimeMillis()), Long.MAX_VALUE);
        long minWindowSize = Utils.getMinWindowSize(fromMap);
        int i = fromMap.getInt("log.parser.parallelism", executionEnvironment.getConfig().getParallelism());
        logger.info("min.window.size=" + minWindowSize);
        logger.info("log.parser.parallelism=" + i);
        String str = fromMap.get("linucb.algo", "disjoint");
        logger.info("linucb.algo=" + str);
        sinkToHologres(executionEnvironment, fromMap, executionEnvironment.addSource(datahubSourceFunction).setParallelism(i).flatMap(new RichFlatMapFunction<RecordEntry, Event>() { // from class: com.alibaba.pairec.linucb.Driver.1
            public void open(Configuration configuration) {
                ParameterTool fromMap2 = ParameterTool.fromMap(getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap());
                Config.getInstance(fromMap2);
                Hologres.getInstance(fromMap2);
            }

            public void flatMap(RecordEntry recordEntry, Collector<Event> collector) {
                Event build = new Event.Builder(recordEntry).build();
                if (null != build) {
                    collector.collect(build);
                }
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((RecordEntry) obj, (Collector<Event>) collector);
            }
        }).setParallelism(i).filter(event -> {
            return event != null && event.isValid();
        }).setParallelism(i).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(minWindowSize)).withTimestampAssigner((event2, j) -> {
            return event2.getEventTime();
        })).setParallelism(i).keyBy((v0) -> {
            return v0.getArmId();
        }).process(str.equalsIgnoreCase("disjoint") ? new DisjointLearner() : new HybridLearner()));
    }

    private static void sinkToHologres(StreamExecutionEnvironment streamExecutionEnvironment, ParameterTool parameterTool, DataStream<RowData> dataStream) throws Exception {
        TableSchema build = new TableSchema.Builder().field("arm_id", DataTypes.BIGINT().notNull()).field("version", DataTypes.BIGINT()).field("invert_matrix_a", DataTypes.ARRAY(DataTypes.INT())).field("vector_b", DataTypes.ARRAY(DataTypes.INT())).field("matrix_b", DataTypes.ARRAY(DataTypes.INT())).primaryKey(new String[]{"arm_id"}).build();
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(parameterTool.getConfiguration());
        logger.info(hologresConnectionParam);
        dataStream.addSink(new HologresSinkFunction(hologresConnectionParam, HologresRpcWriter.createTableWriter(hologresConnectionParam, build, HologresTableSchema.get(hologresConnectionParam)))).name("Holo Sink");
        streamExecutionEnvironment.execute("linucb");
    }

    public static void runFromKafka() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        ParameterTool fromMap = ParameterTool.fromMap(GlobalConfiguration.loadConfiguration().toMap());
        executionEnvironment.getConfig().setGlobalJobParameters(fromMap);
        String str = fromMap.get("kafka.topic");
        String str2 = fromMap.get("kafka.bootstrap.servers");
        String str3 = fromMap.get("kafka.group.id");
        System.out.println("kafka.topic=" + str);
        System.out.println("kafka.bootstrap.servers=" + str2);
        System.out.println("kafka.group.id=" + str3);
        KafkaSource build = KafkaSource.builder().setBootstrapServers(str2).setTopics(str).setGroupId(str3).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();
        String str4 = fromMap.get("linucb.algo", "disjoint");
        KeyedProcessFunction disjointLearner = str4.equalsIgnoreCase("disjoint") ? new DisjointLearner() : new HybridLearner();
        long minWindowSize = Utils.getMinWindowSize(fromMap);
        int i = fromMap.getInt("log.parser.parallelism", executionEnvironment.getConfig().getParallelism());
        System.out.println("linucb.algo=" + str4);
        System.out.println("min.window.size=" + minWindowSize);
        System.out.println("log.parser.parallelism=" + i);
        sinkToHologres(executionEnvironment, fromMap, executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "Kafka Source").map(new RichMapFunction<String, Event>() { // from class: com.alibaba.pairec.linucb.Driver.2
            public Event map(String str5) {
                return new Event.Builder(str5).build();
            }

            public void open(Configuration configuration) {
                Config.getInstance(ParameterTool.fromMap(getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap()));
            }
        }).setParallelism(i).filter(event -> {
            return event != null && event.isValid();
        }).setParallelism(i).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(minWindowSize)).withTimestampAssigner((event2, j) -> {
            return event2.getEventTime();
        })).setParallelism(i).keyBy((v0) -> {
            return v0.getArmId();
        }).process(disjointLearner));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2059297693:
                if (implMethodName.equals("lambda$runFromKafka$a25ae85e$1")) {
                    z = false;
                    break;
                }
                break;
            case 15036633:
                if (implMethodName.equals("lambda$runFromDatahub$70d0efa6$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1032691166:
                if (implMethodName.equals("lambda$runFromKafka$70d0efa6$1")) {
                    z = true;
                    break;
                }
                break;
            case 1218015070:
                if (implMethodName.equals("lambda$runFromDatahub$a25ae85e$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1948910305:
                if (implMethodName.equals("getArmId")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("com/alibaba/pairec/linucb/Driver") && serializedLambda.getImplMethodSignature().equals("(Lcom/alibaba/pairec/linucb/Event;J)J")) {
                    return (event2, j) -> {
                        return event2.getEventTime();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/alibaba/pairec/linucb/Driver") && serializedLambda.getImplMethodSignature().equals("(Lcom/alibaba/pairec/linucb/Event;)Z")) {
                    return event -> {
                        return event != null && event.isValid();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("com/alibaba/pairec/linucb/Driver") && serializedLambda.getImplMethodSignature().equals("(Lcom/alibaba/pairec/linucb/Event;J)J")) {
                    return (event22, j2) -> {
                        return event22.getEventTime();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/alibaba/pairec/linucb/AbstractEvent") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getArmId();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/alibaba/pairec/linucb/AbstractEvent") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getArmId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("filter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/alibaba/pairec/linucb/Driver") && serializedLambda.getImplMethodSignature().equals("(Lcom/alibaba/pairec/linucb/Event;)Z")) {
                    return event3 -> {
                        return event3 != null && event3.isValid();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
