package org.apache.pinot.tools.streams;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.glassfish.tyrus.client.ClientManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/tools/streams/MeetupRsvpStream.class */
public class MeetupRsvpStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(MeetupRsvpStream.class);
    private StreamDataProducer producer;
    private boolean keepPublishing;
    private boolean _partitionByKey;
    private ClientManager client;

    public MeetupRsvpStream() throws Exception {
        this(false);
    }

    public MeetupRsvpStream(boolean z) throws Exception {
        this.keepPublishing = true;
        Properties properties = new Properties();
        properties.put("metadata.broker.list", KafkaStarterUtils.DEFAULT_KAFKA_BROKER);
        properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
        properties.put("request.required.acks", "1");
        this._partitionByKey = z;
        this.producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties);
    }

    public void stopPublishing() {
        this.keepPublishing = false;
        this.producer.close();
        this.client.shutdown();
    }

    public void run() {
        try {
            ClientEndpointConfig build = ClientEndpointConfig.Builder.create().build();
            this.client = ClientManager.createClient();
            this.client.connectToServer(new Endpoint() { // from class: org.apache.pinot.tools.streams.MeetupRsvpStream.1
                public void onOpen(Session session, EndpointConfig endpointConfig) {
                    try {
                        session.addMessageHandler(new MessageHandler.Whole<String>() { // from class: org.apache.pinot.tools.streams.MeetupRsvpStream.1.1
                            public void onMessage(String str) {
                                try {
                                    JsonNode stringToJsonNode = JsonUtils.stringToJsonNode(str);
                                    ObjectNode newObjectNode = JsonUtils.newObjectNode();
                                    JsonNode jsonNode = stringToJsonNode.get("venue");
                                    if (jsonNode != null) {
                                        newObjectNode.set("venue_name", jsonNode.get("venue_name"));
                                    }
                                    JsonNode jsonNode2 = stringToJsonNode.get("event");
                                    if (jsonNode2 != null) {
                                        newObjectNode.set("event_name", jsonNode2.get("event_name"));
                                        newObjectNode.set("event_id", jsonNode2.get("event_id"));
                                        newObjectNode.set("event_time", jsonNode2.get("time"));
                                    }
                                    JsonNode jsonNode3 = stringToJsonNode.get("group");
                                    if (jsonNode3 != null) {
                                        newObjectNode.set("group_city", jsonNode3.get("group_city"));
                                        newObjectNode.set("group_country", jsonNode3.get("group_country"));
                                        newObjectNode.set("group_id", jsonNode3.get("group_id"));
                                        newObjectNode.set("group_name", jsonNode3.get("group_name"));
                                        newObjectNode.set("group_lat", jsonNode3.get("group_lat"));
                                        newObjectNode.set("group_lon", jsonNode3.get("group_lon"));
                                    }
                                    newObjectNode.set("mtime", stringToJsonNode.get("mtime"));
                                    newObjectNode.put("rsvp_count", 1);
                                    if (MeetupRsvpStream.this.keepPublishing) {
                                        if (MeetupRsvpStream.this._partitionByKey) {
                                            MeetupRsvpStream.this.producer.produce("meetupRSVPEvents", jsonNode2.get("event_id").toString().getBytes(StandardCharsets.UTF_8), newObjectNode.toString().getBytes(StandardCharsets.UTF_8));
                                        } else {
                                            MeetupRsvpStream.this.producer.produce("meetupRSVPEvents", newObjectNode.toString().getBytes(StandardCharsets.UTF_8));
                                        }
                                    }
                                } catch (Exception e) {
                                    MeetupRsvpStream.LOGGER.error("error processing raw event ", e);
                                }
                            }
                        });
                        session.getBasicRemote().sendText("");
                    } catch (IOException e) {
                        MeetupRsvpStream.LOGGER.error("found an event where data did not have all the fields, don't care about for quickstart", e);
                    }
                }
            }, build, new URI("wss://stream.meetup.com/2/rsvps"));
        } catch (Exception e) {
            LOGGER.error("encountered an error running the meetupRSVPEvents stream", e);
        }
    }
}
