This commit is contained in:
Dr. Kiji 2025-01-02 19:14:16 +09:00
parent 81c5953fa5
commit 77030d7581
5 changed files with 295 additions and 180 deletions

View File

@ -22,9 +22,11 @@ class Keyword:
match keyword_type:
case KeyWordType.JIEBA:
from core.rag.datasource.keyword.jieba.jieba import Jieba
return Jieba
case KeyWordType.MECAB:
from core.rag.datasource.keyword.mecab.mecab import MeCab
return MeCab
case _:
raise ValueError(f"Keyword store {keyword_type} is not supported.")

View File

@ -1,19 +1,21 @@
from pydantic import BaseModel
class MeCabConfig(BaseModel):
"""Configuration for MeCab keyword processor."""
max_keywords_per_chunk: int = 10
min_keyword_length: int = 2
score_threshold: float = 0.3
storage_type: str = "database"
cache_timeout: int = 3600
# MeCab specific settings
dictionary_path: str = "" # Optional custom dictionary path
user_dictionary_path: str = "" # Optional user dictionary path
pos_weights: dict = {
'名詞': 1.0, # Nouns
'動詞': 0.8, # Verbs
'形容詞': 0.6, # Adjectives
'副詞': 0.4, # Adverbs
}
"名詞": 1.0, # Nouns
"動詞": 0.8, # Verbs
"形容詞": 0.6, # Adjectives
"副詞": 0.4, # Adverbs
}

View File

