/*
 * Decompiled with CFR 0.152.
 */
package com.geoway.ns.ai.chat.service.impl;

import cn.hutool.core.thread.ThreadUtil;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.geoway.ns.ai.base.chat.client.AiChatClient;
import com.geoway.ns.ai.base.chat.message.AIMessageType;
import com.geoway.ns.ai.base.chat.message.AiMessage;
import com.geoway.ns.ai.base.tool.AiTool;
import com.geoway.ns.ai.base.tool.AiToolContext;
import com.geoway.ns.ai.chat.dto.AiChatDTO;
import com.geoway.ns.ai.chat.entity.AiChatHistory;
import com.geoway.ns.ai.chat.mapper.AiChatHistoryMapper;
import com.geoway.ns.ai.chat.service.AiChatHistoryService;
import com.geoway.ns.ai.chat.util.AiChatClientUtil;
import com.geoway.ns.ai.chat.util.AiChatToolUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;

@Service
public class AiChatHistoryServiceImpl
extends ServiceImpl<AiChatHistoryMapper, AiChatHistory>
implements AiChatHistoryService {
    private static final Logger log = LoggerFactory.getLogger(AiChatHistoryServiceImpl.class);

    protected void chat(String pid, AiChatDTO chatDTO, Consumer<AiMessage> messageConsumer) {
        AiChatClient chatClient = AiChatClientUtil.getChatClient(chatDTO);
        List<AiTool> aiTools = AiChatToolUtil.getAiTools(chatDTO);
        if (aiTools.size() > 0) {
            AiToolContext aiToolContext = new AiToolContext(chatDTO.getExtraInfo());
            List<AiMessage> messages = this.queryRecentMessages(pid, 2);
            chatClient.chat(messages, aiToolContext, aiTools, messageConsumer);
        } else {
            List<AiMessage> messages = this.queryRecentMessages(pid, 22);
            chatClient.chatStream(messages, messageConsumer);
        }
    }

    @Override
    public Flux<ServerSentEvent<AiMessage>> chatStream(String pid, AiChatDTO chatDTO) {
        AiChatHistory question = this.createQuestionHistory(pid, chatDTO);
        this.save(question);
        AiChatHistory response = this.createReponseHistory(pid, chatDTO);
        this.save(response);
        return Flux.create(sink -> {
            AiMessage temp = new AiMessage();
            temp.setRole("front");
            ArrayList<AiChatHistory> histories = new ArrayList<AiChatHistory>();
            histories.add(question);
            histories.add(response);
            temp.setContent(histories);
            sink.next((Object)temp);
            ThreadUtil.execute(() -> {
                ArrayList<AiMessage> aiMessages = new ArrayList<AiMessage>();
                try {
                    this.chat(pid, chatDTO, msg -> {
                        if (msg.getIsReturn().booleanValue()) {
                            msg.setSessionId(response.getId());
                            sink.next(msg);
                        }
                        if (msg.getIsStore().booleanValue()) {
                            aiMessages.add((AiMessage)msg);
                        }
                    });
                    response.setMessages(aiMessages);
                    response.setStatus(1);
                }
                catch (Exception exception) {
                    exception.printStackTrace();
                    AiMessage errorMessage = new AiMessage();
                    errorMessage.setSessionId(response.getId());
                    errorMessage.setContent((Object)exception.getMessage());
                    errorMessage.setRole(AIMessageType.Assistant.getDesc());
                    errorMessage.setState(-1);
                    aiMessages.add(errorMessage);
                    response.setMessages(aiMessages);
                    response.setStatus(-1);
                    log.error("\u667a\u80fd\u68c0\u7d22", (Throwable)exception);
                    sink.next((Object)errorMessage);
                }
                finally {
                    this.saveOrUpdate(response);
                    temp.setRole("finish");
                    sink.next((Object)temp);
                    sink.complete();
                }
            });
        }).map(data -> ServerSentEvent.builder().data(data).build()).onErrorResume(F -> {
            F.printStackTrace();
            throw new RuntimeException(F.getMessage());
        });
    }

    private List<AiMessage> queryRecentMessages(String pid, int limit) {
        ArrayList<AiMessage> result = new ArrayList<AiMessage>();
        List list = this.list((Wrapper)((LambdaUpdateWrapper)((LambdaUpdateWrapper)Wrappers.lambdaUpdate(AiChatHistory.class).eq(AiChatHistory::getPid, (Object)pid)).orderByDesc(AiChatHistory::getCreateTime)).last(" limit " + limit + " "));
        Collections.reverse(list);
        list.forEach(f -> result.addAll(f.getMessages()));
        return result;
    }

    private AiChatHistory createQuestionHistory(String pid, AiChatDTO chatDTO) {
        AiChatHistory question = new AiChatHistory();
        question.setPid(pid);
        question.setCreateTime(new Date());
        question.setType(1);
        question.setStatus(1);
        AiMessage aiMessage = new AiMessage();
        aiMessage.setContent((Object)chatDTO.getMessage());
        aiMessage.setRole(AIMessageType.User.getDesc());
        question.getMessages().add(aiMessage);
        return question;
    }

    private AiChatHistory createReponseHistory(String pid, AiChatDTO chatDTO) {
        AiChatHistory reponse = new AiChatHistory();
        reponse.setPid(pid);
        reponse.setCreateTime(new Date());
        reponse.setType(2);
        reponse.setStatus(0);
        reponse.setMessages(new ArrayList<AiMessage>());
        return reponse;
    }
}

