/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.client.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.noear.solon.net.http.HttpUtilsBuilder;
import org.noear.solon.net.http.textstream.ServerSentEvent;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class WebRxSseClientTransport
implements McpClientTransport {
    private static final Logger logger = LoggerFactory.getLogger(WebRxSseClientTransport.class);
    private static final String MCP_PROTOCOL_VERSION = "2024-11-05";
    private static final String MESSAGE_EVENT_TYPE = "message";
    private static final String ENDPOINT_EVENT_TYPE = "endpoint";
    private static final String DEFAULT_SSE_ENDPOINT = "/sse";
    private final HttpUtilsBuilder webClientBuilder;
    protected ObjectMapper objectMapper;
    private Disposable inboundSubscription;
    private volatile boolean isClosing = false;
    protected final Sinks.One<String> messageEndpointSink = Sinks.one();
    private String sseEndpoint;
    private BiConsumer<Retry.RetrySignal, SynchronousSink<Object>> inboundRetryHandler = (retrySpec, sink) -> {
        if (this.isClosing) {
            logger.debug("SSE connection closed during shutdown");
            sink.error(retrySpec.failure());
            return;
        }
        if (retrySpec.failure() instanceof IOException) {
            logger.debug("Retrying SSE connection after IO error");
            sink.next(retrySpec);
            return;
        }
        logger.error("Fatal SSE error, not retrying: {}", (Object)retrySpec.failure().getMessage());
        sink.error(retrySpec.failure());
    };

    public WebRxSseClientTransport(HttpUtilsBuilder webClientBuilder) {
        this(webClientBuilder, new ObjectMapper());
    }

    public WebRxSseClientTransport(HttpUtilsBuilder webClientBuilder, ObjectMapper objectMapper) {
        this(webClientBuilder, objectMapper, DEFAULT_SSE_ENDPOINT);
    }

    public WebRxSseClientTransport(HttpUtilsBuilder webClientBuilder, ObjectMapper objectMapper, String sseEndpoint) {
        Assert.notNull(objectMapper, "ObjectMapper must not be null");
        Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
        Assert.hasText(sseEndpoint, "SSE endpoint must not be null or empty");
        this.objectMapper = objectMapper;
        this.webClientBuilder = webClientBuilder;
        this.sseEndpoint = sseEndpoint;
    }

    @Override
    public List<String> protocolVersions() {
        return Utils.asList(MCP_PROTOCOL_VERSION);
    }

    @Override
    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
        Flux<ServerSentEvent> events = this.eventStream();
        this.inboundSubscription = events.concatMap(event -> Mono.just((Object)event).handle((e, s) -> {
            if (ENDPOINT_EVENT_TYPE.equals(event.event())) {
                String messageEndpointUri = event.data();
                if (this.messageEndpointSink.tryEmitValue((Object)messageEndpointUri).isSuccess()) {
                    s.complete();
                } else {
                    s.error((Throwable)new RuntimeException("Failed to handle SSE endpoint event"));
                }
            } else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
                try {
                    McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
                    s.next((Object)message);
                }
                catch (IOException ioException) {
                    s.error((Throwable)ioException);
                }
            } else {
                logger.debug("Received unrecognized SSE event type: {}", event);
                s.complete();
            }
        }).transform(handler)).subscribe();
        return this.messageEndpointSink.asMono().then();
    }

    @Override
    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
        return this.messageEndpointSink.asMono().flatMap(messageEndpointUri -> {
            if (this.isClosing) {
                return Mono.empty();
            }
            try {
                String jsonText = this.objectMapper.writeValueAsString((Object)message);
                return Mono.fromFuture((CompletableFuture)this.webClientBuilder.build(messageEndpointUri).contentType("application/json").header("MCP-Protocol-Version", MCP_PROTOCOL_VERSION).bodyOfJson(jsonText).execAsync("POST")).doOnSuccess(response -> logger.debug("Message sent successfully")).doOnError(error -> {
                    if (!this.isClosing) {
                        logger.error("Error sending message: {}", (Object)error.getMessage());
                    }
                });
            }
            catch (IOException e) {
                if (!this.isClosing) {
                    return Mono.error((Throwable)new RuntimeException("Failed to serialize message", e));
                }
                return Mono.empty();
            }
        }).then();
    }

    protected Flux<ServerSentEvent> eventStream() {
        return Flux.from((Publisher)this.webClientBuilder.build(this.sseEndpoint).accept("text/event-stream").header("MCP-Protocol-Version", MCP_PROTOCOL_VERSION).execAsSseStream("GET")).retryWhen(Retry.from(retrySignal -> retrySignal.handle(this.inboundRetryHandler)));
    }

    @Override
    public Mono<Void> closeGracefully() {
        return Mono.fromRunnable(() -> {
            this.isClosing = true;
            if (this.inboundSubscription != null) {
                this.inboundSubscription.dispose();
            }
        }).then().subscribeOn(Schedulers.boundedElastic());
    }

    @Override
    public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
        return (T)this.objectMapper.convertValue(data, typeRef);
    }

    public static Builder builder(HttpUtilsBuilder webClientBuilder) {
        return new Builder(webClientBuilder);
    }

    public static class Builder {
        private final HttpUtilsBuilder webClientBuilder;
        private String sseEndpoint = "/sse";
        private ObjectMapper objectMapper = new ObjectMapper();

        public Builder(HttpUtilsBuilder webClientBuilder) {
            Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
            this.webClientBuilder = webClientBuilder;
        }

        public Builder sseEndpoint(String sseEndpoint) {
            Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
            this.sseEndpoint = sseEndpoint;
            return this;
        }

        public Builder objectMapper(ObjectMapper objectMapper) {
            Assert.notNull(objectMapper, "objectMapper must not be null");
            this.objectMapper = objectMapper;
            return this;
        }

        public WebRxSseClientTransport build() {
            return new WebRxSseClientTransport(this.webClientBuilder, this.objectMapper, this.sseEndpoint);
        }
    }
}

