diff --git a/api/core/rag/datasource/keyword/keyword_factory.py b/api/core/rag/datasource/keyword/keyword_factory.py index f1a6ade91f..5e30c873a7 100644 --- a/api/core/rag/datasource/keyword/keyword_factory.py +++ b/api/core/rag/datasource/keyword/keyword_factory.py @@ -22,8 +22,10 @@ class Keyword: match keyword_type: case KeyWordType.JIEBA: from core.rag.datasource.keyword.jieba.jieba import Jieba - return Jieba + case KeyWordType.MECAB: + from core.rag.datasource.keyword.mecab.mecab import MeCab + return MeCab case _: raise ValueError(f"Keyword store {keyword_type} is not supported.") diff --git a/api/core/rag/datasource/keyword/keyword_type.py b/api/core/rag/datasource/keyword/keyword_type.py index d845c7111d..a4fb63b794 100644 --- a/api/core/rag/datasource/keyword/keyword_type.py +++ b/api/core/rag/datasource/keyword/keyword_type.py @@ -3,3 +3,4 @@ from enum import StrEnum class KeyWordType(StrEnum): JIEBA = "jieba" + MECAB = "mecab" diff --git a/api/core/rag/datasource/keyword/mecab/README.md b/api/core/rag/datasource/keyword/mecab/README.md new file mode 100644 index 0000000000..f589c96d44 --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/README.md @@ -0,0 +1,273 @@ +# MeCab Keyword Processor + +A Japanese text keyword extraction module using MeCab morphological analyzer for the Dify RAG system. + +## Overview + +This module provides Japanese text keyword extraction capabilities using the MeCab morphological analyzer. It's designed to: + +- Extract meaningful keywords from Japanese text +- Handle compound words and technical terms +- Support custom dictionaries +- Provide configurable scoring based on parts of speech +- Handle mixed Japanese-English text + +## Components + +### 1. MeCabKeywordTableHandler + +The core component responsible for keyword extraction using MeCab: + +```python +handler = MeCabKeywordTableHandler( + dictionary_path="/path/to/dict", # Optional custom dictionary + user_dictionary_path="/path/to/user_dict" # Optional user dictionary +) +keywords = handler.extract_keywords(text, max_keywords=10) +``` + +#### Features: + +- **Part of Speech (POS) Weighting**: + + ```python + pos_weights = { + '名詞': 1.0, # Nouns + '動詞': 0.8, # Verbs + '形容詞': 0.6, # Adjectives + '副詞': 0.4, # Adverbs + '連体詞': 0.3, # Adnominal adjectives + '感動詞': 0.2, # Interjections + } + ``` + +- **Special Term Handling**: + - Boosts scores for proper nouns (固有名詞) + - Boosts scores for technical terms (専門用語) + - Compound word detection (e.g., "機械学習", "自然言語処理") + +- **Reading Normalization**: + - Handles different forms of the same word + - Normalizes compound terms using readings + +### 2. Configuration (MeCabConfig) + +Configurable settings for the processor: + +```python +class MeCabConfig(BaseModel): + max_keywords_per_chunk: int = 10 + min_keyword_length: int = 2 + score_threshold: float = 0.3 + storage_type: str = "database" + cache_timeout: int = 3600 + dictionary_path: str = "" + user_dictionary_path: str = "" + pos_weights: dict = {...} +``` + +### 3. Stopwords + +Comprehensive Japanese stopword list including: + +- Particles (は, が, の, etc.) +- Auxiliary verbs (です, ます, etc.) +- Pronouns (これ, それ, etc.) +- Common words +- Numbers and punctuation +- Common English stopwords for mixed text + +## Usage + +### Basic Usage + +```python +from core.rag.datasource.keyword.keyword_factory import Keyword +from models.dataset import Dataset + +# Initialize +dataset = Dataset(...) +keyword_processor = Keyword(dataset) # Will use MeCab if KEYWORD_STORE = "mecab" + +# Process text +documents = [ + Document( + page_content="自然言語処理は人工知能の重要な分野です。", + metadata={"doc_id": "1", ...} + ) +] +keyword_processor.create(documents) + +# Search +results = keyword_processor.search("自然言語処理について") +``` + +### Custom Dictionary Usage + +```python +# In your configuration: +KEYWORD_PROCESSOR_CONFIG = { + "dictionary_path": "/path/to/mecab/dict", + "user_dictionary_path": "/path/to/user.dic", + "pos_weights": { + "名詞": 1.2, + "動詞": 0.8, + # ... customize weights + } +} +``` + +## Features + +### 1. Keyword Extraction + +- **POS-based Scoring**: + - Weights different parts of speech + - Boosts important terms + - Configurable scoring thresholds + +- **Compound Word Detection**: + + ```python + # Input text: "自然言語処理の研究" + # Detected compounds: + # - "自然言語" + # - "自然言語処理" + # - "言語処理" + ``` + +- **Reading Normalization**: + + ```python + # Handles variations: + # - "データベース" (katakana) + # - "データベース" (with readings) + # Both normalize to same term + ``` + +### 2. Storage + +- **Flexible Storage Options**: + - Database storage + - File-based storage + - Redis-based locking for concurrency + +- **Data Structure**: + + ```python + { + "__type__": "keyword_table", + "__data__": { + "index_id": "dataset_id", + "table": { + "keyword1": ["doc_id1", "doc_id2"], + "keyword2": ["doc_id2", "doc_id3"], + } + } + } + ``` + +### 3. Error Handling + +- Comprehensive error handling +- Custom exception classes +- Logging integration +- Graceful fallbacks + +## Performance Considerations + +1. **Memory Usage**: + - Efficient keyword table structure + - Batch processing support + - Caching mechanisms + +2. **Concurrency**: + - Redis-based locking + - Transaction handling + - Safe concurrent access + +3. **Optimization Tips**: + - Use appropriate batch sizes + - Configure caching timeouts + - Adjust scoring thresholds + +## Dependencies + +- MeCab and Python bindings: + + ```bash + # Ubuntu/Debian + apt-get install mecab mecab-ipadic-utf8 python3-mecab + + # macOS + brew install mecab mecab-ipadic + pip install mecab-python3 + ``` + +## Best Practices + +1. **Dictionary Management**: + - Keep dictionaries updated + - Use domain-specific user dictionaries + - Regular maintenance of custom terms + +2. **Configuration Tuning**: + - Adjust POS weights for your use case + - Set appropriate thresholds + - Monitor and adjust batch sizes + +3. **Error Handling**: + - Implement proper logging + - Monitor extraction quality + - Handle edge cases + +## Testing + +Example test cases: + +```python +def test_basic_extraction(): + text = "自然言語処理は人工知能の重要な分野です。" + keywords = handler.extract_keywords(text) + assert "自然言語処理" in keywords + assert "人工知能" in keywords + +def test_compound_words(): + text = "機械学習モデルを使った自然言語処理" + keywords = handler.extract_keywords(text) + assert "機械学習" in keywords + assert "自然言語処理" in keywords + +def test_mixed_text(): + text = "AIを使った自然言語処理のResearch" + keywords = handler.extract_keywords(text) + assert "AI" in keywords + assert "自然言語処理" in keywords + assert "Research" in keywords +``` + +## Common Issues and Solutions + +1. **Dictionary Loading Failures**: + + ```python + try: + handler = MeCabKeywordTableHandler(dictionary_path=path) + except RuntimeError as e: + # Handle dictionary loading error + ``` + +2. **Memory Usage**: + + ```python + # Use batch processing for large datasets + for batch in chunks(documents, size=100): + process_batch(batch) + ``` + +3. **Concurrent Access**: + + ```python + with redis_client.lock(f"lock_{dataset_id}"): + # Safe concurrent operations + ``` diff --git a/api/core/rag/datasource/keyword/mecab/config.py b/api/core/rag/datasource/keyword/mecab/config.py new file mode 100644 index 0000000000..0abfc24a3d --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/config.py @@ -0,0 +1,19 @@ +from pydantic import BaseModel + +class MeCabConfig(BaseModel): + """Configuration for MeCab keyword processor.""" + max_keywords_per_chunk: int = 10 + min_keyword_length: int = 2 + score_threshold: float = 0.3 + storage_type: str = "database" + cache_timeout: int = 3600 + + # MeCab specific settings + dictionary_path: str = "" # Optional custom dictionary path + user_dictionary_path: str = "" # Optional user dictionary path + pos_weights: dict = { + '名詞': 1.0, # Nouns + '動詞': 0.8, # Verbs + '形容詞': 0.6, # Adjectives + '副詞': 0.4, # Adverbs + } diff --git a/api/core/rag/datasource/keyword/mecab/mecab.py b/api/core/rag/datasource/keyword/mecab/mecab.py new file mode 100644 index 0000000000..f40e3c229e --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/mecab.py @@ -0,0 +1,287 @@ +import json +import logging +from typing import Any, Optional +from collections import defaultdict + +from core.rag.datasource.keyword.keyword_base import BaseKeyword +from core.rag.datasource.keyword.mecab.mecab_keyword_table_handler import MeCabKeywordTableHandler +from core.rag.datasource.keyword.mecab.config import MeCabConfig +from core.rag.models.document import Document +from extensions.ext_database import db +from extensions.ext_redis import redis_client +from extensions.ext_storage import storage +from models.dataset import Dataset, DatasetKeywordTable, DocumentSegment + + +logger = logging.getLogger(__name__) + + +class KeywordProcessorError(Exception): + """Base error for keyword processing.""" + pass + + +class KeywordExtractionError(KeywordProcessorError): + """Error during keyword extraction.""" + pass + + +class KeywordStorageError(KeywordProcessorError): + """Error during storage operations.""" + pass + + +class SetEncoder(json.JSONEncoder): + """JSON encoder that handles sets.""" + def default(self, obj): + if isinstance(obj, set): + return list(obj) + return super().default(obj) + + +class MeCab(BaseKeyword): + """Japanese keyword processor using MeCab morphological analyzer.""" + + def __init__(self, dataset: Dataset): + super().__init__(dataset) + self._config = MeCabConfig() + self._keyword_handler = None + self._init_handler() + + def _init_handler(self): + """Initialize MeCab handler with configuration.""" + try: + self._keyword_handler = MeCabKeywordTableHandler( + dictionary_path=self._config.dictionary_path, + user_dictionary_path=self._config.user_dictionary_path + ) + if self._config.pos_weights: + self._keyword_handler.pos_weights = self._config.pos_weights + self._keyword_handler.min_score = self._config.score_threshold + except Exception as e: + logger.error(f"Failed to initialize MeCab handler: {str(e)}") + raise KeywordProcessorError(f"MeCab initialization failed: {str(e)}") + + def create(self, texts: list[Document], **kwargs) -> BaseKeyword: + """Create keyword index for documents.""" + lock_name = f"keyword_indexing_lock_{self.dataset.id}" + with redis_client.lock(lock_name, timeout=600): + keyword_table = self._get_dataset_keyword_table() + + for text in texts: + keywords = self._keyword_handler.extract_keywords( + text.page_content, + self._config.max_keywords_per_chunk + ) + if text.metadata is not None: + self._update_segment_keywords( + self.dataset.id, + text.metadata["doc_id"], + list(keywords) + ) + keyword_table = self._add_text_to_keyword_table( + keyword_table or {}, + text.metadata["doc_id"], + list(keywords) + ) + + self._save_dataset_keyword_table(keyword_table) + return self + + def add_texts(self, texts: list[Document], **kwargs): + """Add new texts to existing index.""" + lock_name = f"keyword_indexing_lock_{self.dataset.id}" + with redis_client.lock(lock_name, timeout=600): + keyword_table = self._get_dataset_keyword_table() + keywords_list = kwargs.get("keywords_list") + + for i, text in enumerate(texts): + if keywords_list: + keywords = keywords_list[i] + if not keywords: + keywords = self._keyword_handler.extract_keywords( + text.page_content, + self._config.max_keywords_per_chunk + ) + else: + keywords = self._keyword_handler.extract_keywords( + text.page_content, + self._config.max_keywords_per_chunk + ) + + if text.metadata is not None: + self._update_segment_keywords( + self.dataset.id, + text.metadata["doc_id"], + list(keywords) + ) + keyword_table = self._add_text_to_keyword_table( + keyword_table or {}, + text.metadata["doc_id"], + list(keywords) + ) + + self._save_dataset_keyword_table(keyword_table) + + def text_exists(self, id: str) -> bool: + """Check if text exists in index.""" + keyword_table = self._get_dataset_keyword_table() + if keyword_table is None: + return False + return id in set.union(*keyword_table.values()) if keyword_table else False + + def delete_by_ids(self, ids: list[str]) -> None: + """Delete texts by IDs.""" + lock_name = f"keyword_indexing_lock_{self.dataset.id}" + with redis_client.lock(lock_name, timeout=600): + keyword_table = self._get_dataset_keyword_table() + if keyword_table is not None: + keyword_table = self._delete_ids_from_keyword_table(keyword_table, ids) + self._save_dataset_keyword_table(keyword_table) + + def delete(self) -> None: + """Delete entire index.""" + lock_name = f"keyword_indexing_lock_{self.dataset.id}" + with redis_client.lock(lock_name, timeout=600): + dataset_keyword_table = self.dataset.dataset_keyword_table + if dataset_keyword_table: + db.session.delete(dataset_keyword_table) + db.session.commit() + if dataset_keyword_table.data_source_type != "database": + file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt" + storage.delete(file_key) + + def search(self, query: str, **kwargs: Any) -> list[Document]: + """Search documents using keywords.""" + keyword_table = self._get_dataset_keyword_table() + k = kwargs.get("top_k", 4) + + sorted_chunk_indices = self._retrieve_ids_by_query( + keyword_table or {}, + query, + k + ) + + documents = [] + for chunk_index in sorted_chunk_indices: + segment = ( + db.session.query(DocumentSegment) + .filter( + DocumentSegment.dataset_id == self.dataset.id, + DocumentSegment.index_node_id == chunk_index + ) + .first() + ) + + if segment: + documents.append( + Document( + page_content=segment.content, + metadata={ + "doc_id": chunk_index, + "doc_hash": segment.index_node_hash, + "document_id": segment.document_id, + "dataset_id": segment.dataset_id, + }, + ) + ) + + return documents + + def _get_dataset_keyword_table(self) -> Optional[dict]: + """Get keyword table from storage.""" + dataset_keyword_table = self.dataset.dataset_keyword_table + if dataset_keyword_table: + keyword_table_dict = dataset_keyword_table.keyword_table_dict + if keyword_table_dict: + return dict(keyword_table_dict["__data__"]["table"]) + return {} + + def _save_dataset_keyword_table(self, keyword_table): + """Save keyword table to storage.""" + table_dict = { + "__type__": "keyword_table", + "__data__": { + "index_id": self.dataset.id, + "summary": None, + "table": keyword_table + } + } + + dataset_keyword_table = self.dataset.dataset_keyword_table + data_source_type = dataset_keyword_table.data_source_type + + if data_source_type == "database": + dataset_keyword_table.keyword_table = json.dumps(table_dict, cls=SetEncoder) + db.session.commit() + else: + file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt" + if storage.exists(file_key): + storage.delete(file_key) + storage.save( + file_key, + json.dumps(table_dict, cls=SetEncoder).encode("utf-8") + ) + + def _add_text_to_keyword_table(self, keyword_table: dict, id: str, keywords: list[str]) -> dict: + """Add text keywords to table.""" + for keyword in keywords: + if keyword not in keyword_table: + keyword_table[keyword] = set() + keyword_table[keyword].add(id) + return keyword_table + + def _delete_ids_from_keyword_table(self, keyword_table: dict, ids: list[str]) -> dict: + """Delete IDs from keyword table.""" + node_idxs_to_delete = set(ids) + keywords_to_delete = set() + + for keyword, node_idxs in keyword_table.items(): + if node_idxs_to_delete.intersection(node_idxs): + keyword_table[keyword] = node_idxs.difference(node_idxs_to_delete) + if not keyword_table[keyword]: + keywords_to_delete.add(keyword) + + for keyword in keywords_to_delete: + del keyword_table[keyword] + + return keyword_table + + def _retrieve_ids_by_query(self, keyword_table: dict, query: str, k: int = 4): + """Retrieve document IDs by query.""" + keywords = self._keyword_handler.extract_keywords(query) + + # Score documents based on matching keywords + chunk_indices_count = defaultdict(int) + keywords_list = [ + keyword for keyword in keywords + if keyword in set(keyword_table.keys()) + ] + + for keyword in keywords_list: + for node_id in keyword_table[keyword]: + chunk_indices_count[node_id] += 1 + + sorted_chunk_indices = sorted( + chunk_indices_count.keys(), + key=lambda x: chunk_indices_count[x], + reverse=True + ) + + return sorted_chunk_indices[:k] + + def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: list[str]): + """Update segment keywords in database.""" + document_segment = ( + db.session.query(DocumentSegment) + .filter( + DocumentSegment.dataset_id == dataset_id, + DocumentSegment.index_node_id == node_id + ) + .first() + ) + + if document_segment: + document_segment.keywords = keywords + db.session.add(document_segment) + db.session.commit() diff --git a/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py new file mode 100644 index 0000000000..0eaf230300 --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/mecab_keyword_table_handler.py @@ -0,0 +1,152 @@ +import re +from typing import Optional, Set +import MeCab +from collections import defaultdict + +from core.rag.datasource.keyword.mecab.stopwords import STOPWORDS + +class MeCabKeywordTableHandler: + """Japanese keyword extraction using MeCab morphological analyzer.""" + + def __init__(self, dictionary_path: str = "", user_dictionary_path: str = ""): + """Initialize MeCab tokenizer. + + Args: + dictionary_path: Path to custom system dictionary + user_dictionary_path: Path to user dictionary + """ + try: + # Build MeCab argument string + mecab_args = ["-Ochasen"] # Use ChaSen format for detailed POS info + if dictionary_path: + mecab_args.append(f"-d {dictionary_path}") + if user_dictionary_path: + mecab_args.append(f"-u {user_dictionary_path}") + + self.tagger = MeCab.Tagger(" ".join(mecab_args)) + self.tagger.parse('') # Force initialization to catch dictionary errors + + except RuntimeError as e: + raise RuntimeError(f"Failed to initialize MeCab: {str(e)}") + + # POS weights for scoring + self.pos_weights = { + '名詞': 1.0, # Nouns + '動詞': 0.8, # Verbs + '形容詞': 0.6, # Adjectives + '副詞': 0.4, # Adverbs + '連体詞': 0.3, # Adnominal adjectives + '感動詞': 0.2, # Interjections + } + self.min_score = 0.3 + + def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> Set[str]: + """Extract keywords from Japanese text using MeCab. + + Args: + text: Input text to extract keywords from + max_keywords_per_chunk: Maximum number of keywords to extract + + Returns: + Set of extracted keywords + """ + if not text or not text.strip(): + return set() + + try: + # Parse text with MeCab + self.tagger.parse('') # Clear tagger state + node = self.tagger.parseToNode(text) + + # Calculate term frequencies and scores + term_scores = defaultdict(float) + while node: + features = node.feature.split(',') + if len(features) > 0: + pos = features[0] # Part of speech + pos_subtype = features[1] if len(features) > 1 else '' + base_form = features[6] if len(features) > 6 else node.surface + + # Score the term based on its POS + if pos in self.pos_weights and base_form not in STOPWORDS: + score = self.pos_weights[pos] + # Boost proper nouns and technical terms + if pos == '名詞' and pos_subtype in ['固有名詞', '専門用語']: + score *= 1.5 + if len(base_form) > 1: # Filter out single characters + term_scores[base_form] += score + + node = node.next + + # Get top scoring terms + sorted_terms = sorted( + term_scores.items(), + key=lambda x: x[1], + reverse=True + ) + + # Filter by minimum score and take top N + keywords = { + term for term, score in sorted_terms + if score >= self.min_score + } + + if max_keywords_per_chunk: + keywords = set(list(keywords)[:max_keywords_per_chunk]) + + # Expand with compound terms + expanded_keywords = self._expand_tokens_with_compounds(keywords, text) + + return expanded_keywords + + except Exception as e: + raise RuntimeError(f"Failed to extract keywords: {str(e)}") + + def _expand_tokens_with_compounds(self, keywords: Set[str], text: str) -> Set[str]: + """Expand keywords with compound terms. + + This method looks for adjacent keywords in the original text to capture + compound terms like '機械学習' (machine learning) or '自然言語処理' (natural language processing). + """ + results = set(keywords) + + try: + # Parse again to find compounds + node = self.tagger.parseToNode(text) + compound = [] + compound_readings = [] # For handling different forms of the same compound + + while node: + features = node.feature.split(',') + if len(features) > 6: + base_form = features[6] + reading = features[7] if len(features) > 7 else None + else: + base_form = node.surface + reading = None + + if base_form in keywords: + compound.append(base_form) + if reading: + compound_readings.append(reading) + else: + if len(compound) > 1: + # Add the compound term + compound_term = ''.join(compound) + if len(compound_term) > 1: + results.add(compound_term) + # If readings are available, add normalized form + if compound_readings: + normalized_term = ''.join(compound_readings) + if normalized_term != compound_term: + results.add(normalized_term) + compound = [] + compound_readings = [] + + node = node.next + + return results + + except Exception as e: + # If compound expansion fails, return original keywords + return keywords diff --git a/api/core/rag/datasource/keyword/mecab/stopwords.py b/api/core/rag/datasource/keyword/mecab/stopwords.py new file mode 100644 index 0000000000..13802ac947 --- /dev/null +++ b/api/core/rag/datasource/keyword/mecab/stopwords.py @@ -0,0 +1,36 @@ +STOPWORDS = { + # Japanese particles and basic stopwords + "は", "が", "の", "に", "を", "で", "へ", "と", "から", "より", "まで", "によって", + "あそこ", "あっ", "あの", "あのかた", "あの人", "あり", "あります", "ある", "あれ", + "い", "いう", "います", "いる", "う", "うち", "え", "お", "および", "おり", "おります", + "か", "かつて", "から", "が", "き", "ここ", "こちら", "こと", "この", "これ", "これら", + "さ", "さらに", "し", "しかし", "する", "ず", "せ", "せる", "そこ", "そして", "その", + "その他", "その後", "それ", "それぞれ", "それで", "た", "ただし", "たち", "ため", "たり", + "だ", "だっ", "だれ", "つ", "て", "で", "でき", "できる", "です", "では", "でも", "と", + "という", "といった", "とき", "ところ", "として", "とともに", "とも", "と共に", "どこ", + "どの", "な", "ない", "なお", "なかっ", "ながら", "なく", "なっ", "など", "なに", "なら", + "なり", "なる", "なん", "に", "において", "における", "について", "にて", "によって", "により", + "による", "に対して", "に対する", "に関する", "の", "ので", "のみ", "は", "ば", "へ", "ほか", + "ほとんど", "ほど", "ます", "また", "または", "まで", "も", "もの", "ものの", "や", "よう", + "より", "ら", "られ", "られる", "れ", "れる", "を", "ん", "何", "及び", "彼", "彼女", + "我々", "特に", "私", "私達", "貴方", "貴方方", + + # Japanese auxiliary verbs + "です", "ます", "でした", "ました", "である", "だ", "な", "だった", + + # Japanese pronouns + "これ", "それ", "あれ", "この", "その", "あの", "ここ", "そこ", "あそこ", + + # Japanese common words + "いる", "ある", "なる", "する", "できる", "おる", "いく", "くる", + + # Numbers + "一", "二", "三", "四", "五", "六", "七", "八", "九", "十", + "1", "2", "3", "4", "5", "6", "7", "8", "9", "0", + + # Punctuation + "、", "。", "「", "」", "『", "』", "(", ")", "[", "]", + + # Common English stopwords (for mixed text) + "the", "is", "at", "which", "on", "in", "and", "or", "a", "an", +}