AGIの入り口の前:ソースコード:command_list.py

PYTHON_PATH = “user python path”
はpython実行のパス。コマンドでも行けるかもしれないが試してはいない。

@toolがついてるのが呼び出し対象となる関数。
ついていないのは、元からロジックで呼び出すために作られたのもあるが、AIが呼んでくれなかったり、呼んだらエラーばかり出すためあきらめたのもある。
g_time_keeperは、geminの無料版のリクエスト数の制限に対応するため、スリープを入れるためのもの。

check_error_python_file_executeがpythonファイルを実行してエラーを確認するプログラム。5秒問題なく動けば、OKとしている。


from langchain_core.tools import tool
from langchain_core.pydantic_v1 import (BaseModel, Field)


import subprocess
import os, sys, io
import subprocess
import threading
import sys
import time
import msvcrt
####################################################
PYTHON_PATH = "user python path"
####################################################
class ToolsDataBase():
    def __init__(self):
        self.ai_agent_dict = {}
        self.task_end_flag = {}
        self.save_text_file_name = None
g_tdb = ToolsDataBase()

class Timekeeper():
    def __init__(self):
        self.time_buffer = time.time()
        self.interval = 4.1

    def wait(self):
        time_buf = self.interval - (time.time()-self.time_buffer)
        print("time_buf",time_buf)
        if 0 < time_buf:
            time.sleep(time_buf)
        self.time_buffer = time.time()
g_time_keeper = Timekeeper()

def create_folder(folder_path):
    """指定されたフォルダが存在しない場合、フォルダを作成します。
  
    Args:
      folder_path: 作成するフォルダのパス。
    """
    folder_path = folder_path.replace("\\", "/")
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    if folder_path.endswith("/"):
        return folder_path
    else:
        return folder_path + "/"

WORK_SPACE_DIR = create_folder(os.getcwd())
AGENT_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Agents")
CODE_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Code")
DELOVERABLES_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Deliverables")
TEMP_SPACE_DIR = create_folder(WORK_SPACE_DIR + "Temp")


def set_agent_space(space):
    AGENT_SPACE_DIR = create_folder(space)


def set_code_space(space):
    CODE_SPACE_DIR = create_folder(space)


def set_deliverables_space(space):
    DELOVERABLES_SPACE_DIR = create_folder(space)


def set_temp_space(space):
    TEMP_SPACE_DIR = create_folder(space)


def set_work_space(space):
    WORK_SPACE_DIR = create_folder(space)
    WORK_SPACE_DIR = WORK_SPACE_DIR.replace("\\", "/")
    
    set_agent_space(WORK_SPACE_DIR+"Agents")
    set_code_space(WORK_SPACE_DIR+"Code")
    set_deliverables_space(WORK_SPACE_DIR+"Deliverables")
    set_temp_space(WORK_SPACE_DIR+"Deliverables")
####################################################


class PythonCode(BaseModel):
    code: str = Field()

@tool(args_schema=PythonCode)
def execute_python_code(code):
    """
    Pythonコードを実行し、標準出力を取得します。

    Args:
      code: 実行するPythonコード。
    Returns:
      実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
    """
    print("execute_python_code:")
    try:
        # 標準出力のキャプチャ
        stdout_buffer = io.StringIO()
        old_stdout = sys.stdout
        sys.stdout = stdout_buffer

        # コードを実行します。
        exec(code)

        # 標準出力を取得します。
        stdout_output = stdout_buffer.getvalue()

        # 標準出力を元に戻します。
        sys.stdout = old_stdout
        g_time_keeper.wait()  # googleのレスポンス規制に対応
        return "コードが正常に実行されました。", stdout_output
    except Exception as e:
        # エラーが発生した場合、エラーメッセージと標準出力を返します。

        return f"エラーが発生しました: {e}", stdout_buffer.getvalue()

########################################################
class PipCommand(BaseModel):
    module_name: str = Field()
    command: str = Field()

@tool(args_schema=PipCommand)
def run_pip_command(command, module_name):
    """
    pipコマンドを実行する関数。
    Args:
        command: 実行するpip のコマンド 例: install など。
        module_name: コマンドの対称としたいモジュールの名p

    Returns:
        実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
    """
    print("run_pip_command:")
    command = ['py', '-3.10', '-m', 'pip', command, module_name]
    process = subprocess.run(command, capture_output=True, text=True)
    g_time_keeper.wait()  # googleのレスポンス規制に対応
    return process.stdout

