package org.apache.flink.streaming.connectors.kafka.internals.metrics;

import com.alibaba.ververica.connectors.common.metrics.SinkMetricNames;
import com.alibaba.ververica.connectors.common.metrics.SourceMetricNames;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricRecorder.class */
public class KafkaMetricRecorder {
    public static final String SOURCE_METRIC_GROUP = "source";
    public static final String SINK_METRIC_GROUP = "sink";
    public static final long INVALID = -1;
    private final OperatorMetricGroup operatorMetricGroup;
    private Meter numBytesInPerSecond;
    private Meter numRecordsInPerSecond;
    private Gauge<Long> pendingRecords;
    private Meter numBytesOutPerSecond;
    private Meter numRecordsOutPerSecond;
    private Counter numRecordsOutErrors;
    private final SimpleGauge<Long> currentFetchEventTimeLag = new SimpleGauge<>(-1L);
    private final SimpleGauge<Long> currentEmitEventTimeLag = new SimpleGauge<>(-1L);
    private final CurrentTimeLagGauge watermarkLag = new CurrentTimeLagGauge();
    private final CurrentTimeLagGauge sourceIdleTime = new CurrentTimeLagGauge();
    private final SimpleGauge<Long> currentSendTime = new SimpleGauge<>(-1L);

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricRecorder$CurrentTimeLagGauge.class */
    public static class CurrentTimeLagGauge implements Gauge<Long> {
        public Long timestamp = -1L;

        public void update(Long l) {
            this.timestamp = l;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Long m3815getValue() {
            if (this.timestamp.longValue() == -1) {
                return -1L;
            }
            return Long.valueOf(System.currentTimeMillis() - this.timestamp.longValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/metrics/KafkaMetricRecorder$SimpleGauge.class */
    public static class SimpleGauge<T> implements Gauge<T> {
        private T value;

        public SimpleGauge(T t) {
            this.value = t;
        }

        public void update(T t) {
            this.value = t;
        }

        public T getValue() {
            return this.value;
        }
    }

    public KafkaMetricRecorder(OperatorMetricGroup operatorMetricGroup) {
        this.operatorMetricGroup = operatorMetricGroup;
    }

    public void registerSourceMetrics() {
        Counter counter = this.operatorMetricGroup.counter(SourceMetricNames.NUM_BYTES_IN);
        this.numBytesInPerSecond = this.operatorMetricGroup.meter(SourceMetricNames.NUM_BYTES_IN_RATE, new MeterView(counter));
        MetricGroup addGroup = this.operatorMetricGroup.addGroup("source");
        addGroup.counter(SourceMetricNames.NUM_BYTES_IN, counter);
        addGroup.meter(SourceMetricNames.NUM_BYTES_IN_RATE, this.numBytesInPerSecond);
        Counter numRecordsInCounter = this.operatorMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
        this.numRecordsInPerSecond = this.operatorMetricGroup.getIOMetricGroup().getNumRecordsInRateMeter();
        addGroup.counter(SourceMetricNames.NUM_RECORDS_IN, numRecordsInCounter);
        addGroup.meter(SourceMetricNames.NUM_RECORDS_IN_RATE, this.numRecordsInPerSecond);
        this.operatorMetricGroup.gauge(SourceMetricNames.CURRENT_FETCH_EVENT_TIME_LAG, this.currentFetchEventTimeLag);
        this.operatorMetricGroup.gauge(SourceMetricNames.CURRENT_EMIT_EVENT_TIME_LAG, this.currentEmitEventTimeLag);
        this.operatorMetricGroup.gauge(SourceMetricNames.WATERMARK_LAG, this.watermarkLag);
        this.operatorMetricGroup.gauge(SourceMetricNames.SOURCE_IDLE_TIME, this.sourceIdleTime);
    }

    public void registerSourcePendingRecords(Gauge<Long> gauge) {
        this.pendingRecords = gauge;
        this.operatorMetricGroup.gauge(SourceMetricNames.PENDING_RECORDS, gauge);
    }

    public Meter getNumBytesInPerSecond() {
        return this.numBytesInPerSecond;
    }

    public Meter getNumRecordsInPerSecond() {
        return this.numRecordsInPerSecond;
    }

    public SimpleGauge<Long> getCurrentFetchEventTimeLag() {
        return this.currentFetchEventTimeLag;
    }

    public SimpleGauge<Long> getCurrentEmitEventTimeLag() {
        return this.currentEmitEventTimeLag;
    }

    public CurrentTimeLagGauge getWatermarkLag() {
        return this.watermarkLag;
    }

    public CurrentTimeLagGauge getSourceIdleTime() {
        return this.sourceIdleTime;
    }

    public Gauge<Long> getPendingRecords() {
        return this.pendingRecords;
    }

    public void registerSinkMetrics() {
        Counter counter = this.operatorMetricGroup.counter(SinkMetricNames.NUM_BYTES_OUT);
        this.numBytesOutPerSecond = this.operatorMetricGroup.meter(SinkMetricNames.NUM_BYTES_OUT_RATE, new MeterView(counter));
        MetricGroup addGroup = this.operatorMetricGroup.addGroup("sink");
        addGroup.counter(SinkMetricNames.NUM_BYTES_OUT, counter);
        addGroup.meter(SinkMetricNames.NUM_BYTES_OUT_RATE, this.numBytesOutPerSecond);
        Counter numRecordsOutCounter = this.operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
        this.numRecordsOutPerSecond = this.operatorMetricGroup.getIOMetricGroup().getNumRecordsOutRate();
        addGroup.counter(SinkMetricNames.NUM_RECORDS_OUT, numRecordsOutCounter);
        addGroup.meter(SinkMetricNames.NUM_RECORDS_OUT_RATE, this.numRecordsOutPerSecond);
        this.numRecordsOutErrors = this.operatorMetricGroup.counter(SinkMetricNames.NUM_RECORDS_OUT_ERRORS);
        this.operatorMetricGroup.gauge(SinkMetricNames.CURRENT_SEND_TIME, this.currentSendTime);
    }

    public Meter getNumBytesOutPerSecond() {
        return this.numBytesOutPerSecond;
    }

    public Meter getNumRecordsOutPerSecond() {
        return this.numRecordsOutPerSecond;
    }

    public Counter getNumRecordsOutErrors() {
        return this.numRecordsOutErrors;
    }

    public SimpleGauge<Long> getCurrentSendTime() {
        return this.currentSendTime;
    }
}
