from flow.FlowSequentialBase import FlowSequentialBase
from flow.FlowIf import FlowIf
from flow.FlowFor import FlowFor
from flow.FlowSelect import FlowSelect
from flow.FlowWhile import FlowWhile
from flow.FlowProcessAIAgent import FlowProcessAIAgent
from flow.FlowBreak import FlowBreak
from flow.FlowReturn import FlowReturn
from flow.FlowContinue import FlowContinue
from flow.FlowClearMemory import FlowClearMemory
from flow.FlowAppendRespons import FlowAppendRespons
from flow.FlowClearResponsList import FlowClearResponsList
from flow.FlowPrintResponsList import FlowPrintResponsList
from flow.FlowPrint import FlowPrint
class AIOperation():
def __init__(self):
super().__init__()
self.now_flow = None
self.function_list = []
self.now_code_block_stack = []
self.now_code_block_stack.append(FlowSequentialBase())
self.main_process = self.now_code_block_stack[-1]
self.now_branching_stack = [None]
def run(self, command="", pre_respons="", flow_data=None):
return self.main_process.run(command, pre_respons, flow_data)
def append_process(self, process):
if 0 < len(self.now_code_block_stack):
if None is self.now_branching_stack[-1]:
self.now_code_block_stack[-1].append_process(process)
else:
# 選択肢が指定されているとき
self.now_code_block_stack[-1].append_process(
process,
self.now_branching_stack[-1])
return process
def append_now_control_flow_statements_text(self, text):
"""
現在の制御文が複数行にわたっているとき終わりのテキストを追加する。
FlowProcessAIAgent以外は必要なら拡張する。
"""
if FlowProcessAIAgent is type(self.now_code_block_stack[-1]):
self.now_code_block_stack[-1].append_prompt(text)
def analyze_operation_code(self, code_dict):
if "operation" not in code_dict.keys():
# print("code_dict.keys()", code_dict.keys())
return
code = code_dict["operation"].split("\n")
# print("code", code)
for line in code:
data = line.split("#")
data = data[0].split(":")
print("data", data)
# print("line",line)
if 0 < len(data):
data[0] = data[0].strip()
if "sub" == data[0]:
pass
elif "if" == data[0]:
control = self.append_process(FlowIf(data[1]))
self.now_code_block_stack.append(control)
self.now_branching_stack.append("True")
pass
elif "else" == data[0]:
self.now_branching_stack[-1] = "False"
pass
elif "endif" == data[0]:
self.now_code_block_stack.pop()
self.now_branching_stack.pop()
pass
elif "for" == data[0]:
control = self.append_process(FlowFor(int(data[1])))
self.now_code_block_stack.append(control)
pass
elif "endfor" == data[0]:
self.now_code_block_stack.pop()
pass
elif "while" == data[0]:
control = self.append_process(FlowWhile(data[1]))
self.now_code_block_stack.append(control)
pass
elif "endwhile" == data[0]:
self.now_code_block_stack.pop()
pass
elif "select" == data[0]:
control = self.append_process(FlowSelect(data[1]))
self.now_code_block_stack.append(control)
self.now_branching_stack.append(None)
pass
elif "when" == data[0]:
self.now_code_block_stack[-1].append_branching(data[1])
self.now_branching_stack.pop()
self.now_branching_stack.append(data[1])
pass
elif "endselect" == data[0]:
self.now_code_block_stack.pop()
self.now_branching_stack.pop()
pass
elif "process" == data[0]:
print("process", data)
if 1 < len(data):
self.append_process(
FlowProcessAIAgent(data[1], data[2]))
else:
print("processの引数が足りません。data = ", data)
pass
elif "break" == data[0]:
# 解析段階ではスタックを減らす必要はない
print("append break", data)
self.append_process(FlowBreak())
pass
elif "continue" == data[0]:
self.append_process(FlowContinue())
pass
elif "return" == data[0]:
self.append_process(FlowReturn())
pass
elif "clear_memory" == data[0]:
self.append_process(FlowClearMemory())
pass
elif "append_respons" == data[0]:
self.append_process(FlowAppendRespons())
pass
elif "clear_respons_list" == data[0]:
self.append_process(FlowClearResponsList())
pass
elif "print_respons_list" == data[0]:
self.append_process(FlowPrintResponsList())
pass
elif "print" == data[0]:
self.append_process(FlowPrint(data[1]))
pass
else:
self.append_now_control_flow_statements_text(line)
pass