###################################################



class  PythonFileExecuteData():
    """pythonファイル実行時の出力保存"""
    def __init__(self):
        self.clear()

    def clear(self):
        self.stdout = ""
        self.error = ""
        self.returncode = 0

g_pfed = PythonFileExecuteData()


def _find_line_numbers_with_string(text, string):
    """
    文章の中から、特定の文字列がある行番号を返すプログラムです。

    Args:
      text: 文章
      string: 特定の文字列

    Returns:
      特定の文字列がある行番号のリスト
    """
    lines = text.splitlines()
    result = []
    for i, line in enumerate(lines):
        if string in line:
            print("len(line.rstrip())", len(line.rstrip()))
            print("len(string.rstrip())", len(string.rstrip()))
            
            result.append(i + 1)  # 行番号は1から始まるので、1を加算
    return result


def read_output(process):
    while True:
        line = process.stdout.readline()
        if line == b'':
            break

        try:
            print(line.decode('utf-8'), end='')
            g_pfed.stdout += line.decode('utf-8')
        except UnicodeDecodeError:
            if str is type(line):
                g_pfed.error += line

def read_error(process):
    while True:
        line = process.stderr.readline()
        if line == b'':
            break
        try:
            print(line.decode('utf-8'), end='')
            g_pfed.error += line.decode('utf-8')

        except UnicodeDecodeError:
            if str is type(line):
                g_pfed.error += line


def check_error_python_file_execute(python_file, input_data, normal_termination_time=5):
    """
    別プロセスで実行される関数。
    # 別のPythonコードを実行する
    # python_fileのコードを実行します。input_dataは引数です。
    Args:
        python_file: 実行したいパイソンファイル
        input_data: pythonファイルに渡す引数
    Returns:
        実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
    """
    print("python_file_execute:")
    g_pfed.clear()
    process = subprocess.Popen([PYTHON_PATH, CODE_SPACE_DIR + python_file, input_data],
                                stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE
                                )

    # 標準出力と標準エラー出力を別々のスレッドで読み込む
    output_thread = threading.Thread(target=read_output, args=(process,))
    error_thread = threading.Thread(target=read_error, args=(process,))
    output_thread.start()
    error_thread.start()
    sttime=time.time()
    # メインスレッドでキー入力待ち
    while True:
      # キー入力があるか確認
        try:

            if msvcrt.kbhit():
                # キー入力があれば、子プロセスにデータを送信
                input_data = msvcrt.getch()

                if input_data == b'\x1b':  # escキー
                    print("escキーが押されました")
                    process.terminate()
                    break

                # キー入力があったら、子プロセスにデータを送信
                if process.poll() is None:
                    if process.stdin is not None:
                        process.stdin.write(input_data)
                        process.stdin.flush()
                    else:
                        print("子プロセスは標準入力を受け取っていません")
                else:
                    print("子プロセスはすでに終了しています")
                    break
            else:
                time.sleep(0.01)
                if 0 < len(g_pfed.error):
                    print("エラーが発生しました。")
                    process.terminate()
                    break
            if (normal_termination_time < time.time() - sttime):
                process.terminate()
                print(str(normal_termination_time)+"秒正常に動作しました。")

                break
            
        except EOFError:
            break

    # スレッドを終了
    output_thread.join()
    error_thread.join()

    # 子プロセスの終了ステータスを取得
    returncode = process.wait()
    g_pfed.returncode = returncode
    print(" g_pfed.returncode", g_pfed.returncode)
    if returncode == 0:
        print("コマンドが正常に終了しました")
    else:
        # traceback_list = traceback.format_exc().splitlines()
        traceback_list = g_pfed.error.splitlines()
        # traceback_list = traceback.format_exc(process.stderr).splitlines()
        print(f"traceback_list: {traceback_list}")

        if 1 < len(traceback_list):

            number = _find_line_numbers_with_string(
                load_python_file(python_file),
                traceback_list[-2])
            print(f"行番号: {number}")

        print("コマンドがエラーで終了しました")
        print("g_pfed.stdout, g_pfed.error, g_pfed.returncode",
              g_pfed.stdout, g_pfed.error, g_pfed.returncode)
    return g_pfed.stdout, g_pfed.error, g_pfed.returncode
