package org.apache.pinot.broker.broker.helix;

import com.google.common.collect.ImmutableList;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.broker.BrokerAdminApiApplication;
import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.requesthandler.SingleConnectionBrokerRequestHandler;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.MetricsHelper;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.helix.TableCache;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.services.ServiceStartable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/broker/broker/helix/HelixBrokerStarter.class */
public class HelixBrokerStarter implements ServiceStartable {
    private static final Logger LOGGER = LoggerFactory.getLogger(HelixBrokerStarter.class);
    private final PinotConfiguration _brokerConf;
    private final String _clusterName;
    private final String _zkServers;
    private final String _brokerId;
    private final List<ClusterChangeHandler> _externalViewChangeHandlers;
    private final List<ClusterChangeHandler> _instanceConfigChangeHandlers;
    private final List<ClusterChangeHandler> _liveInstanceChangeHandlers;
    private HelixManager _spectatorHelixManager;
    private HelixAdmin _helixAdmin;
    private ZkHelixPropertyStore<ZNRecord> _propertyStore;
    private HelixDataAccessor _helixDataAccessor;
    private MetricsRegistry _metricsRegistry;
    private BrokerMetrics _brokerMetrics;
    private RoutingManager _routingManager;
    private AccessControlFactory _accessControlFactory;
    private BrokerRequestHandler _brokerRequestHandler;
    private BrokerAdminApiApplication _brokerAdminApplication;
    private ClusterChangeMediator _clusterChangeMediator;
    private HelixManager _participantHelixManager;

    public HelixBrokerStarter(PinotConfiguration pinotConfiguration, String str, String str2) throws Exception {
        this(pinotConfiguration, str, str2, null);
    }

    public HelixBrokerStarter(PinotConfiguration pinotConfiguration, String str, String str2, @Nullable String str3) throws Exception {
        this._externalViewChangeHandlers = new ArrayList();
        this._instanceConfigChangeHandlers = new ArrayList();
        this._liveInstanceChangeHandlers = new ArrayList();
        this._brokerConf = pinotConfiguration;
        setupHelixSystemProperties();
        this._clusterName = str;
        this._zkServers = str2.replaceAll("\\s+", "");
        if (str3 == null) {
            str3 = this._brokerConf.getProperty("pinot.set.instance.id.to.hostname", false) ? NetUtil.getHostnameOrAddress() : NetUtil.getHostAddress();
        }
        this._brokerId = this._brokerConf.getProperty("instanceId", "Broker_" + str3 + "_" + this._brokerConf.getProperty("pinot.broker.client.queryPort", 8099));
        this._brokerConf.addProperty("pinot.broker.id", this._brokerId);
    }

    private void setupHelixSystemProperties() {
        System.setProperty("helixmanager.flappingTimeWindow", this._brokerConf.getProperty("pinot.broker.flapping.timeWindowMs", "1"));
    }

