OperationcodeCreatorAgent:ソース:flow:AIOperation.py

メインクラス

from flow.FlowSequentialBase import FlowSequentialBase
from flow.FlowIf import FlowIf
from flow.FlowFor import FlowFor
from flow.FlowSelect import FlowSelect
from flow.FlowWhile import FlowWhile
from flow.FlowProcessAIAgent import FlowProcessAIAgent
from flow.FlowBreak import FlowBreak
from flow.FlowReturn import FlowReturn
from flow.FlowContinue import FlowContinue
from flow.FlowClearMemory import FlowClearMemory
from flow.FlowAppendRespons import FlowAppendRespons
from flow.FlowClearResponsList import FlowClearResponsList
from flow.FlowPrintResponsList import FlowPrintResponsList
from flow.FlowPrint import FlowPrint


class AIOperation():

    def __init__(self):
        super().__init__()
        self.now_flow = None
        self.function_list = []
        self.now_code_block_stack = []
        self.now_code_block_stack.append(FlowSequentialBase())
        self.main_process = self.now_code_block_stack[-1]
        self.now_branching_stack = [None]

    def run(self, command="", pre_respons="", flow_data=None):
        return self.main_process.run(command, pre_respons, flow_data)

    def append_process(self, process):
        if 0 < len(self.now_code_block_stack):
            if None is self.now_branching_stack[-1]:
                self.now_code_block_stack[-1].append_process(process)

            else:
                # 選択肢が指定されているとき
                self.now_code_block_stack[-1].append_process(
                    process,
                    self.now_branching_stack[-1])

    def append_now_control_flow_statements_text(self, text):
        """
            現在の制御文が複数行にわたっているとき終わりのテキストを追加する。
            FlowProcessAIAgent以外は必要なら拡張する。

        """
        if FlowProcessAIAgent == type(self.now_code_block_stack[-1]):
            
            self.now_code_block_stack[-1].append_prompt(text)
            
    def analyze_operation_code(self, code_dict):
        if "operation" not in code_dict.keys():
            #print("code_dict.keys()", code_dict.keys())
            return 
        code = code_dict["operation"].split("\n")

        #print("code", code)
        for line in code:
            data = line.split(":")
            # print("data",data)
            # print("line",line)
            if 0 < len(data):
                data[0]= data[0].strip()
                if "sub" == data[0]:
                    pass
                elif "if" == data[0]:
                    control = self.now_code_block_stack[-1].append_process(FlowIf(data[1]))
                    self.now_code_block_stack.append(control)
                    self.now_branching_stack.append("True")
                    
                    pass
                elif "else" == data[0]:
                    self.now_branching_stack[-1] = "False"
                    pass
                elif "endif" == data[0]:
                    self.now_code_block_stack.pop()
                    self.now_branching_stack.pop()
                    pass
                elif "for" == data[0]:
                    control = self.now_code_block_stack[-1].append_process(FlowFor(int(data[1])))
                    self.now_code_block_stack.append(control)
                    pass
                elif "endfor" == data[0]:
                    self.now_code_block_stack.pop()
                    pass
                elif "while" == data[0]:
                    control = self.now_code_block_stack[-1].append_process(FlowWhile(data[1]))
                    self.now_code_block_stack.append(control)
                    pass
                elif "endwhile" == data[0]:
                    self.now_code_block_stack.pop()
                    pass
                elif "select" == data[0]:
                    control = self.now_code_block_stack[-1].append_process(FlowSelect(data[1]))
                    self.now_code_block_stack.append(control)
                    self.now_branching_stack.append(None)
                    pass
                elif "when" == data[0]:
                    self.now_code_block_stack[-1].append_branching(data[1])
                    self.now_branching_stack.pop()
                    self.now_branching_stack.append(data[1])
                    pass
                elif "endselect" == data[0]:
                    self.now_code_block_stack.pop()
                    self.now_branching_stack.pop()
                    pass
                elif "process" == data[0]:
                    print("process",data)
                    if 1 < len(data):
                        self.append_process(
                            FlowProcessAIAgent(data[1], data[2]))
                    else:
                        print("processの引数が足りません。data = ", data)
                    pass
                elif "break" == data[0]:
                    # 解析段階ではスタックを減らす必要はない
                    print("append break",data)
                    self.append_process(FlowBreak())
                    pass
                elif "continue" == data[0]:
                    self.append_process(FlowContinue())
                    pass
                elif "return" == data[0]:
                    self.append_process(FlowReturn())
                    pass
                elif "clear_memory" == data[0]:
                    self.append_process(FlowClearMemory())
                    pass
                elif "append_respons" == data[0]:
                    self.append_process(FlowAppendRespons())
                    pass
                elif "clear_respons_list" == data[0]:
                    self.append_process(FlowClearResponsList())
                    pass
                elif "print_respons_list" == data[0]:
                    self.append_process(FlowPrintResponsList())
                    pass
                elif "print" == data[0]:
                    self.append_process(FlowPrint(data[1]))
                    pass
                else:
                    self.append_now_control_flow_statements_text(data)
                    pass