###################################################

def __save_text(file_name, data):
    file_name = file_name.replace("../", "")
    file_name = file_name.replace("..\\", "")
    data = data.replace("\\r", "\r")
    data = data.replace("\\n", "\n")
    try:
        with open(WORK_SPACE_DIR+file_name, 'wb') as f_out:
            f_out.write(data.encode())
    except PermissionError:
        print(f"ファイルへの書き込み権限がありません: {file_name}")
    except OSError as e:
        print(f"ファイル書き込み中にエラーが発生しました: {e}")
    except Exception as e:
        print(f"ファイル書き込み中にエラーが発生しました: {e}")

def __load_text(file_name):
    file_name = file_name.replace("../", "")
    file_name = file_name.replace("..\\", "")
    full_path = WORK_SPACE_DIR+file_name
    if os.path.exists(full_path):
        try:
            with open(full_path, 'rb') as f_in:
                data = f_in.read()
            return data.decode('utf-8')

        except Exception as e:
            print(f"ファイル読み込み中にエラーが発生しました: {e}")
        
    else:
        print("ファイルが見つかりませんでした")
    return None
###########################################################
class TextFileSave(BaseModel):
    file_name: str = Field()
    data: str = Field()
class TextFileLoad(BaseModel):
    data: str = Field()

def save_program_file(file_name, data):
    """
    pythonコードを保存します。
    Args:
        data:pythonコード
    """
    print("save_python_file:")
    __save_text("Code/" + file_name, data)


def load_python_file(file_name):
    """
    pythonコードを読み込みます。
    Returns:
        読み込んだpythonコードを返します
    """
    print("load_python_file:")
    result = __load_text("Code/" + file_name)
    return result
#####################################
class FixFileData(BaseModel):
    data: str = Field()

@tool(args_schema=FixFileData)
def save_task_manager_progless_file(data):
    """
    エージェントが全体作業計画書の進捗管理用のデータを保存します。
    詳細作業計画書とは違います。
    Args:
        data:作業計画書の進捗管理用のデータ
    """
    print("save_task_manager_progless_file:")
    __save_text("progress.txt", data)
    g_time_keeper.wait()  # googleのレスポンス規制に対応

@tool
def load_task_manager_progless_file():
    """
    エージェントが全体作業計画書の進捗管理用のデータを読み込みます。
    詳細作業計画書とは違います。
    Returns:
        読み込んだ 作業計画書の進捗管理用のデータを返します
    """
    print("load_task_manager_progless_file:")
    result = __load_text("progress.txt")
    g_time_keeper.wait()
    return result


@tool(args_schema=TextFileSave)
def save_deliverables(file_name, data):
    """
    成果物を保存します。
    Args:
        file_name:ファイル名:
        data:保存するテキストデータ:
    """
    print("save_deliverables:", file_name)
    __save_text("deliverables/"+file_name, data)
    g_time_keeper.wait()  # googleのレスポンス規制に対応


@tool(args_schema=TextFileSave)
def save_text(file_name, data):
    """
    テキストファイルをセーブします
    Args:
        file_name:ファイル名:
        data:保存するテキストデータ:

    """
    print("save_text:", file_name)
    __save_text("Temp/" + file_name, data)
    g_tdb.save_text_file_name = file_name
    g_time_keeper.wait()  # googleのレスポンス規制に対応



@tool(args_schema=TextFileLoad)
def load_text(file_name):
    """
    テキストファイルをロードします
    Args:
        file_name:ファイル名:
    Returns:
        読み込んだ テキストのデータを返します

    """
    print("load_text:", file_name)
    result = __load_text("Temp/" + file_name)
    g_time_keeper.wait()
    return result

@tool(args_schema=TextFileSave)
def save_text_temp_file(file_name, data):
    """
    エージェントが一時的に作業用のデータを一時作業(Temp)フォルダ保存します。
    エージェントが各エージェントが作業の保存に使用します。
    エージェントが全体作業報告書にファイル名を記載してください。
    エージェントが作業名_詳細作業布告書を保存ずるのに使います。
    エージェントが作業名_詳細作業布告書にファイル名を記載してください。
    エージェントが動作確認のために一時的にpythonデータを保存したりするのに使います。

    Args:
        file_name:ファイル名:
        data:保存するデータ:

    """
    print("save_text_temp_file:", file_name)
    __save_text("Temp/"+file_name, data)
    g_time_keeper.wait()  # googleのレスポンス規制に対応

