AI Agent:Agents:AIAgent.py:ソースコード

# AIAgent.py
# Langchainの主要コンポーネントをインポート

from langchain_core.prompts import MessagesPlaceholder, ChatPromptTemplate
from langchain_core.runnables import RunnableConfig

# 会話履歴を管理するためのメモリクラスをインポート

from langgraph.checkpoint.memory import MemorySaver


#from langgraph.checkpoint import Checkpoint, CheckpointMetadata

from langchain_community.chat_message_histories import ChatMessageHistory # 修正済みであることを確認
# メッセージの型クラスをインポート (HumanMessage: ユーザーからのメッセージ, AIMessage: AIからのメッセージ)
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage # BaseMessage をインポート
# Streamlitと連携するためのコールバックハンドラをインポート (現在はコメントアウトされている)
# from langchain_community.callbacks import StreamlitCallbackHandler

# LLMプロバイダのインターフェースと具象クラスをインポート
from Agents.llms.LlmInterface import LLMInterface
from Agents.llms.GeminiLlm import GeminiLLM
from Agents.llms.OllamaLlm import OllamaLLM
from Agents.llms.LMStudioLlm import LMStudioLLM
from tools.program_called_command_list import Timekeeper
# Google Generative AIのエラークラス (GeminiLLM内で使用される想定)
# from  langchain_google_genai.chat_models import ChatGoogleGenerativeAIError

import time
# custom tools
# GUI関連のモジュールをインポート
#import GUI.ComparisonGui as gui

# GUIライブラリPySide6のQApplicationをインポート (イベント処理用)

import requests # 画像URL取得用

# LangChain Core Output Parsers and Agent components

from typing import Union, List, Optional, Tuple # List, Optional, Tuple を追加
from pathlib import Path # pathlib.Path をインポート
import base64 # 画像エンコード用
from tools.exception import InterruptedException as InterruptedException
######################################################




import inspect
from typing import Callable, Iterable
def find_missing_description(tools: Iterable[Callable]) -> List[Callable]:
    """
    与えられた関数リストから、docstring が無く、かつ
    `langchain` の `from_function` で `description` を明示していない関数を返す。

    Parameters
    ----------
    tools : Iterable[Callable]
        `langchain_core.tools.structured.from_function` に渡す予定の関数群

    Returns
    -------
    List[Callable]
        docstring も description も無い関数のリスト
    """
    missing = []

    for fn in tools:
        # 1) docstring があるか確認
        doc = inspect.getdoc(fn)  # None なら docstring が無い

        if doc is None:
            # ここで「description を渡さない」ケースを想定
            missing.append(fn)

    return missing

