from langchain_core.tools import tool
from pydantic import BaseModel, Field
import subprocess
import sys, io
import threading
import time
import msvcrt
from duckduckgo_search import DDGS
import requests
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from tools.program_called_command_list import save_text as __save_text
from tools.program_called_command_list import load_text as __load_text
from tools.program_called_command_list import load_agent_file as __load_agent_file
from tools.program_called_command_list import get_main_agent_name_list as __get_agent_name_list
from tools.program_called_command_list import append_python_program_function_direct
from tools.program_called_command_list import load_python_file
from flow.flow_controller import python_error_check_main
import tools.program_called_command_list
from tools.program_called_command_list import wait_safety
####################################################
# PYTHON_PATH = "user python path"
PYTHON_PATH = tools.program_called_command_list.PYTHON_PATH
CODE_SPACE_DIR = tools.program_called_command_list.CODE_SPACE_DIR
####################################################
class ToolsDataBase():
def __init__(self):
self.root_ai_agent_dict = {}
self.ai_agent_dict = {}
self.task_end_flag = {}
self.save_text_file_name = None
g_tdb = ToolsDataBase()
####################################################
class PythonCode(BaseModel):
code: str = Field()
@tool(args_schema=PythonCode)
def execute_python_code(code) -> str:
"""
Pythonコードを実行し、標準出力を取得します。
Args:
code: 実行するPythonコード。
Returns:
実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
"""
print("execute_python_code:")
wait_safety() # リクエストの制限を越えないために必要
try:
# 標準出力のキャプチャ
stdout_buffer = io.StringIO()
old_stdout = sys.stdout
sys.stdout = stdout_buffer
# コードを実行します。
exec(code)
# 標準出力を取得します。
stdout_output = stdout_buffer.getvalue()
# 標準出力を元に戻します。
sys.stdout = old_stdout
return "コードが正常に実行されました。\r\n", stdout_output
except Exception as e:
# エラーが発生した場合、エラーメッセージと標準出力を返します。
import traceback
error_message = f"コードの実行中にエラーが発生しました: {e}\n"
error_message += traceback.format_exc() # トレースバックを含める
return error_message, stdout_buffer.getvalue()
########################################################
class PipCommand(BaseModel):
module_name: str = Field()
command: str = Field()
@tool(args_schema=PipCommand)
def run_pip_command(command: str, module_name: str) -> str:
"""
pipコマンドを実行する関数。
Args:
command: 実行するpip のコマンド 例: install など。
module_name: コマンドの対称としたいモジュールの名p
Returns:
実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
"""
print("run_pip_command:")
wait_safety() # リクエストの制限を越えないために必要
try:
# Pythonのバージョンを自動検出する。shebang行で指定されているバージョンを使うのが理想的だが、ここではシステムのデフォルトを使う。
command = ['py', '-3.10', '-m', 'pip', command, module_name]
# check=Trueでエラーを例外として扱う
process = subprocess.run(command, capture_output=True,
text=True, check=True)
return process.stdout
except subprocess.CalledProcessError as e:
return f"pipコマンドの実行に失敗しました: 戻り値={e.returncode},\
標準出力={e.stdout}, 標準エラー出力={e.stderr}"
except FileNotFoundError:
return "pipコマンドが見つかりません。pipがインストールされていることを確認してください。"
except Exception as e:
return f"予期せぬエラーが発生しました: {e}"
###################################################
class PythonFileExecuteData():
"""pythonファイル実行時の出力保存"""
def __init__(self):
self.clear()
def clear(self):
self.stdout = ""
self.error = ""
self.returncode = 0
g_pfed = PythonFileExecuteData()
def _find_line_numbers_with_string(text, string):
"""
文章の中から、特定の文字列がある行番号を返すプログラムです。
Args:
text: 文章
string: 特定の文字列
Returns:
特定の文字列がある行番号のリスト
"""
lines = text.splitlines()
result = []
for i, line in enumerate(lines):
if string in line:
print("len(line.rstrip())", len(line.rstrip()))
print("len(string.rstrip())", len(string.rstrip()))
result.append(i + 1) # 行番号は1から始まるので、1を加算
return result
class PythonProgramFunctionData(BaseModel):
function_name: str = Field()
explanation: str = Field()
program: str = Field()
@tool(args_schema=PythonProgramFunctionData)
def append_python_program_function(function_name=None,
explanation="", program="") -> str:
"""
pythonプログラムで書かれた機能追加します。
既に存在している場合何もしません。
Args:
function_name:機能の名前。半角英数のみ利用できます。:
explanation:機能が何をする機能か簡易な説明。:
program:機能を実現するためのpythonのプログラム
return:
プログラムを実行しての標準出力とエラーがある時はエラーの内容がテキストで出力されます。
"""
text = python_error_check_main(program)
if tools.flow_controller.g_fcd.code_error:
text += "\r\nエラーを修正してください"
else:
append_python_program_function_direct(function_name,
explanation, program)
tools.command_list.g_time_keeper.wait()
wait_safety() # リクエストの制限を越えないために必要
return text
def read_output(process):
while True:
line = process.stdout.readline()
if line == b'':
break
try:
print(line.decode('utf-8'), end='')
g_pfed.stdout += line.decode('utf-8')
except UnicodeDecodeError:
if str is type(line):
g_pfed.error += line
def read_error(process):
while True:
line = process.stderr.readline()
if line == b'':
break
try:
print(line.decode('utf-8'), end='')
g_pfed.error += line.decode('utf-8')
except UnicodeDecodeError:
if str is type(line):
g_pfed.error += line
class PythonFileExeCute(BaseModel):
python_file: str = Field()
input_data: str = Field()
normal_termination_time: float = Field()
@tool(args_schema=PythonFileExeCute)
def python_file_execute(python_file, input_data, normal_termination_time=5):
"""
別プロセスで実行される関数。
# 別のPythonプログラムを実行する
# python_fileのプログラムを実行します。input_dataは引数です。
Args:
python_file: 実行したいpyrhonファイル
input_data: pythonファイルに渡す引数
normal_termination_time: 強制終了するまでの時間
Returns:
str(
実行結果と標準出力。エラーが発生した場合は、エラーメッセージと標準出力を返します。
)
"""
global PYTHON_PATH
global CODE_SPACE_DIR
print("python_file_execute:")
g_pfed.clear()
process = subprocess.Popen([PYTHON_PATH, CODE_SPACE_DIR + python_file,
input_data],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
stdin=subprocess.PIPE
)
# 標準出力と標準エラー出力を別々のスレッドで読み込む
output_thread = threading.Thread(target=read_output, args=(process,))
error_thread = threading.Thread(target=read_error, args=(process,))
output_thread.start()
error_thread.start()
sttime = time.time()
# メインスレッドでキー入力待ち
while True:
# キー入力があるか確認
try:
if msvcrt.kbhit():
# キー入力があれば、子プロセスにデータを送信
input_data = msvcrt.getch()
if input_data == b'\x1b': # escキー
print("escキーが押されました")
process.terminate()
break
# キー入力があったら、子プロセスにデータを送信
if process.poll() is None:
if process.stdin is not None:
process.stdin.write(input_data)
process.stdin.flush()
else:
print("子プロセスは標準入力を受け取っていません")
else:
print("子プロセスはすでに終了しています")
break
else:
time.sleep(0.01)
if 0 < len(g_pfed.error):
print("エラーが発生しました。")
process.terminate()
break
if (normal_termination_time < time.time() - sttime):
process.terminate()
print(str(normal_termination_time)+"秒正常に動作しました。")
break
except EOFError:
break
# スレッドを終了
output_thread.join()
error_thread.join()
# 子プロセスの終了ステータスを取得
returncode = process.wait()
g_pfed.returncode = returncode
print(" g_pfed.returncode", g_pfed.returncode)
if returncode == 0:
print("コマンドが正常に終了しました")
else:
# traceback_list = traceback.format_exc().splitlines()
traceback_list = g_pfed.error.splitlines()
# traceback_list = traceback.format_exc(process.stderr).splitlines()
print(f"traceback_list: {traceback_list}")
if 1 < len(traceback_list):
number = _find_line_numbers_with_string(
load_python_file(python_file),
traceback_list[-2])
print(f"行番号: {number}")
print("コマンドがエラーで終了しました")
print("g_pfed.stdout, g_pfed.error, g_pfed.returncode",
g_pfed.stdout, g_pfed.error, g_pfed.returncode)
wait_safety() # リクエストの制限を越えないために必要
return g_pfed.stdout, g_pfed.error, g_pfed.returncode
###################################################
###########################################################
class TextFileSave(BaseModel):
file_name: str = Field()
data: str = Field()
class TextFileLoad(BaseModel):
data: str = Field()
#####################################
class FixFileData(BaseModel):
data: str = Field()
@tool(args_schema=FixFileData)
def save_task_manager_progless_file(data):
"""
エージェントが全体作業計画書の進捗管理用のデータを保存します。
詳細作業計画書とは違います。
Args:
data:作業計画書の進捗管理用のデータ
"""
print("save_task_manager_progless_file:")
__save_text("progress.txt", data)
wait_safety() # リクエストの制限を越えないために必要
@tool
def load_task_manager_progless_file():
"""
エージェントが全体作業計画書の進捗管理用のデータを読み込みます。
詳細作業計画書とは違います。
Returns:
読み込んだ 作業計画書の進捗管理用のデータを返します
"""
print("load_task_manager_progless_file:")
result = __load_text("progress.txt")
wait_safety() # リクエストの制限を越えないために必要
return result
@tool(args_schema=TextFileSave)
def save_deliverables(file_name, data):
"""
成果物を保存します。
Args:
file_name:ファイル名:
data:保存するテキストデータ:
"""
print("save_deliverables:", file_name)
__save_text("deliverables/"+file_name, data)
wait_safety() # リクエストの制限を越えないために必要
@tool(args_schema=TextFileSave)
def save_text(file_name, data):
"""
テキストファイルをセーブします
Args:
file_name:ファイル名:
data:保存するテキストデータ:
"""
print("save_text:", file_name)
__save_text("Temp/" + file_name, data)
g_tdb.save_text_file_name = file_name
wait_safety() # リクエストの制限を越えないために必要
@tool(args_schema=TextFileLoad)
def load_text(file_name):
"""
テキストファイルをロードします
Args:
file_name:ファイル名:
Returns:
読み込んだ テキストのデータを返します
"""
print("load_text:", file_name)
result = __load_text("Temp/" + file_name)
wait_safety() # リクエストの制限を越えないために必要
return result
@tool(args_schema=TextFileSave)
def save_text_temp_file(file_name, data):
"""
エージェントが一時的に作業用のデータを一時作業(Temp)フォルダ保存します。
エージェントが各エージェントが作業の保存に使用します。
エージェントが全体作業報告書にファイル名を記載してください。
エージェントが作業名_詳細作業布告書を保存ずるのに使います。
エージェントが作業名_詳細作業布告書にファイル名を記載してください。
エージェントが動作確認のために一時的にpythonデータを保存したりするのに使います。
Args:
file_name:ファイル名:
data:保存するデータ:
"""
print("save_text_temp_file:", file_name)
__save_text("Temp/"+file_name, data)
wait_safety() # リクエストの制限を越えないために必要
@tool(args_schema=TextFileLoad)
def load_text_temp_file(file_name):
"""
エージェントが一時的に作業用のデータを保存します。
エージェントが全体作業報告書に書かれたファイルを読み込むのに使います。
エージェントが作業名_詳細作業布告書を読み込むのに使います。
エージェントが作業名_詳細作業布告書に書かれたファイルを読み込むのに使います。
エージェントが動作確認のために一時的に保存したpythonデータを読み込むのに使います。
Args:
file_name:ファイル名:
Returns:
str(読み込んだ 一時作業データを返します)
"""
print("load_text_temp_file:", file_name)
result = __load_text("Temp/"+file_name)
wait_safety() # リクエストの制限を越えないために必要
return result
##########################################################
def __save_agent_file(file_name, data=""):
__save_text("Agents/"+file_name, data)
@tool(args_schema=TextFileSave)
def save_agent_file(file_name, data=""):
"""
AIエージェント用のプロントデータを保存します。
Args:
file_name:保存ファイル名:
data:プロンプトテキスト
"""
print("save_agent_file:", file_name)
__save_agent_file(file_name, data)
wait_safety() # リクエストの制限を越えないために必要
@tool(args_schema=TextFileLoad)
def load_agent_file(file_name):
"""
agentプロンプトを読み込みます。
Args:
file_name:agentファイル名:
Returns:
str(
読み込んだagentプロンプトを返します
)
"""
print("load_agent_file:", file_name)
result = __load_agent_file(file_name)
wait_safety() # リクエストの制限を越えないために必要
return result
###
def get_ai_agent(agent_name):
"""
指定されたai_agentオブジェクトを返します
Arg:
agent_name:Ai_agentの名前
Returns:
AIAgentオブジェクト
"""
print("get_ai_agent", agent_name)
if agent_name in g_tdb.ai_agent_dict:
return g_tdb.ai_agent_dict[agent_name][1]
else:
return None
@tool(args_schema=TextFileSave)
def modify_ai_agent_prompt(file_name, data):
"""
AIエージェントオブジェクトのプロンプトを修正します。
Args:
file_name:エージェント名:
data:プロンプトテキスト
"""
print("modify_ai_agent_prompt:", file_name)
if file_name in g_tdb.ai_agent_dict:
__save_agent_file(file_name+".txt", data)
wait_safety() # リクエストの制限を越えないために必要
#####################################################
def get_tool_list():
return [web_search, # web検索をします。
get_web_text_links, # webページのテキストデータをリンク付きで得ます。
execute_python_code, # python コードを実行します。
run_pip_command, # pip を実行します
save_task_manager_progless_file,
load_task_manager_progless_file,
save_text,
load_text,
modify_ai_agent_prompt, # AIエージェントのプロンプトを修正します。
get_agent_name_list, # AIエージェントの名前のリストを取得します。
append_ai_agent, #
modify_ai_agent_explanation, #
check_same_code, # コードが同じかどうか確認
specify_agent #
]
@tool
def get_agent_name_list():
"""
今使えるエージェントの説明を列挙します。
作業を渡すべきエージェントを確認sるのに使います。
Returns:
str(
エージェント名1:エージェントの説明
エージェント名2:エージェントの説明
......
というテキストを返します
)
"""
print("get_agent_name_list:")
wait_safety() # リクエストの制限を越えないために必要
return __get_agent_name_list()
class AgentListData(BaseModel):
agent_name: str = Field()
explanation: str = Field()
system_prompt: str = Field()
@tool(args_schema=AgentListData)
def append_ai_agent(agent_name=None, explanation="", system_prompt=""):
"""
新しく作成したエージェントの情報を設定します。
既に存在している場合何もしません。
今あるエージェントのプトンプとを修正したい場合は
modify_ai_agent_prompt
を使用してください。
Args:
agent_name:エージェント名:
explanation:何をするエージェントか簡易な説明。:
system_prompt:Aiエージェントのシステムプロンプト
"""
print("append_ai_agent:", agent_name)
if agent_name not in g_tdb.ai_agent_dict:
g_tdb.ai_agent_dict[agent_name] = [explanation, system_prompt]
__save_agent_file("ai_agent_name_list.list", __get_agent_name_list())
__save_agent_file(agent_name+".txt", system_prompt)
wait_safety() # リクエストの制限を越えないために必要
@tool(args_schema=TextFileSave)
def modify_ai_agent_explanation(file_name, data):
"""
AIエージェントリストの書かれている説明を修正します。
Args:
file_name:エージェント名:
data:説明テキスト
"""
print("modify_ai_agent_explanation:", file_name)
if file_name in g_tdb.ai_agent_dict:
g_tdb.ai_agent_dict[file_name][0] = data
__save_agent_file("ai_agent_name_list.list", __get_agent_name_list())
wait_safety() # リクエストの制限を越えないために必要
#################################################
class CheckCodeData(BaseModel):
pre_code: str = Field()
now_code: str = Field()
@tool(args_schema=CheckCodeData)
def check_same_code(pre_code, now_code):
"""
以前のコードと同じかどうかを比較する機能。
同じ場合はシステムプロンプトや指示を変える必要がある。
ひとつ前だけでなく2,3回前のコードも確認して堂々巡りになってないか確認するのにも使う。
比較対象は、プログラムだけでなくテキストデータであればこれで比較できる。
Args:
pre_code:エージェント名:
now_code:説明テキスト
Returns:
同じときTrue
異なっているときFalse
を返す。
"""
print("check_same_code:")
wait_safety() # リクエストの制限を越えないために必要
if pre_code == now_code:
return True
return False
##################################################
class SetTaskEndData(BaseModel):
next_agent: str = Field()
@tool(args_schema=SetTaskEndData)
def specify_agent(next_agent):
"""
next_agentに必ず次に作業をしてほしいエージェントをget_agent_name_listで得られたアルファベットの名前を指定してください。
Args:
next_agent:これまでの作業の経緯から、次に作業を渡したい相手のエージェント名を入れます。
"""
print("specify_agent:", next_agent)
g_tdb.task_end_flag["RoleAssignmentOfficer"] = next_agent
wait_safety() # リクエストの制限を越えないために必要
def get_next_agent():
"""
プログラムでのロジック制御の際に呼び出します。
Returns:
次に作業を渡すエージェントを返します。
"""
if "RoleAssignmentOfficer" in g_tdb.task_end_flag:
next_agent = g_tdb.task_end_flag["RoleAssignmentOfficer"]
g_tdb.task_end_flag["RoleAssignmentOfficer"] = None
else:
next_agent = None
return next_agent
#############################################################
class WebSerchQuery(BaseModel):
keyword: str = Field()
@tool(args_schema=WebSerchQuery)
def web_search(keyword):
"""
DuckDuckGoでウェブ検索を実行します。
Args:
query: 検索キーワード:キーワードはクエリと書かれることがあります。
Returns:
str(
検索結果のテキスト
タイトル:....
URL:https://......
説明:....
--------------------
タイトル:....
URL:https://......
説明:....
--------------------
タイトル:....
URL:https://......
説明:....
--------------------
........
)
"""
wait_safety() # リクエストの制限を越えないために必要
try:
print("web_search", keyword)
serched_data = DDGS().text(keyword,
region='wt-wt',
safesearch='off',
timelimit=None,
max_results=10)
if serched_data:
result = ""
for data in serched_data:
result += f"タイトル: {data['title']}\r\n"
result += f"URL: {data['href']}\r\n"
result += f"説明: {data['body']}\r\n"
result += "-" * 20 + "\r\n"
return result + "\r\n" + "作業を続けてください。"
else:
return "検索結果が見つかりませんでした。"
except Exception as e:
print(f"エラーが発生しました: {e}")
return f"エラーが発生しました: {e}"
class WebURL(BaseModel):
url: str = Field()
@tool(args_schema=WebURL)
def check_url(url: str) -> bool:
"""指定されたURLが正常に読み込めるかどうかを判定します。
Args:
url: チェックするURL
Returns:
正常に読み込める場合はTrue、そうでない場合はFalse
"""
wait_safety() # リクエストの制限を越えないために必要
try:
response = requests.get(url)
print("get_web_text_links", url)
return response.status_code == 200
except requests.exceptions.RequestException:
print("get_web_text_links", url)
return False
@tool(args_schema=WebURL)
def get_web_text_links(url):
"""
この機能は以下のような文章でしようされるます。
・指定されたurlをクリックしテキストデータを得ます。
・指定されたURLのテキストデータとテキストに設定されたリンクを得ます。
・ホームページにアクセスして
・リンク先にアクセスして
・リンクをクリックして
urlは,ホームページアドレス、リンク、ページへのリンク、ページのurlなどと呼ばれることがあります。
ホームページは、サイト、webページ、webサイト,ページなどと呼ばれることがあります。
Args:
url: WebページのURL:urlはホームページアドレス、リンク、ページへのリンク、ページのurlなどと呼ばれることがあります。
Returns:
str(
ページ内にあるテキストとリンクデータを以下のようなフォーマットで得ます。
テキスト......
テキスト<リンクurl>テキスト.....
テキスト<リンクurl>.....
テキスト......
テキスト......
.....
#エラーの時は空になります。
)
"""
# Chromeのオプションを設定
options = Options()
options.add_argument('--incognito')
options.add_argument(
'--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64)\
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36')
# Chromeドライバを起動
driver = webdriver.Chrome(options=options)
wait_safety() # リクエストの制限を越えないために必要
# 指定されたURLにアクセス
try:
driver.get(url)
except Exception as e:
print(f"Error accessing URL: {url}, {e}")
return "nodatanodata\rnodata:<nodata>"
print("step02")
# テキストデータを取得
try:
text_data = driver.find_element(By.TAG_NAME, "body").text
except Exception as e:
print(f"Error getting text data: {url}, {e}")
return "リンク切れです"
print("step03")
text_data = text_data.replace("<br>", "\n")
text_data = text_data.replace("<br/>", "\n")
text_data_list = text_data.split("\n")
# リンクとそのテキストを取得
try:
links = driver.find_elements(By.TAG_NAME, "a")
except Exception as e:
print(f"Error getting links: {url}, {e}")
return "nodata:<nodata>"
ht_list = []
for k in range(len(links)):
link = links[k]
if None is link:
continue
# テキストが短すぎ利場合ははじく。欠点はページ番号のリンクが取れない。
if "" == link.text or len(link.text) <= 2:
continue
# link.get_attribute("href")が1回しかできないのでリストに入れておく
try:
ht_list.append([link.get_attribute("href"),
link.text])
except Exception as e:
print(f"Error getting link attribute: {url}, {e}")
continue
for i in range(len(text_data_list)):
if None is text_data_list[i]:
continue
if "" == text_data_list[i] or len(text_data_list[i]) <= 2:
continue
for ht in ht_list:
href = ht[0]
text = ht[1]
if text in text_data_list[i]:
try:
text_data_list[i] = text_data_list[i].replace(
text, f"{text}: <{href}>")
except Exception as e:
print(f"Error replacing text: {url}, {e}")
continue
break
result = ""
for text in text_data_list:
result += text + "\n" + "作業を続けてください。"
# ブラウザを閉じる
try:
driver.quit()
except Exception as e:
print(f"Error closing browser: {url}, {e}")
return result
print("get_web_text_links", url)
return result