/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.web.rx.integration;

import org.noear.solon.core.handle.Context;
import org.noear.solon.rx.Completable;
import org.noear.solon.rx.handle.RxContext;
import org.noear.solon.rx.handle.RxHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class RxHandlerImpl
implements RxHandler {
    private static final byte[] CRLF = "\n".getBytes();
    private Flux publisher;
    private boolean isStreaming;

    public RxHandlerImpl(Publisher publisher, boolean isStreaming) {
        this.publisher = publisher instanceof Flux ? (Flux)publisher : Flux.from((Publisher)publisher);
        this.isStreaming = isStreaming;
    }

    @Override
    public Completable handle(RxContext rxCtx) {
        return Completable.create(emitter -> {
            Context ctx = rxCtx.toContext();
            if (!ctx.asyncStarted()) {
                ctx.asyncStart();
            }
            this.publisher.doOnNext(o -> {
                try {
                    ctx.render(o);
                    if (this.isStreaming) {
                        ctx.output(CRLF);
                        ctx.flush();
                    }
                }
                catch (Throwable e) {
                    throw new RuntimeException(e);
                }
            }).doOnError(err -> emitter.onError((Throwable)err)).doOnComplete(() -> emitter.onComplete()).subscribe();
        });
    }
}