######################################################
class AIAgent():
    time_buffer = time.time()
    interval = 5.1
    # インデントレベルを管理するためのクラス変数
    indentation = 0  # indent

    # 共有メモリ用のChatMessageHistoryインスタンスをクラス変数として定義
    _shared_chat_message_history = ChatMessageHistory(messages=[])

    DEFAULT_LLM_TYPE = "gemini" # クラス変数としてデフォルトLLMタイプを定義
    DEFAULT_MODEL_ID = "gemini-2.5-flash" # クラス変数としてデフォルトモデルIDを定義
    #DEFAULT_LLM_TYPE = "ollama" # クラス変数としてデフォルトLLMタイプを定義
    #DEFAULT_MODEL_ID = "gemma3:12b" # クラス変数としてデフォルトモデルIDを定義
    callbacks=[]
    def __init__(self,
                 agent_name: str,
                 system_prompt: str,
                 tools: list,
                 private_memory: bool = False,
                 llm_model: Optional[str] = None,
                 **llm_provider_kwargs # LLMプロバイダ固有の引数をここで受け取る
                 ):
        """
        AIAgentクラスのコンストラクタ。

        Args:
            agent_name (str): エージェントの名前。
            system_prompt (str): エージェントに与えるシステムプロンプト。
            tools (list): エージェントが使用できるツールのリスト。
            private_memory (bool, optional): Trueの場合、エージェント固有のメモリを使用。Falseの場合、共有メモリを使用。デフォルトはFalse。
            llm_model (str, optional): 使用するLLMモデルの識別子。例: "gemini:gemini-2.5-flash", "ollama:llama3",
                                       指定がない場合はクラスのデフォルトLLMが使用されます。
        """
        self.prompt = None
        self.name = agent_name
        self.is_append_sysyte_promprt = False #ユーザープロンプトの前にシステムプロンプトを追加。
        self.private_memory = private_memory
        #self.memory_key="chat_history"
        # llm_model 文字列からLLMタイプとモデルIDをパース
        parsed_llm_type, parsed_model_id = AIAgent._parse_llm_model_string(llm_model)
        print("missing tool-----------\r\n",find_missing_description(tools))
        # LLMプロバイダのインスタンスを作成
        # デフォルト温度はllm_provider_kwargsで上書き可能
        if "temperature" not in llm_provider_kwargs:
            llm_provider_kwargs["temperature"] = 0 # AIAgentのデフォルト温度
        self.llm_provider: LLMInterface = self._create_llm_provider(
            parsed_llm_type,
            parsed_model_id, **llm_provider_kwargs # kwargsを渡す
        )
        if self.llm_provider:
            # LLMプロバイダの画像サポート状況をAIAgentインスタンスにも反映
            self.supports_images = self.llm_provider.supports_images
            self.memory = self.llm_provider.get_memory()

            if self.supports_images:
                print(f"AIAgent '{self.name}': LLMプロバイダ '{type(self.llm_provider).__name__}' (モデル: {self.llm_provider.model_name}) は画像対応です。")
            else:
                print(f"AIAgent '{self.name}': LLMプロバイダ '{type(self.llm_provider).__name__}' (モデル: {self.llm_provider.model_name}) は画像非対応、または判定不能です。")
        else: # LLMプロバイダ作成失敗
            self.supports_images = False # LLMプロバイダがない場合は画像非対応
            raise ValueError(f"AIAgent '{self.name}': LLMプロバイダの作成に失敗しました。")

        # 会話メモリを初期化または取得
        self.llm_provider.is_private_memory(private_memory)
        self.llm_provider.set_name(self.name)
      
        if None is system_prompt:
            system_prompt = ""
        self.sysytem_prompt = system_prompt
        self.llm_provider.set_system_prompt(system_prompt)

        if self.llm_provider.supports_tools:
            self.tools = tools
            self.llm_provider.update_tools(tools)
        else:
            self.tools = []
        # AgentExecutor は llm_provider 内部で必要に応じて作成されるため、AIAgentでは直接保持しない
        # self.agent = None 
        self.update_system_prompt(self.sysytem_prompt)

        # 時間管理を行います。連続してよびだいっすぎないようにします。
        # APIのレート制限などを考慮し、連続呼び出しを防ぐための時間管理
        self.time_buffer = time.time()
        # LLMプロバイダの種類に基づいて self.interval を設定
        if isinstance(self.llm_provider, GeminiLLM):
            self.interval = 4.1
            Timekeeper.set_interval(4.1)
            print(f"AIAgent '{self.name}': GeminiLLM detected, interval set to {self.interval}")
        elif isinstance(self.llm_provider, OllamaLLM):
            self.interval = 0.0
            Timekeeper.set_interval(0.0)
            print(f"AIAgent '{self.name}': OllamaLLM detected, interval set to {self.interval}")
        else:
            # デフォルトのinterval値 (他のLLMプロバイダや不明な場合)
            self.interval = 4.1 # 例えばGeminiと同じにするか、別のデフォルト値を設定
            Timekeeper.set_interval(4.1)
            print(f"AIAgent '{self.name}': LLM provider type {type(self.llm_provider).__name__}, interval set to default {self.interval}")

    @classmethod
    def _parse_llm_model_string(cls, llm_model_str: Optional[str]) -> Tuple[str, Optional[str]]:
        """llm_model文字列を解析し、LLMタイプとモデルIDを返す。"""
        if not llm_model_str:
            return cls.DEFAULT_LLM_TYPE, cls.DEFAULT_MODEL_ID

        parts = llm_model_str.split(":", 1)
        if len(parts) == 2: # "type:model_id" 形式
            llm_type = parts[0].lower()
            model_id = parts[1]
            # 既知のLLMタイプか確認
            known_llm_types = ["gemini", "ollama","lmstudio"] # 'heron' を追加
            if llm_type not in known_llm_types:
                # "model_id:something_else" のような形式で、最初の部分がモデルIDだった場合を考慮
                # ただし、この場合は曖昧なので、基本は "type:model_id" を期待
                print(f"警告: 不明なLLMタイプ '{llm_type}' が llm_model '{llm_model_str}' で指定されました。デフォルトタイプ '{cls.DEFAULT_LLM_TYPE}' を使用し、'{llm_model_str}' をモデルIDとして試みます。")
                return cls.DEFAULT_LLM_TYPE, llm_model_str # llm_model_str全体をモデルIDとみなす
            return llm_type, model_id
        else: # "model_id" のみ、または "type" のみ
            # 既知のLLMタイプ名かどうかで判断
            potential_type = llm_model_str.lower()
            known_llm_types = ["gemini", "ollama","lmstudio"] # 'heron' を追加
            if potential_type in known_llm_types:
                # タイプ名のみが指定されたとみなし、モデルは各プロバイダのデフォルト
                return potential_type, None
            else:
                # モデルIDのみが指定されたとみなし、タイプはクラスのデフォルト
                return cls.DEFAULT_LLM_TYPE, llm_model_str

    @classmethod
    def set_callbacks(cls, callbacks):
        cls.callbacks = callbacks
    @classmethod
    def get_callbacks(cls):
        return cls.callbacks



    def _create_llm_provider(self, llm_type: str, model_id: Optional[str], **kwargs) -> Optional[LLMInterface]:
        # kwargs に temperature を含める
        if "temperature" not in kwargs:
            kwargs["temperature"] = 0 # デフォルト温度
        
        if llm_type == "gemini" or llm_type == "google":
            return GeminiLLM(model_identifier=model_id or self.DEFAULT_MODEL_ID, **kwargs)
        elif llm_type == "ollama":
            # Ollamaのデフォルトモデルを指定 (例: gemma3:12b-it-qat)
            return OllamaLLM(model_identifier=model_id or "gemma3:12b-it-qat", **kwargs)
        elif llm_type == "lmstudio":
            return LMStudioLLM(model_identifier=model_id or "gemma3:12b-it-qat", **kwargs)

       # If llm_type was not 'phi4' and also not 'heron', it will fall through here.
        return None
    
    def get_name(self):
        """エージェントの名前を取得します。"""
        return self.name

    def clear_memory(self):
        self.llm_provider.clear_memory()
