Python Programer Agent:ソースコード:fllow_controller.py

PYTHON_PATHにはあなたの環境でのpython.exeへのパスを通してください。

import tools.command_list 
import os
import subprocess

import threading
import sys
import time
import msvcrt
####################################################
PYTHON_PATH = "your python path"
####################################################

def create_folder(folder_path):
    """指定されたフォルダが存在しない場合、フォルダを作成します。
  
    Args:
      folder_path: 作成するフォルダのパス。
    """
    folder_path = folder_path.replace("\\", "/")
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    if folder_path.endswith("/"):
        return folder_path
    else:
        return folder_path + "/"

WORK_SPACE_DIR = create_folder(os.getcwd())
AGENT_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Agents")
CODE_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Code")
DELOVERABLES_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Deliverables")
TEMP_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Temp")





class  PythonFileExecuteData():
    """pythonファイル実行時の出力保存"""
    def __init__(self):
        self.clear()

    def clear(self):
        self.stdout = ""
        self.error = ""
        self.returncode = 0
        
g_pfed = PythonFileExecuteData()

def read_output(process):
    while True:
        line = process.stdout.readline()
        if line == b'':
            break

        try:
            print(line.decode('utf-8'), end='')
            g_pfed.stdout += line.decode('utf-8')
        except UnicodeDecodeError:
            if str is type(line):
                g_pfed.error += line

def read_error(process):
    while True:
        line = process.stderr.readline()
        if line == b'':
            break
        try:
            print(line.decode('utf-8'), end='')
            g_pfed.error += line.decode('utf-8')

        except UnicodeDecodeError:
            if str is type(line):
                g_pfed.error += line


def check_error_python_file_execute(python_file, input_data, normal_termination_time=5):
    """
    別プロセスで実行される関数。
    # 別のPythonコードを実行する
    # python_fileのコードを実行します。input_dataは引数です。
    Args:
        python_file: 実行したいパイソンファイル
        input_data: pythonファイルに渡す引数
    Returns:
        実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
    """
    print("python_file_execute:")
    g_pfed.clear()
    process = subprocess.Popen([PYTHON_PATH, CODE_SPACE_DIR + python_file, input_data],
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE
                                )

    # 標準出力と標準エラー出力を別々のスレッドで読み込む
    output_thread = threading.Thread(target=read_output, args=(process,))
    error_thread = threading.Thread(target=read_error, args=(process,))
    output_thread.start()
    error_thread.start()
    sttime=time.time()
    # メインスレッドでキー入力待ち
    while True:
      # キー入力があるか確認
        try:

            if msvcrt.kbhit():
                # キー入力があれば、子プロセスにデータを送信
                input_data = msvcrt.getch()

                if input_data == b'\x1b':  # escキー
                    print("escキーが押されました")
                    process.terminate()
                    break

                # キー入力があったら、子プロセスにデータを送信
                if process.poll() is None:
                    if process.stdin is not None:
                        process.stdin.write(input_data)
                        process.stdin.flush()
                    else:
                        print("子プロセスは標準入力を受け取っていません")
                else:
                    print("子プロセスはすでに終了しています")
                    break
            else:
                time.sleep(0.01)
                if 0 < len(g_pfed.error):
                    print("エラーが発生しました。")
                    process.terminate()
                    break
            if (normal_termination_time < time.time() - sttime):
                process.terminate()
                print(str(normal_termination_time)+"秒正常に動作しました。")

                break
            
        except EOFError:
            break

    # スレッドを終了
    output_thread.join()
    error_thread.join()

    # 子プロセスの終了ステータスを取得
    returncode = process.wait()
    g_pfed.returncode = returncode
    print(" g_pfed.returncode", g_pfed.returncode)
    if returncode == 0:
        print("コマンドが正常に終了しました")
    else:
        traceback_list = g_pfed.error.splitlines()
        print(f"traceback_list: {traceback_list}")

        print("コマンドがエラーで終了しました")
        print("g_pfed.stdout, g_pfed.error, g_pfed.returncode",
              g_pfed.stdout, g_pfed.error, g_pfed.returncode)
    return g_pfed.stdout, g_pfed.error, g_pfed.returncode
###################################################

class FllowControlleData():
    def __init__(self):
        self.code_error = False
        self.code = ""
        self.ext_list = []  # 拡張子

g_fcd = FllowControlleData()
# ファイルの実行とputhonエラーの報告


def get_file_list(folder_path):
  """指定されたフォルダ内のファイルリストを取得する関数。

  Args:
    folder_path: ファイルリストを取得したいフォルダのパス。

  Returns:
    フォルダ内のファイルリスト。
  """
  file_list = []
  folder_path = os.path.normpath(folder_path)  # パスを正規化
  for root, _, files in os.walk(folder_path):
    for file in files:
      file_list.append(os.path.join(root, file))
  return file_list


