EmotionalBehaviorAgentの入り口:ソースコード:EmotionalBehaviorAgent.py


from tools.AIAgent import AIAgent
from tools.command_list import load_ai_agent_name_list
from tools.fllow_controller import get_python_line_start_sharp_comment_from_respons

from tools.emotional_command_list import get_python_code
from tools.emotional_command_list import load_python_program_list
from tools.emotional_command_list import get_python_program_list
from tools.emotional_command_list import append_python_program_function_direct
from tools.command_list import run_pip_command
import tools.command_list 
import streamlit as st


DO_PYTHON_PROGRAM = 0
MAKE_PYTHON_PROGRAM = 1
DEEP_THINK = 2

class EmotionalBehaviorAgent():
    def __init__(self, max_check=3):
        name = "Emotional"
        self.agent_dict = load_ai_agent_name_list("EmotionalBehavior")
        print( self.agent_dict)
        self.agent = AIAgent(name,
                             "",
                             [],
                             False)
        self.max_check = max_check
        self.temperature = 0.5
        self.pre_emotional_respons=""
        self.latest_respons =""
        self.pre_select=DEEP_THINK
        load_python_program_list()
    def __get_agent_prompt(self, name):
        return self.agent_dict[name][1]

    # AIAgentと関数的互換性確保のために必要
    def get_respons(self, command):
        return self.Emotional(command, "Emotional")
    
    def clear_memory(self):
        self.agent.clear_memory()

    def update_temperature(self, temperature):
        self.temperature = temperature
        self.agent.update_temperature(temperature)

    def update_tools(self, tools):
        self.agent.update_tools(tools)
################################################
    def Emotional(self, command, emotional_name):
        program_list_prompt=get_python_program_list()
        if "" == program_list_prompt:
            pre_promput = "現在扱える機能はありません。\r\n"
        else:
            pre_promput = "現在扱える機能は\r\n"+program_list_prompt
        # 感情
        self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt(emotional_name))
        self.update_temperature(self.temperature)
        self.update_tools([])
        # コールバック関数の設定 (エージェントの動作の可視化用)
        tools.command_list.g_time_keeper.wait()
        command+=self.latest_respons
        respons = self.agent.get_respons(command)
#        if "" == self.pre_emotional_respons:
#            respons = self.agent.get_respons(command)
#        else:
#            respons = self.agent.get_respons("前回の出力内容は\r\n"+self.pre_emotional_respons+"\r\nこれと重複がないようにしてください。")

        # 分岐
        self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("Switch"))
        # コールバック関数の設定 (エージェントの動作の可視化用)
        tools.command_list.g_time_keeper.wait()
        respons = self.agent.get_respons("")

        select, program = self.check_use_or_make_pyhon_program(respons)
        print("select",select)
        if DO_PYTHON_PROGRAM == select:
            # 選択
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("FunctionSelecter"))
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("")
            func_name_list = get_python_line_start_sharp_comment_from_respons(respons)
            code = get_python_code(func_name_list[0])
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("PythonProgramExecuter"))
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons(code)
            
            error, respons = self.respons_programe_check(respons)

#            timeout = self.__get_python_program_timeout(respons)
#            if None is timeout:
#                timeout = 5
#            self.respons_programe_check(respons, timeout)
            self.latest_respons = respons
            pass
        elif MAKE_PYTHON_PROGRAM == select:
            # アイデアだし
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("ProgramIdeaGeneration"))
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("まだ機能として作られていない機能のアイデアを書いてください。")

            # 批評
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("ProgramCriticism"))
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("")
            # 修正
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("Fixes"))
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("")

            # まとめ
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("ProgramSummary"))
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("")

            # 作成

            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("PythonProgramer"))
            self.update_tools([run_pip_command])
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("まだ機能として作られていない機能のプログラムを書いてください。")
            error, respons = self.respons_programe_check(respons)
            self.latest_respons = respons
            # 追加
#            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("AddPythonProgramer"))
#            self.update_tools([append_python_program_function])
#            self.update_temperature(0)
            # コールバック関数の設定 (エージェントの動作の可視化用)
#            tools.command_list.g_time_keeper.wait()
#            respons = self.agent.get_respons("")
            self.save_python_functions(respons)
        elif DEEP_THINK == select:
            # アイデアだし
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("IdeaGeneration"))
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("")

            # 批評
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("Criticism"))
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("")
            # 修正
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("Fixes"))
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("")

            # まとめ
            self.agent.update_system_prompt(pre_promput+self.__get_agent_prompt("Summary"))
            # コールバック関数の設定 (エージェントの動作の可視化用)
            tools.command_list.g_time_keeper.wait()
            respons = self.agent.get_respons("")
            self.latest_respons = ""

            pass
        return respons
    def save_python_functions(self, respons):
        """レスポンスに"""
        code_list = tools.fllow_controller.get_program_code_list_from_response(respons)
        if "python" in code_list:
            #複数のコードがある場合書くコードについてエラーチェックをして保存する。
            if list == type(code_list["python"]):
                for code in code_list["python"]:
