Agent Select:ソースコード:fllow_controller.py

import tools.command_list 
import os
import subprocess

import threading
import sys
import time
import msvcrt
####################################################
PYTHON_PATH = "your python path"
####################################################

def create_folder(folder_path):
    """指定されたフォルダが存在しない場合、フォルダを作成します。
  
    Args:
      folder_path: 作成するフォルダのパス。
    """
    folder_path = folder_path.replace("\\", "/")
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    if folder_path.endswith("/"):
        return folder_path
    else:
        return folder_path + "/"

WORK_SPACE_DIR = create_folder(os.getcwd())
AGENT_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Agents")
CODE_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Code")
DELOVERABLES_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Deliverables")
TEMP_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Temp")





class  PythonFileExecuteData():
    """pythonファイル実行時の出力保存"""
    def __init__(self):
        self.clear()

    def clear(self):
        self.stdout = ""
        self.error = ""
        self.returncode = 0
        
g_pfed = PythonFileExecuteData()

def read_output(process):
    while True:
        line = process.stdout.readline()
        if line == b'':
            break

        try:
            print(line.decode('utf-8'), end='')
            g_pfed.stdout += line.decode('utf-8')
        except UnicodeDecodeError:
            if str is type(line):
                g_pfed.error += line

def read_error(process):
    while True:
        line = process.stderr.readline()
        if line == b'':
            break
        try:
            print(line.decode('utf-8'), end='')
            g_pfed.error += line.decode('utf-8')

        except UnicodeDecodeError:
            if str is type(line):
                g_pfed.error += line


def check_error_python_file_execute(python_file, input_data, normal_termination_time=5):
    """
    別プロセスで実行される関数。
    # 別のPythonコードを実行する
    # python_fileのコードを実行します。input_dataは引数です。
    Args:
        python_file: 実行したいパイソンファイル
        input_data: pythonファイルに渡す引数
    Returns:
        実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
    """
    print("python_file_execute:")
    g_pfed.clear()
    process = subprocess.Popen([PYTHON_PATH, CODE_SPACE_DIR + python_file, input_data],
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE
                                )

    # 標準出力と標準エラー出力を別々のスレッドで読み込む
    output_thread = threading.Thread(target=read_output, args=(process,))
    error_thread = threading.Thread(target=read_error, args=(process,))
    output_thread.start()
    error_thread.start()
    sttime=time.time()
    # メインスレッドでキー入力待ち
    while True:
      # キー入力があるか確認
        try:

            if msvcrt.kbhit():
                # キー入力があれば、子プロセスにデータを送信
                input_data = msvcrt.getch()

                if input_data == b'\x1b':  # escキー
                    print("escキーが押されました")
                    process.terminate()
                    break

                # キー入力があったら、子プロセスにデータを送信
                if process.poll() is None:
                    if process.stdin is not None:
                        process.stdin.write(input_data)
                        process.stdin.flush()
                    else:
                        print("子プロセスは標準入力を受け取っていません")
                else:
                    print("子プロセスはすでに終了しています")
                    break
            else:
                time.sleep(0.01)
                if 0 < len(g_pfed.error):
                    print("エラーが発生しました。")
                    process.terminate()
                    break
            if (normal_termination_time < time.time() - sttime):
                process.terminate()
                print(str(normal_termination_time)+"秒正常に動作しました。")

                break
            
        except EOFError:
            break

    # スレッドを終了
    output_thread.join()
    error_thread.join()

    # 子プロセスの終了ステータスを取得
    returncode = process.wait()
    g_pfed.returncode = returncode
    print(" g_pfed.returncode", g_pfed.returncode)
    if returncode == 0:
        print("コマンドが正常に終了しました")
    else:
        traceback_list = g_pfed.error.splitlines()
        if 0< len(traceback_list):
            print(f"traceback_list: {traceback_list}")

            print("コマンドがエラーで終了しました")
        print("g_pfed.stdout, g_pfed.error, g_pfed.returncode",
              g_pfed.stdout, g_pfed.error, g_pfed.returncode)
    return g_pfed.stdout, g_pfed.error, g_pfed.returncode
