package com.aliyun.openservices.eas.discovery.core;

import com.aliyun.openservices.eas.discovery.utils.CollectionUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/aliyun/openservices/eas/discovery/core/EventDispatcher.class */
public class EventDispatcher {
    private static ExecutorService executor;
    private static BlockingQueue<Service> mailbox = new LinkedBlockingQueue();
    private static ConcurrentMap<String, List<Listener>> observerMap = new ConcurrentHashMap();

    /* loaded from: input_file:com/aliyun/openservices/eas/discovery/core/EventDispatcher$Notifier.class */
    private static class Notifier implements Runnable {
        private Notifier() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                Service service = null;
                try {
                    service = (Service) EventDispatcher.mailbox.poll(5L, TimeUnit.MINUTES);
                } catch (Exception e) {
                }
                if (service != null) {
                    try {
                        List<Listener> list = (List) EventDispatcher.observerMap.get(service.getKey());
                        if (!CollectionUtils.isEmpty(list)) {
                            for (Listener listener : list) {
                                List<Endpoint> unmodifiableList = Collections.unmodifiableList(service.getEndpoints());
                                if (!CollectionUtils.isEmpty(unmodifiableList)) {
                                    listener.onChange(unmodifiableList);
                                    DiscoveryClient.LOG.info("NOTIFY", "finish notifying Listener, dom: " + service.getKey() + ", endpoints size: " + unmodifiableList.size());
                                }
                            }
                        }
                    } catch (Exception e2) {
                        DiscoveryClient.LOG.error("NA", "notify error for dom: " + service.getName() + ", clusters: " + service.getClusters(), e2);
                    }
                }
            }
        }
    }

    public static void addListener(String str, String str2, Listener listener) {
        addListener(str, str2, "", listener);
    }

    public static void addListener(String str, String str2, String str3, Listener listener) {
        List<Listener> synchronizedList = Collections.synchronizedList(new ArrayList());
        synchronizedList.add(listener);
        List<Listener> putIfAbsent = observerMap.putIfAbsent(Service.getKey(str, str2), synchronizedList);
        if (putIfAbsent != null) {
            putIfAbsent.add(listener);
        }
        changed(HostReactor.getService(str, str2, str3));
    }

    public static void removeListener(String str, String str2, Listener listener) {
        List<Listener> list = observerMap.get(Service.getKey(str, str2));
        if (list != null) {
            Iterator<Listener> it = list.iterator();
            while (it.hasNext()) {
                if (it.next().equals(listener)) {
                    it.remove();
                }
            }
        }
    }

    public static void changed(Service service) {
        if (service == null) {
            return;
        }
        mailbox.add(service);
    }

    public static void setExecutor(ExecutorService executorService) {
        ExecutorService executorService2 = executor;
        executor = executorService;
        executorService2.shutdown();
    }

    static {
        executor = null;
        executor = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: com.aliyun.openservices.eas.discovery.core.EventDispatcher.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable, "DISCOVERY-CLIENT-LISTENER");
                thread.setDaemon(true);
                return thread;
            }
        });
        executor.execute(new Notifier());
    }
}