@tool(args_schema=TextFileLoad)
def load_text_temp_file(file_name):
    """
    エージェントが一時的に作業用のデータを保存します。
    エージェントが全体作業報告書に書かれたファイルを読み込むのに使います。
    エージェントが作業名_詳細作業布告書を読み込むのに使います。
    エージェントが作業名_詳細作業布告書に書かれたファイルを読み込むのに使います。
    エージェントが動作確認のために一時的に保存したpythonデータを読み込むのに使います。
    Args:
        file_name:ファイル名:
    Returns:
        読み込んだ 一時作業データを返します

    """
    print("load_text_temp_file:", file_name)
    result = __load_text("Temp/"+file_name)
    g_time_keeper.wait()
    return result


##########################################################


def __save_agent_file(file_name, data=""):
    __save_text("Agents/"+file_name, data)

@tool(args_schema=TextFileSave)
def save_agent_file(file_name, data=""):
    """
    AIエージェント用のプロントデータを保存します。
    Args:
        file_name:保存ファイル名:
        data:プロンプトテキスト
    """
    print("save_agent_file:", file_name)
    __save_agent_file(file_name, data)
    g_time_keeper.wait()  # googleのレスポンス規制に対応

def __load_agent_file(file_name):
    result = __load_text("Agents/" + file_name)
    return result


@tool(args_schema=TextFileLoad)
def load_agent_file(file_name):
    """
    agentプロンプトを読み込みます。
    Args:
        file_name:agentファイル名:
    Returns:
        str(
        読み込んだagentプロンプトを返します
        )
    """
    print("load_agent_file:", file_name)
    result = __load_agent_file(file_name)
    return result
###

def get_ai_agent(agent_name):
    """
    指定されたai_agentオブジェクトを返します
    Arg:
        agent_name:Ai_agentの名前
    Returns:
        AIAgentオブジェクト
    """
    print("get_ai_agent", agent_name)
    if agent_name in g_tdb.ai_agent_dict:
        return g_tdb.ai_agent_dict[agent_name][1]
    else:
        return None

@tool(args_schema=TextFileSave)
def modify_ai_agent_prompt(file_name, data):
    """
    AIエージェントオブジェクトのプロンプトを修正します。
    Args:
        file_name:エージェント名:
        data:プロンプトテキスト
    
    """
    print("modify_ai_agent_prompt:", file_name)
    if file_name in g_tdb.ai_agent_dict:
        __save_agent_file(file_name+".txt", data)
        g_time_keeper.wait()  # googleのレスポンス規制に対応

#####################################################
def get_tool_list():
    return [web_search, # web検索をします。
            get_web_text_links, # webページのテキストデータをリンク付きで得ます。
            execute_python_code, # python コードを実行します。
            run_pip_command,    # pip を実行します
            save_text,
            load_text,
            modify_ai_agent_prompt,  # AIエージェントのプロンプトを修正します。
            get_agent_name_list,  # AIエージェントの名前のリストを取得します。
            append_ai_agent,  #
            modify_ai_agent_explanation,  #
            check_same_code,  #  コードが同じかどうか確認
            specify_agent  #
            ]

def load_ai_agent_name_list():

    print("load_ai_agent_name_list:")
    data = __load_agent_file("ai_agent_name_list.list")
    data_list = data.split("\r\n")
    g_tdb.ai_agent_dict = {}
    list_buf = []
    for data in data_list:
        list_buf = data.split(":")
        # 説明文にがあった場合の処理
        if 2 <= len(list_buf):
            buf = ""
            for i in range(1, len(list_buf)):
                buf += list_buf[i]
                # list_bufのリストの最後の時は:を追加しない
                if i != len(list_buf) - 1:
                    buf += ":"
            if "User" == list_buf[0]:
                g_tdb.ai_agent_dict[list_buf[0]] = [buf, None]
                
            else:
                prompt = __load_agent_file(list_buf[0]+".txt")
                g_tdb.ai_agent_dict[list_buf[0]] = [buf,prompt]