    public void addExternalViewChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._externalViewChangeHandlers.add(clusterChangeHandler);
    }

    public void addInstanceConfigChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._instanceConfigChangeHandlers.add(clusterChangeHandler);
    }

    public void addLiveInstanceChangeHandler(ClusterChangeHandler clusterChangeHandler) {
        this._liveInstanceChangeHandlers.add(clusterChangeHandler);
    }

    public ServiceRole getServiceRole() {
        return ServiceRole.BROKER;
    }

    public String getInstanceId() {
        return this._brokerId;
    }

    public PinotConfiguration getConfig() {
        return this._brokerConf;
    }

    public void start() throws Exception {
        LOGGER.info("Starting Pinot broker");
        Utils.logVersions();
        LOGGER.info("Connecting spectator Helix manager");
        this._spectatorHelixManager = HelixManagerFactory.getZKHelixManager(this._clusterName, this._brokerId, InstanceType.SPECTATOR, this._zkServers);
        this._spectatorHelixManager.connect();
        this._helixAdmin = this._spectatorHelixManager.getClusterManagmentTool();
        this._propertyStore = this._spectatorHelixManager.getHelixPropertyStore();
        this._helixDataAccessor = this._spectatorHelixManager.getHelixDataAccessor();
        Map config = this._helixAdmin.getConfig(new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(this._clusterName).build(), Arrays.asList("enable.case.insensitive", "enable.case.insensitive.pql", "pinot.broker.enable.query.limit.override", "default.hyperloglog.log2m", "enable.distinct.count.bitmap.override"));
        boolean z = Boolean.parseBoolean((String) config.get("enable.case.insensitive")) || Boolean.parseBoolean((String) config.get("enable.case.insensitive.pql"));
        String str = (String) config.get("default.hyperloglog.log2m");
        if (str != null) {
            try {
                this._brokerConf.setProperty("default.hyperloglog.log2m", Integer.valueOf(Integer.parseInt(str)));
            } catch (NumberFormatException e) {
                LOGGER.warn("Invalid config of '{}': '{}', using: {} as the default log2m for HyperLogLog", new Object[]{"default.hyperloglog.log2m", str, 8});
            }
        }
        if (Boolean.parseBoolean((String) config.get("pinot.broker.enable.query.limit.override"))) {
            this._brokerConf.setProperty("pinot.broker.enable.query.limit.override", true);
        }
        if (Boolean.parseBoolean((String) config.get("enable.distinct.count.bitmap.override"))) {
            this._brokerConf.setProperty("enable.distinct.count.bitmap.override", true);
        }
        LOGGER.info("Setting up broker request handler");
        this._metricsRegistry = new MetricsRegistry();
        MetricsHelper.initializeMetrics(this._brokerConf.subset("pinot.broker.metrics"));
        MetricsHelper.registerMetricsRegistry(this._metricsRegistry);
        this._brokerMetrics = new BrokerMetrics(this._brokerConf.getProperty("pinot.broker.metrics.prefix", "pinot.broker."), this._metricsRegistry, this._brokerConf.getProperty("pinot.broker.enableTableLevelMetrics", true), this._brokerConf.getProperty("pinot.broker.allowedTablesForEmittingMetrics", Collections.emptyList()));
        this._brokerMetrics.initializeGlobalMeters();
        this._routingManager = new RoutingManager(this._brokerMetrics);
        this._routingManager.init(this._spectatorHelixManager);
        this._accessControlFactory = AccessControlFactory.loadFactory(this._brokerConf.subset("pinot.broker.access.control"));
        HelixExternalViewBasedQueryQuotaManager helixExternalViewBasedQueryQuotaManager = new HelixExternalViewBasedQueryQuotaManager(this._brokerMetrics, this._brokerId);
        helixExternalViewBasedQueryQuotaManager.init(this._spectatorHelixManager);
        FunctionRegistry.init();
        this._brokerRequestHandler = new SingleConnectionBrokerRequestHandler(this._brokerConf, this._routingManager, this._accessControlFactory, helixExternalViewBasedQueryQuotaManager, new TableCache(this._propertyStore, z), this._brokerMetrics);
        int property = this._brokerConf.getProperty("pinot.broker.client.queryPort", 8099);
        LOGGER.info("Starting broker admin application on port: {}", Integer.valueOf(property));
        this._brokerAdminApplication = new BrokerAdminApiApplication(this._routingManager, this._brokerRequestHandler, this._brokerMetrics);
        this._brokerAdminApplication.start(property);
        LOGGER.info("Initializing cluster change mediator");
        Iterator<ClusterChangeHandler> it = this._externalViewChangeHandlers.iterator();
        while (it.hasNext()) {
            it.next().init(this._spectatorHelixManager);
        }
        this._externalViewChangeHandlers.add(this._routingManager);
        this._externalViewChangeHandlers.add(helixExternalViewBasedQueryQuotaManager);
        Iterator<ClusterChangeHandler> it2 = this._instanceConfigChangeHandlers.iterator();
        while (it2.hasNext()) {
            it2.next().init(this._spectatorHelixManager);
        }
        this._instanceConfigChangeHandlers.add(this._routingManager);
        this._instanceConfigChangeHandlers.add(helixExternalViewBasedQueryQuotaManager);
        Iterator<ClusterChangeHandler> it3 = this._liveInstanceChangeHandlers.iterator();
        while (it3.hasNext()) {
            it3.next().init(this._spectatorHelixManager);
        }
        HashMap hashMap = new HashMap();
        hashMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, this._externalViewChangeHandlers);
        hashMap.put(HelixConstants.ChangeType.INSTANCE_CONFIG, this._instanceConfigChangeHandlers);
        if (!this._liveInstanceChangeHandlers.isEmpty()) {
            hashMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, this._liveInstanceChangeHandlers);
        }
        this._clusterChangeMediator = new ClusterChangeMediator(hashMap, this._brokerMetrics);
        this._clusterChangeMediator.start();
        this._spectatorHelixManager.addExternalViewChangeListener(this._clusterChangeMediator);
        this._spectatorHelixManager.addInstanceConfigChangeListener(this._clusterChangeMediator);
        if (!this._liveInstanceChangeHandlers.isEmpty()) {
            this._spectatorHelixManager.addLiveInstanceChangeListener(this._clusterChangeMediator);
        }
        LOGGER.info("Connecting participant Helix manager");
        this._participantHelixManager = HelixManagerFactory.getZKHelixManager(this._clusterName, this._brokerId, InstanceType.PARTICIPANT, this._zkServers);
        this._participantHelixManager.getStateMachineEngine().registerStateModelFactory(BrokerResourceOnlineOfflineStateModelFactory.getStateModelDef(), new BrokerResourceOnlineOfflineStateModelFactory(this._propertyStore, this._helixDataAccessor, this._routingManager, helixExternalViewBasedQueryQuotaManager));
        this._participantHelixManager.getMessagingService().registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), new BrokerUserDefinedMessageHandlerFactory(this._routingManager, helixExternalViewBasedQueryQuotaManager));
        this._participantHelixManager.connect();
        addInstanceTagIfNeeded();
        this._brokerMetrics.addCallbackGauge("helix.connected", () -> {
            return Long.valueOf(this._participantHelixManager.isConnected() ? 1L : 0L);
        });
        this._participantHelixManager.addPreConnectCallback(() -> {
            this._brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L);
        });
        registerServiceStatusHandler();
        LOGGER.info("Finish starting Pinot broker");
    }

    private void registerServiceStatusHandler() {
        ArrayList arrayList = new ArrayList(1);
        IdealState resourceIdealState = this._helixAdmin.getResourceIdealState(this._clusterName, "brokerResource");
        if (resourceIdealState != null && resourceIdealState.isEnabled()) {
            Iterator it = resourceIdealState.getPartitionSet().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (resourceIdealState.getInstanceSet((String) it.next()).contains(this._brokerId)) {
                    arrayList.add("brokerResource");
                    break;
                }
            }
        }
        double property = this._brokerConf.getProperty("pinot.broker.startup.minResourcePercent", 100.0d);
        LOGGER.info("Registering service status handler");
        ServiceStatus.setServiceStatusCallback(this._brokerId, new ServiceStatus.MultipleCallbackServiceStatusCallback(ImmutableList.of(new ServiceStatus.IdealStateAndCurrentStateMatchServiceStatusCallback(this._participantHelixManager, this._clusterName, this._brokerId, arrayList, property), new ServiceStatus.IdealStateAndExternalViewMatchServiceStatusCallback(this._participantHelixManager, this._clusterName, this._brokerId, arrayList, property))));
    }

    private void addInstanceTagIfNeeded() {
        List tags = this._helixDataAccessor.getProperty(this._helixDataAccessor.keyBuilder().instanceConfig(this._brokerId)).getTags();
        if (tags == null || tags.isEmpty()) {
            if (ZKMetadataProvider.getClusterTenantIsolationEnabled(this._propertyStore)) {
                this._helixAdmin.addInstanceTag(this._clusterName, this._brokerId, TagNameUtils.getBrokerTagForTenant((String) null));
            } else {
                this._helixAdmin.addInstanceTag(this._clusterName, this._brokerId, "broker_untagged");
            }
        }
    }

    public void stop() {
        LOGGER.info("Shutting down Pinot broker");
        LOGGER.info("Disconnecting participant Helix manager");
        this._participantHelixManager.disconnect();
        LOGGER.info("Stopping cluster change mediator");
        this._clusterChangeMediator.stop();
        long property = this._brokerConf.getProperty("pinot.broker.delayShutdownTimeMs", 10000L);
        LOGGER.info("Wait for {}ms before shutting down request handler to finish the pending queries", Long.valueOf(property));
        try {
            Thread.sleep(property);
        } catch (Exception e) {
            LOGGER.error("Caught exception while waiting for shutdown delay of {}ms", Long.valueOf(property), e);
        }
        LOGGER.info("Shutting down request handler and broker admin application");
        this._brokerRequestHandler.shutDown();
        this._brokerAdminApplication.stop();
        LOGGER.info("Disconnecting spectator Helix manager");
        this._spectatorHelixManager.disconnect();
        LOGGER.info("Deregistering service status handler");
        ServiceStatus.removeServiceStatusCallback(this._brokerId);
        LOGGER.info("Shutdown Broker Metrics Registry");
        this._metricsRegistry.shutdown();
        LOGGER.info("Finish shutting down Pinot broker for {}", this._brokerId);
    }

    public HelixManager getSpectatorHelixManager() {
        return this._spectatorHelixManager;
    }

    public MetricsRegistry getMetricsRegistry() {
        return this._metricsRegistry;
    }

    public BrokerMetrics getBrokerMetrics() {
        return this._brokerMetrics;
    }

    public RoutingManager getRoutingManager() {
        return this._routingManager;
    }

    public AccessControlFactory getAccessControlFactory() {
        return this._accessControlFactory;
    }

    public BrokerRequestHandler getBrokerRequestHandler() {
        return this._brokerRequestHandler;
    }

    public static HelixBrokerStarter getDefault() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("pinot.broker.client.queryPort", 5001);
        hashMap.put("pinot.broker.timeoutMs", 60000L);
        return new HelixBrokerStarter(new PinotConfiguration(hashMap), "quickstart", "localhost:2122");
    }

    public static void main(String[] strArr) throws Exception {
        getDefault().start();
    }
}
