AI Agent:flow:AIOperation.py:ソースコード

# flow/AIOperation.py
from flow.FlowSequentialBase import FlowSequentialBase
from flow.FlowIf import FlowIf
from flow.FlowFor import FlowFor
from flow.FlowSelect import FlowSelect
from flow.FlowWhile import FlowWhile
from flow.FlowProcessAIAgent import FlowProcessAIAgent
from flow.FlowBreak import FlowBreak
from flow.FlowReturn import FlowReturn
from flow.FlowContinue import FlowContinue
from flow.FlowClearMemory import FlowClearMemory
from flow.FlowAppendResponse import FlowAppendResponse
from flow.FlowClearResponseList import FlowClearResponseList
from flow.FlowPrintResponseList import FlowPrintResponseList
from flow.FlowPrint import FlowPrint


class AIOperation:
    """
    AIOperation はテキストベースと JSON 形式の両方で「操作コード」を解析し、
    `Flow*` オブジェクトを組み立てて実行できるクラスです。

    既存の `analyze_operation_code` (テキスト形式) と同じように動作しますが、
    JSON 形式に対応するための `analyze_operation_code_json` を追加しました。
    """

    def __init__(self):
        super().__init__()
        self.now_flow = None
        self.function_list = []
        self.now_code_block_stack = []
        # ルートは FlowSequentialBase で初期化
        self.now_code_block_stack.append(FlowSequentialBase())
        self.main_process = self.now_code_block_stack[-1]
        # 分岐(if, select)用スタック
        self.now_branching_stack = [None]

    # ------------------------------------------------------------------
    # テキスト形式用
    # ------------------------------------------------------------------
    def run(self, command="", pre_respons="", flow_data=None):
        return self.main_process.run(command, pre_respons, flow_data)

    def append_process(self, process):
        if 0 < len(self.now_code_block_stack):
            self.now_code_block_stack[-1].append_process(
                process, self.now_branching_stack[-1]
            )
        return process

    def append_now_control_flow_statements_text(self, text):
        """
        FlowProcessAIAgent 以外は必要なら拡張する。
        """
        if isinstance(self.now_code_block_stack[-1], FlowProcessAIAgent):
            self.now_code_block_stack[-1].append_prompt(text)

    def analyze_operation_code(self, code_dict):
        if "operation" not in code_dict.keys():
            # print("code_dict.keys()", code_dict.keys())
            return
        code = code_dict["operation"].split("\n")

        # print("code", code)
        for line in code:
            data = line.split("#")
            data = data[0].split(":")
            print("data", data)
            # print("line",line)
            if len(data) == 0:
                continue

            keyword  = data[0].strip()
            if "sub" == keyword:
                pass
            elif "if" == keyword:
                control = self.append_process(FlowIf(data[1]))
                self.now_code_block_stack.append(control)
                self.now_branching_stack.append("True")
                pass
            elif "else" == keyword:
                self.now_branching_stack[-1] = "False"
                pass
            elif "endif" == keyword:
                self.now_code_block_stack.pop()
                self.now_branching_stack.pop()
                pass
            elif "for" == keyword:
                control = self.append_process(FlowFor(int(data[1])))
                self.now_code_block_stack.append(control)
                pass
            elif "endfor" == keyword:
                self.now_code_block_stack.pop()
                pass
            elif "while" == keyword:
                control = self.append_process(FlowWhile(data[1]))
                self.now_code_block_stack.append(control)
                pass
            elif "endwhile" == keyword:
                self.now_code_block_stack.pop()
                pass
            elif "select" == keyword:
                control = self.append_process(FlowSelect(data[1]))
                self.now_code_block_stack.append(control)
                self.now_branching_stack.append(None)
                pass
            elif "when" == keyword:
                self.now_code_block_stack[-1].append_branching(data[1])
                self.now_branching_stack.pop()
                self.now_branching_stack.append(data[1])
                pass
            elif "endselect" == keyword:
                self.now_code_block_stack.pop()
                self.now_branching_stack.pop()
                pass
            elif "process" == keyword:
                print("process", data)
                if 3 <= len(data):
                    self.append_process(
                        FlowProcessAIAgent(data[1], data[2]))
                else:
                    print("processの引数が足りません。data = ", data)
                pass
            elif "break" == keyword:
                # 解析段階ではスタックを減らす必要はない
                print("append break", data)
                self.append_process(FlowBreak())
                pass
            elif "continue" == keyword:
                self.append_process(FlowContinue())
                pass
            elif "return" == keyword:
                self.append_process(FlowReturn())
                pass
            elif "clear_memory" == keyword:
                self.append_process(FlowClearMemory())
                pass
            elif "append_respons" == keyword:
                self.append_process(FlowAppendRespons())
                pass
            elif "clear_response_list" == keyword:
                self.append_process(FlowClearResponseList())
                pass
            elif "print_response_list" == keyword:
                self.append_process(FlowPrintResponseList())
                pass
            elif "print" == keyword:
                self.append_process(FlowPrint(data[1]))
                pass
            else:
                self.append_now_control_flow_statements_text(line)
                pass
    # ------------------------------------------------------------------
    # JSON 形式用
    # ------------------------------------------------------------------
    def analyze_operation_code_json(self, code_dict):
        """
        JSON 形式(OperationcodeCreator 構文)を解析し、Flow オブジェクトを
        スタックに積んでいきます。再帰的に処理することで、
        if / for / while / select などの入れ子構造に対応します。

        Parameters
        ----------
        code_dict : dict
            JSON のパース結果。少なくとも `steps` キーを持つこと。
        """
        # ステップのルートを処理
        steps = code_dict.get("steps", [])
        self._process_steps(steps, self.main_process)

    def _process_steps(self, steps, parent_flow):
        """
        再帰的にステップを処理するヘルパー。

        Parameters
        ----------
        steps : list[dict]
            ステップのリスト
        parent_flow : FlowSequentialBase
            親フロー(通常は main_process か Flow* の body 部分)
        """
        # 現在のフローをスタックに入れる
        self.now_code_block_stack.append(parent_flow)

        for step in steps:
            stype = step.get("type")
            if "if" == stype:
                condition = step.get("condition", "")
                flow_if = FlowIf(condition)
                self.append_process(flow_if)
                # then ブロック
                then_steps = step.get("then", [])
                self._process_steps(then_steps, flow_if)
                # else ブロック(省略可)
                else_steps = step.get("else", [])
                if else_steps:
                    # else は FlowIf の `else` ブロックとして扱う
                    # 既存の FlowIf は else を持つ想定です
                    flow_else = flow_if.create_else_steps(else_steps, self)
                    self._process_steps(else_steps, flow_else)
            elif "for" == stype:
                count = int(step.get("count", 0))
                flow_for = FlowFor(count)
                self.append_process(flow_for)
                body_steps = step.get("body", [])
                self._process_steps(body_steps, flow_for)
            elif "while" == stype:
                condition = step.get("condition", "")
                flow_while = FlowWhile(condition)
                self.append_process(flow_while)
                body_steps = step.get("body", [])
                self._process_steps(body_steps, flow_while)
            elif "select" == stype:
                condition = step.get("condition", "")
                flow_select = FlowSelect(condition)
                self.append_process(flow_select)
                cases = step.get("cases", [])
                for case in cases:
                    when = case.get("when")
                    steps_case = case.get("steps", [])
                    flow_case = flow_select.create_case(when)
                    self._process_steps(steps_case, flow_case)

            elif "process" == stype:
                agent = step.get("agent", "")
                prompt = step.get("prompt", "")
                self.append_process(FlowProcessAIAgent(agent, prompt))
            elif "break" == stype:
                self.append_process(FlowBreak())
            elif "continue" == stype:
                self.append_process(FlowContinue())
            elif "return" == stype:
                self.append_process(FlowReturn())
            elif "clear_memory" == stype:
                self.append_process(FlowClearMemory())
            elif "append_respons" == stype:
                self.append_process(FlowAppendRespons())
            elif "clear_respons_list" == stype:
                self.append_process(FlowClearResponseList())
            elif "print_respons_list" == stype:
                self.append_process(FlowPrintResponseList())
            elif "print" == stype:
                text = step.get("text", "")
                self.append_process(FlowPrint(text))
            else:
                # 予期しないタイプは無視またはログに残す
                print(f"[AIOperation] 未対応 type: {stype}")

        # 現在のフローをスタックから外す
        self.now_code_block_stack.pop()