PYTHON_PATHにはあなたの環境でのpython.exeへのパスを通してください。
import tools.command_list
import os
import subprocess
import threading
import sys
import time
import msvcrt
####################################################
PYTHON_PATH = "your python path"
####################################################
def create_folder(folder_path):
"""指定されたフォルダが存在しない場合、フォルダを作成します。
Args:
folder_path: 作成するフォルダのパス。
"""
folder_path = folder_path.replace("\\", "/")
if not os.path.exists(folder_path):
os.makedirs(folder_path)
if folder_path.endswith("/"):
return folder_path
else:
return folder_path + "/"
WORK_SPACE_DIR = create_folder(os.getcwd())
AGENT_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Agents")
CODE_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Code")
DELOVERABLES_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Deliverables")
TEMP_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Temp")
class PythonFileExecuteData():
"""pythonファイル実行時の出力保存"""
def __init__(self):
self.clear()
def clear(self):
self.stdout = ""
self.error = ""
self.returncode = 0
g_pfed = PythonFileExecuteData()
def read_output(process):
while True:
line = process.stdout.readline()
if line == b'':
break
try:
print(line.decode('utf-8'), end='')
g_pfed.stdout += line.decode('utf-8')
except UnicodeDecodeError:
if str is type(line):
g_pfed.error += line
def read_error(process):
while True:
line = process.stderr.readline()
if line == b'':
break
try:
print(line.decode('utf-8'), end='')
g_pfed.error += line.decode('utf-8')
except UnicodeDecodeError:
if str is type(line):
g_pfed.error += line
def check_error_python_file_execute(python_file, input_data, normal_termination_time=5):
"""
別プロセスで実行される関数。
# 別のPythonコードを実行する
# python_fileのコードを実行します。input_dataは引数です。
Args:
python_file: 実行したいパイソンファイル
input_data: pythonファイルに渡す引数
Returns:
実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
"""
print("python_file_execute:")
g_pfed.clear()
process = subprocess.Popen([PYTHON_PATH, CODE_SPACE_DIR + python_file, input_data],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE
)
# 標準出力と標準エラー出力を別々のスレッドで読み込む
output_thread = threading.Thread(target=read_output, args=(process,))
error_thread = threading.Thread(target=read_error, args=(process,))
output_thread.start()
error_thread.start()
sttime=time.time()
# メインスレッドでキー入力待ち
while True:
# キー入力があるか確認
try:
if msvcrt.kbhit():
# キー入力があれば、子プロセスにデータを送信
input_data = msvcrt.getch()
if input_data == b'\x1b': # escキー
print("escキーが押されました")
process.terminate()
break
# キー入力があったら、子プロセスにデータを送信
if process.poll() is None:
if process.stdin is not None:
process.stdin.write(input_data)
process.stdin.flush()
else:
print("子プロセスは標準入力を受け取っていません")
else:
print("子プロセスはすでに終了しています")
break
else:
time.sleep(0.01)
if 0 < len(g_pfed.error):
print("エラーが発生しました。")
process.terminate()
break
if (normal_termination_time < time.time() - sttime):
process.terminate()
print(str(normal_termination_time)+"秒正常に動作しました。")
break
except EOFError:
break
# スレッドを終了
output_thread.join()
error_thread.join()
# 子プロセスの終了ステータスを取得
returncode = process.wait()
g_pfed.returncode = returncode
print(" g_pfed.returncode", g_pfed.returncode)
if returncode == 0:
print("コマンドが正常に終了しました")
else:
traceback_list = g_pfed.error.splitlines()
print(f"traceback_list: {traceback_list}")
print("コマンドがエラーで終了しました")
print("g_pfed.stdout, g_pfed.error, g_pfed.returncode",
g_pfed.stdout, g_pfed.error, g_pfed.returncode)
return g_pfed.stdout, g_pfed.error, g_pfed.returncode
###################################################
class FllowControlleData():
def __init__(self):
self.code_error = False
self.code = ""
self.ext_list = [] # 拡張子
g_fcd = FllowControlleData()
# ファイルの実行とputhonエラーの報告
def get_file_list(folder_path):
"""指定されたフォルダ内のファイルリストを取得する関数。
Args:
folder_path: ファイルリストを取得したいフォルダのパス。
Returns:
フォルダ内のファイルリスト。
"""
file_list = []
folder_path = os.path.normpath(folder_path) # パスを正規化
for root, _, files in os.walk(folder_path):
for file in files:
file_list.append(os.path.join(root, file))
return file_list
def get_program_code_from_response(r_str):
# プログラムコードw抜き出す
lines = r_str.split("\n")
now_code = False
code_dict = {}
ctype = ""
for line in lines:
l_line = line.lower()
l_line = line.rstrip()
if l_line.startswith("```"):
if False is now_code:
now_code = True
ctype = l_line[3:]
if ctype not in code_dict:
code_dict[ctype] = ""
print("ctype", ctype)
continue
if "```" == l_line:
if now_code:
now_code = False
continue
if now_code:
code_dict[ctype] += line + "\n"
return code_dict
def get_lengest_program_code_from_response(r_str):
# プログラムコードw抜き出す
lines = r_str.split("\n")
now_code = False
code_dict = {}
code = ""
ctype = ""
for line in lines:
l_line = line.lower()
l_line = line.rstrip()
if l_line.startswith("```"):
if False is now_code:
now_code = True
code = ""
ctype = l_line[3:]
if ctype not in code_dict:
code_dict[ctype] = ""
print("ctype", ctype)
continue
if "```" == l_line:
if now_code:
if len(code_dict[ctype]) < len(code):
code_dict[ctype] = code
now_code = False
continue
if now_code:
code += line + "\n"
return code_dict
def save_program_code_to_temp(code_dict):
for key, value in code_dict.items():
if "" != key:
if "python" == key:
ext = ".py"
elif "javascript" == key:
ext = ".js"
elif "c++" == key:
ext = ".cpp"
elif "c++" == key:
ext = ".cpp"
elif "ruby" == key:
ext = ".rb"
elif "perl" == key:
ext = ".pl"
elif "r" == key:
ext = ".rd"
else:
ext = "." + key
# コードの種類を設置
g_fcd.ext_list.append(ext)
print("key", key)
tools.command_list.save_program_file("temp"+ext, value)
def python_error_check(r_str):
g_fcd.ext_list = []
code = get_lengest_program_code_from_response(r_str)
print("code", code)
if g_fcd.code == code:
return "ソースコードが前から変わっていません"
g_fcd.code = code
save_program_code_to_temp(code)
print("g_fcd.ext_list", g_fcd.ext_list)
if ".py" in g_fcd.ext_list:
print("python_error_check step 01")
output, error, returncode = check_error_python_file_execute("temp.py", "")
print("output, error", output, error, returncode)
if "" == output and "" == error:
g_fcd.code_error = False
return ""
elif "" == error:
g_fcd.code_error = False
return "実行した結果。標準出力の内容は\r\n" + output + "\r\n"
elif "" == output:
g_fcd.code_error = True
return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n"
else:
g_fcd.code_error = True
return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n標準出力の内容は\r\n" + output + "\r\n"
else:
g_fcd.code_error = False
return ""
def get_code_error():
return g_fcd.code_error
以下AIによる説明
このコードは、Python ファイルを実行し、その結果を分析してエラーを検出する機能を提供するようです。
コードの機能
- ファイルの読み込みと実行:
check_error_python_file_execute
関数は、指定された Python ファイルを実行し、標準出力、標準エラー出力、および終了コードを返します。subprocess.Popen
を使用して、別プロセスで Python ファイルを実行します。threading
を使用して、標準出力と標準エラー出力を別々のスレッドで読み込みます。msvcrt
を使用して、ユーザーからのキー入力を受け取ります。- エスケープキーが押された場合、プロセスを終了します。
- 標準エラー出力にエラーメッセージが含まれている場合、プロセスを終了します。
- 指定された時間内にプロセスが正常に終了しない場合、プロセスを終了します。
- エラーの検出:
python_error_check
関数は、応答からプログラムコードを抽出し、Python ファイルとして保存します。check_error_python_file_execute
関数を使用して、保存された Python ファイルを実行します。- 実行結果に基づいて、エラーが発生したかどうかを判断します。
- エラーが発生した場合、エラーメッセージと標準出力を返します。
- コードの保存:
save_program_code_to_temp
関数は、応答から抽出したプログラムコードを一時ファイルとして保存します。
- その他の機能:
get_file_list
関数は、指定されたフォルダ内のファイルリストを取得します。get_program_code_from_response
関数は、応答からプログラムコードを抽出します。get_lengest_program_code_from_response
関数は、応答から最も長いプログラムコードを抽出します。
コードの使用方法
PYTHON_PATH
変数を、Python インタープリターのパスに設定します。check_error_python_file_execute
関数を使用して、Python ファイルを実行します。python_error_check
関数を使用して、実行結果を分析し、エラーを検出します。
コードの改善点
- エラーメッセージをより詳細にする。
- 複数のプログラミング言語をサポートする。
- コードのフォーマットを改善する。
コードの例
# Python ファイルを実行する
output, error, returncode = check_error_python_file_execute("my_script.py", "")
# エラーを検出する
error_message = python_error_check(output)
# エラーメッセージを出力する
print(error_message)
参考資料
このコードは、Python ファイルを実行し、エラーを検出するための基本的な機能を提供します。より高度な機能が必要な場合は、コードを拡張する必要があります。