OperationcodeCreatorAgent:ソース:tools:program_called_command_list.py


from langchain_core.tools import tool
from langchain_core.pydantic_v1 import (BaseModel, Field)

import os
from flow.flow_controller import python_error_check_main
import tools.command_list
####################################################
PYTHON_PATH = "user python path"
####################################################
class ToolsDataBase():
    def __init__(self):
        self.python_program_dict = {}

g_tdb = ToolsDataBase()




def create_folder(folder_path):
    """指定されたフォルダが存在しない場合、フォルダを作成します。
  
    Args:
      folder_path: 作成するフォルダのパス。
    """
    folder_path = folder_path.replace("\\", "/")
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    if folder_path.endswith("/"):
        return folder_path
    else:
        return folder_path + "/"

WORK_SPACE_DIR = create_folder(os.getcwd())
AGENT_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Agents")
CODE_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Code")
DELOVERABLES_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Deliverables")
TEMP_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Temp")


def set_agent_space(space):
    AGENT_SPACE_DIR = create_folder(space)


def set_code_space(space):
    CODE_SPACE_DIR = create_folder(space)


def set_deliverables_space(space):
    DELOVERABLES_SPACE_DIR = create_folder(space)


def set_temp_space(space):
    TEMP_SPACE_DIR = create_folder(space)


def set_work_space(space):
    WORK_SPACE_DIR = create_folder(space)
    WORK_SPACE_DIR = WORK_SPACE_DIR.replace("\\", "/")
    
    set_agent_space(WORK_SPACE_DIR+"Agents")
    set_code_space(WORK_SPACE_DIR+"Code")
    set_deliverables_space(WORK_SPACE_DIR+"Deliverables")
    set_temp_space(WORK_SPACE_DIR+"Deliverables")
####################################################


def save_text(file_name, data):
    file_name = file_name.replace("../", "")
    file_name = file_name.replace("..\\", "")
    data = data.replace("\\r", "\r")
    data = data.replace("\\n", "\n")
    try:
        with open(WORK_SPACE_DIR+file_name, 'wb') as f_out:
            f_out.write(data.encode())
    except PermissionError:
        print(f"ファイルへの書き込み権限がありません: {file_name}")
    except OSError as e:
        print(f"ファイル書き込み中にエラーが発生しました: {e}")
    except Exception as e:
        print(f"ファイル書き込み中にエラーが発生しました: {e}")

def load_text(file_name):
    file_name = file_name.replace("../", "")
    file_name = file_name.replace("..\\", "")
    full_path = WORK_SPACE_DIR+file_name
    if os.path.exists(full_path):
        try:
            with open(full_path, 'rb') as f_in:
                data = f_in.read()
            return data.decode('utf-8')

        except Exception as e:
            print(f"ファイル読み込み中にエラーが発生しました: {e}")
        
    else:
        print("ファイルが見つかりませんでした")
    return None

##########################################################
def __load_python_file(fname):
    if not fname.endswith(".py"):
        fname += ".py"
    return __load_text("Code/"+fname)

def get_python_code(fname):
    if fname in g_tdb.python_program_dict.keys():
        return g_tdb.python_program_dict[fname]
    return ""
def load_python_program_list():
    data = __load_text("Code/python_program_function_list.list")
    if None is data:
        return 
    lines = data.split("\n")
    for line in lines:
        l_line = line.rstrip()
        data_list = l_line.split(":")
        if None is not data_list:
            if 1 < len(data_list):
                g_tdb.python_program_dict[data_list[0]] =\
                    [data_list[1], __load_python_file(data_list[0])]
                
def __save_python_file(fname, data):
    if not fname.endswith(".py"):
        fname += ".py"

    __save_text("Code/"+fname, data)

                
def __save_python_list(fname, data):
    if not fname.endswith(".list"):
        fname += ".list"

    __save_text("Code/"+fname, data)

def __get_python_program_list():
    data=""
    for key in g_tdb.python_program_dict.keys():

        data += key + ":" + g_tdb.python_program_dict[key][0]
        data += "\n"
    return data
def get_python_program_list():
    return __get_python_program_list()

class PythonProgramFunctionData(BaseModel):
    function_name: str = Field()
    explanation: str = Field()
    program: str = Field()
    
@tool(args_schema=PythonProgramFunctionData)
def append_python_program_function(function_name=None, explanation="", program="") -> str:
    """
        pythonプログラムで書かれた機能追加します。
        既に存在している場合何もしません。
    Args:
        function_name:機能の名前。半角英数のみ利用できます。:
        explanation:機能が何をする機能か簡易な説明。:
        program:機能を実現するためのpythonのプログラム
    return:
        プログラムを実行しての標準出力とエラーがある時はエラーの内容がテキストで出力されます。
    """
    text = python_error_check_main(program)
    if tools.flow_controller.g_fcd.code_error:
        text += "\r\nエラーを修正してください"
    else:
        append_python_program_function_direct(function_name, explanation, program)
    tools.command_list.g_time_keeper.wait()
    return text

def append_python_program_function_direct(function_name=None, explanation="", program=""):
    """
        pythonプログラムで書かれた機能追加します。
        既に存在している場合何もしません。
    Args:
        function_name:機能の名前。半角英数のみ利用できます。:
        explanation:機能が何をする機能か簡易な説明。:
        program:機能を実現するためのpythonのプログラム
    
    """
    print("append_ai_agent:", function_name)
    
    if function_name not in g_tdb.python_program_dict:
        
        g_tdb.python_program_dict[function_name] = [explanation,program]
        __save_python_list("python_program_function_list.list", __get_python_program_list())
        __save_python_file(function_name+".py", program)


#def get_append_python_program_function_parameter(text):
#    if 0 <= text.find('.append_python_program_function'):
#        start = text.find('(') + 1
#        # 括弧の対応関係を考慮して、最後の ')' を探す
#        count = 1
#        end = start
#        while count > 0:
#            end += 1
#            if text[end] == '(':
#                count += 1
#            elif text[end] == ')':
#                count -= 1
#        function_name = text[start:end]
#
#        start = text.find('explanation=') + len('explanation=')
#        end = text.find(',', start)
#        explanation = text[start:end].strip('\'')
#
#        start = text.find('function_name=') + len('function_name=')
#        end = text.find(',', start)
#        function_name = text[start:end].strip('\'')
#
#        start = text.find('program=') + len('program=')
#        # 括弧の対応関係を考慮して、最後の ')' を探す
#        count = 1
#        end = start
#        # プログラム文字列内のシングルクォートを考慮してカウント
#        quote_count = 0
#        while count > 0:
#            end += 1
#            if text[end] == '\'':
#                quote_count += 1
#                if quote_count % 2 == 0:  # 偶数回出現したらカウントをリセット
#                    quote_count = 0
#            elif text[end] == '(':
#                count += 1
#            elif text[end] == ')':
#                count -= 1
#            # ループの終了条件を追加
#            if end >= len(text):
#                break
#        program = text[start:end].strip('\'')
#        return function_name, explanation, program
#    return None, None, None
#