OperationcodeCreatorAgent:ソース:tools:AIAgent.py




import streamlit as st
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import MessagesPlaceholder, ChatPromptTemplate
from langchain_core.runnables import RunnableConfig
from  langchain_google_genai.chat_models import ChatGoogleGenerativeAIError
from langchain.memory import ConversationBufferWindowMemory
from langchain_core.messages import HumanMessage, AIMessage
from langchain_community.callbacks import StreamlitCallbackHandler
# models

from langchain_google_genai import ChatGoogleGenerativeAI
import time
# custom tools


######################################################

class AIAgent():
    time_buffer = time.time()
    interval = 5.1

    def __init__(self, agent_name, system_prompt, tools, private_memory=False):
        self.chat_history = []
        self.sysytem_prompt = ""
        self.llm = None
        self.prompt = None
        self.name = agent_name
        self.llm = ChatGoogleGenerativeAI(
                        temperature=0,
                        model="gemini-1.5-flash"
#                        max_tokens=1000000

                        )
        self.private_memory = private_memory
        self.__create_memory()

        if None is system_prompt:
            system_prompt = ""
        self.sysytem_prompt = system_prompt
        self.tools = tools
        print("self.sysytem_prompt",self.sysytem_prompt)
        self.agent = self.create_agent()
        self.update_system_prompt(self.sysytem_prompt)

        # 時間管理を行います。連続してよびだいっすぎないようにします。
        self.time_buffer = time.time()
        self.interval = 4.1

    def __create_agent(self):
        agent = create_tool_calling_agent(self.llm, self.tools, self.prompt)
        self.agent = AgentExecutor(
            agent=agent,
            tools=self.tools,
            verbose=True,
            memory=self.chat_history
        )
        return self.agent

    def __create_memory(self):
        if self.private_memory:
            if self.name + 'memory' not in st.session_state:
                st.session_state[self.name + 'memory'] = ConversationBufferWindowMemory(
                    return_messages=True,
                    memory_key=self.name + "chat_history",
                    k=100,
                    max_token_limit=1000000
                )
            self.chat_history = st.session_state[self.name + 'memory']
        else:
            if "memory" not in st.session_state:
                st.session_state['memory'] = ConversationBufferWindowMemory(
                    return_messages=True,
                    memory_key="chat_history",
                    k=100,
                    max_token_limit=1000000
                )
                print(st.session_state)
            self.chat_history = st.session_state['memory']


    def create_agent(self):
        if self.private_memory:
            print("self.sysytem_prompt",self.sysytem_prompt)
            print("self.name",self.name)

            self.prompt = ChatPromptTemplate.from_messages([
                ("system", self.sysytem_prompt),
                MessagesPlaceholder(variable_name=self.name + "chat_history"),
                ("user", "{input}"),
                MessagesPlaceholder(variable_name="agent_scratchpad"),
            ])
        else:
            if 0 < len(self.sysytem_prompt):
                self.prompt = ChatPromptTemplate.from_messages([
                    ("system", self.sysytem_prompt),
                    MessagesPlaceholder(variable_name="chat_history"),
                    ("user", "{input}"),
                    MessagesPlaceholder(variable_name="agent_scratchpad"),
                ])
            else:
                self.prompt = ChatPromptTemplate.from_messages([
                    MessagesPlaceholder(variable_name="chat_history"),
                    ("user", "{input}"),
                    MessagesPlaceholder(variable_name="agent_scratchpad"),
                ])
        return self.__create_agent()


    def clear_memory(self):
        print("self.chat_history.chat_memory.messages",
              self.chat_history.chat_memory.messages)
        self.chat_history.chat_memory.messages = []

    def update_temperature(self, temperature):
        self.modllmel = ChatGoogleGenerativeAI(
                        temperature=temperature,
                        model="gemini-1.5-flash"
                        )

        self.__create_agent()
        
    def update_system_prompt(self, prompt):
        self.sysytem_prompt = prompt
        self.create_agent()

    def update_tools(self, tools):
        self.tools = tools
        self.__create_agent()

    @classmethod
    def __wait(cls):
        time_buf = cls.interval - (time.time()-cls.time_buffer)
        print("AIAgent.time_buf",time_buf)
        if 0 < time_buf:
            time.sleep(time_buf)
        cls.time_buffer = time.time()
    
    def get_respons(self, prompt):
        response = ""
        st_cb = StreamlitCallbackHandler(
            st.container(), expand_new_thoughts=True)

        if None is not self.agent:

            # command = self.get_user_message(command, pre_agent)
            # カスタムコールバックのインスタンスを作成

             # エージェントを実行
            self.__wait()

            try:
                print("self.sysytem_prompt", self.sysytem_prompt, "\r\nprompt\r\n" + prompt)
                response = self.agent.invoke(
                    {'input': self.sysytem_prompt+"\r\n"+prompt},
                    config=RunnableConfig({'callbacks': [st_cb]})
                )
                self.update_last_input(prompt)
            except ChatGoogleGenerativeAIError as e:
                print(f"エラーが発生しました: {e}")
                print("function_response:", response)

                if dict != type(response):
                    response = {}
                    response["output"] = f"エラーが発生しました: {e}\r\nこのまま続けられます。"
                print(type(response))
        if "output" in response:
            return response["output"]
        else:
            return ""

    def update_last_input(self, imput_prompt):


        # chat_history の内容を取得

        iend = len(self.chat_history.chat_memory.messages) - 1 
        for i in range(len(self.chat_history.chat_memory.messages)):

            if HumanMessage == type(self.chat_history.chat_memory.messages[iend-i]):

                self.chat_history.chat_memory.messages[iend-i].content = imput_prompt
                break

        return

    def modify_prompt(self, data):
        self.sysytem_prompt = data

    def get_history(self):
        return self.chat_history

    def append_aimessage(self, message):
        self.chat_history.chat_memory.messages.append(AIMessage(message))