def __get_agent_name_list():
    result = ''
    for key, value in g_tdb.ai_agent_dict.items():
        result += key + ':' + value[0] + '\r\n'
    g_time_keeper.wait()  # googleのレスポンス規制に対応
    return result

@tool
def get_agent_name_list():
    """
        今使えるエージェントの説明を列挙します。
        作業を渡すべきエージェントを確認sるのに使います。
        Returns:
        str(
        エージェント名1:エージェントの説明
        エージェント名2:エージェントの説明
        ......
        というテキストを返します
        )
    """    
    print("get_agent_name_list:")
    return __get_agent_name_list()


class AgentListData(BaseModel):
    agent_name: str = Field()
    explanation: str = Field()
    system_prompt: str = Field()

@tool(args_schema=AgentListData)
def append_ai_agent(agent_name=None, explanation="", system_prompt=""):
    """
        新しく作成したエージェントの情報を設定します。
        既に存在している場合何もしません。
        今あるエージェントのプトンプとを修正したい場合は
        modify_ai_agent_prompt
        を使用してください。
    Args:
        agent_name:エージェント名:
        explanation:何をするエージェントか簡易な説明。:
        system_prompt:Aiエージェントのシステムプロンプト
    
    """
    print("append_ai_agent:", agent_name)
    
    if agent_name not in g_tdb.ai_agent_dict:
        
        g_tdb.ai_agent_dict[agent_name] = [explanation,system_prompt]
        __save_agent_file("ai_agent_name_list.list", __get_agent_name_list())
        __save_agent_file(agent_name+".txt", system_prompt)
    g_time_keeper.wait()  # googleのレスポンス規制に対応

@tool(args_schema=TextFileSave)
def modify_ai_agent_explanation(file_name, data):
    """
    AIエージェントリストの書かれている説明を修正します。
    Args:
        file_name:エージェント名:
        data:説明テキスト
    
    """
    print("modify_ai_agent_explanation:", file_name)
    if file_name in g_tdb.ai_agent_dict:
        g_tdb.ai_agent_dict[file_name][0] = data
        __save_agent_file("ai_agent_name_list.list", __get_agent_name_list())
    g_time_keeper.wait()  # googleのレスポンス規制に対応

#################################################


class CheckCodeData(BaseModel):
    pre_code: str = Field()
    now_code: str = Field()

@tool(args_schema=CheckCodeData)
def check_same_code(pre_code, now_code):
    """
    以前のコードと同じかどうかを比較する機能。
    同じ場合はシステムプロンプトや指示を変える必要がある。
    ひとつ前だけでなく2,3回前のコードも確認して堂々巡りになってないか確認するのにも使う。
    比較対象は、プログラムだけでなくテキストデータであればこれで比較できる。
    Args:
        pre_code:エージェント名:
        now_code:説明テキスト
    Returns:
        同じときTrue
        異なっているときFalse
        を返す。
    """
    print("check_same_code:")
    if pre_code == now_code:
        g_time_keeper.wait()  # googleのレスポンス規制に対応
        return True
    g_time_keeper.wait()  # googleのレスポンス規制に対応
    return False
##################################################


class SetTaskEndData(BaseModel):
    next_agent: str = Field()
@tool(args_schema=SetTaskEndData)
def specify_agent(next_agent):
    """
    next_agentに必ず次に作業をしてほしいエージェントをget_agent_name_listで得られたアルファベットの名前を指定してください。
    Args:
        next_agent:これまでの作業の経緯から、次に作業を渡したい相手のエージェント名を入れます。
    
    """
    print("specify_agent:",  next_agent)
    g_tdb.task_end_flag["RoleAssignmentOfficer"] = next_agent
    g_time_keeper.wait()  # googleのレスポンス規制に対応


def get_next_agent():
    """
    プログラムでのロジック制御の際に呼び出します。
    Returns:
        次に作業を渡すエージェントを返します。
    """
    if "RoleAssignmentOfficer" in g_tdb.task_end_flag:
        next_agent = g_tdb.task_end_flag["RoleAssignmentOfficer"]
        g_tdb.task_end_flag["RoleAssignmentOfficer"] = None
    else:
        next_agent = None
    return next_agent




#############################################################
from duckduckgo_search import DDGS
class WebSerchQuery(BaseModel):
    keyword: str = Field()