@ -1,38 +1,41 @@
import json
import logging
from typing import Any, Optional
from collections import defaultdict
from typing import Any, Optional
from core.rag.datasource.keyword.keyword_base import BaseKeyword
from core.rag.datasource.keyword.mecab.mecab_keyword_table_handler import MeCabKeywordTableHandler
from core.rag.datasource.keyword.mecab.config import MeCabConfig
from core.rag.datasource.keyword.mecab.mecab_keyword_table_handler import MeCabKeywordTableHandler
from core.rag.models.document import Document
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from models.dataset import Dataset, DatasetKeywordTable, DocumentSegment
from models.dataset import Dataset, DocumentSegment
logger = logging.getLogger(__name__)
class KeywordProcessorError(Exception):
"""Base error for keyword processing."""
pass
class KeywordExtractionError(KeywordProcessorError):
"""Error during keyword extraction."""
pass
class KeywordStorageError(KeywordProcessorError):
"""Error during storage operations."""
pass
class SetEncoder(json.JSONEncoder):
"""JSON encoder that handles sets."""
def default(self, obj):
if isinstance(obj, set):
return list(obj)
@ -41,19 +44,18 @@ class SetEncoder(json.JSONEncoder):
class MeCab(BaseKeyword):
"""Japanese keyword processor using MeCab morphological analyzer."""
def __init__(self, dataset: Dataset):
super().__init__(dataset)
self._config = MeCabConfig()
self._keyword_handler = None
self._init_handler()
def _init_handler(self):
"""Initialize MeCab handler with configuration."""
try:
self._keyword_handler = MeCabKeywordTableHandler(
dictionary_path=self._config.dictionary_path,
user_dictionary_path=self._config.user_dictionary_path
dictionary_path=self._config.dictionary_path, user_dictionary_path=self._config.user_dictionary_path
)
if self._config.pos_weights:
self._keyword_handler.pos_weights = self._config.pos_weights
@ -61,75 +63,60 @@ class MeCab(BaseKeyword):
except Exception as e:
logger.error(f"Failed to initialize MeCab handler: {str(e)}")
raise KeywordProcessorError(f"MeCab initialization failed: {str(e)}")
def create(self, texts: list[Document], **kwargs) -> BaseKeyword:
"""Create keyword index for documents."""
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
keyword_table = self._get_dataset_keyword_table()
for text in texts:
keywords = self._keyword_handler.extract_keywords(
text.page_content,
self._config.max_keywords_per_chunk
text.page_content, self._config.max_keywords_per_chunk
)
if text.metadata is not None:
self._update_segment_keywords(
self.dataset.id,
text.metadata["doc_id"],
list(keywords)
)
self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords))
keyword_table = self._add_text_to_keyword_table(
keyword_table or {},
text.metadata["doc_id"],
list(keywords)
keyword_table or {}, text.metadata["doc_id"], list(keywords)
)
self._save_dataset_keyword_table(keyword_table)
return self
def add_texts(self, texts: list[Document], **kwargs):
"""Add new texts to existing index."""
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
keyword_table = self._get_dataset_keyword_table()
keywords_list = kwargs.get("keywords_list")
for i, text in enumerate(texts):
if keywords_list:
keywords = keywords_list[i]
if not keywords:
keywords = self._keyword_handler.extract_keywords(
text.page_content,
self._config.max_keywords_per_chunk
text.page_content, self._config.max_keywords_per_chunk
)
else:
keywords = self._keyword_handler.extract_keywords(
text.page_content,
self._config.max_keywords_per_chunk
text.page_content, self._config.max_keywords_per_chunk
)
if text.metadata is not None:
self._update_segment_keywords(
self.dataset.id,
text.metadata["doc_id"],
list(keywords)
)
self._update_segment_keywords(self.dataset.id, text.metadata["doc_id"], list(keywords))
keyword_table = self._add_text_to_keyword_table(
keyword_table or {},
text.metadata["doc_id"],
list(keywords)
keyword_table or {}, text.metadata["doc_id"], list(keywords)
)
self._save_dataset_keyword_table(keyword_table)
def text_exists(self, id: str) -> bool:
"""Check if text exists in index."""
keyword_table = self._get_dataset_keyword_table()
if keyword_table is None:
return False
return id in set.union(*keyword_table.values()) if keyword_table else False
def delete_by_ids(self, ids: list[str]) -> None:
"""Delete texts by IDs."""
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
@ -138,7 +125,7 @@ class MeCab(BaseKeyword):
if keyword_table is not None:
keyword_table = self._delete_ids_from_keyword_table(keyword_table, ids)
self._save_dataset_keyword_table(keyword_table)
def delete(self) -> None:
"""Delete entire index."""
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
@ -150,29 +137,22 @@ class MeCab(BaseKeyword):
if dataset_keyword_table.data_source_type != "database":
file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt"
storage.delete(file_key)
def search(self, query: str, **kwargs: Any) -> list[Document]:
"""Search documents using keywords."""
keyword_table = self._get_dataset_keyword_table()
k = kwargs.get("top_k", 4)
sorted_chunk_indices = self._retrieve_ids_by_query(
keyword_table or {},
query,
k
)
sorted_chunk_indices = self._retrieve_ids_by_query(keyword_table or {}, query, k)
documents = []
for chunk_index in sorted_chunk_indices:
segment = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == self.dataset.id,
DocumentSegment.index_node_id == chunk_index
)
.filter(DocumentSegment.dataset_id == self.dataset.id, DocumentSegment.index_node_id == chunk_index)
.first()
)
if segment:
documents.append(
Document(
@ -185,9 +165,9 @@ class MeCab(BaseKeyword):
},
)
)
return documents
def _get_dataset_keyword_table(self) -> Optional[dict]:
"""Get keyword table from storage."""
dataset_keyword_table = self.dataset.dataset_keyword_table
@ -196,21 +176,17 @@ class MeCab(BaseKeyword):
if keyword_table_dict:
return dict(keyword_table_dict["__data__"]["table"])
return {}
def _save_dataset_keyword_table(self, keyword_table):
"""Save keyword table to storage."""
table_dict = {
"__type__": "keyword_table",
"__data__": {
"index_id": self.dataset.id,
"summary": None,
"table": keyword_table
}
"__data__": {"index_id": self.dataset.id, "summary": None, "table": keyword_table},
}
dataset_keyword_table = self.dataset.dataset_keyword_table
data_source_type = dataset_keyword_table.data_source_type
if data_source_type == "database":
dataset_keyword_table.keyword_table = json.dumps(table_dict, cls=SetEncoder)
db.session.commit()
@ -218,11 +194,8 @@ class MeCab(BaseKeyword):
file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt"
if storage.exists(file_key):
storage.delete(file_key)
storage.save(
file_key,
json.dumps(table_dict, cls=SetEncoder).encode("utf-8")
)
storage.save(file_key, json.dumps(table_dict, cls=SetEncoder).encode("utf-8"))
def _add_text_to_keyword_table(self, keyword_table: dict, id: str, keywords: list[str]) -> dict:
"""Add text keywords to table."""
for keyword in keywords:
@ -230,58 +203,48 @@ class MeCab(BaseKeyword):
keyword_table[keyword] = set()
keyword_table[keyword].add(id)
return keyword_table
def _delete_ids_from_keyword_table(self, keyword_table: dict, ids: list[str]) -> dict:
"""Delete IDs from keyword table."""
node_idxs_to_delete = set(ids)
keywords_to_delete = set()
for keyword, node_idxs in keyword_table.items():
if node_idxs_to_delete.intersection(node_idxs):
keyword_table[keyword] = node_idxs.difference(node_idxs_to_delete)
if not keyword_table[keyword]:
keywords_to_delete.add(keyword)
for keyword in keywords_to_delete:
del keyword_table[keyword]
return keyword_table
def _retrieve_ids_by_query(self, keyword_table: dict, query: str, k: int = 4):
"""Retrieve document IDs by query."""
keywords = self._keyword_handler.extract_keywords(query)
# Score documents based on matching keywords
chunk_indices_count = defaultdict(int)
keywords_list = [
keyword for keyword in keywords
if keyword in set(keyword_table.keys())
]
keywords_list = [keyword for keyword in keywords if keyword in set(keyword_table.keys())]
for keyword in keywords_list:
for node_id in keyword_table[keyword]:
chunk_indices_count[node_id] += 1
sorted_chunk_indices = sorted(
chunk_indices_count.keys(),
key=lambda x: chunk_indices_count[x],
reverse=True
)
sorted_chunk_indices = sorted(chunk_indices_count.keys(), key=lambda x: chunk_indices_count[x], reverse=True)
return sorted_chunk_indices[:k]
def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: list[str]):
"""Update segment keywords in database."""
document_segment = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == dataset_id,
DocumentSegment.index_node_id == node_id
)
.filter(DocumentSegment.dataset_id == dataset_id, DocumentSegment.index_node_id == node_id)
.first()
)
if document_segment:
document_segment.keywords = keywords
db.session.add(document_segment)
db.session.commit()
db.session.commit()

