# flow/AIOperation.py
from flow.FlowSequentialBase import FlowSequentialBase
from flow.FlowIf import FlowIf
from flow.FlowFor import FlowFor
from flow.FlowSelect import FlowSelect
from flow.FlowWhile import FlowWhile
from flow.FlowProcessAIAgent import FlowProcessAIAgent
from flow.FlowBreak import FlowBreak
from flow.FlowReturn import FlowReturn
from flow.FlowContinue import FlowContinue
from flow.FlowClearMemory import FlowClearMemory
from flow.FlowAppendResponse import FlowAppendResponse
from flow.FlowClearResponseList import FlowClearResponseList
from flow.FlowPrintResponseList import FlowPrintResponseList
from flow.FlowPrint import FlowPrint
class AIOperation:
"""
AIOperation はテキストベースと JSON 形式の両方で「操作コード」を解析し、
`Flow*` オブジェクトを組み立てて実行できるクラスです。
既存の `analyze_operation_code` (テキスト形式) と同じように動作しますが、
JSON 形式に対応するための `analyze_operation_code_json` を追加しました。
"""
def __init__(self):
super().__init__()
self.now_flow = None
self.function_list = []
self.now_code_block_stack = []
# ルートは FlowSequentialBase で初期化
self.now_code_block_stack.append(FlowSequentialBase())
self.main_process = self.now_code_block_stack[-1]
# 分岐(if, select)用スタック
self.now_branching_stack = [None]
# ------------------------------------------------------------------
# テキスト形式用
# ------------------------------------------------------------------
def run(self, command="", pre_respons="", flow_data=None):
return self.main_process.run(command, pre_respons, flow_data)
def append_process(self, process):
if 0 < len(self.now_code_block_stack):
self.now_code_block_stack[-1].append_process(
process, self.now_branching_stack[-1]
)
return process
def append_now_control_flow_statements_text(self, text):
"""
FlowProcessAIAgent 以外は必要なら拡張する。
"""
if isinstance(self.now_code_block_stack[-1], FlowProcessAIAgent):
self.now_code_block_stack[-1].append_prompt(text)
def analyze_operation_code(self, code_dict):
if "operation" not in code_dict.keys():
# print("code_dict.keys()", code_dict.keys())
return
code = code_dict["operation"].split("\n")
# print("code", code)
for line in code:
data = line.split("#")
data = data[0].split(":")
print("data", data)
# print("line",line)
if len(data) == 0:
continue
keyword = data[0].strip()
if "sub" == keyword:
pass
elif "if" == keyword:
control = self.append_process(FlowIf(data[1]))
self.now_code_block_stack.append(control)
self.now_branching_stack.append("True")
pass
elif "else" == keyword:
self.now_branching_stack[-1] = "False"
pass
elif "endif" == keyword:
self.now_code_block_stack.pop()
self.now_branching_stack.pop()
pass
elif "for" == keyword:
control = self.append_process(FlowFor(int(data[1])))
self.now_code_block_stack.append(control)
pass
elif "endfor" == keyword:
self.now_code_block_stack.pop()
pass
elif "while" == keyword:
control = self.append_process(FlowWhile(data[1]))
self.now_code_block_stack.append(control)
pass
elif "endwhile" == keyword:
self.now_code_block_stack.pop()
pass
elif "select" == keyword:
control = self.append_process(FlowSelect(data[1]))
self.now_code_block_stack.append(control)
self.now_branching_stack.append(None)
pass
elif "when" == keyword:
self.now_code_block_stack[-1].append_branching(data[1])
self.now_branching_stack.pop()
self.now_branching_stack.append(data[1])
pass
elif "endselect" == keyword:
self.now_code_block_stack.pop()
self.now_branching_stack.pop()
pass
elif "process" == keyword:
print("process", data)
if 3 <= len(data):
self.append_process(
FlowProcessAIAgent(data[1], data[2]))
else:
print("processの引数が足りません。data = ", data)
pass
elif "break" == keyword:
# 解析段階ではスタックを減らす必要はない
print("append break", data)
self.append_process(FlowBreak())
pass
elif "continue" == keyword:
self.append_process(FlowContinue())
pass
elif "return" == keyword:
self.append_process(FlowReturn())
pass
elif "clear_memory" == keyword:
self.append_process(FlowClearMemory())
pass
elif "append_respons" == keyword:
self.append_process(FlowAppendRespons())
pass
elif "clear_response_list" == keyword:
self.append_process(FlowClearResponseList())
pass
elif "print_response_list" == keyword:
self.append_process(FlowPrintResponseList())
pass
elif "print" == keyword:
self.append_process(FlowPrint(data[1]))
pass
else:
self.append_now_control_flow_statements_text(line)
pass
# ------------------------------------------------------------------
# JSON 形式用
# ------------------------------------------------------------------
def analyze_operation_code_json(self, code_dict):
"""
JSON 形式(OperationcodeCreator 構文)を解析し、Flow オブジェクトを
スタックに積んでいきます。再帰的に処理することで、
if / for / while / select などの入れ子構造に対応します。
Parameters
----------
code_dict : dict
JSON のパース結果。少なくとも `steps` キーを持つこと。
"""
# ステップのルートを処理
steps = code_dict.get("steps", [])
self._process_steps(steps, self.main_process)
def _process_steps(self, steps, parent_flow):
"""
再帰的にステップを処理するヘルパー。
Parameters
----------
steps : list[dict]
ステップのリスト
parent_flow : FlowSequentialBase
親フロー(通常は main_process か Flow* の body 部分)
"""
# 現在のフローをスタックに入れる
self.now_code_block_stack.append(parent_flow)
for step in steps:
stype = step.get("type")
if "if" == stype:
condition = step.get("condition", "")
flow_if = FlowIf(condition)
self.append_process(flow_if)
# then ブロック
then_steps = step.get("then", [])
self._process_steps(then_steps, flow_if)
# else ブロック(省略可)
else_steps = step.get("else", [])
if else_steps:
# else は FlowIf の `else` ブロックとして扱う
# 既存の FlowIf は else を持つ想定です
flow_else = flow_if.create_else_steps(else_steps, self)
self._process_steps(else_steps, flow_else)
elif "for" == stype:
count = int(step.get("count", 0))
flow_for = FlowFor(count)
self.append_process(flow_for)
body_steps = step.get("body", [])
self._process_steps(body_steps, flow_for)
elif "while" == stype:
condition = step.get("condition", "")
flow_while = FlowWhile(condition)
self.append_process(flow_while)
body_steps = step.get("body", [])
self._process_steps(body_steps, flow_while)
elif "select" == stype:
condition = step.get("condition", "")
flow_select = FlowSelect(condition)
self.append_process(flow_select)
cases = step.get("cases", [])
for case in cases:
when = case.get("when")
steps_case = case.get("steps", [])
flow_case = flow_select.create_case(when)
self._process_steps(steps_case, flow_case)
elif "process" == stype:
agent = step.get("agent", "")
prompt = step.get("prompt", "")
self.append_process(FlowProcessAIAgent(agent, prompt))
elif "break" == stype:
self.append_process(FlowBreak())
elif "continue" == stype:
self.append_process(FlowContinue())
elif "return" == stype:
self.append_process(FlowReturn())
elif "clear_memory" == stype:
self.append_process(FlowClearMemory())
elif "append_respons" == stype:
self.append_process(FlowAppendRespons())
elif "clear_respons_list" == stype:
self.append_process(FlowClearResponseList())
elif "print_respons_list" == stype:
self.append_process(FlowPrintResponseList())
elif "print" == stype:
text = step.get("text", "")
self.append_process(FlowPrint(text))
else:
# 予期しないタイプは無視またはログに残す
print(f"[AIOperation] 未対応 type: {stype}")
# 現在のフローをスタックから外す
self.now_code_block_stack.pop()