@tool(args_schema=WebSerchQuery)
def web_search(keyword):
    """
    DuckDuckGoでウェブ検索を実行します。
    Args:
      query: 検索キーワード:キーワードはクエリと書かれることがあります。

    Returns:
    str(
      検索結果のテキスト
      タイトル:....
      URL:https://......
      説明:....
      --------------------
      タイトル:....
      URL:https://......
      説明:....
      --------------------
      タイトル:....
      URL:https://......
      説明:....
      --------------------
      ........
      )
    """

    try:
        print("web_search", keyword)
        serched_data = DDGS().text(keyword,
                                   region='wt-wt',
                                   safesearch='off',
                                   timelimit=None,
                                   max_results=10)
        
        if serched_data:
            result = ""
            for data in serched_data:
                result += f"タイトル: {data['title']}\r\n"
                result += f"URL: {data['href']}\r\n"
                result += f"説明: {data['body']}\r\n"
                result += "-" * 20 + "\r\n"
            g_time_keeper.wait()  # googleのレスポンス規制に対応

            return result +"\r\n" +"作業を続けてください。"
        else:
            g_time_keeper.wait()  # googleのレスポンス規制に対応
            return "検索結果が見つかりませんでした。"
        
    except Exception as e:
        print(f"エラーが発生しました: {e}")
        g_time_keeper.wait()  # googleのレスポンス規制に対応
        return f"エラーが発生しました: {e}"


import requests


class WebURL(BaseModel):
    url: str = Field()


import requests
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
import os


@tool(args_schema=WebURL)
def check_url(url: str) -> bool:
  """指定されたURLが正常に読み込めるかどうかを判定します。

  Args:
    url: チェックするURL

  Returns:
    正常に読み込める場合はTrue、そうでない場合はFalse
  """
  try:
    response = requests.get(url)
    g_time_keeper.wait()  # googleのレスポンス規制に対応
    print("get_web_text_links", url)

    return response.status_code == 200
  except requests.exceptions.RequestException:
    g_time_keeper.wait()  # googleのレスポンス規制に対応
    print("get_web_text_links", url)
    return False

@tool(args_schema=WebURL)
def get_web_text_links(url):
    """
    この機能は以下のような文章でしようされるます。
        ・指定されたurlをクリックしテキストデータを得ます。
        ・指定されたURLのテキストデータとテキストに設定されたリンクを得ます。
        ・ホームページにアクセスして
        ・リンク先にアクセスして
        ・リンクをクリックして
        urlは,ホームページアドレス、リンク、ページへのリンク、ページのurlなどと呼ばれることがあります。
        ホームページは、サイト、webページ、webサイト,ページなどと呼ばれることがあります。
    Args:
      url: WebページのURL:urlはホームページアドレス、リンク、ページへのリンク、ページのurlなどと呼ばれることがあります。
    Returns:
      str(
      ページ内にあるテキストとリンクデータを以下のようなフォーマットで得ます。
      テキスト......
      テキスト<リンクurl>テキスト.....
      テキスト<リンクurl>.....
      テキスト......
      テキスト......
      .....
      #エラーの時は空になります。
      )
    """

  # Chromeのオプションを設定
    options = Options()
    options.add_argument('--incognito')
    options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36')

  # Chromeドライバを起動
    driver = webdriver.Chrome(options=options)

  # 指定されたURLにアクセス
    print("step01")
    try:
        driver.get(url)
    except Exception as e:
        print(f"Error accessing URL: {url}, {e}")
        print("step012")
        return "nodatanodata\rnodata:<nodata>"
    print("step02")

  # テキストデータを取得
    try:
        text_data = driver.find_element(By.TAG_NAME, "body").text
    except Exception as e:
        print(f"Error getting text data: {url}, {e}")
        return "リンク切れです"
    print("step03")

    text_data = text_data.replace("<br>", "\n")
    text_data = text_data.replace("<br/>", "\n")
    text_data_list = text_data.split("\n")
      # リンクとそのテキストを取得
    try:
        links = driver.find_elements(By.TAG_NAME, "a")
    except Exception as e:
        print(f"Error getting links: {url}, {e}")
        return "nodata:<nodata>"
    print("step041")

    ht_list=[]
    for k in range(len(links)):
        link = links[k]
        if None is link:
            continue
        # テキストが短すぎ利場合ははじく。欠点はページ番号のリンクが取れない。
        if "" == link.text or len(link.text) <= 2:
            continue
        # link.get_attribute("href")が1回しかできないのでリストに入れておく
        try:
            ht_list.append([link.get_attribute("href"),
                           link.text])
        except Exception as e:
            print(f"Error getting link attribute: {url}, {e}")
            continue

    for i in range(len(text_data_list)):
        if None is text_data_list[i]:
            continue
        if "" == text_data_list[i] or len(text_data_list[i]) <= 2:
            continue
        for ht in ht_list:

            href = ht[0]
            text = ht[1]
            if text in text_data_list[i]:
                try:
                    text_data_list[i] = text_data_list[i].replace(text, f"{text}: <{href}>")
                except Exception as e:
                    print(f"Error replacing text: {url}, {e}")
                    continue
                break

    result = ""
    for text in text_data_list:
        result += text + "\n" + "作業を続けてください。"


  # ブラウザを閉じる
    try:
        driver.quit()
    except Exception as e:
        print(f"Error closing browser: {url}, {e}")
        return result
    g_time_keeper.wait()  # googleのレスポンス規制に対応
    print("get_web_text_links", url)
    return result