#                    error_respons, respons_text = self.respons_programe_check(code)

#                    code_dict = tools.fllow_controller.get_lengest_program_code_from_response(respons_text)
#                    function_name, explanation, program = self.get_append_python_program_function_parameter(code_dict["python"])
                    function_name, explanation, program = self.get_append_python_program_function_parameter(code)
                    if None is not function_name:
                        append_python_program_function_direct(function_name, explanation, program)
    def get_append_python_program_function_parameter(self, program):
        count = 0
        code_list = program.split("\n")
        name = None
        explanation= None
        for line in code_list:
            l_line = line.rstrip()
            if 0 < len(l_line):
                if "#" == l_line[0]:
                    l_line = l_line[1:]
                    l_line = l_line.strip()
                    if 0 == count:
                        name = l_line
                    if 1 == count:
                        explanation = l_line
                        break
                    count += 1
        return name,explanation,program
    def check_use_or_make_pyhon_program(self, respons):
        """ レスポンスの分岐を判定。"""
        code = tools.fllow_controller.get_lengest_program_code_from_response(respons)
        if "python" in code:
            program = code["python"]
        else:
            return DEEP_THINK, ""
        if list is not type(program):
            program = program.split("\n")
        for line in program:
            l_line = line.rstrip()
            if 0 < len(l_line):
                if "#" == l_line[0]:
                    l_line = l_line[1:]
                    l_line = l_line.strip()
                    if "pythonプログラムの実行" == l_line:
                        print("step 01",l_line)
                        return DO_PYTHON_PROGRAM, code["python"]
                    elif "pythonプログラムの作成" == l_line:
                        print("step 02",l_line)
                        return MAKE_PYTHON_PROGRAM, ""
                    print("step 03",l_line)
                    return DEEP_THINK, ""
        print("step 04")
        return DEEP_THINK, ""


    def __get_python_program_timeout(self, respons):
        """ pythonプログラムの実行の時に実行すること。
        timeoutを取得
        """
        code = tools.fllow_controller.get_lengest_program_code_from_response(respons)
        if "python" in code:
            program = code["python"]
        else:
            return None
        if list is not type(program):
            program = program.split("\n")
        count = 0
        for line in program:
            l_line = line.rstrip()

            if 0 < len(l_line):
                if "#" == l_line[0]:
                    l_line = l_line[1:]
                    l_line = l_line.strip()
                    data = l_line.split("=")
                    if 1 < len(data):
                        if "timeout" == data[0]:
                            return float(data[1])
            count += 1      
            if 4 < count:
                return None
        return None
    
#########################################    
    def __programe_check_sub(self, respons_text, error_respons, count):
        if False is tools.fllow_controller.get_code_error():
             error_respons = ""
             return False, error_respons, respons_text, count
        if self.max_check <= count:
            error_respons = "3回以上やり直してもエラーを修正しきれませんでした。"
            return False, error_respons, respons_text, count
        print(error_respons)
        tools.command_list.g_time_keeper.wait()
        if 0<=error_respons.find("ソースコードが前から変わっていません。"):
            self.update_temperature(1.0)
        respons_text = self.agent.get_respons(error_respons+"\r\n直してください。必要ならrun_pip_commandを使用してください。")
        count += 1
        return True, error_respons, respons_text, count

    def programe_check(self, check_program, timeout=5):
        # pythonで エラーが出るようなら自動で作りなおしを依頼。
        
        count = 0
        respons_text = check_program
        loop_flag = True
        error_respons = ""
        while loop_flag:
            error_respons = tools.fllow_controller.python_error_check_main(respons_text, error_respons, timeout)
            loop_flag, error_respons, respons_text, count = self.__programe_check_sub(respons_text, error_respons, count)
            count += 1
        return error_respons, respons_text
    
    def respons_programe_check(self, check_prompt, timeout=5):
        # pythonで エラーが出るようなら自動で作りなおしを依頼。
        
        count = 0
        respons_text = check_prompt
        loop_flag = True
        error_respons = ""
        while loop_flag:
            error_respons = tools.fllow_controller.python_error_check(respons_text, error_respons,timeout)
            loop_flag, error_respons, respons_text, count = self.__programe_check_sub(respons_text, error_respons, count)
            count += 1
        return error_respons, respons_text
#################################
    def think_agent(self, prompt):
        tools.command_list.g_time_keeper.wait()
        # AIエージェントの思考
        with st.chat_message("PythonProgramer"):
            # エージェントを実行
            response = self.agent.get_respons(prompt)
            st.write(response)
        return response