###################################################

class FllowControlleData():
    def __init__(self):
        self.code_error = False
        self.code = ""
        self.ext_list = []  # 拡張子

g_fcd = FllowControlleData()
# ファイルの実行とputhonエラーの報告


def get_file_list(folder_path):
  """指定されたフォルダ内のファイルリストを取得する関数。

  Args:
    folder_path: ファイルリストを取得したいフォルダのパス。

  Returns:
    フォルダ内のファイルリスト。
  """
  file_list = []
  folder_path = os.path.normpath(folder_path)  # パスを正規化
  for root, _, files in os.walk(folder_path):
    for file in files:
      file_list.append(os.path.join(root, file))
  return file_list


def get_program_code_from_response(r_str):
    # プログラムコードw抜き出す
    
    lines = r_str.split("\n")
    now_code = False
    code_dict = {}
    ctype = ""
    for line in lines:
        l_line = line.lower()
        l_line = line.rstrip()
        if l_line.startswith("```"):
            if False is now_code:
                now_code = True
                ctype = l_line[3:]
                if ctype not in code_dict:
                    code_dict[ctype] = ""
                print("ctype", ctype)
                continue
        if "```" == l_line:
            if now_code:
                now_code = False
                continue
        if now_code:
            code_dict[ctype] += line + "\n"
    return code_dict


def get_lengest_program_code_from_response(r_str):
    # プログラムコードw抜き出す
    
    lines = r_str.split("\n")
    now_code = False
    code_dict = {}
    code = ""
    ctype = ""
    for line in lines:
        l_line = line.lower()
        l_line = line.rstrip()
        if l_line.startswith("```"):
            if False is now_code:
                now_code = True
                code = ""
                ctype = l_line[3:]
                if ctype not in code_dict:
                    code_dict[ctype] = ""
                print("ctype", ctype)
                continue
        if "```" == l_line:
            if now_code:
                if len(code_dict[ctype]) < len(code):
                    code_dict[ctype] = code
                now_code = False
                continue
        if now_code:
            code += line + "\n"
    return code_dict


def save_program_code_to_temp(code_dict):

    for key, value in code_dict.items():
        if "" != key:
            if "python" == key:
                ext = ".py"
            elif "javascript" == key:
                ext = ".js"
            elif "c++" == key:
                ext = ".cpp"
            elif "c++" == key:
                ext = ".cpp"
            elif "ruby" == key:
                ext = ".rb"
            elif "perl" == key:
                ext = ".pl"
            elif "r" == key:
                ext = ".rd"
            else:
                ext = "." + key
            # コードの種類を設置        
            g_fcd.ext_list.append(ext)
            print("key", key)
            tools.command_list.save_program_file("temp"+ext, value)


def python_error_check(r_str):
    g_fcd.ext_list = []
    code = get_lengest_program_code_from_response(r_str)
    print("code", code)
    if g_fcd.code == code:
        return "ソースコードが前から変わっていません"
    g_fcd.code = code
    save_program_code_to_temp(code)
    print("g_fcd.ext_list", g_fcd.ext_list)
    if ".py" in g_fcd.ext_list:
        output, error, returncode = check_error_python_file_execute("temp.py", "")
        print("output, error", output, error, returncode)
        if "" == output and "" == error:
            g_fcd.code_error = False
            return ""
        elif "" == error:
            g_fcd.code_error = False
            return "実行した結果。標準出力の内容は\r\n" + output + "\r\n"
        elif "" == output:
            g_fcd.code_error = True

            return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n"
        else:
            g_fcd.code_error = True        
            return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n標準出力の内容は\r\n" + output + "\r\n"
    else:
        g_fcd.code_error = False
        return ""

def get_code_error():
    return g_fcd.code_error

AIによる説明

Python ファイル実行とエラーチェック機能のリファレンスマニュアル

このコードは、Python ファイルを実行し、実行結果やエラーを処理する機能を提供します。Streamlit などのフレームワークと連携して、ユーザーが Python コードを実行し、その結果を確認できるように設計されています。

