from langchain_core.tools import tool
from pydantic import BaseModel, Field
import hashlib
import requests
import json
from typing import Optional, List, Dict, Union
from datetime import datetime
#from opensearchpy import OpenSearch
from pdfminer.high_level import extract_text
from bs4 import BeautifulSoup
import base64
import io
from PIL import Image
import numpy as np
import time
import uuid
#from langchain_community.embeddings import OllamaEmbeddings
from langchain_ollama import OllamaEmbeddings
from langchain_openai import OpenAIEmbeddings
import unicodedata # 日本語判定用
import re
import ollama
import os
if "__main__" == __name__:
from embeding_multimodal import get_image_data_embedding, get_audio_data_embedding
from LocalSearchClient import LocalSearchClient
else:
from tools.embeding_multimodal import get_image_data_embedding, get_audio_data_embedding
from tools.LocalSearchClient import LocalSearchClient
from tools.program_called_command_list import wait_safety
def is_predominantly_japanese(text, threshold_ratio=0.3):
"""
テキストが主に日本語であるかどうかを判定します。
ひらがな、カタカナ、漢字の文字が指定された割合以上含まれていればTrueを返します。
"""
if not text:
return False
japanese_char_count = 0
total_significant_chars = 0 # 空白以外の文字数
for char in text:
if char.isspace():
continue
total_significant_chars += 1
# unicodedata.script は Python 3.8+ で利用可能
try:
script = unicodedata.script(char)
if script in ('Hiragana', 'Katakana', 'Han'):
japanese_char_count += 1
except AttributeError: # 古いPythonのためのフォールバック (より単純な判定)
if '\u3040' <= char <= '\u309F' or \
'\u30A0' <= char <= '\u30FF' or \
'\u4E00' <= char <= '\u9FFF':
japanese_char_count += 1
if total_significant_chars == 0:
return False
return (japanese_char_count / total_significant_chars) >= threshold_ratio
def get_token_count(response_text: str) -> int:
if is_predominantly_japanese(response_text):
return len(response_text) # 日本語の場合は文字数をトークン数とみなす
else:
return len(response_text.split()) # それ以外はスペースで分割した単語数をトークン数とみなす
def split_sentences(text):
# 簡易文分割
# 小数点を一時的に置換
text = re.sub(r'(\d)\.(\d)', r'\1<DECIMAL>\2', text)
# 文末候補: ピリオド + スペース + 大文字
text = re.sub(r'\.(\s+[A-Z])', r'.<SPLIT>\1', text)
# 分割
sentences = [s.strip() for s in text.split("<SPLIT>")]
# 小数点を戻す
sentences = [s.replace("<DECIMAL>", ".") for s in sentences]
return sentences
class SemanticIndexer:
def __init__(self, backend="ollama", ollama_url="http://localhost:11434", lmstudio_url="http://localhost:1234", index_name="base"):
self.backend = backend # ベクトル生成のバックエンド(ollama または lmstudio)
self.ollama_url = ollama_url
self.lmstudio_url = lmstudio_url
#self.client = os_client or OpenSearch(hosts=[{"host": "localhost", "port": 9200}])
self.client = LocalSearchClient(
#db_path="local.db",
index_dir="vector_indexes",
dim=768
)
#self.client = os_client or OpenSearch(
# hosts=[{"host": "localhost", "port": 9200}],
# use_ssl=True,
# http_auth=("admin", "admin"), # デフォルトのユーザー名とパスワード
#
# verify_certs=False,
# ssl_show_warn=False
#)
self.embedding_cache = {} # embeddingキャッシュ(text → vector)
self.text_embedder = OllamaEmbeddings(model="embeddinggemma:300m")
#self.text_embedder = OpenAIEmbeddings(model="text-embedding-ada-002",base_url="http://localhost:1234/v1", api_key="not-needed") # LM Studio OpenAI互換API用
self.page_max_token = 1500
# ベクトル生成(キャッシュ付き)
def _embed(self, text: str) -> List[float]:
key = hashlib.sha256(text.encode()).hexdigest()
if key in self.embedding_cache:
return self.embedding_cache[key]
url = f"{self.ollama_url}/api/embeddings" if self.backend == "ollama" else f"{self.lmstudio_url}/v1/embeddings"
payload = {"model": "embeddinggemma:300m", "prompt" if self.backend == "ollama" else "input": text}
response = requests.post(url, json=payload)
if "embedding" not in response.json():
print("no embedding\n", response.json())
embedding = response.json()["embedding"] if self.backend == "ollama" else response.json()["data"][0]["embedding"]
self.embedding_cache[key] = embedding
print("text embedding size",len(embedding))
return embedding
# イメージのembedding生成
def _embed_image(self, image_bytes: bytes) -> List[float]:
# 画像をベクトルに変換(ここでは簡易的な方法としてOpenCVを使用)
# 実際の実装では、画像特徴量抽出モデル(CLIPなど)を使用する
try:
img = Image.open(io.BytesIO(image_bytes))
img_array = np.array(img)
# 簡易的な特徴量として画像の平均値を使用(実際にはモデルを使用する)
features = get_image_data_embedding(img_array)
# 実際の実装では、この特徴量をさらにembeddingモデルで処理する
#print(features)
result=None
if len(features) < 768:
result= np.append(features[0],[0.0] * (768 - len(features[0])))
elif 768 < len(features[0]):
print("error image embedding size over.", len(features[0]))
return result # 768次元に合わせる
except Exception as e:
print(f"Image embedding failed: {e}")
return [0.0] * 768 # デフォルトベクトル
# 音声のembedding生成(簡易的な実装)
def _embed_audio(self, audio_bytes: bytes) -> List[float]:
try:
# 1. 入力データがすでにNumPy配列かどうかをチェック
if isinstance(audio_bytes, np.ndarray):
# NumPy配列の場合、直接処理する
# get_audio_data_embedding() が期待する形式(1次元浮動小数点配列)であることを確認
if audio_bytes.ndim != 1 or not np.issubdtype(audio_bytes.dtype, np.floating):
raise ValueError("Input NumPy array must be a 1D floating-point array.")
# 配列をそのまま渡す
return get_audio_data_embedding(audio_bytes)[0]
else:
# bytesの場合、NumPy配列に変換してから処理する
np_array = np.frombuffer(audio_bytes, dtype=np.float32)
return get_audio_data_embedding(np_array)[0]
except Exception as e:
print(f"Audio embedding failed: {e}")
return [0.0] * 768
# インデックス作成(拡張フィールド追加)
def create_index(self, name: str, description: str = "", dimension: int = 768):
print("create_index called\n")
if not self.client.indices.exists(index=name):
self.client.indices.create(index=name, body={
"settings": {"index": {"knn": True}},
"mappings": {
"properties": {
"text": {"type": "text"},
"embedding": {"type": "knn_vector", "dimension": dimension},
"embedding_state": {"type": "knn_vector", "dimension": dimension},
"embedding_purpose": {"type": "knn_vector", "dimension": dimension},
"embedding_result": {"type": "knn_vector", "dimension": dimension},
"image_embedding": {"type": "knn_vector", "dimension": dimension},
"audio_embedding": {"type": "knn_vector", "dimension": dimension},
"page": {"type": "integer"},
"section": {"type": "keyword"},
"state": {"type": "keyword"},
"purpose": {"type": "keyword"},
"result": {"type": "keyword"},
"previous_page_id": {"type": "keyword"},
"next_page_id": {"type": "keyword"},
"publication_date": {"type": "date"},
"created_at": {"type": "date"},
"updated_at": {"type": "date"},
"access_at": {"type": "date"},
"media_type": {"type": "keyword"}, # "text", "image", "audio"
"media_data": {"type": "binary"} # 元のメディアデータ(オプション)
}
}
})
print(self.client.embedding_fields)
if not self.client.indices.exists(index="index-metadata"):
self.client.indices.create(index="index-metadata", body={
"settings": {"index": {"knn": True}},
"mappings": {
"properties": {
"description": {"type": "text"}
}
}
})
self.client.index(index="index-metadata", id=name, body={"description": description})
# インデックス説明の更新
def update_index_description(self, name: str, description: str):
self.client.index(index="index-metadata", id=name, body={"description": description})
# インデックス一覧(説明付き)
def list_indices(self):
print("list_indices called\n")
indices = self.client.indices.get_alias().keys()
return [{
"index": i,
"description": self.client.get(index="index-metadata", id=i)["_source"].get("description", "")
} for i in indices if i != "index-metadata"]
# 指定されたindexが存在するか確認する。
def index_exists(self, index: str) -> bool:
return self.client.indices.exists(index=index)
# 単一データ登録(拡張フィールド対応)
def register(
self,
index: str,
text: str = "",
summary: Optional[str] = None,
state: Optional[str] = None,
purpose: Optional[str] = None,
result: Optional[str] = None,
image_bytes: Optional[bytes] = None,
audio_bytes: Optional[bytes] = None,
section: Optional[str] = None,
page: Optional[int] = None,
previous_page_id: Optional[str] = None,
next_page_id: Optional[str] = None,
publication_date: Optional[Union[str, datetime]] = None,
id: Optional[str] = None,
media_type: str = "text"
):
embedding = self._embed(summary) if summary else [0.0] * 768
embedding_state = self._embed(state) if state else [0.0] * 768
embedding_purpose = self._embed(purpose) if purpose else [0.0] * 768
embedding_result = self._embed(result) if result else [0.0] * 768
image_embedding = self._embed_image(image_bytes) if None is not image_bytes else [0.0] * 768
audio_embedding = self._embed_audio(audio_bytes) if None is not audio_bytes else [0.0] * 768
doc_id = id or self._generate_id()
body = {
"text": text,
"embedding": embedding,
"embedding_state": embedding_state,
"embedding_purpose": embedding_purpose,
"embedding_result": embedding_result,
"image_embedding": image_embedding,
"audio_embedding": audio_embedding,
"media_type": media_type,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"access_at": datetime.now().isoformat()
}
if state:
body["state"] = state
if purpose:
body["purpose"] = purpose
if result:
body["result"] = result
if page is not None:
body["page"] = page
if section:
body["section"] = section
if previous_page_id:
body["previous_page_id"] = previous_page_id
if next_page_id:
body["next_page_id"] = next_page_id
if publication_date:
if isinstance(publication_date, str):
body["publication_date"] = publication_date
else:
body["publication_date"] = publication_date.isoformat()
if None is not image_bytes:
body["media_data"] = base64.b64encode(image_bytes).decode('utf-8')
elif None is not audio_bytes:
body["media_data"] = base64.b64encode(audio_bytes).decode('utf-8')
self.client.index(index=index, id=doc_id, body=body)
def __text_page_split(self, text :str ):
# pages = text.split("\f") # 改ページで分割
# トークン数で分割に変更
if is_predominantly_japanese(text):
text = text.replacer('。」', '」<SPLIT>')
text = text.replacer('。)', ')<SPLIT>')
text = text.replacer('」', '」<SPLIT>')
text = text.replacer(')', ')<SPLIT>')
text = text.replacer(r'。', r'。<SPLIT>')
sentences = text.split("<SPLIT>") # 。で分割
else:
sentences = split_sentences(text)
pages=[]
page=""
page_tokens_count = 0
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
tokens_count = get_token_count(sentence)
if page_tokens_count + tokens_count <= self.page_max_token:
page += sentence
page_tokens_count += tokens_count
else:
pages.append(page)
page = sentence
page_tokens_count = tokens_count
pages.append(page)
return pages
def ingest_text(self, index: str,
text :str,
summary:Optional[str] = None,
state:Optional[str] = None,
purpose:Optional[str] = None,
result:Optional[str] = None,
publication_date: Optional[Union[str, datetime]] = None):
self.register(
index=index,
text=text,
summary=summary,
state=state,
purpose=purpose,
result=result,
previous_page_id=str(i) if 0 < i else None,
publication_date=publication_date
)
# PDFからテキスト抽出+登録(ページID追加)
def ingest_pdf(self, filepath: str,
index: str,
state:Optional[str] = None,
purpose:Optional[str] = None,
result:Optional[str] = None,
publication_date: Optional[Union[str, datetime]] = None):
text = extract_text(filepath)
pages = self.__text_page_split(text)
for i, page_text in enumerate(pages):
page_text = page_text.strip()
if page_text: # 空ページをスキップ
self.register(
index=index,
text=page_text,
summary=page_text,
state=state,
purpose=purpose,
result=result,
page=i + 1,
previous_page_id=str(i) if 0 < i else None,
next_page_id=str(i + 2) if i < len(pages) - 1 else None,
publication_date=publication_date
)
# HTMLからテキスト抽出+登録
def ingest_html(self, filepath: str, index: str):
with open(filepath, encoding="utf-8") as f:
soup = BeautifulSoup(f, "html.parser")
paragraphs = soup.find_all("p")
for i, p in enumerate(paragraphs):
self.register(index=index, text=p.get_text().strip(), section=f"段落{i + 1}")
# 画像ファイルを登録
def ingest_image(self, filepath: str,
index: str,
text: str = "",
state:Optional[str] = None,
purpose:Optional[str] = None,
result:Optional[str] = None,
section: str = "画像", page: int = 1):
with open(filepath, "rb") as f:
image_bytes = f.read()
#image = np.array(Image.open(io.BytesIO(image_bytes)).convert("RGB"))
self.register(
index=index,
image_bytes=image_bytes,
text=text,
summary=text,
state=state,
purpose=purpose,
result=result,
section=section,
page=page,
media_type="image"
)
# 画像データ
def ingest_image_bytes(self,
image_bytes: bytes,
index: str,
text: str = "",
section: str = "画像",
state:Optional[str] = None,
purpose:Optional[str] = None,
result:Optional[str] = None,
page: int = 1):
self.register(
index=index,
image_bytes=image_bytes,
text=text,
summary=text,
state=state,
purpose=purpose,
result=result,
section=section,
page=page,
media_type="image"
)
# 音声ファイルを登録
def ingest_audio(self, filepath: str,
index: str,
text: str = "",
section: str = "音声",
state:Optional[str] = None,
purpose:Optional[str] = None,
result:Optional[str] = None,
page: int = 1):
with open(filepath, "rb") as f:
audio_bytes = f.read()
self.register(
index=index,
audio_bytes=audio_bytes,
text=text,
summary=text,
page=page,
state=state,
purpose=purpose,
result=result,
media_type="audio"
)
# 音声データ
def ingest_audio_bytes(self, audio_bytes: bytes,
index: str,
text: str = "",
section: str = "音声",
state:Optional[str] = None,
purpose:Optional[str] = None,
result:Optional[str] = None,
page: int = 1):
self.register(
index=index,
audio_bytes=audio_bytes,
text=text,
summary=text,
section=section,
state=state,
purpose=purpose,
result=result,
page=page,
media_type="audio"
)
def inget_multi_modal(self, index: str,
text: str = "",
image_bytes: Optional[bytes] = None,
audio_bytes: Optional[bytes] = None,
section: Optional[str] = None,
state: Optional[str] = None,
purpose: Optional[str] = None,
result: Optional[str] = None,
page: Optional[int] = None):
self.register(
index=index,
text=text,
summary=text,
image_bytes=image_bytes,
audio_bytes=audio_bytes,
section=section,
state=state,
purpose=purpose,
result=result,
page=page,
media_type="multi_modal"
)
##########################
def _update_access_at(self, index, response ):
for hit in response["hits"]["hits"]:
doc_id = hit["_id"]
self.client.update(
index=index,
id=doc_id,
body={
"doc": {
"access_at": datetime.now().isoformat()
}
}
)
# キーワード検索
def search_keyword(self, index: str, query: str, top_k: int = 5):
"""
キーワード検索を行います。
"""
query = {
"query": {
"multi_match": {
"query": query,
"fields": ["text", "section"]
}
},
"size": top_k
}
response = self.client.search(index=index, body=query)
self._update_access_at(index, response)
return json.dumps(response, indent=4, ensure_ascii=False)
# ベクトル検索(テキスト)
def search_vector(self, index: str, query_text: str, top_k: int = 5):
"""
テキストのベクトル検索を行います。
"""
query_embedding = self._embed(query_text)
query = {
"size": top_k,
"query": {
"knn": {
"embedding": {
"vector": query_embedding,
"k": top_k
}
}
}
}
response = self.client.search(index=index, body=query)
self._update_access_at(index, response)
return json.dumps(response, indent=4, ensure_ascii=False)
def search_vector_state(self, index: str, query_text: str, top_k: int = 5):
"""
状態テキストのベクトル検索を行います。
"""
query_embedding = self._embed(query_text)
query = {
"size": top_k,
"query": {
"knn": {
"embedding_state": {
"vector": query_embedding,
"k": top_k
}
}
}
}
response = self.client.search(index=index, body=query)
self._update_access_at(index, response)
return json.dumps(response, indent=4, ensure_ascii=False)
def search_vector_purpose(self, index: str, query_text: str, top_k: int = 5):
"""
目的テキストのベクトル検索を行います。
"""
query_embedding = self._embed(query_text)
query = {
"size": top_k,
"query": {
"knn": {
"embedding_purpose": {
"vector": query_embedding,
"k": top_k
}
}
}
}
response = self.client.search(index=index, body=query)
self._update_access_at(index, response)
return json.dumps(response, indent=4, ensure_ascii=False)
def search_vector_result(self, index: str, query_text: str, top_k: int = 5):
"""
結果テキストのベクトル検索を行います。
"""
query_embedding = self._embed(query_text)
query = {
"size": top_k,
"query": {
"knn": {
"embedding_result": {
"vector": query_embedding,
"k": top_k
}
}
}
}
response = self.client.search(index=index, body=query)
self._update_access_at(index, response)
return json.dumps(response, indent=4, ensure_ascii=False)
# 画像検索
def search_image(self, index: str, image_bytes: bytes, top_k: int = 5):
"""
画像のベクトル検索を行います。
"""
image_embedding = self._embed_image(image_bytes)
query = {
"size": top_k,
"query": {
"knn": {
"image_embedding": {
"vector": image_embedding,
"k": top_k
}
}
}
}
response = self.client.search(index=index, body=query)
self._update_access_at(index, response)
return json.dumps(response, indent=4, ensure_ascii=False)
# 音声検索
def search_audio(self, index: str, audio_bytes: bytes, top_k: int = 5):
"""
音声のベクトル検索を行います。
"""
audio_embedding = self._embed_audio(audio_bytes)
query = {
"size": top_k,
"query": {
"knn": {
"audio_embedding": {
"vector": audio_embedding,
"k": top_k
}
}
}
}
response = self.client.search(index=index, body=query)
self._update_access_at(index, response)
return json.dumps(response, indent=4, ensure_ascii=False)
# ハイブリッド検索(キーワード + ベクトル)
def search_hybrid(self, index: str, query_text: str, top_k: int = 5):
"""
キーワード検索とベクトル検索のハイブリッド検索を行います。
"""
# ベクトル検索の結果
vector_results = self.search_vector(index, query_text, top_k * 2)
# キーワード検索の結果
keyword_results = self.search_keyword(index, query_text, top_k * 2)
# 結果をマージして重複を除去
combined = list(set(vector_results + keyword_results))
# 上位top_k件を返す
return combined[:top_k]
###################################################
# 複数データの一括登録
def bulk_register(self, index: str, items: List[Dict]):
"""
複数のドキュメントを一括登録します。
Args:
index: インデックス名
items: 一括登録するドキュメントのリスト。各要素は以下の形式を持ちます。
{
"_id": "ドキュメントID", # 省略可能(自動生成される)
"text": "テキスト内容",
"section": "セクション名", # 省略可能
"page": 1, # ページ番号(省略可能)
"embedding": [...] # ベクトル(省略可能、自動生成される場合)
}
Returns:
bulk操作の結果
"""
actions = []
for item in items:
# IDが指定されていない場合は自動生成
if "_id" not in item:
item["_id"] = self._generate_id()
# テキストからベクトルを自動生成(存在しない場合)
if "embedding" not in item and "text" in item:
item["embedding"] = self._embed(item["text"])
# Bulk操作のアクションを作成
action = {
"_op_type": "index",
"_index": index,
"_id": item["_id"],
"_source": item
}
actions.append(action)
# 一括登録実行
success, failed = self.client.bulk(self.client, actions)
return {"success": success, "failed": failed}
# データ削除
def delete(self, index: str, id: str):
"""
指定されたIDのドキュメントを削除します。
Args:
index: インデックス名
id: 削除するドキュメントのID
Returns:
削除操作の結果
"""
response = self.client.delete(index=index, id=id)
return response
# データ更新
def update(self, index: str, id: str, new_text: str, section: Optional[str] = None, page: Optional[int] = None):
"""
指定されたIDのドキュメントを更新します。
Args:
index: インデックス名
id: 更新するドキュメントのID
new_text: 新しいテキスト内容
section: 新しいセクション名(省略可能)
page: 新しいページ番号(省略可能)
Returns:
更新操作の結果
"""
# ベクトルを自動生成
embedding = self._embed(new_text)
# 更新するドキュメントのデータを作成
doc = {
"text": new_text,
"embedding": embedding,
"section": section,
"page": page,
"updated_at": datetime.now().isoformat(),
"access_at": datetime.now().isoformat()
}
# ドキュメントを更新
response = self.client.update(
index=index,
id=id,
doc=doc,
doc_as_upsert=True # ドキュメントが存在しない場合は新規作成
)
return response
# ID自動生成(簡易版)
def _generate_id(self):
"""
ドキュメントIDを自動生成します。
"""
return str(time.time()) + str(uuid.uuid4())
g_index = "base"
g_semantic_indexer = SemanticIndexer()
data=g_semantic_indexer.index_exists(g_index)
if not data:
g_semantic_indexer.create_index(g_index)
def _set_long_memory_index(index):
global g_index
g_index = index
def _get_long_memory_index():
global g_index
return g_index
class RegisterTextInput(BaseModel):
text: str = Field(..., description="登録するテキスト内容")
summary: Optional[str] = Field(None, description="テキストの要約、ベクトル検索で使う(省略可能)")
state: Optional[str] = Field(None, description="テキストを取得または作成した状態(省略可能)")
purpose: Optional[str] = Field(None, description="テキストを取得または作成した目的(省略可能)")
result: Optional[str] = Field(None, description="テキストを取得または作成した結果や作業の成否(省略可能)")
section: Optional[str] = Field(None, description="セクション名(省略可能)")
page: Optional[int] = Field(None, description="ページ番号(省略可能)")
@tool(args_schema = RegisterTextInput)
def register_text(text: str,
summary: Optional[str] = None,
state: Optional[str] = None,
purpose: Optional[str] = None,
result: Optional[str] = None,
section: Optional[str] = None, page: Optional[int] = None):
"""
長期記憶用のデータ書き込み、登録機能です。
テキストデータを書き込み、登録します。
Args:
text: 登録するテキスト内容
summary: テキストの要約,ベクトル検索で使う(省略可能)
state: テキストを取得または作成した状態(省略可能)
purpose: テキストを取得または作成した目的(省略可能)
result: テキストを取得または作成した結果や作業の成否(省略可能)
section: セクション名(省略可能)
page: ページ番号(省略可能)
"""
print("register_text called\n")
global g_index
wait_safety() # リクエストの制限を越えないために必要
g_semantic_indexer.register(index=g_index,
text=text,
summary=summary,
state=state,
purpose=purpose,
result=result,
section=section, page=page)
class RegisterPDFInput(BaseModel):
filepath: str = Field(..., description="PDFファイルのパス")
state: Optional[str] = Field(None, description="PDFファイルを取得または作成した状態、状況(省略可能)")
purpose: Optional[str] = Field(None, description="PDFファイルを取得または作成した目的または記録する目的。(省略可能)")
result: Optional[str] = Field(None, description="PDFファイルを取得または作成した結果や作業の成否(省略可能)")
publication_date: Optional[Union[str, datetime]] = Field(None, description="公開日(省略可能)")
@tool(args_schema = RegisterPDFInput)
def register_pdf(filepath: str,
state: Optional[str] = None,
purpose: Optional[str] = None,
result: Optional[str] = None,
publication_date: Optional[Union[str, datetime]] = None):
"""
長期記憶用のデータ書き込み、登録機能です。
PDFファイルからテキストを抽出し、登録します。
Args:
filepath: PDFファイルのパス
state: PDFファイルを取得または作成した状態、状況(省略可能)
purpose: PDFファイルを取得または作成した目的。または記録する目的。(省略可能)
result: PDFファイルを使用した作業の結果や作業の成否(省略可能)
publication_date: 公開日(省略可能)
"""
print("register_pdf called\n")
global g_index
wait_safety() # リクエストの制限を越えないために必要
if os.path.exists(filepath):
g_semantic_indexer.ingest_pdf(filepath=filepath, index=g_index,
state=state,
purpose=purpose,
result=result,
publication_date=publication_date)
return "No Error"
else:
print("file not found\n")
return "file not found"
class RegisterHTMLInput(BaseModel):
filepath: str = Field(..., description="HTMLファイルのパス")
@tool(args_schema = RegisterHTMLInput)
def register_html(filepath: str):
"""
長期記憶用のデータ書き込み、登録機能です。
HTMLファイルからテキストを抽出し、登録します。
Args:
filepath: HTMLファイルのパス
"""
print("register_html called\n")
global g_index
wait_safety() # リクエストの制限を越えないために必要
if os.path.exists(filepath):
g_semantic_indexer.ingest_html(filepath=filepath, index=g_index)
return "No Error"
else:
print("file not found\n")
return "file not found"
class RegisterImageInput(BaseModel):
filepath: str = Field(..., description="画像ファイルのパス")
text: Optional[str] = Field("", description="画像に関連するテキストの説明や、行った作業の内容(省略可能)")
state: Optional[str] = Field(None, description="画像を取得または作成した状態、状況(省略可能)")
purpose: Optional[str] = Field(None, description="画像を取得または作成した目的。または記録する目的。(省略可能)")
result: Optional[str] = Field(None, description="画像を使用した作業の結果や作業の成否(省略可能)")
section: Optional[str] = Field("image", description="セクション名(省略可能)")
page: Optional[int] = Field(1, description="ページ番号(省略可能)")
@tool(args_schema = RegisterImageInput)
def register_image(filepath: str,
text: Optional[str] = "",
state: Optional[str] = None,
purpose: Optional[str] = None,
result: Optional[str] = None,
section: Optional[str] = "image", page: Optional[str] = 1):
"""
長期記憶用のデータ書き込み、登録機能です。
画像ファイルを指定されたインデックスに登録します。
Args:
filepath: 画像ファイルのパス
text: 画像に関連するテキストの説明や、行った作業の内容(省略可能)
state: 画像を取得または作成した状態、状況(省略可能)
purpose: 画像を取得または作成した目的。または記録する目的。(省略可能)
result: 画像を使用した作業の結果や作業の成否(省略可能)
section: セクション名(省略可能)
page: ページ番号(省略可能)
"""
print("register_image called\n")
global g_index
wait_safety() # リクエストの制限を越えないために必要
if os.path.exists(filepath):
g_semantic_indexer.ingest_image(filepath=filepath, index=g_index, text=text,
state=state,
purpose=purpose,
result=result,
section=section, page=page)
return "No Error"
else:
print("file not found\n")
return "file not found"
class RegisterAudioInput(BaseModel):
filepath: str = Field(..., description="音声ファイルのパス")
text: Optional[str] = Field("", description="音声に関連するテキスト内容(省略可能)")
state: Optional[str] = Field(None, description="音声を取得または作成した状態、状況(省略可能)")
purpose: Optional[str] = Field(None, description="音声を取得または作成した目的。または記録する目的。(省略可能)")
result: Optional[str] = Field(None, description="音声を使用した作業の結果や作業の成否(省略可能)")
section: Optional[str] = Field("audio", description="セクション名(省略可能)")
page: Optional[int] = Field(1, description="ページ番号(省略可能)")
@tool(args_schema = RegisterAudioInput)
def register_audio(filepath: str,
text: Optional[str] = "",
state: Optional[str] = None,
purpose: Optional[str] = None,
result: Optional[str] = None,
section: Optional[str] = "audio", page: int = 1):
"""
長期記憶用のデータ書き込み、登録機能です。
音声ファイルを登録します。
Args:
index: インデックス名
filepath: 音声ファイルのパス
text: 音声に関連するテキスト内容(省略可能)
state: 音声を取得または作成した状態、状況(省略可能)
purpose: 音声を取得または作成した目的。または記録する目的。(省略可能)
result: 音声を使用した作業の結果や作業の成否(省略可能)
section: セクション名(省略可能)
page: ページ番号(省略可能)
"""
print("register_audio called\n")
global g_index
wait_safety() # リクエストの制限を越えないために必要
if os.path.exists(filepath):
g_semantic_indexer.ingest_audio(filepath=filepath, index=g_index, text=text,
state=state,
purpose=purpose,
result=result,
section=section, page=page)
return "No Error"
else:
print("file not found\n")
return "file not found"
class UpdateTextInput(BaseModel):
id: str = Field(..., description="更新するドキュメントのID")
new_text: str = Field(..., description="新しいテキスト内容")
summary: Optional[str] = Field(None, description="テキストの要約、ベクトル検索で使う(省略可能)")
section: Optional[str] = Field(None, description="新しいセクション名(省略可能)")
page: Optional[int] = Field(None, description="新しいページ番号(省略可能)")
@tool(args_schema = UpdateTextInput)
def update_text(id: str, new_text: str, summary: Optional[str] = None, section: Optional[str] = None, page: Optional[int] = None):
"""
長期記憶用のデータ更新、登録機能です。
指定されたIDのドキュメントのテキストを更新します。
Args:
index: インデックス名
id: 更新するドキュメントのID
new_text: 新しいテキスト内容
summary: テキストの要約、ベクトル検索で使う(省略可能)
section: 新しいセクション名(省略可能)
page: 新しいページ番号(省略可能)
"""
print("update_text called\n")
global g_index
wait_safety() # リクエストの制限を越えないために必要
g_semantic_indexer.update(index=g_index, id=id, new_text=new_text, section=section, page=page)
class DeleteDataInput(BaseModel):
id: str = Field(..., description="削除するドキュメントのID")
@tool(args_schema = DeleteDataInput)
def delete_data(id: str):
"""
長期記憶用のデータ削除機能です。
指定されたIDのドキュメントを削除します。
Args:
index: インデックス名
id: 削除するドキュメントのID
"""
print("delete_data called\n")
global g_index
wait_safety() # リクエストの制限を越えないために必要
g_semantic_indexer.delete(index=g_index, id=id)
class SearchKeywordInput(BaseModel):
query: str = Field(..., description="検索クエリ")
top_k: Optional[int] = Field(5, description="取得する上位件数")
@tool(args_schema = SearchKeywordInput)
def serch_keyword( query: str, top_k: int = 5):
"""
長期記憶用のデータ検索、読み込み機能です。
キーワード検索を行います。
Args:
query: 検索クエリ
top_k: 取得する上位件数
Returns:
検索結果のリスト
"""
print("serch_keyword called\n", top_k)
global g_index
wait_safety() # リクエストの制限を越えないために必要
return g_semantic_indexer.search_keyword(index=g_index, query=query, top_k=top_k)
class SearchVectorInput(BaseModel):
query_text: str = Field(..., description="検索クエリテキスト。内部で自動的にベクトル化され検索に利用される。")
top_k: Optional[int] = Field(5, description="取得する上位件数")
@tool(args_schema = SearchVectorInput)
def search_vector( query_text: str, top_k: int = 5):
"""
長期記憶用のデータ検索、読み込み機能です。
テキストのベクトル検索を行います。
Args:
query_text: 検索クエリテキスト。内部で自動的にベクトル化され検索に利用される。
top_k: 取得する上位件数
Returns:
検索結果のリスト
"""
print("search_vector called\n", top_k)
global g_index
wait_safety() # リクエストの制限を越えないために必要
return g_semantic_indexer.search_vector(index=g_index, query_text=query_text, top_k=top_k)
@tool(args_schema = SearchVectorInput)
def search_vector_state( query_text: str, top_k: int = 5):
"""
長期記憶用のデータ検索、読み込み機能です。
検索クエリテキストは現在の状態を表すテキストを入れてください。
状態テキストの内容の近いものを検索します。
Args:
query_text: 検索クエリテキスト。内部で自動的にベクトル化され検索に利用される。
top_k: 取得する上位件数
Returns:
検索結果のリスト
"""
print("search_vector_state called\n",query_text, top_k)
global g_index
wait_safety() # リクエストの制限を越えないために必要
return g_semantic_indexer.search_vector_state(index=g_index, query_text=query_text, top_k=top_k)
@tool(args_schema = SearchVectorInput)
def search_vector_purpose( query_text: str, top_k: int = 5):
"""
長期記憶用のデータ検索、読み込み機能です。
検索クエリテキストは現在の目的テキストを入れてください。
目的テキストの内容の近いものを検索します。
Args:
query_text: 検索クエリテキスト。内部で自動的にベクトル化され検索に利用される。
top_k: 取得する上位件数
Returns:
検索結果のリスト
"""
print("search_vector_purpose called\n", top_k)
global g_index
wait_safety() # リクエストの制限を越えないために必要
return g_semantic_indexer.search_vector_purpose(index=g_index, query_text=query_text, top_k=top_k)
@tool(args_schema = SearchVectorInput)
def search_vector_result( query_text: str, top_k: int = 5):
"""
長期記憶用のデータ検索、読み込み機能です。
結果テキストの内容の近いものを検索します。
Args:
query_text: 検索クエリテキスト
top_k: 取得する上位件数
Returns:
検索結果のリスト
"""
print("search_vector_result called\n", top_k)
global g_index
wait_safety() # リクエストの制限を越えないために必要
return g_semantic_indexer.search_vector_result(index=g_index, query_text=query_text, top_k=top_k)
class SearchHybridInput(BaseModel):
query_text: str = Field(..., description="検索クエリテキスト。内部で自動的にベクトル化され検索に利用される。")
top_k: Optional[int] = Field(5, description="取得する上位件数")
@tool(args_schema = SearchHybridInput)
def search_hybrid( query_text: str, top_k: int = 5):
"""
長期記憶用のデータ検索、読み込み機能です。
キーワード検索と意味検索のハイブリッド検索を行います。
Args:
query_text: 検索クエリテキスト。内部で自動的にベクトル化され検索に利用される。
top_k: 取得する上位件数
Returns:
検索結果のリスト
"""
print("search_hybrid called\n")
global g_index
wait_safety() # リクエストの制限を越えないために必要
return g_semantic_indexer.search_hybrid(index=g_index, query_text=query_text, top_k=top_k)
class SearchImageFileInput(BaseModel):
file_name: str = Field(..., description="検索クエリ画像のファイル名")
top_k: Optional[int] = Field(5, description="取得する上位件数")
@tool(args_schema = SearchImageFileInput)
def serch_image_file(file_name: str, top_k: int = 5):
"""
長期記憶用のデータ検索、読み込み機能です。
指定されたファイル画像のベクトル検索を行います。
:param file: 検索クエリ画像のファイル名
:type file: str
:param top_k: 取得する上位件数
:type top_k: int
"""
print("serch_image_file called\n", top_k)
global g_index
wait_safety() # リクエストの制限を越えないために必要
if not os.path.exists(file_name):
return "ValueError: file not found"
with open(file_name, "rb") as f:
image_bytes = f.read()
return g_semantic_indexer.search_image(index=g_index, image_bytes=image_bytes, top_k=top_k)
class SearchImageInput(BaseModel):
image_bytes: bytes = Field(..., description="検索クエリ画像のバイトデータ")
top_k: Optional[int] = Field(5, description="取得する上位件数")
@tool(args_schema = SearchImageInput)
def serch_image( image_bytes: bytes, top_k: int = 5):
"""
長期記憶用のデータ検索、読み込み機能です。
画像のベクトル検索を行います。
Args:
image_bytes: 検索クエリ画像のバイトデータ
top_k: 取得する上位件数
Returns:
検索結果のリスト
"""
print("serch_image called\n", top_k)
global g_index
wait_safety() # リクエストの制限を越えないために必要
return g_semantic_indexer.search_image(index=g_index, image_bytes=image_bytes, top_k=top_k)
class SearchAudioInput(BaseModel):
audio_bytes: bytes = Field(..., description="検索クエリ音声のバイトデータ")
top_k: Optional[int] = Field(5, description="取得する上位件数")
@tool(args_schema = SearchAudioInput)
def search_audio(audio_bytes: bytes, top_k: int = 5):
"""
長期記憶用のデータ検索、読み込み機能です。
音声のベクトル検索を行います。
Args:
audio_bytes: 検索クエリ音声のバイトデータ
top_k: 取得する上位件数
Returns:
検索結果のリスト
"""
print("serch_audio called\n" ,top_k)
global g_index
wait_safety() # リクエストの制限を越えないために必要
return g_semantic_indexer.search_audio(index=g_index, audio_bytes=audio_bytes, top_k=top_k)
def get_text_memory_register_tool():
return [register_text]
def get_text_memory_search_tools():
return [serch_keyword,
search_vector,
search_vector_state,
search_vector_purpose,
search_vector_result,
search_hybrid]
def get_text_memory_tools_list():
return [register_text,
update_text,
serch_keyword,
search_vector,
search_vector_state,
search_vector_purpose,
search_vector_result,
search_hybrid]
def get_text_memory_edit_tools():
return [update_text,
delete_data,
serch_keyword,
search_vector,
search_vector_state,
search_vector_purpose,
search_vector_result,
search_hybrid]
def get_tools_list():
return [
register_text,
register_pdf,
#register_html,
register_image,
register_audio,
# update_text,
# delete_data,
serch_keyword,
search_vector,
search_vector_state,
search_vector_purpose,
search_vector_result,
search_hybrid,
serch_image_file,
serch_image,
search_audio
]
def get_register_tools_list():
return [
register_text,
register_pdf,
register_image,
register_audio,
]