import tools.command_list
import os
import subprocess
import threading
import sys
import time
import msvcrt
####################################################
PYTHON_PATH = "your python path"
####################################################
def create_folder(folder_path):
"""指定されたフォルダが存在しない場合、フォルダを作成します。
Args:
folder_path: 作成するフォルダのパス。
"""
folder_path = folder_path.replace("\\", "/")
if not os.path.exists(folder_path):
os.makedirs(folder_path)
if folder_path.endswith("/"):
return folder_path
else:
return folder_path + "/"
WORK_SPACE_DIR = create_folder(os.getcwd())
AGENT_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Agents")
CODE_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Code")
DELOVERABLES_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Deliverables")
TEMP_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Temp")
class PythonFileExecuteData():
"""pythonファイル実行時の出力保存"""
def __init__(self):
self.clear()
def clear(self):
self.stdout = ""
self.error = ""
self.returncode = 0
g_pfed = PythonFileExecuteData()
def read_output(process):
while True:
line = process.stdout.readline()
if line == b'':
break
try:
print(line.decode('utf-8'), end='')
g_pfed.stdout += line.decode('utf-8')
except UnicodeDecodeError:
if str is type(line):
g_pfed.error += line
def read_error(process):
while True:
line = process.stderr.readline()
if line == b'':
break
try:
print(line.decode('utf-8'), end='')
g_pfed.error += line.decode('utf-8')
except UnicodeDecodeError:
if str is type(line):
g_pfed.error += line
def check_error_python_file_execute(python_file, input_data, normal_termination_time=5):
"""
別プロセスで実行される関数。
# 別のPythonコードを実行する
# python_fileのコードを実行します。input_dataは引数です。
Args:
python_file: 実行したいパイソンファイル
input_data: pythonファイルに渡す引数
Returns:
実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
"""
print("python_file_execute:")
g_pfed.clear()
process = subprocess.Popen([PYTHON_PATH, CODE_SPACE_DIR + python_file, input_data],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE
)
# 標準出力と標準エラー出力を別々のスレッドで読み込む
output_thread = threading.Thread(target=read_output, args=(process,))
error_thread = threading.Thread(target=read_error, args=(process,))
output_thread.start()
error_thread.start()
sttime=time.time()
# メインスレッドでキー入力待ち
while True:
# キー入力があるか確認
try:
if msvcrt.kbhit():
# キー入力があれば、子プロセスにデータを送信
input_data = msvcrt.getch()
if input_data == b'\x1b': # escキー
print("escキーが押されました")
process.terminate()
break
# キー入力があったら、子プロセスにデータを送信
if process.poll() is None:
if process.stdin is not None:
process.stdin.write(input_data)
process.stdin.flush()
else:
print("子プロセスは標準入力を受け取っていません")
else:
print("子プロセスはすでに終了しています")
break
else:
time.sleep(0.01)
if 0 < len(g_pfed.error):
print("エラーが発生しました。")
process.terminate()
break
if (normal_termination_time < time.time() - sttime):
process.terminate()
print(str(normal_termination_time)+"秒正常に動作しました。")
break
except EOFError:
break
# スレッドを終了
output_thread.join()
error_thread.join()
# 子プロセスの終了ステータスを取得
returncode = process.wait()
g_pfed.returncode = returncode
print(" g_pfed.returncode", g_pfed.returncode)
if returncode == 0:
print("コマンドが正常に終了しました")
else:
traceback_list = g_pfed.error.splitlines()
if 0< len(traceback_list):
print(f"traceback_list: {traceback_list}")
print("コマンドがエラーで終了しました")
print("g_pfed.stdout, g_pfed.error, g_pfed.returncode",
g_pfed.stdout, g_pfed.error, g_pfed.returncode)
return g_pfed.stdout, g_pfed.error, g_pfed.returncode
###################################################
class FllowControlleData():
def __init__(self):
self.code_error = False
self.code = ""
self.ext_list = [] # 拡張子
g_fcd = FllowControlleData()
# ファイルの実行とputhonエラーの報告
def get_file_list(folder_path):
"""指定されたフォルダ内のファイルリストを取得する関数。
Args:
folder_path: ファイルリストを取得したいフォルダのパス。
Returns:
フォルダ内のファイルリスト。
"""
file_list = []
folder_path = os.path.normpath(folder_path) # パスを正規化
for root, _, files in os.walk(folder_path):
for file in files:
file_list.append(os.path.join(root, file))
return file_list
def get_program_code_from_response(r_str):
# プログラムコードw抜き出す
lines = r_str.split("\n")
now_code = False
code_dict = {}
ctype = ""
for line in lines:
l_line = line.lower()
l_line = line.rstrip()
if l_line.startswith("```"):
if False is now_code:
now_code = True
ctype = l_line[3:]
if ctype not in code_dict:
code_dict[ctype] = ""
print("ctype", ctype)
continue
if "```" == l_line:
if now_code:
now_code = False
continue
if now_code:
code_dict[ctype] += line + "\n"
return code_dict
def get_lengest_program_code_from_response(r_str):
# プログラムコードw抜き出す
lines = r_str.split("\n")
now_code = False
code_dict = {}
code = ""
ctype = ""
for line in lines:
l_line = line.lower()
l_line = line.rstrip()
if l_line.startswith("```"):
if False is now_code:
now_code = True
code = ""
ctype = l_line[3:]
if ctype not in code_dict:
code_dict[ctype] = ""
print("ctype", ctype)
continue
if "```" == l_line:
if now_code:
if len(code_dict[ctype]) < len(code):
code_dict[ctype] = code
now_code = False
continue
if now_code:
code += line + "\n"
return code_dict
def save_program_code_to_temp(code_dict):
for key, value in code_dict.items():
if "" != key:
if "python" == key:
ext = ".py"
elif "javascript" == key:
ext = ".js"
elif "c++" == key:
ext = ".cpp"
elif "c++" == key:
ext = ".cpp"
elif "ruby" == key:
ext = ".rb"
elif "perl" == key:
ext = ".pl"
elif "r" == key:
ext = ".rd"
else:
ext = "." + key
# コードの種類を設置
g_fcd.ext_list.append(ext)
print("key", key)
tools.command_list.save_program_file("temp"+ext, value)
def python_error_check(r_str):
g_fcd.ext_list = []
code = get_lengest_program_code_from_response(r_str)
print("code", code)
if g_fcd.code == code:
return "ソースコードが前から変わっていません"
g_fcd.code = code
save_program_code_to_temp(code)
print("g_fcd.ext_list", g_fcd.ext_list)
if ".py" in g_fcd.ext_list:
output, error, returncode = check_error_python_file_execute("temp.py", "")
print("output, error", output, error, returncode)
if "" == output and "" == error:
g_fcd.code_error = False
return ""
elif "" == error:
g_fcd.code_error = False
return "実行した結果。標準出力の内容は\r\n" + output + "\r\n"
elif "" == output:
g_fcd.code_error = True
return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n"
else:
g_fcd.code_error = True
return "実行した結果。エラーが発生しました。\r\nエラーの内容は\r\n" + error + "\r\n標準出力の内容は\r\n" + output + "\r\n"
else:
g_fcd.code_error = False
return ""
def get_code_error():
return g_fcd.code_error
AIによる説明
Python ファイル実行とエラーチェック機能のリファレンスマニュアル
このコードは、Python ファイルを実行し、実行結果やエラーを処理する機能を提供します。Streamlit などのフレームワークと連携して、ユーザーが Python コードを実行し、その結果を確認できるように設計されています。
ファイル構成
tools/command_list.py
: ファイル保存などのコマンドを実行するためのモジュール。main.py
: Python ファイル実行とエラーチェックのメインロジックを含むファイル。
クラスと関数
1. PythonFileExecuteData
クラス
- Python ファイル実行時の出力やエラー情報を保存するためのクラスです。
- メソッド:
__init__(self)
: 初期化時に出力、エラー、リターンコードをクリアします。clear(self)
: 出力、エラー、リターンコードをクリアします。
2. check_error_python_file_execute(python_file, input_data, normal_termination_time=5)
関数
- 指定された Python ファイルを実行し、実行結果とエラー情報を返します。
- 引数:
python_file
: 実行する Python ファイル名。input_data
: Python ファイルに渡す引数。normal_termination_time
: 正常終了とみなす時間(秒)。
- 返り値:
(stdout, stderr, returncode)
: 標準出力、標準エラー出力、リターンコードのタプル。
3. FllowControlleData
クラス
- Python ファイル実行時のエラー状態やコード情報を保存するためのクラスです。
- メソッド:
__init__(self)
: 初期化時にエラー状態、コード、拡張子リストをクリアします。
4. get_file_list(folder_path)
関数
- 指定されたフォルダ内のファイルリストを取得します。
- 引数:
folder_path
: ファイルリストを取得するフォルダのパス。
- 返り値:
- フォルダ内のファイルリスト。
5. get_program_code_from_response(r_str)
関数
- テキストからプログラムコードを抜き出します。
- 引数:
r_str
: テキストデータ。
- 返り値:
- プログラムコードの辞書。キーは言語の種類(例: “python”、”javascript”)、値はコードです。
6. get_lengest_program_code_from_response(r_str)
関数
- テキストから最も長いプログラムコードを抜き出します。
- 引数:
r_str
: テキストデータ。
- 返り値:
- プログラムコードの辞書。キーは言語の種類(例: “python”、”javascript”)、値はコードです。
7. save_program_code_to_temp(code_dict)
関数
- プログラムコードを一時ファイルとして保存します。
- 引数:
code_dict
: プログラムコードの辞書。
8. python_error_check(r_str)
関数
- Python コードを実行し、エラーが発生した場合にエラーメッセージを返します。
- 引数:
r_str
: テキストデータ。
- 返り値:
- エラーメッセージ。
9. get_code_error()
関数
- Python コードの実行結果がエラーかどうかを返します。
- 返り値:
True
: エラーが発生した場合。False
: エラーが発生しなかった場合。
使用方法
check_error_python_file_execute()
関数を使用して、Python ファイルを実行します。python_error_check()
関数を使用して、実行結果にエラーが含まれているかどうかを確認します。get_code_error()
関数を使用して、エラーが発生したかどうかを確認します。
例
# Python ファイルを実行
stdout, stderr, returncode = check_error_python_file_execute("my_script.py", "input_data")
# エラーメッセージを取得
error_message = python_error_check(stdout)
# エラーが発生したかどうかを確認
is_error = get_code_error()
# 結果を表示
print(f"標準出力: {stdout}")
print(f"標準エラー出力: {stderr}")
print(f"リターンコード: {returncode}")
print(f"エラーメッセージ: {error_message}")
print(f"エラーが発生しましたか: {is_error}")
注意点
- このコードは、Windows 環境でのみ動作します。
- Python ファイルを実行する前に、
PYTHON_PATH
変数を適切な Python インタープリターのパスに設定する必要があります。 normal_termination_time
パラメーターは、正常終了とみなす時間を設定します。この値は、実行する Python ファイルの処理時間に応じて調整する必要があります。
まとめ
このコードは、Python ファイルを実行し、実行結果やエラーを処理する機能を提供します。Streamlit などのフレームワークと連携して、ユーザーが Python コードを実行し、その結果を確認できるように設計されています。