/*
 * Decompiled with CFR 0.152.
 */
package io.modelcontextprotocol.client.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportException;
import io.modelcontextprotocol.spec.McpTransportSession;
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import io.modelcontextprotocol.spec.McpTransportStream;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.noear.solon.net.http.HttpResponse;
import org.noear.solon.net.http.HttpResponseException;
import org.noear.solon.net.http.HttpUtilsBuilder;
import org.noear.solon.net.http.textstream.ServerSentEvent;
import org.noear.solon.net.http.textstream.TextStreamUtil;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class WebRxStreamableHttpTransport
implements McpClientTransport {
    private static final String MISSING_SESSION_ID = "[missing_session_id]";
    private static final Logger logger = LoggerFactory.getLogger(WebRxStreamableHttpTransport.class);
    private static final String MCP_PROTOCOL_VERSION = "2025-03-26";
    private static final String DEFAULT_ENDPOINT = "/mcp";
    private static final String MESSAGE_EVENT_TYPE = "message";
    private final ObjectMapper objectMapper;
    private final HttpUtilsBuilder webClientBuilder;
    private final String endpoint;
    private final boolean openConnectionOnStartup;
    private final boolean resumableStreams;
    private final AtomicReference<DefaultMcpTransportSession> activeSession = new AtomicReference();
    private final AtomicReference<Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>>> handler = new AtomicReference();
    private final AtomicReference<Consumer<Throwable>> exceptionHandler = new AtomicReference();

    private WebRxStreamableHttpTransport(ObjectMapper objectMapper, HttpUtilsBuilder webClientBuilder, String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) {
        this.objectMapper = objectMapper;
        this.webClientBuilder = webClientBuilder;
        this.endpoint = endpoint;
        this.resumableStreams = resumableStreams;
        this.openConnectionOnStartup = openConnectionOnStartup;
        this.activeSession.set(this.createTransportSession());
    }

    @Override
    public List<String> protocolVersions() {
        return Utils.asList("2024-11-05", MCP_PROTOCOL_VERSION);
    }

    public static Builder builder(HttpUtilsBuilder webClientBuilder) {
        return new Builder(webClientBuilder);
    }

    @Override
    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
        return Mono.deferContextual(ctx -> {
            this.handler.set(handler);
            if (this.openConnectionOnStartup) {
                logger.debug("Eagerly opening connection on startup");
                return this.reconnect(null).then();
            }
            return Mono.empty();
        });
    }

    private DefaultMcpTransportSession createTransportSession() {
        Function<String, Publisher<Void>> onClose = sessionId -> {
            if (sessionId == null) {
                return Mono.empty();
            }
            return Mono.fromFuture((CompletableFuture)this.webClientBuilder.build(this.endpoint).header("mcp-session-id", sessionId).header("MCP-Protocol-Version", MCP_PROTOCOL_VERSION).execAsync("DELETE")).onErrorComplete(e -> {
                logger.warn("Got error when closing transport", e);
                return true;
            }).then();
        };
        return new DefaultMcpTransportSession(onClose);
    }

    @Override
    public void setExceptionHandler(Consumer<Throwable> handler) {
        logger.debug("Exception handler registered");
        this.exceptionHandler.set(handler);
    }

    private void handleException(Throwable t) {
        Consumer<Throwable> handler;
        logger.debug("Handling exception for session {}", (Object)WebRxStreamableHttpTransport.sessionIdOrPlaceholder(this.activeSession.get()), (Object)t);
        if (t instanceof McpTransportSessionNotFoundException) {
            McpTransportSession invalidSession = this.activeSession.getAndSet(this.createTransportSession());
            logger.warn("Server does not recognize session {}. Invalidating.", invalidSession.sessionId());
            invalidSession.close();
        }
        if ((handler = this.exceptionHandler.get()) != null) {
            handler.accept(t);
        }
    }

    @Override
    public Mono<Void> closeGracefully() {
        return Mono.defer(() -> {
            logger.debug("Graceful close triggered");
            DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(this.createTransportSession());
            if (currentSession != null) {
                return currentSession.closeGracefully();
            }
            return Mono.empty();
        });
    }

    private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
        return Mono.deferContextual(ctx -> {
            if (stream != null) {
                logger.debug("Reconnecting stream {} with lastId {}", (Object)stream.streamId(), stream.lastId());
            } else {
                logger.debug("Reconnecting with no prior stream");
            }
            AtomicReference<Disposable> disposableRef = new AtomicReference<Disposable>();
            McpTransportSession transportSession = this.activeSession.get();
            Disposable connection = Mono.fromFuture((CompletableFuture)this.webClientBuilder.build(this.endpoint).accept("text/event-stream").header("MCP-Protocol-Version", MCP_PROTOCOL_VERSION).fill(http -> {
                transportSession.sessionId().ifPresent(id -> http.header("mcp-session-id", id));
                if (stream != null) {
                    stream.lastId().ifPresent(id -> http.header("Last-Event-ID", id));
                }
            }).execAsync("GET")).flatMapMany(response -> {
                if (WebRxStreamableHttpTransport.isEventStream(response)) {
                    logger.debug("Established SSE stream via GET");
                    return this.eventStream(stream, (HttpResponse)response);
                }
                if (WebRxStreamableHttpTransport.isNotAllowed(response)) {
                    logger.debug("The server does not support SSE streams, using request-response mode.");
                    return Flux.empty();
                }
                if (WebRxStreamableHttpTransport.isNotFound(response)) {
                    if (transportSession.sessionId().isPresent()) {
                        String sessionIdRepresentation = WebRxStreamableHttpTransport.sessionIdOrPlaceholder(transportSession);
                        return WebRxStreamableHttpTransport.mcpSessionNotFoundError(sessionIdRepresentation);
                    }
                    return this.extractError((HttpResponse)response, MISSING_SESSION_ID);
                }
                return Flux.error((Throwable)response.createError()).doOnError(e -> logger.info("Opening an SSE stream failed. This can be safely ignored.", e));
            }).flatMap(jsonrpcMessage -> this.handler.get().apply((Mono<McpSchema.JSONRPCMessage>)Mono.just((Object)jsonrpcMessage))).onErrorComplete(t -> {
                this.handleException((Throwable)t);
                return true;
            }).doFinally(s -> {
                Disposable ref = disposableRef.getAndSet(null);
                if (ref != null) {
                    transportSession.removeConnection(ref);
                }
            }).contextWrite(ctx).subscribe();
            disposableRef.set(connection);
            transportSession.addConnection(connection);
            return Mono.just((Object)connection);
        });
    }

    @Override
    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
        return Mono.create(sink -> {
            logger.debug("Sending message {}", (Object)message);
            AtomicReference<Disposable> disposableRef = new AtomicReference<Disposable>();
            McpTransportSession transportSession = this.activeSession.get();
            String messageJsonStr = null;
            try {
                messageJsonStr = this.objectMapper.writeValueAsString((Object)message);
            }
            catch (Exception e) {
                sink.error((Throwable)e);
                return;
            }
            Disposable connection = Mono.fromFuture((CompletableFuture)this.webClientBuilder.build(this.endpoint).accept("application/json, text/event-stream").header("MCP-Protocol-Version", MCP_PROTOCOL_VERSION).fill(http -> transportSession.sessionId().ifPresent(id -> http.header("mcp-session-id", id))).bodyOfJson(messageJsonStr).execAsync("POST")).flatMapMany(response -> {
                if (transportSession.markInitialized(response.header("mcp-session-id"))) {
                    this.reconnect(null).contextWrite(sink.contextView()).subscribe();
                }
                String sessionRepresentation = WebRxStreamableHttpTransport.sessionIdOrPlaceholder(transportSession);
                if (response.code() >= 200 && response.code() < 300) {
                    String contentType = response.contentType();
                    if (contentType == null || contentType.length() == 0) {
                        logger.trace("Message was successfully sent via POST for session {}", (Object)sessionRepresentation);
                        sink.success();
                        return Flux.empty();
                    }
                    if (contentType.startsWith("text/event-stream")) {
                        logger.debug("Established SSE stream via POST");
                        sink.success();
                        return this.newEventStream((HttpResponse)response, sessionRepresentation);
                    }
                    if (contentType.startsWith("application/json")) {
                        logger.trace("Received response to POST for session {}", (Object)sessionRepresentation);
                        sink.success();
                        return this.directResponseFlux(message, (HttpResponse)response);
                    }
                    logger.warn("Unknown media type {} returned for POST in session {}", (Object)contentType, (Object)sessionRepresentation);
                    return Flux.error((Throwable)new RuntimeException("Unknown media type returned: " + contentType));
                }
                if (WebRxStreamableHttpTransport.isNotFound(response) && !sessionRepresentation.equals(MISSING_SESSION_ID)) {
                    return WebRxStreamableHttpTransport.mcpSessionNotFoundError(sessionRepresentation);
                }
                return this.extractError((HttpResponse)response, sessionRepresentation);
            }).flatMap(jsonRpcMessage -> this.handler.get().apply((Mono<McpSchema.JSONRPCMessage>)Mono.just((Object)jsonRpcMessage))).onErrorComplete(t -> {
                this.handleException((Throwable)t);
                sink.error(t);
                return true;
            }).doFinally(s -> {
                Disposable ref = disposableRef.getAndSet(null);
                if (ref != null) {
                    transportSession.removeConnection(ref);
                }
            }).contextWrite(sink.contextView()).subscribe();
            disposableRef.set(connection);
            transportSession.addConnection(connection);
        });
    }

    private static Flux<McpSchema.JSONRPCMessage> mcpSessionNotFoundError(String sessionRepresentation) {
        logger.warn("Session {} was not found on the MCP server", (Object)sessionRepresentation);
        return Flux.error((Throwable)new McpTransportSessionNotFoundException(sessionRepresentation));
    }

    private Flux<McpSchema.JSONRPCMessage> extractError(HttpResponse response, String sessionRepresentation) {
        return Flux.defer(() -> {
            try {
                RuntimeException toPropagate;
                HttpResponseException e = response.createError();
                byte[] body = e.bodyBytes();
                if (body == null || body.length == 0) {
                    toPropagate = new RuntimeException("Sending request failed(" + e.getMessage() + ")", (Throwable)e);
                } else {
                    try {
                        McpSchema.JSONRPCResponse.JSONRPCError jsonRpcError = null;
                        McpSchema.JSONRPCResponse jsonRpcResponse = (McpSchema.JSONRPCResponse)this.objectMapper.readValue(body, McpSchema.JSONRPCResponse.class);
                        jsonRpcError = jsonRpcResponse.getError();
                        toPropagate = jsonRpcError != null ? new McpError(jsonRpcError) : new McpTransportException("Can't parse the jsonResponse " + jsonRpcResponse);
                    }
                    catch (IOException ex) {
                        toPropagate = new McpTransportException("Sending request failed, " + e.getMessage(), (Throwable)e);
                        logger.debug("Received content together with {} HTTP code response: {}", (Object)e.code(), (Object)body);
                    }
                }
                if (e.code() == 400) {
                    if (!sessionRepresentation.equals(MISSING_SESSION_ID)) {
                        return Flux.error((Throwable)new McpTransportSessionNotFoundException(sessionRepresentation, toPropagate));
                    }
                    return Flux.error((Throwable)new McpTransportException("Received 400 BAD REQUEST for session " + sessionRepresentation + ". " + toPropagate.getMessage(), toPropagate));
                }
                return Flux.error((Throwable)toPropagate);
            }
            catch (Exception ex) {
                return Flux.error((Throwable)ex);
            }
        });
    }

    private Flux<McpSchema.JSONRPCMessage> eventStream(McpTransportStream<Disposable> stream, HttpResponse response) {
        DefaultMcpTransportStream sessionStream = stream != null ? stream : new DefaultMcpTransportStream(this.resumableStreams, this::reconnect);
        logger.debug("Connected stream {}", (Object)sessionStream.streamId());
        Flux idWithMessages = Flux.from((Publisher)TextStreamUtil.parseSseStream((HttpResponse)response)).map(this::parse);
        return Flux.from(sessionStream.consumeSseStream((Publisher<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>>)idWithMessages));
    }

    private static boolean isNotFound(HttpResponse response) {
        return response.code() == 404;
    }

    private static boolean isNotAllowed(HttpResponse response) {
        return response.code() == 405;
    }

    private static boolean isEventStream(HttpResponse response) {
        String ct;
        return response.code() >= 200 && response.code() <= 300 && (ct = response.contentType()) != null && ct.startsWith("text/event-stream");
    }

    private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSession) {
        return transportSession.sessionId().orElse(MISSING_SESSION_ID);
    }

    private Flux<McpSchema.JSONRPCMessage> directResponseFlux(McpSchema.JSONRPCMessage sentMessage, HttpResponse response) {
        return Flux.create(s -> {
            try {
                String responseMessage = response.bodyAsString();
                if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) {
                    logger.warn("Notification: {} received non-compliant response: {}", (Object)sentMessage, (Object)responseMessage);
                    s.complete();
                } else {
                    McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(this.objectMapper, responseMessage);
                    s.next((Object)jsonRpcResponse);
                }
            }
            catch (IOException e) {
                s.error((Throwable)new McpTransportException(e));
            }
        });
    }

    private Flux<McpSchema.JSONRPCMessage> newEventStream(HttpResponse response, String sessionRepresentation) {
        DefaultMcpTransportStream<Disposable> sessionStream = new DefaultMcpTransportStream<Disposable>(this.resumableStreams, this::reconnect);
        logger.trace("Sent POST and opened a stream ({}) for session {}", (Object)sessionStream.streamId(), (Object)sessionRepresentation);
        return this.eventStream(sessionStream, response);
    }

    @Override
    public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
        return (T)this.objectMapper.convertValue(data, typeRef);
    }

    private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(ServerSentEvent event) {
        if (MESSAGE_EVENT_TYPE.equals(event.event())) {
            try {
                McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
                return Tuples.of(Optional.ofNullable(event.id()), Utils.asList(message));
            }
            catch (IOException ioException) {
                throw new McpTransportException("Error parsing JSON-RPC message: " + event.data(), ioException);
            }
        }
        logger.debug("Received SSE event with type: {}", (Object)event);
        return Tuples.of(Optional.empty(), Utils.asList(new McpSchema.JSONRPCMessage[0]));
    }

    public static class Builder {
        private ObjectMapper objectMapper;
        private HttpUtilsBuilder webClientBuilder;
        private String endpoint = "/mcp";
        private boolean resumableStreams = true;
        private boolean openConnectionOnStartup = false;

        private Builder(HttpUtilsBuilder webClientBuilder) {
            Assert.notNull(webClientBuilder, "HttpUtilsBuilder must not be null");
            this.webClientBuilder = webClientBuilder;
        }

        public Builder objectMapper(ObjectMapper objectMapper) {
            Assert.notNull(objectMapper, "ObjectMapper must not be null");
            this.objectMapper = objectMapper;
            return this;
        }

        public Builder webClientBuilder(HttpUtilsBuilder webClientBuilder) {
            Assert.notNull(webClientBuilder, "HttpUtilsBuilder must not be null");
            this.webClientBuilder = webClientBuilder;
            return this;
        }

        public Builder endpoint(String endpoint) {
            Assert.hasText(endpoint, "endpoint must be a non-empty String");
            this.endpoint = endpoint;
            return this;
        }

        public Builder resumableStreams(boolean resumableStreams) {
            this.resumableStreams = resumableStreams;
            return this;
        }

        public Builder openConnectionOnStartup(boolean openConnectionOnStartup) {
            this.openConnectionOnStartup = openConnectionOnStartup;
            return this;
        }

        public WebRxStreamableHttpTransport build() {
            ObjectMapper objectMapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
            return new WebRxStreamableHttpTransport(objectMapper, this.webClientBuilder, this.endpoint, this.resumableStreams, this.openConnectionOnStartup);
        }
    }
}