View File

@ -1,16 +1,17 @@
import re
from typing import Optional, Set
import MeCab
from collections import defaultdict
from typing import Optional, Set
import MeCab
from core.rag.datasource.keyword.mecab.stopwords import STOPWORDS
class MeCabKeywordTableHandler:
"""Japanese keyword extraction using MeCab morphological analyzer."""
def __init__(self, dictionary_path: str = "", user_dictionary_path: str = ""):
"""Initialize MeCab tokenizer.
Args:
dictionary_path: Path to custom system dictionary
user_dictionary_path: Path to user dictionary
@ -22,109 +23,102 @@ class MeCabKeywordTableHandler:
mecab_args.append(f"-d {dictionary_path}")
if user_dictionary_path:
mecab_args.append(f"-u {user_dictionary_path}")
self.tagger = MeCab.Tagger(" ".join(mecab_args))
self.tagger.parse('') # Force initialization to catch dictionary errors
self.tagger.parse("") # Force initialization to catch dictionary errors
except RuntimeError as e:
raise RuntimeError(f"Failed to initialize MeCab: {str(e)}")
# POS weights for scoring
self.pos_weights = {
'名詞': 1.0, # Nouns
'動詞': 0.8, # Verbs
'形容詞': 0.6, # Adjectives
'副詞': 0.4, # Adverbs
'連体詞': 0.3, # Adnominal adjectives
'感動詞': 0.2, # Interjections
"名詞": 1.0, # Nouns
"動詞": 0.8, # Verbs
"形容詞": 0.6, # Adjectives
"副詞": 0.4, # Adverbs
"連体詞": 0.3, # Adnominal adjectives
"感動詞": 0.2, # Interjections
}
self.min_score = 0.3
def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> Set[str]:
"""Extract keywords from Japanese text using MeCab.
Args:
text: Input text to extract keywords from
max_keywords_per_chunk: Maximum number of keywords to extract
Returns:
Set of extracted keywords
"""
if not text or not text.strip():
return set()
try:
# Parse text with MeCab
self.tagger.parse('') # Clear tagger state
self.tagger.parse("") # Clear tagger state
node = self.tagger.parseToNode(text)
# Calculate term frequencies and scores
term_scores = defaultdict(float)
while node:
features = node.feature.split(',')
features = node.feature.split(",")
if len(features) > 0:
pos = features[0] # Part of speech
pos_subtype = features[1] if len(features) > 1 else ''
pos_subtype = features[1] if len(features) > 1 else ""
base_form = features[6] if len(features) > 6 else node.surface
# Score the term based on its POS
if pos in self.pos_weights and base_form not in STOPWORDS:
score = self.pos_weights[pos]
# Boost proper nouns and technical terms
if pos == '名詞' and pos_subtype in ['固有名詞', '専門用語']:
if pos == "名詞" and pos_subtype in ["固有名詞", "専門用語"]:
score *= 1.5
if len(base_form) > 1: # Filter out single characters
term_scores[base_form] += score
node = node.next
# Get top scoring terms
sorted_terms = sorted(
term_scores.items(),
key=lambda x: x[1],
reverse=True
)
sorted_terms = sorted(term_scores.items(), key=lambda x: x[1], reverse=True)
# Filter by minimum score and take top N
keywords = {
term for term, score in sorted_terms
if score >= self.min_score
}
keywords = {term for term, score in sorted_terms if score >= self.min_score}
if max_keywords_per_chunk:
keywords = set(list(keywords)[:max_keywords_per_chunk])
# Expand with compound terms
expanded_keywords = self._expand_tokens_with_compounds(keywords, text)
return expanded_keywords
except Exception as e:
raise RuntimeError(f"Failed to extract keywords: {str(e)}")
def _expand_tokens_with_compounds(self, keywords: Set[str], text: str) -> Set[str]:
"""Expand keywords with compound terms.
This method looks for adjacent keywords in the original text to capture
compound terms like '機械学習' (machine learning) or '自然言語処理' (natural language processing).
"""
results = set(keywords)
try:
# Parse again to find compounds
node = self.tagger.parseToNode(text)
compound = []
compound_readings = [] # For handling different forms of the same compound
while node:
features = node.feature.split(',')
features = node.feature.split(",")
if len(features) > 6:
base_form = features[6]
reading = features[7] if len(features) > 7 else None
else:
base_form = node.surface
reading = None
if base_form in keywords:
compound.append(base_form)
if reading:
@ -132,21 +126,21 @@ class MeCabKeywordTableHandler:
else:
if len(compound) > 1:
# Add the compound term
compound_term = ''.join(compound)
compound_term = "".join(compound)
if len(compound_term) > 1:
results.add(compound_term)
# If readings are available, add normalized form
if compound_readings:
normalized_term = ''.join(compound_readings)
normalized_term = "".join(compound_readings)
if normalized_term != compound_term:
results.add(normalized_term)
compound = []
compound_readings = []
node = node.next
return results
except Exception as e:
# If compound expansion fails, return original keywords
return keywords
return keywords

View File

@ -1,36 +1,190 @@
STOPWORDS = {
# Japanese particles and basic stopwords
"", "", "", "", "", "", "", "", "から", "より", "まで", "によって",
"あそこ", "あっ", "あの", "あのかた", "あの人", "あり", "あります", "ある", "あれ",
"", "いう", "います", "いる", "", "うち", "", "", "および", "おり", "おります",
"", "かつて", "から", "", "", "ここ", "こちら", "こと", "この", "これ", "これら",
"", "さらに", "", "しかし", "する", "", "", "せる", "そこ", "そして", "その",
"その他", "その後", "それ", "それぞれ", "それで", "", "ただし", "たち", "ため", "たり",
"", "だっ", "だれ", "", "", "", "でき", "できる", "です", "では", "でも", "",
"という", "といった", "とき", "ところ", "として", "とともに", "とも", "と共に", "どこ",
"どの", "", "ない", "なお", "なかっ", "ながら", "なく", "なっ", "など", "なに", "なら",
"なり", "なる", "なん", "", "において", "における", "について", "にて", "によって", "により",
"による", "に対して", "に対する", "に関する", "", "ので", "のみ", "", "", "", "ほか",
"ほとんど", "ほど", "ます", "また", "または", "まで", "", "もの", "ものの", "", "よう",
"より", "", "られ", "られる", "", "れる", "", "", "", "及び", "", "彼女",
"我々", "特に", "", "私達", "貴方", "貴方方",
"",
"",
"",
"",
"",
"",
"",
"",
"から",
"より",
"まで",
"によって",
"あそこ",
"あっ",
"あの",
"あのかた",
"あの人",
"あり",
"あります",
"ある",
"あれ",
"",
"いう",
"います",
"いる",
"",
"うち",
"",
"",
"および",
"おり",
"おります",
"",
"かつて",
"",
"ここ",
"こちら",
"こと",
"この",
"これ",
"これら",
"",
"さらに",
"",
"しかし",
"する",
"",
"",
"せる",
"そこ",
"そして",
"その",
"その他",
"その後",
"それ",
"それぞれ",
"それで",
"",
"ただし",
"たち",
"ため",
"たり",
"",
"だっ",
"だれ",
"",
"",
"でき",
"できる",
"です",
"では",
"でも",
"という",
"といった",
"とき",
"ところ",
"として",
"とともに",
"とも",
"と共に",
"どこ",
"どの",
"",
"ない",
"なお",
"なかっ",
"ながら",
"なく",
"なっ",
"など",
"なに",
"なら",
"なり",
"なる",
"なん",
"において",
"における",
"について",
"にて",
"により",
"による",
"に対して",
"に対する",
"に関する",
"ので",
"のみ",
"",
"ほか",
"ほとんど",
"ほど",
"ます",
"また",
"または",
"",
"もの",
"ものの",
"",
"よう",
"",
"られ",
"られる",
"",
"れる",
"",
"",
"及び",
"",
"彼女",
"我々",
"特に",
"",
"私達",
"貴方",
"貴方方",
# Japanese auxiliary verbs
"です", "ます", "でした", "ました", "である", "", "", "だった",
"でした",
"ました",
"である",
"だった",
# Japanese pronouns
"これ", "それ", "あれ", "この", "その", "あの", "ここ", "そこ", "あそこ",
# Japanese common words
"いる", "ある", "なる", "する", "できる", "おる", "いく", "くる",
"おる",
"いく",
"くる",
# Numbers
"", "", "", "", "", "", "", "", "", "",
"1", "2", "3", "4", "5", "6", "7", "8", "9", "0",
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"0",
# Punctuation
"", "", "", "", "", "", "", "", "", "",
"",
"",
"",
"",
"",
"",
"",
"",
"",
"",
# Common English stopwords (for mixed text)
"the", "is", "at", "which", "on", "in", "and", "or", "a", "an",
}
"the",
"is",
"at",
"which",
"on",
"in",
"and",
"or",
"a",
"an",
}