#

    def update_temperature(self, temperature):
        """
        LLMのtemperature(出力のランダム性)を更新します。

        Args:
            temperature (float): 新しいtemperatureの値。
        """
        if self.llm_provider:
            current_llm_type = ""
            # isinstance を使って型を判定
            if isinstance(self.llm_provider, GeminiLLM): current_llm_type = "gemini"
            elif isinstance(self.llm_provider, OllamaLLM): current_llm_type = "ollama"
            else:
                print(f"警告: 不明なLLMプロバイダタイプ ({type(self.llm_provider)}) のため、温度を更新できません。")
                return

            # 既存のkwargsを維持しつつtemperatureのみ更新
            current_provider_kwargs = self.llm_provider.__dict__ # 簡単な方法だが、より厳密な管理が望ましい場合もある
            current_provider_kwargs["temperature"] = temperature
            self.llm_provider = self._create_llm_provider(
                current_llm_type,
                self.llm_provider.model_name,
                **current_provider_kwargs)
            if self.llm_provider: # 更新成功
                self.supports_images = self.llm_provider.supports_images # 画像サポート状況も更新
                if self.supports_images:
                    print(f"AIAgent '{self.name}': 温度更新後、LLMプロバイダは画像対応です。")
            # else: LLMプロバイダ更新失敗時は何もしない(古いプロバイダが残る)
        else:
            print("警告: LLMプロバイダが初期化されていないため、温度を更新できません。")
        # self.__create_character() # AgentExecutorを直接持たないので不要
        self.llm_provider.create_agent_executer()

    def update_tools(self, tools):
        """
        エージェントが使用するツールを更新します。

        Args:
            tools (list): 新しいツールのリスト。
        """
        self.tools = tools
        if self.llm_provider and hasattr(self.llm_provider, 'update_tools'):
            self.llm_provider.update_tools(tools) # LLMプロバイダにツールの変更を通知
        # self.__create_character() # AgentExecutorを直接持たないので不要 (LLMプロバイダが内部で処理)
        self.llm_provider.create_agent_executer()

    def get_response(self, prompt: str, fiel_paths: Optional[List[str]] = None, callbacks=[], sub_prompts: str = "") -> str:
        #response = {}
        if 0 == len(callbacks):

            if None is not AIAgent.callbacks:
                callbacks = AIAgent.callbacks
        
        # LLMプロバイダが初期化されているか確認
        if self.llm_provider is None:
            print(f"エラー: AIAgent '{self.name}' のLLMプロバイダが初期化されていません。")
            return "LLMが正しく初期化されていません。"
        # カスタムコールバックのインスタンスを作成
        # エージェントを実行
        # APIの連続呼び出しを防ぐための待機処理
        self.__wait()

        try:
            #print(f"AIAgent ({self.name}) System Prompt: {self.sysytem_prompt}\nUser Prompt: {prompt}")
            #if image_paths and self.supports_images: # AIAgentインスタンスのsupports_imagesを参照
            #     print(f"Images: {[Path(p).name for p in image_paths]}") # Display only file names

            # 現在のインデントレベルに基づいてインデント文字列を作成 (ComparisonGuiからは直接呼ばれない想定)
            # indentation = ">" * self.indentation # Added
            # GUIにノードを追加 (エージェント名とインデント)
            # gui.append_node(indentation + self.name, "") # ComparisonGui側でnew_streaming_nodeが呼ばれるため不要

            # LLMプロバイダのget_responseを呼び出す
            # システムプロンプト、ツール、コールバックはLLMプロバイダ側で処理される想定
            # 履歴はAIAgentが管理し、LLMプロバイダに渡す
            #history_messages: List[BaseMessage] = []

            #if self.memory and isinstance(self.memory, MemorySaver) and self.memory.chat_memory:
            #    history_messages = self.memory.chat_memory.messages
            if self.is_append_sysyte_promprt:
                response_text = self.llm_provider.get_response(
                    prompt=sub_prompts+"\n" + self.sysytem_prompt + "\n" + prompt, # ユーザープロンプトのみ渡す
                #    chat_history=history_messages,
                    file_paths=fiel_paths,
                    callbacks=callbacks
                )
                response = {"output": response_text} # AgentExecutorの出力形式に合わせる

                self.update_last_input_text(prompt)
            else:
                response_text = self.llm_provider.get_response(
                    prompt=prompt, # ユーザープロンプトのみ渡す
                    file_paths=fiel_paths,
                    callbacks=callbacks
                )
                response = {"output": response_text} # AgentExecutorの出力形式に合わせる


            output_text = ""
            if isinstance(response, dict) and "output" in response:
                if isinstance(response["output"], str):
                    output_text = response["output"]
                else:
                    print(f"警告: response['output'] は文字列ではありません。型: {type(response['output'])}。文字列に変換します。")
                    output_text = str(response["output"])
            elif isinstance(response, str):
                print("警告: 応答はプレーンな文字列でした。")
                output_text = response
                response = {"output": output_text} # 一貫性のために辞書でラップ
            else:
                error_msg = f"エラー: 予期しない応答形式です。Type: {type(response)}, Content: {str(response)[:500]}"
                print(error_msg)
                output_text = error_msg
                response = {"output": output_text} # 応答が辞書であることを保証
            # ストリーミングの場合はコールバックが逐次 append_text を呼ぶので、
            # ここで set_last_node_text を呼ぶ必要はない。
            # gui.set_last_node_text(output_text) 
            
        except InterruptedException as ie: # 中断例外をキャッチ
            print(f"AIAgent ({self.name}): Operation interrupted - {ie}")
            response = {"output": response_text} # AgentExecutorの出力形式に合わせる

