OperationcodeCreatorAgent:ソース:flow:flow_controller.py

import tools.command_list 
import os
import subprocess

import threading
import time
import msvcrt
####################################################
PYTHON_PATH = "your python path"
####################################################

def create_folder(folder_path):
    """指定されたフォルダが存在しない場合、フォルダを作成します。
  
    Args:
      folder_path: 作成するフォルダのパス。
    """
    folder_path = folder_path.replace("\\", "/")
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    if folder_path.endswith("/"):
        return folder_path
    else:
        return folder_path + "/"

WORK_SPACE_DIR = create_folder(os.getcwd())
AGENT_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Agents")
CODE_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Code")
DELOVERABLES_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Deliverables")
TEMP_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Temp")





class  PythonFileExecuteData():
    """pythonファイル実行時の出力保存"""
    def __init__(self):
        self.clear()

    def clear(self):
        self.stdout = ""
        self.error = ""
        self.returncode = 0
        
g_pfed = PythonFileExecuteData()

def read_output(process):
    while True:
        line = process.stdout.readline()
        if line == b'':
            break

        try:
            print(line.decode('utf-8'), end='')
            g_pfed.stdout += line.decode('utf-8')
        except UnicodeDecodeError:
            if str is type(line):
                g_pfed.error += line

def read_error(process):
    while True:
        line = process.stderr.readline()
        if line == b'':
            break
        try:
            print(line.decode('utf-8'), end='')
            g_pfed.error += line.decode('utf-8')

        except UnicodeDecodeError:
            if str is type(line):
                g_pfed.error += line


def check_error_python_file_execute(python_file, input_data, normal_termination_time=5):
    """
    別プロセスで実行される関数。
    # 別のPythonコードを実行する
    # python_fileのコードを実行します。input_dataは引数です。
    Args:
        python_file: 実行したいパイソンファイル
        input_data: pythonファイルに渡す引数
    Returns:
        実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
    """
    print("python_file_execute:")
    g_pfed.clear()
    process = subprocess.Popen([PYTHON_PATH, CODE_SPACE_DIR + python_file, input_data],
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE
                                )

    # 標準出力と標準エラー出力を別々のスレッドで読み込む
    output_thread = threading.Thread(target=read_output, args=(process,))
    error_thread = threading.Thread(target=read_error, args=(process,))
    output_thread.start()
    error_thread.start()
    sttime=time.time()
    # メインスレッドでキー入力待ち
    while True:
      # キー入力があるか確認
        try:

            if msvcrt.kbhit():
                # キー入力があれば、子プロセスにデータを送信
                input_data = msvcrt.getch()

                if input_data == b'\x1b':  # escキー
                    print("escキーが押されました")
                    process.terminate()
                    break

                # キー入力があったら、子プロセスにデータを送信
                if process.poll() is None:
                    if process.stdin is not None:
                        process.stdin.write(input_data)
                        process.stdin.flush()
                    else:
                        print("子プロセスは標準入力を受け取っていません")
                else:
                    print("子プロセスはすでに終了しています")
                    break
            else:
                time.sleep(0.01)
                if 0 < len(g_pfed.error):
                    print("エラーが発生しました。")
                    process.terminate()
                    break
            if (normal_termination_time < time.time() - sttime):
                process.terminate()
                print(str(normal_termination_time)+"秒正常に動作しました。")

                break
            
        except EOFError:
            break

    # スレッドを終了
    output_thread.join()
    error_thread.join()

    # 子プロセスの終了ステータスを取得
    returncode = process.wait()
    g_pfed.returncode = returncode
    print(" g_pfed.returncode", g_pfed.returncode)
    if returncode == 0:
        print("コマンドが正常に終了しました")
    else:
        traceback_list = g_pfed.error.splitlines()
        if 0< len(traceback_list):
            print(f"traceback_list: {traceback_list}")

            print("コマンドがエラーで終了しました")
        print("g_pfed.stdout, g_pfed.error, g_pfed.returncode",
              g_pfed.stdout, g_pfed.error, g_pfed.returncode)
    return g_pfed.stdout, g_pfed.error, g_pfed.returncode
###################################################

class FllowControlleData():
    def __init__(self):
        self.code_error = False
        self.code = ""
        self.ext_list = []  # 拡張子

g_fcd = FllowControlleData()
# ファイルの実行とputhonエラーの報告


def get_file_list(folder_path):
  """指定されたフォルダ内のファイルリストを取得する関数。

  Args:
    folder_path: ファイルリストを取得したいフォルダのパス。

  Returns:
    フォルダ内のファイルリスト。
  """
  file_list = []
  folder_path = os.path.normpath(folder_path)  # パスを正規化
  for root, _, files in os.walk(folder_path):
    for file in files:
      file_list.append(os.path.join(root, file))
  return file_list


