from langchain_core.tools import tool
from langchain_core.pydantic_v1 import (BaseModel, Field)
import os
from flow.flow_controller import python_error_check_main
import tools.command_list
####################################################
PYTHON_PATH = "user python path"
####################################################
class ToolsDataBase():
def __init__(self):
self.python_program_dict = {}
g_tdb = ToolsDataBase()
def create_folder(folder_path):
"""指定されたフォルダが存在しない場合、フォルダを作成します。
Args:
folder_path: 作成するフォルダのパス。
"""
folder_path = folder_path.replace("\\", "/")
if not os.path.exists(folder_path):
os.makedirs(folder_path)
if folder_path.endswith("/"):
return folder_path
else:
return folder_path + "/"
WORK_SPACE_DIR = create_folder(os.getcwd())
AGENT_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Agents")
CODE_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Code")
DELOVERABLES_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Deliverables")
TEMP_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Temp")
def set_agent_space(space):
AGENT_SPACE_DIR = create_folder(space)
def set_code_space(space):
CODE_SPACE_DIR = create_folder(space)
def set_deliverables_space(space):
DELOVERABLES_SPACE_DIR = create_folder(space)
def set_temp_space(space):
TEMP_SPACE_DIR = create_folder(space)
def set_work_space(space):
WORK_SPACE_DIR = create_folder(space)
WORK_SPACE_DIR = WORK_SPACE_DIR.replace("\\", "/")
set_agent_space(WORK_SPACE_DIR+"Agents")
set_code_space(WORK_SPACE_DIR+"Code")
set_deliverables_space(WORK_SPACE_DIR+"Deliverables")
set_temp_space(WORK_SPACE_DIR+"Deliverables")
####################################################
def save_text(file_name, data):
file_name = file_name.replace("../", "")
file_name = file_name.replace("..\\", "")
data = data.replace("\\r", "\r")
data = data.replace("\\n", "\n")
try:
with open(WORK_SPACE_DIR+file_name, 'wb') as f_out:
f_out.write(data.encode())
except PermissionError:
print(f"ファイルへの書き込み権限がありません: {file_name}")
except OSError as e:
print(f"ファイル書き込み中にエラーが発生しました: {e}")
except Exception as e:
print(f"ファイル書き込み中にエラーが発生しました: {e}")
def load_text(file_name):
file_name = file_name.replace("../", "")
file_name = file_name.replace("..\\", "")
full_path = WORK_SPACE_DIR+file_name
if os.path.exists(full_path):
try:
with open(full_path, 'rb') as f_in:
data = f_in.read()
return data.decode('utf-8')
except Exception as e:
print(f"ファイル読み込み中にエラーが発生しました: {e}")
else:
print("ファイルが見つかりませんでした")
return None
##########################################################
def __load_python_file(fname):
if not fname.endswith(".py"):
fname += ".py"
return __load_text("Code/"+fname)
def get_python_code(fname):
if fname in g_tdb.python_program_dict.keys():
return g_tdb.python_program_dict[fname]
return ""
def load_python_program_list():
data = __load_text("Code/python_program_function_list.list")
if None is data:
return
lines = data.split("\n")
for line in lines:
l_line = line.rstrip()
data_list = l_line.split(":")
if None is not data_list:
if 1 < len(data_list):
g_tdb.python_program_dict[data_list[0]] =\
[data_list[1], __load_python_file(data_list[0])]
def __save_python_file(fname, data):
if not fname.endswith(".py"):
fname += ".py"
__save_text("Code/"+fname, data)
def __save_python_list(fname, data):
if not fname.endswith(".list"):
fname += ".list"
__save_text("Code/"+fname, data)
def __get_python_program_list():
data=""
for key in g_tdb.python_program_dict.keys():
data += key + ":" + g_tdb.python_program_dict[key][0]
data += "\n"
return data
def get_python_program_list():
return __get_python_program_list()
class PythonProgramFunctionData(BaseModel):
function_name: str = Field()
explanation: str = Field()
program: str = Field()
@tool(args_schema=PythonProgramFunctionData)
def append_python_program_function(function_name=None, explanation="", program="") -> str:
"""
pythonプログラムで書かれた機能追加します。
既に存在している場合何もしません。
Args:
function_name:機能の名前。半角英数のみ利用できます。:
explanation:機能が何をする機能か簡易な説明。:
program:機能を実現するためのpythonのプログラム
return:
プログラムを実行しての標準出力とエラーがある時はエラーの内容がテキストで出力されます。
"""
text = python_error_check_main(program)
if tools.flow_controller.g_fcd.code_error:
text += "\r\nエラーを修正してください"
else:
append_python_program_function_direct(function_name, explanation, program)
tools.command_list.g_time_keeper.wait()
return text
def append_python_program_function_direct(function_name=None, explanation="", program=""):
"""
pythonプログラムで書かれた機能追加します。
既に存在している場合何もしません。
Args:
function_name:機能の名前。半角英数のみ利用できます。:
explanation:機能が何をする機能か簡易な説明。:
program:機能を実現するためのpythonのプログラム
"""
print("append_ai_agent:", function_name)
if function_name not in g_tdb.python_program_dict:
g_tdb.python_program_dict[function_name] = [explanation,program]
__save_python_list("python_program_function_list.list", __get_python_program_list())
__save_python_file(function_name+".py", program)
#def get_append_python_program_function_parameter(text):
# if 0 <= text.find('.append_python_program_function'):
# start = text.find('(') + 1
# # 括弧の対応関係を考慮して、最後の ')' を探す
# count = 1
# end = start
# while count > 0:
# end += 1
# if text[end] == '(':
# count += 1
# elif text[end] == ')':
# count -= 1
# function_name = text[start:end]
#
# start = text.find('explanation=') + len('explanation=')
# end = text.find(',', start)
# explanation = text[start:end].strip('\'')
#
# start = text.find('function_name=') + len('function_name=')
# end = text.find(',', start)
# function_name = text[start:end].strip('\'')
#
# start = text.find('program=') + len('program=')
# # 括弧の対応関係を考慮して、最後の ')' を探す
# count = 1
# end = start
# # プログラム文字列内のシングルクォートを考慮してカウント
# quote_count = 0
# while count > 0:
# end += 1
# if text[end] == '\'':
# quote_count += 1
# if quote_count % 2 == 0: # 偶数回出現したらカウントをリセット
# quote_count = 0
# elif text[end] == '(':
# count += 1
# elif text[end] == ')':
# count -= 1
# # ループの終了条件を追加
# if end >= len(text):
# break
# program = text[start:end].strip('\'')
# return function_name, explanation, program
# return None, None, None
#