AIにより解説


このコードは、LangChainを使用して、AIエージェントを構築するためのツールセットを提供しています。

主な機能:

  • エージェント管理:
    • append_ai_agent: 新しいAIエージェントを追加します。
    • modify_ai_agent_prompt: エージェントのプロンプトを修正します。
    • modify_ai_agent_explanation: エージェントの説明を修正します。
    • get_agent_name_list: 利用可能なエージェントの名前と説明を取得します。
  • ファイル操作:
    • save_text: テキストファイルを保存します。
    • load_text: テキストファイルを読み込みます。
    • save_text_temp_file: 一時作業用のテキストファイルを保存します。
    • load_text_temp_file: 一時作業用のテキストファイルを読み込みます。
    • save_deliverables: 成果物を保存します。
    • save_agent_file: AIエージェント用のプロンプトデータを保存します。
    • load_agent_file: AIエージェント用のプロンプトデータを読み込みます。
    • save_task_manager_progless_file: 作業計画書の進捗管理用のデータを保存します。
    • load_task_manager_progless_file: 作業計画書の進捗管理用のデータを読み込みます。
  • コード実行:
    • execute_python_code: Pythonコードを実行します。
    • run_pip_command: pipコマンドを実行します。
    • check_same_code: 以前のコードと同じかどうかを比較します。
  • ウェブ検索:
    • web_search: DuckDuckGoでウェブ検索を実行します。
    • get_web_text_links: 指定されたURLのテキストデータとリンクを取得します。
    • check_url: 指定されたURLが正常に読み込めるかどうかを判定します。
  • タスク管理:
    • specify_agent: 次の作業を行うエージェントを指定します。
    • get_next_agent: 次の作業を行うエージェントを取得します。

コードの構造:

  • ツール定義: 各機能は、LangChainの@toolデコレータを使用してツールとして定義されています。
  • データモデル: 各ツールは、BaseModelを継承したデータモデルを使用して、入力パラメータを定義しています。
  • ファイル操作: ファイル操作は、__save_text__load_text関数を使用して行われています。
  • エージェント管理: エージェント情報は、g_tdb.ai_agent_dict辞書に格納されています。
  • タスク管理: タスクの終了フラグは、g_tdb.task_end_flag辞書に格納されています。

使用方法:

  1. ツールをインポート: 必要なツールをインポートします。
  2. ツールを使用: ツールを呼び出して、必要なパラメータを渡します。
  3. 結果を取得: ツールは、実行結果を返します。

例:

from tools import web_search

# DuckDuckGoで"Python"を検索
result = web_search(keyword="Python")

# 検索結果を出力
print(result)

注意:

  • このコードは、LangChainの@toolデコレータを使用しています。LangChainは、AIエージェントを構築するためのフレームワークです。
  • このコードは、DuckDuckGo APIを使用しています。DuckDuckGo APIを使用するには、APIキーが必要です。
  • このコードは、Python 3.6以降で動作します。

参考資料:

このコードは、AIエージェントを構築するための基本的なツールセットを提供しています。このコードをベースに、独自のAIエージェントを構築することができます。