#            response["output"] = f"Operation cancelled by user." # 中断メッセージ
            response["interrupted"] = True # 中断フラグ
            raise
        except Exception as e: # より一般的なエラーを捕捉
            print(f"エラーが発生しました (Exception): {e}")
            import traceback
            traceback.print_exc()
            if not isinstance(response, dict) or "output" not in response:
                response = {}
            response["output"] = f"エラーが発生しました (Exception): {e}\nこのまま続けられます。"

        # PySide6のイベントループを処理し、GUIの更新を即座に反映
#        if q_app:
#            q_app.processEvents()
 
        # 履歴更新は中断されていない場合のみ行うか、中断されてもプロンプトは残すか検討
#        if not response.get("interrupted"): # ★この行が重要★
#            encoded_image_urls_for_history = []
#            if self.supports_images and image_paths: # 画像があり、モデルがサポートする場合のみ
#                for img_path_hist in image_paths:
#                    try:
#                        encoded_image_urls_for_history.append(self._encode_image_to_data_url(img_path_hist))
#                    except Exception as e_hist_img:
#                        print(f"履歴用の画像エンコード中にエラー: {Path(img_path_hist).name}, {e_hist_img}")
#            self.update_last_input(prompt, encoded_image_urls_for_history if encoded_image_urls_for_history else None)
#            # AIの応答も履歴に追加 (中断されていない場合)
#            if "output" in response and isinstance(response["output"], str):
#                 self.append_ai_message(response["output"])


        return response.get("output", "") # outputキーがない場合は空文字を返す

