package io.github.chengliu.nacosconsuladapter.utils;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.client.naming.NacosNamingService;
import io.github.chengliu.nacosconsuladapter.listeners.ServiceChangeListener;
import io.github.chengliu.nacosconsuladapter.model.Result;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;

/* loaded from: input_file:io/github/chengliu/nacosconsuladapter/utils/NacosServiceCenter.class */
public class NacosServiceCenter {
    private static final Logger log = LoggerFactory.getLogger(NacosServiceCenter.class);
    private NacosNamingService nacosNamingService;
    private Long DEFAULT_VERSION = 1L;
    private StampedLock stampedLock = new StampedLock();
    private Set<String> services = null;
    private Set<String> backServices = null;
    private ReentrantLock writeServiceLock = new ReentrantLock();
    private Map<String, AtomicLong> serviceVersionMap = new ConcurrentHashMap();
    private EmitterProcessor<Result<String>> emitterProcessor = EmitterProcessor.create(3, true);
    Disposable allChangeConsumer = null;

    public NacosServiceCenter(NacosNamingService nacosNamingService) {
        this.nacosNamingService = nacosNamingService;
    }

    public Set<String> getServiceNames() {
        long tryReadLock = this.stampedLock.tryReadLock();
        if (tryReadLock <= 0) {
            log.debug("未抢占到读锁，返回备份数据。");
            return this.backServices;
        }
        try {
            log.debug("抢占到读锁，返回真实数据");
            Set<String> set = this.services;
            this.stampedLock.unlockRead(tryReadLock);
            return set;
        } catch (Throwable th) {
            this.stampedLock.unlockRead(tryReadLock);
            throw th;
        }
    }

    public void initSetNames(List<String> list) {
        log.debug("first set services’s name");
        this.services = new HashSet(list);
        subscribe(this.services);
        this.allChangeConsumer = this.emitterProcessor.subscribe(result -> {
            log.debug("consume service change {}:{}.", result.getData(), Long.valueOf(result.getChangeIndex()));
        });
    }

    public Flux<Result<String>> getChangeHotSource(String str) {
        return this.emitterProcessor.filter(result -> {
            return ((String) result.getData()).equals(str);
        });
    }

    public void setServiceNames(List<String> list) {
        if (!this.writeServiceLock.tryLock()) {
            log.debug("未抢占到writeServiceLock");
            return;
        }
        try {
            if (this.services.containsAll(list) && list.containsAll(this.services)) {
                log.debug("服务未发生变化无需进行修改");
                this.writeServiceLock.unlock();
                return;
            }
            this.backServices = this.services;
            long writeLock = this.stampedLock.writeLock();
            try {
                log.debug("服务发生变化，修改缓存服务名称");
                this.services = new HashSet(list);
                subscribe((Set) this.services.stream().filter(str -> {
                    return !this.backServices.contains(str);
                }).collect(Collectors.toSet()));
                this.stampedLock.unlockWrite(writeLock);
                this.backServices = null;
                this.writeServiceLock.unlock();
            } catch (Throwable th) {
                this.stampedLock.unlockWrite(writeLock);
                throw th;
            }
        } catch (Throwable th2) {
            this.writeServiceLock.unlock();
            throw th2;
        }
    }

    public Long getServiceVersion(String str) {
        Long valueOf = Long.valueOf(this.serviceVersionMap.getOrDefault(str, new AtomicLong(0L)).get());
        log.debug("{} version is {}", str, valueOf);
        return valueOf;
    }

    private void subscribe(Set<String> set) {
        set.stream().forEach(str -> {
            subscribe(str, new ServiceChangeListener(str, this));
        });
    }

    private void subscribe(String str, EventListener eventListener) {
        try {
            this.nacosNamingService.subscribe(str, eventListener);
            log.debug("subscribe new service: {}", str);
        } catch (NacosException e) {
            log.error("{} subscribe nacos fail.fail message:{}", str, e.getErrMsg());
        }
    }

    public void publish(String str) {
        this.emitterProcessor.onNext(new Result(str, incrementServiceVersion(str).longValue()));
    }

    public Long incrementServiceVersion(String str) {
        log.debug("increment {} version.", str);
        return Long.valueOf(this.serviceVersionMap.computeIfAbsent(str, str2 -> {
            return new AtomicLong(this.DEFAULT_VERSION.longValue());
        }).incrementAndGet());
    }

    @PreDestroy
    public void destroy() {
        log.debug("destroy allChangeConsumer");
        this.allChangeConsumer.dispose();
    }
}