def get_program_code_from_response(r_str):
    # プログラムコードw抜き出す
    
    lines = r_str.split("\n")
    now_code = False
    code_dict = {}
    ctype = ""
    for line in lines:
        l_line = line.lower()
        l_line = line.rstrip()
        if l_line.startswith("```"):
            if False is now_code:
                now_code = True
                ctype = l_line[3:]
                if ctype not in code_dict:
                    code_dict[ctype] = ""
                print("ctype", ctype)
                continue
        if "```" == l_line:
            if now_code:
                now_code = False
                continue
        if now_code:
            code_dict[ctype] += line + "\n"
    return code_dict


def get_lengest_program_code_from_response(r_str):
    # プログラムコードw抜き出す
    
    lines = r_str.split("\n")
    now_code = False
    code_dict = {}
    code = ""
    ctype = ""
    for line in lines:
        l_line = line.lower()
        l_line = line.rstrip()
        if l_line.startswith("```"):
            if False is now_code:
                now_code = True
                code = ""
                ctype = l_line[3:]
                if ctype not in code_dict:
                    code_dict[ctype] = ""
                print("ctype", ctype)
                continue
        if "```" == l_line:
            if now_code:
                if len(code_dict[ctype]) < len(code):
                    code_dict[ctype] = code
                now_code = False
                continue
        if now_code:
            code += line + "\n"
    return code_dict


def save_program_code_to_temp(code_dict):

    for key, value in code_dict.items():
        if "" != key:
            if "python" == key:
                ext = ".py"
            elif "javascript" == key:
                ext = ".js"
            elif "c++" == key:
                ext = ".cpp"
            elif "c++" == key:
                ext = ".cpp"
            elif "ruby" == key:
                ext = ".rb"
            elif "perl" == key:
                ext = ".pl"
            elif "r" == key:
                ext = ".rd"
            else:
                ext = "." + key
            # コードの種類を設置        
            g_fcd.ext_list.append(ext)
            print("key", key)
            tools.command_list.save_program_file("temp"+ext, value)


def python_error_check(r_str):
    g_fcd.ext_list = []
    code = get_lengest_program_code_from_response(r_str)
    print("code", code)
    if g_fcd.code == code:
        return "ソースコードが前から変わっていません"
    g_fcd.code = code
    save_program_code_to_temp(code)
    print("g_fcd.ext_list", g_fcd.ext_list)
    if ".py" in g_fcd.ext_list:
        print("python_error_check step 01")
        output, error, returncode = check_error_python_file_execute("temp.py", "")
        print("output, error", output, error, returncode)
        if "" == output and "" == error:
            g_fcd.code_error = False
            return ""
        elif "" == error:
            g_fcd.code_error = False
            return "実行した結果。標準出力の内容は\r\n" + output + "\r\n"
        elif "" == output:
            g_fcd.code_error = True

            return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n"
        else:
            g_fcd.code_error = True        
            return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n標準出力の内容は\r\n" + output + "\r\n"
    else:
        g_fcd.code_error = False
        return ""

def get_code_error():
    return g_fcd.code_error

以下AIによる説明

このコードは、Python ファイルを実行し、その結果を分析してエラーを検出する機能を提供するようです。

コードの機能

  • ファイルの読み込みと実行:
    • check_error_python_file_execute 関数は、指定された Python ファイルを実行し、標準出力、標準エラー出力、および終了コードを返します。
    • subprocess.Popen を使用して、別プロセスで Python ファイルを実行します。
    • threading を使用して、標準出力と標準エラー出力を別々のスレッドで読み込みます。
    • msvcrt を使用して、ユーザーからのキー入力を受け取ります。
    • エスケープキーが押された場合、プロセスを終了します。
    • 標準エラー出力にエラーメッセージが含まれている場合、プロセスを終了します。
    • 指定された時間内にプロセスが正常に終了しない場合、プロセスを終了します。
  • エラーの検出:
    • python_error_check 関数は、応答からプログラムコードを抽出し、Python ファイルとして保存します。
    • check_error_python_file_execute 関数を使用して、保存された Python ファイルを実行します。
    • 実行結果に基づいて、エラーが発生したかどうかを判断します。
    • エラーが発生した場合、エラーメッセージと標準出力を返します。
  • コードの保存:
    • save_program_code_to_temp 関数は、応答から抽出したプログラムコードを一時ファイルとして保存します。
  • その他の機能:
    • get_file_list 関数は、指定されたフォルダ内のファイルリストを取得します。
    • get_program_code_from_response 関数は、応答からプログラムコードを抽出します。
    • get_lengest_program_code_from_response 関数は、応答から最も長いプログラムコードを抽出します。

コードの使用方法

  1. PYTHON_PATH 変数を、Python インタープリターのパスに設定します。
  2. check_error_python_file_execute 関数を使用して、Python ファイルを実行します。
  3. python_error_check 関数を使用して、実行結果を分析し、エラーを検出します。

コードの改善点

  • エラーメッセージをより詳細にする。
  • 複数のプログラミング言語をサポートする。
  • コードのフォーマットを改善する。

コードの例

# Python ファイルを実行する
output, error, returncode = check_error_python_file_execute("my_script.py", "")

# エラーを検出する
error_message = python_error_check(output)

# エラーメッセージを出力する
print(error_message)

参考資料

このコードは、Python ファイルを実行し、エラーを検出するための基本的な機能を提供します。より高度な機能が必要な場合は、コードを拡張する必要があります。