########################################################


    def get_chat_history(self):
        # self.memory is MemorySaver()
        return self.memory

    def update_system_prompt(self, prompt):
        """
        システムプロンプトを更新し、エージェントのキャラクターを再作成します。

        Args:
            prompt (str): 新しいシステムプロンプト。
        """
        self.sysytem_prompt = prompt
        if self.llm_provider and hasattr(self.llm_provider, 'update_system_prompt'):
            self.llm_provider.update_system_prompt(prompt) # LLMプロバイダに通知
        # self._create_prompt_template() # AgentExecutorを直接持たないので不要
        # self.__create_character() # AgentExecutorを直接持たないので不要
        self.llm_provider.create_agent_executer()
        
    @classmethod
    def __wait(cls):
        """
        クラスメソッド。APIの連続呼び出しを防ぐために、指定された間隔(interval)だけ待機します。
        """
        time_buf = cls.interval - (time.time()-cls.time_buffer)
        print("AIAgent.time_buf", time_buf)
        if 0 < time_buf:
            time.sleep(time_buf)
        cls.time_buffer = time.time()

    def append_message(self, huma_message, ai_message):
        """
        ユーザーメッセージとAIメッセージを会話履歴に追加します。

        Args:
            huma_message (str): 追加するユーザーメッセージ。
            ai_message (str): 追加するAIメッセージ。
        """
        self.llm_provider.append_message(huma_message, ai_message)

    def append_human_message(self, message):
        """
        ユーザーメッセージを会話履歴に追加します。

        Args:
            message (str): 追加するユーザーメッセージ。
        """
        self.llm_provider.append_human_message(message)

    def append_ai_message(self, message):
        """
        AIメッセージを会話履歴に追加します。

        Args:
            message (str): 追加するAIメッセージ。
        """
        self.llm_provider.append_ai_message(message)


    def get_history(self):
        """会話履歴オブジェクトを取得します。"""
        return self.memory

    def append_aimessage(self, message):
        """AIメッセージを会話履歴に追加します。append_ai_message と同じ機能です。"""
        self.llm_provider.append_ai_message(message)

    @classmethod
    def increment_indentation(cls):
        """クラス変数 indentation をインクリメントします。GUI表示のインデント調整用。"""
        cls.indentation += 1

    @classmethod
    def decrement_indentation(cls):
        """クラス変数 indentation をデクリメントします。GUI表示のインデント調整用。0未満にはなりません。"""
        cls.indentation -= 1
        if cls.indentation < 0:
            cls.indentation = 0

    def update_last_input(self, imput_prompt_text: str, image_data_urls: Optional[List[str]] = None):
        """
        会話履歴内の最後のユーザー入力(HumanMessage)を更新します。
        AgentExecutorがシステムプロンプトとユーザープロンプトを結合して履歴に保存する場合があるため、
        ユーザープロンプトのみが履歴に残るように修正します。

        Args:
            imput_prompt (str): 更新するユーザー入力の文字列。
        """
        self.llm_provider.update_input(imput_prompt_text, image_data_urls) #この関数は現在未実装

    def update_last_input_text(self, imput_prompt_text: str, image_data_urls: Optional[List[str]] = None):
        """
        会話履歴内の最後のユーザー入力(HumanMessage)を更新します。
        AgentExecutorがシステムプロンプトとユーザープロンプトを結合して履歴に保存する場合があるため、
        ユーザープロンプトのみが履歴に残るように修正します。

        Args:
            imput_prompt (str): 更新するユーザー入力の文字列。
        """
        self.llm_provider.update_input_text(imput_prompt_text, image_data_urls)

    def _create_prompt_template(self):
        """
        エージェントのプロンプトテンプレートを作成します。
        (このメソッドはAgentExecutorをAIAgentが直接持つ場合のものです。設計変更後は不要になる可能性があります)
        private_memoryフラグとシステムプロンプトの有無によって、プロンプトの構造が変わります。
        """
        messages = []
        history_placeholder_name = ""

        if self.private_memory:
            messages.append(("system", self.sysytem_prompt))
            history_placeholder_name = self.name + "_chat_history" # アンダースコア区切り
            messages.append(MessagesPlaceholder(variable_name=history_placeholder_name))
            messages.append(("user", "{input}"))
            if self.tools: # ツールがある場合のみ agent_scratchpad を追加
                messages.append(MessagesPlaceholder(variable_name="agent_scratchpad"))
        else:
            if len(self.sysytem_prompt) > 0:
                messages.append(("system", self.sysytem_prompt))
            messages.append(MessagesPlaceholder(variable_name="chat_history"))
            messages.append(("user", "{input}"))
            if self.tools: # ツールがある場合のみ agent_scratchpad を追加
                messages.append(MessagesPlaceholder(variable_name="agent_scratchpad"))
        self.prompt = ChatPromptTemplate.from_messages(messages)

    

    @classmethod
    def set_default_llm_model(cls, llm_model: str):
        """
        デフォルトのLLMモデルを設定します。

        Args:
            llm_model (str): 新しいデフォルトLLMモデルの識別子。
        """
        cls.DEFAULT_LLM_TYPE, cls.DEFAULT_MODEL_ID = cls._parse_llm_model_string(llm_model)