def get_program_code_from_response(r_str):
    # プログラムコードw抜き出す
    
    lines = r_str.split("\n")
    now_code = False
    code_dict = {}
    ctype = ""
    for line in lines:
        l_line = line.rstrip()
        l_line = l_line.lower()
        if l_line.startswith("```"):
            if False is now_code:
                now_code = True
                ctype = l_line[3:]
                if ctype not in code_dict:
                    code_dict[ctype] = ""
                print("ctype", ctype)
                continue
        if "```" == l_line:
            if now_code:
                now_code = False
                continue
        if now_code:
            code_dict[ctype] += line + "\n"
    return code_dict

def get_program_code_list_from_response(r_str):
    # プログラムコードw抜き出す
    
    lines = r_str.split("\n")
    now_code = False
    code_dict = {}
    code = ""
    ctype = ""
    for line in lines:
        l_line = line.rstrip()
        l_line = l_line.lower()
        if l_line.startswith("```"):
            if False is now_code:
                now_code = True
                code = ""
                ctype = l_line[3:]
                if ctype not in code_dict:
                    code_dict[ctype] = []
                print("ctype", ctype)
                continue
        if "```" == l_line:
            if now_code:
                code_dict[ctype].append(code)
                now_code = False
                continue
        if now_code:
            code += line + "\n"
    return code_dict


def get_lengest_program_code_from_response(r_str):
    # プログラムコードw抜き出す

    lines = r_str.split("\n")
    now_code = False
    code_dict = {}
    code = ""
    ctype = ""
    for line in lines:
        l_line = line.rstrip()
        l_line = l_line.lower()
        if l_line.startswith("```"):
            if False is now_code:
                now_code = True
                code = ""
                ctype = l_line[3:]
                print("get_lengest_program_code_from_response:ctype", ctype, l_line)
                if ctype not in code_dict:
                    code_dict[ctype] = ""
                print("ctype", ctype)
                continue
        if "```" == l_line:
            if now_code:
                if len(code_dict[ctype]) < len(code):
                    code_dict[ctype] = code
                now_code = False
                continue
        if now_code:
            code += line + "\n"
    return code_dict


def save_program_code_to_temp(code_dict):

    for key, value in code_dict.items():
        if "" != key:
            if "python" == key:
                ext = ".py"
            elif "javascript" == key:
                ext = ".js"
            elif "c++" == key:
                ext = ".cpp"
            elif "c++" == key:
                ext = ".cpp"
            elif "ruby" == key:
                ext = ".rb"
            elif "perl" == key:
                ext = ".pl"
            elif "r" == key:
                ext = ".rd"
            else:
                ext = "." + key
            # コードの種類を設置        
            g_fcd.ext_list.append(ext)
            print("key", key)
            tools.command_list.save_program_file("temp"+ext, value)


def python_error_check(r_str, error_respons="", timeout=5):
    
    code_dict = get_lengest_program_code_from_response(r_str)
    return python_error_check_main(code_dict)

def python_error_check_main(code, pre_error="", timeout=5):
    """pythonプログラムを実行してその出力を得る。"""
    g_fcd.ext_list = []
    code_dict = {}
    if dict is not type(code):
        code_dict["python"] = code
    else:
        code_dict = code
    print("code", code_dict)
    if g_fcd.code == code_dict:
        return "ソースコードが前から変わっていません。前回のエラーは\r\n" + pre_error
    g_fcd.code = code_dict
    save_program_code_to_temp(code_dict)
    
    print("g_fcd.ext_list", g_fcd.ext_list)
    if ".py" in g_fcd.ext_list:
        output, error, returncode = check_error_python_file_execute("temp.py",
                                                                    "",
                                                                    timeout)
        print("output, error", output, error, returncode)
        if "" == output and "" == error:
            g_fcd.code_error = False
            return ""
        elif "" == error:
            g_fcd.code_error = False
            return "実行した結果。標準出力の内容は\r\n" + output + "\r\n"
        elif "" == output:
            g_fcd.code_error = True

            return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n"
        else:
            g_fcd.code_error = True        
            return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n標準出力の内容は\r\n" + output + "\r\n"
    else:
        g_fcd.code_error = False
        return ""


def get_code_error():
    return g_fcd.code_error


def get_python_line_start_sharp_comment_from_respons(respons):
    """#で始まる行のコメントをリスト化する。"""
    result = []
    code_dict = get_lengest_program_code_from_response(respons)
    if "python" in code_dict:
        code = code_dict["python"]
        line_list = code.split("\n")
        if 0 < len(line_list):
            for line in line_list:
                l_line = line.strip()
                if 0 < len(l_line):
                    if "#" == l_line[0]:
                        result.append(l_line[1:])
    return result