ファイル構成

  • tools/command_list.py: ファイル保存などのコマンドを実行するためのモジュール。
  • main.py: Python ファイル実行とエラーチェックのメインロジックを含むファイル。

クラスと関数

1. PythonFileExecuteData クラス

  • Python ファイル実行時の出力やエラー情報を保存するためのクラスです。
  • メソッド:
    • __init__(self): 初期化時に出力、エラー、リターンコードをクリアします。
    • clear(self): 出力、エラー、リターンコードをクリアします。

2. check_error_python_file_execute(python_file, input_data, normal_termination_time=5) 関数

  • 指定された Python ファイルを実行し、実行結果とエラー情報を返します。
  • 引数:
    • python_file: 実行する Python ファイル名。
    • input_data: Python ファイルに渡す引数。
    • normal_termination_time: 正常終了とみなす時間(秒)。
  • 返り値:
    • (stdout, stderr, returncode): 標準出力、標準エラー出力、リターンコードのタプル。

3. FllowControlleData クラス

  • Python ファイル実行時のエラー状態やコード情報を保存するためのクラスです。
  • メソッド:
    • __init__(self): 初期化時にエラー状態、コード、拡張子リストをクリアします。

4. get_file_list(folder_path) 関数

  • 指定されたフォルダ内のファイルリストを取得します。
  • 引数:
    • folder_path: ファイルリストを取得するフォルダのパス。
  • 返り値:
    • フォルダ内のファイルリスト。

5. get_program_code_from_response(r_str) 関数

  • テキストからプログラムコードを抜き出します。
  • 引数:
    • r_str: テキストデータ。
  • 返り値:
    • プログラムコードの辞書。キーは言語の種類(例: “python”、”javascript”)、値はコードです。

6. get_lengest_program_code_from_response(r_str) 関数

  • テキストから最も長いプログラムコードを抜き出します。
  • 引数:
    • r_str: テキストデータ。
  • 返り値:
    • プログラムコードの辞書。キーは言語の種類(例: “python”、”javascript”)、値はコードです。

7. save_program_code_to_temp(code_dict) 関数

  • プログラムコードを一時ファイルとして保存します。
  • 引数:
    • code_dict: プログラムコードの辞書。

8. python_error_check(r_str) 関数

  • Python コードを実行し、エラーが発生した場合にエラーメッセージを返します。
  • 引数:
    • r_str: テキストデータ。
  • 返り値:
    • エラーメッセージ。

9. get_code_error() 関数

  • Python コードの実行結果がエラーかどうかを返します。
  • 返り値:
    • True: エラーが発生した場合。
    • False: エラーが発生しなかった場合。

使用方法

  1. check_error_python_file_execute() 関数を使用して、Python ファイルを実行します。
  2. python_error_check() 関数を使用して、実行結果にエラーが含まれているかどうかを確認します。
  3. get_code_error() 関数を使用して、エラーが発生したかどうかを確認します。

# Python ファイルを実行
stdout, stderr, returncode = check_error_python_file_execute("my_script.py", "input_data")

# エラーメッセージを取得
error_message = python_error_check(stdout)

# エラーが発生したかどうかを確認
is_error = get_code_error()

# 結果を表示
print(f"標準出力: {stdout}")
print(f"標準エラー出力: {stderr}")
print(f"リターンコード: {returncode}")
print(f"エラーメッセージ: {error_message}")
print(f"エラーが発生しましたか: {is_error}")

注意点

  • このコードは、Windows 環境でのみ動作します。
  • Python ファイルを実行する前に、PYTHON_PATH 変数を適切な Python インタープリターのパスに設定する必要があります。
  • normal_termination_time パラメーターは、正常終了とみなす時間を設定します。この値は、実行する Python ファイルの処理時間に応じて調整する必要があります。

まとめ

このコードは、Python ファイルを実行し、実行結果やエラーを処理する機能を提供します。Streamlit などのフレームワークと連携して、ユーザーが Python コードを実行し、その結果を確認できるように設計されています。