[WIP] before final test

This commit is contained in:
Dr. Kiji 2025-01-02 20:11:44 +09:00
parent 75dd8677b9
commit 4f5a4e7194
2 changed files with 287 additions and 167 deletions

View File

@ -1,6 +1,6 @@
# MeCab Keyword Processor
# MeCab Keyword Processor for Dify
A Japanese text keyword extraction module using MeCab morphological analyzer for the Dify RAG system.
A Japanese text keyword extraction module for Dify's RAG system, powered by MeCab morphological analyzer.
## Overview
@ -85,189 +85,255 @@ Comprehensive Japanese stopword list including:
from core.rag.datasource.keyword.keyword_factory import Keyword
from models.dataset import Dataset
# Initialize
dataset = Dataset(...)
keyword_processor = Keyword(dataset) # Will use MeCab if KEYWORD_STORE = "mecab"
# Initialize with KEYWORD_STORE = "mecab" in config
keyword_processor = Keyword(dataset)
# Process text
# Process documents
documents = [
Document(
page_content="自然言語処理は人工知能の重要な分野です。",
metadata={"doc_id": "1", ...}
metadata={"doc_id": "1"}
)
]
keyword_processor.create(documents)
# Search
results = keyword_processor.search("自然言語処理について")
results = keyword_processor.search("自然言語処理")
```
### Custom Dictionary Usage
## Configuration
### Basic Settings
```python
# In your configuration:
KEYWORD_PROCESSOR_CONFIG = {
"dictionary_path": "/path/to/mecab/dict",
"user_dictionary_path": "/path/to/user.dic",
# In your environment configuration:
KEYWORD_STORE = "mecab"
KEYWORD_DATA_SOURCE_TYPE = "database" # or other supported storage types
```
### Advanced Settings
```python
# MeCab-specific configuration
MECAB_CONFIG = {
"max_keywords_per_chunk": 10,
"score_threshold": 0.3,
"dictionary_path": "/path/to/dict", # Optional
"user_dictionary_path": "/path/to/user_dict", # Optional
"pos_weights": {
"名詞": 1.2,
"動詞": 0.8,
# ... customize weights
"名詞": 1.0, # Nouns
"動詞": 0.8, # Verbs
"形容詞": 0.6 # Adjectives
}
}
```
## Features
## Key Features
### 1. Keyword Extraction
### 1. Intelligent Keyword Extraction
- **POS-based Scoring**:
- Weights different parts of speech
- Boosts important terms
- Configurable scoring thresholds
- Part-of-speech based scoring
- Compound word detection
- Technical term recognition
- Reading normalization for variations
- **Compound Word Detection**:
### 2. Storage Options
```python
# Input text: "自然言語処理の研究"
# Detected compounds:
# - "自然言語"
# - "自然言語処理"
# - "言語処理"
```
- **Reading Normalization**:
```python
# Handles variations:
# - "データベース" (katakana)
# - "データベース" (with readings)
# Both normalize to same term
```
### 2. Storage
- **Flexible Storage Options**:
- Database storage
- File-based storage
- Redis-based locking for concurrency
- **Data Structure**:
```python
{
"__type__": "keyword_table",
"__data__": {
"index_id": "dataset_id",
"table": {
"keyword1": ["doc_id1", "doc_id2"],
"keyword2": ["doc_id2", "doc_id3"],
}
}
}
```
- Database storage (default)
- File-based storage
- Concurrent access support via Redis locking
### 3. Error Handling
- Comprehensive error handling
- Custom exception classes
- Logging integration
- Comprehensive exception handling
- Detailed logging
- Graceful fallbacks
## Performance Considerations
1. **Memory Usage**:
- Efficient keyword table structure
- Batch processing support
- Caching mechanisms
2. **Concurrency**:
- Redis-based locking
- Transaction handling
- Safe concurrent access
3. **Optimization Tips**:
- Use appropriate batch sizes
- Configure caching timeouts
- Adjust scoring thresholds
## Dependencies
- MeCab and Python bindings:
```bash
# Ubuntu/Debian
apt-get install mecab mecab-ipadic-utf8 python3-mecab
```bash
# Ubuntu/Debian
apt-get install mecab mecab-ipadic-utf8 python3-mecab
# macOS
brew install mecab mecab-ipadic
pip install mecab-python3
```
# macOS
brew install mecab mecab-ipadic
pip install mecab-python3
```
## Best Practices
1. **Dictionary Management**:
- Keep dictionaries updated
- Use domain-specific user dictionaries
- Regular maintenance of custom terms
1. **Performance**
- Use batch processing for large datasets
- Configure appropriate cache timeouts
- Monitor memory usage
2. **Configuration Tuning**:
2. **Customization**
- Update dictionaries regularly
- Adjust POS weights for your use case
- Set appropriate thresholds
- Monitor and adjust batch sizes
3. **Error Handling**:
3. **Error Handling**
- Implement proper logging
- Monitor extraction quality
- Handle edge cases
- Handle dictionary loading errors
- Manage concurrent access
## Testing
## Example Usage
Example test cases:
### Basic Keyword Extraction
```python
def test_basic_extraction():
text = "自然言語処理は人工知能の重要な分野です。"
keywords = handler.extract_keywords(text)
assert "自然言語処理" in keywords
assert "人工知能" in keywords
def test_compound_words():
text = "機械学習モデルを使った自然言語処理"
keywords = handler.extract_keywords(text)
assert "機械学習" in keywords
assert "自然言語処理" in keywords
def test_mixed_text():
text = "AIを使った自然言語処理のResearch"
keywords = handler.extract_keywords(text)
assert "AI" in keywords
assert "自然言語処理" in keywords
assert "Research" in keywords
# Extract keywords from text
text = "自然言語処理は人工知能の重要な分野です。"
keywords = keyword_processor.create([
Document(page_content=text, metadata={"doc_id": "1"})
])
```
## Common Issues and Solutions
### Custom Dictionary
1. **Dictionary Loading Failures**:
```python
# Use custom dictionary
config = MeCabConfig(
dictionary_path="/path/to/dict",
user_dictionary_path="/path/to/user.dic"
)
```
### Batch Processing
```python
# Process multiple documents
documents = [
Document(page_content=text1, metadata={"doc_id": "1"}),
Document(page_content=text2, metadata={"doc_id": "2"})
]
keyword_processor.create(documents)
```
## Integration with Dify
The MeCab processor integrates seamlessly with Dify's existing keyword system:
1. Implements the `BaseKeyword` interface
2. Works with the keyword factory system
3. Supports all standard operations:
- Document indexing
- Keyword extraction
- Search functionality
- Index management
## Common Issues
1. **Dictionary Loading**
```python
try:
handler = MeCabKeywordTableHandler(dictionary_path=path)
except RuntimeError as e:
# Handle dictionary loading error
keyword_processor.create(documents)
except KeywordProcessorError as e:
logger.error("Dictionary loading failed: %s", str(e))
```
2. **Memory Usage**:
2. **Memory Management**
```python
# Use batch processing for large datasets
for batch in chunks(documents, size=100):
process_batch(batch)
# Process in batches
batch_size = 100
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
keyword_processor.create(batch)
```
3. **Concurrent Access**:
3. **Concurrent Access**
```python
with redis_client.lock(f"lock_{dataset_id}"):
# Safe concurrent operations
# Handled automatically via Redis locks
keyword_processor.create(documents) # Safe for concurrent use
```
For more details, refer to the [Dify Documentation](https://docs.dify.ai).
## Text Processing Examples
### Compound Words
The MeCab processor intelligently handles compound words in Japanese text:
```python
text = "人工知能と機械学習の研究を行っています。"
keywords = keyword_processor.create([
Document(page_content=text, metadata={"doc_id": "1"})
])
# Extracted keywords include:
# - "人工知能" (artificial intelligence - compound)
# - "機械学習" (machine learning - compound)
# - "研究" (research - single)
```
Complex technical terms are properly recognized:
```python
text = "自然言語処理における深層学習の応用"
# Extracts:
# - "自然言語処理" (natural language processing)
# - "深層学習" (deep learning)
# - "応用" (application)
```
### Stopwords Handling
Common particles and auxiliary words are automatically filtered:
```python
text = "私はデータベースの設計をしています。"
# Ignores:
# - "は" (particle)
# - "の" (particle)
# - "を" (particle)
# - "います" (auxiliary verb)
# Extracts:
# - "データベース" (database)
# - "設計" (design)
```
Mixed language text is also handled appropriately:
```python
text = "AIシステムのパフォーマンスを改善する。"
# Ignores:
# - "の" (particle)
# - "を" (particle)
# - "する" (auxiliary verb)
# Extracts:
# - "AI" (kept as is)
# - "システム" (system)
# - "パフォーマンス" (performance)
# - "改善" (improvement)
```
### Reading Variations
The processor normalizes different forms of the same word:
```python
text1 = "データベース設計" # カタカナ
text2 = "データベース設計" # with readings
# Both normalize to the same keywords:
# - "データベース"
# - "設計"
```
### Technical Term Boosting
Technical terms receive higher scores in keyword extraction:
```python
text = "機械学習モデルを用いた自然言語処理の研究"
# Prioritizes technical terms:
# High score:
# - "機械学習" (machine learning)
# - "自然言語処理" (natural language processing)
# Lower score:
# - "研究" (research)
# - "モデル" (model)
```

View File

@ -2,7 +2,7 @@ import json
import logging
import os
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set
from typing import Any, Optional
from core.rag.datasource.keyword.keyword_base import BaseKeyword
from core.rag.datasource.keyword.mecab.config import MeCabConfig
@ -18,21 +18,25 @@ logger = logging.getLogger(__name__)
class KeywordProcessorError(Exception):
"""Base error for keyword processing."""
pass
class KeywordExtractionError(KeywordProcessorError):
"""Error during keyword extraction."""
pass
class KeywordStorageError(KeywordProcessorError):
"""Error during storage operations."""
pass
class SetEncoder(json.JSONEncoder):
"""JSON encoder that handles sets."""
def default(self, obj):
if isinstance(obj, set):
return list(obj)
@ -52,8 +56,7 @@ class MeCab(BaseKeyword):
"""Initialize MeCab handler with configuration."""
try:
self._keyword_handler = MeCabKeywordTableHandler(
dictionary_path=self._config.dictionary_path,
user_dictionary_path=self._config.user_dictionary_path
dictionary_path=self._config.dictionary_path, user_dictionary_path=self._config.user_dictionary_path
)
if self._config.pos_weights:
self._keyword_handler.pos_weights = self._config.pos_weights
@ -62,8 +65,21 @@ class MeCab(BaseKeyword):
logger.exception("Failed to initialize MeCab handler")
raise KeywordProcessorError("MeCab initialization failed: {}".format(str(e)))
def create(self, texts: List[Document], **kwargs: Any) -> BaseKeyword:
"""Create keyword index for documents."""
def create(self, texts: list[Document], **kwargs: Any) -> BaseKeyword:
"""Create keyword index for documents.
Args:
texts: List of documents to index
**kwargs: Additional arguments
Returns:
BaseKeyword: Self for method chaining
Raises:
KeywordProcessorError: If indexing fails
KeywordExtractionError: If keyword extraction fails
KeywordStorageError: If storage operations fail
"""
if not texts:
return self
@ -105,8 +121,17 @@ class MeCab(BaseKeyword):
return self
def add_texts(self, texts: List[Document], **kwargs: Any) -> None:
"""Add new texts to existing index."""
def add_texts(self, texts: list[Document], **kwargs: Any) -> None:
"""Add new texts to existing index.
Args:
texts: List of documents to add
**kwargs: Additional arguments including optional keywords_list
Raises:
KeywordProcessorError: If indexing fails
KeywordStorageError: If storage operations fail
"""
if not texts:
return
@ -156,17 +181,38 @@ class MeCab(BaseKeyword):
raise
def text_exists(self, id: str) -> bool:
"""Check if text exists in index."""
"""Check if text exists in index.
Args:
id: Document ID to check
Returns:
bool: True if text exists, False otherwise
Raises:
KeywordProcessorError: If check fails
"""
if not id:
return False
keyword_table = self._get_dataset_keyword_table()
if keyword_table is None:
return False
return id in set.union(*keyword_table.values()) if keyword_table else False
try:
keyword_table = self._get_dataset_keyword_table()
if keyword_table is None:
return False
return id in set.union(*keyword_table.values()) if keyword_table else False
except Exception as e:
logger.exception("Failed to check text existence")
raise KeywordProcessorError("Failed to check text existence: {}".format(str(e)))
def delete_by_ids(self, ids: List[str]) -> None:
"""Delete texts by IDs."""
def delete_by_ids(self, ids: list[str]) -> None:
"""Delete texts by IDs.
Args:
ids: List of document IDs to delete
Raises:
KeywordStorageError: If deletion fails
"""
if not ids:
return
@ -182,7 +228,11 @@ class MeCab(BaseKeyword):
raise KeywordStorageError("Failed to delete documents: {}".format(str(e)))
def delete(self) -> None:
"""Delete entire index."""
"""Delete entire index.
Raises:
KeywordStorageError: If deletion fails
"""
lock_name = "keyword_indexing_lock_{}".format(self.dataset.id)
try:
with redis_client.lock(lock_name, timeout=600):
@ -197,8 +247,19 @@ class MeCab(BaseKeyword):
logger.exception("Failed to delete index")
raise KeywordStorageError("Failed to delete index: {}".format(str(e)))
def search(self, query: str, **kwargs: Any) -> List[Document]:
"""Search documents using keywords."""
def search(self, query: str, **kwargs: Any) -> list[Document]:
"""Search documents using keywords.
Args:
query: Search query string
**kwargs: Additional arguments including optional top_k
Returns:
List[Document]: List of matching documents
Raises:
KeywordProcessorError: If search fails
"""
if not query:
return []
@ -214,10 +275,7 @@ class MeCab(BaseKeyword):
for chunk_index in sorted_chunk_indices:
segment = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == self.dataset.id,
DocumentSegment.index_node_id == chunk_index
)
.filter(DocumentSegment.dataset_id == self.dataset.id, DocumentSegment.index_node_id == chunk_index)
.first()
)
@ -239,7 +297,7 @@ class MeCab(BaseKeyword):
logger.exception("Failed to search documents")
raise KeywordProcessorError("Search failed: {}".format(str(e)))
def _get_dataset_keyword_table(self) -> Optional[Dict[str, Set[str]]]:
def _get_dataset_keyword_table(self) -> Optional[dict[str, set[str]]]:
"""Get keyword table from storage."""
try:
dataset_keyword_table = self.dataset.dataset_keyword_table
@ -273,7 +331,7 @@ class MeCab(BaseKeyword):
logger.exception("Failed to get keyword table")
raise KeywordStorageError("Failed to get keyword table: {}".format(str(e)))
def _save_dataset_keyword_table(self, keyword_table: Dict[str, Set[str]]) -> None:
def _save_dataset_keyword_table(self, keyword_table: dict[str, set[str]]) -> None:
"""Save keyword table to storage."""
if keyword_table is None:
raise ValueError("Keyword table cannot be None")
@ -303,8 +361,8 @@ class MeCab(BaseKeyword):
raise KeywordStorageError("Failed to save keyword table: {}".format(str(e)))
def _add_text_to_keyword_table(
self, keyword_table: Dict[str, Set[str]], id: str, keywords: List[str]
) -> Dict[str, Set[str]]:
self, keyword_table: dict[str, set[str]], id: str, keywords: list[str]
) -> dict[str, set[str]]:
"""Add text keywords to table."""
if not id or not keywords:
return keyword_table
@ -315,9 +373,7 @@ class MeCab(BaseKeyword):
keyword_table[keyword].add(id)
return keyword_table
def _delete_ids_from_keyword_table(
self, keyword_table: Dict[str, Set[str]], ids: List[str]
) -> Dict[str, Set[str]]:
def _delete_ids_from_keyword_table(self, keyword_table: dict[str, set[str]], ids: list[str]) -> dict[str, set[str]]:
"""Delete IDs from keyword table."""
if not keyword_table or not ids:
return keyword_table
@ -336,9 +392,7 @@ class MeCab(BaseKeyword):
return keyword_table
def _retrieve_ids_by_query(
self, keyword_table: Dict[str, Set[str]], query: str, k: int = 4
) -> List[str]:
def _retrieve_ids_by_query(self, keyword_table: dict[str, set[str]], query: str, k: int = 4) -> list[str]:
"""Retrieve document IDs by query."""
if not query or not keyword_table:
return []
@ -366,9 +420,9 @@ class MeCab(BaseKeyword):
logger.exception("Failed to retrieve IDs by query")
raise KeywordExtractionError("Failed to retrieve IDs: {}".format(str(e)))
def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: List[str]) -> None:
def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: list[str]) -> None:
"""Update segment keywords in database."""
if not dataset_id or not node_id:
if not dataset_id or not node_id or not keywords:
return
try:
@ -386,7 +440,7 @@ class MeCab(BaseKeyword):
logger.exception("Failed to update segment keywords")
raise KeywordStorageError("Failed to update segment keywords: {}".format(str(e)))
def create_segment_keywords(self, node_id: str, keywords: List[str]) -> None:
def create_segment_keywords(self, node_id: str, keywords: list[str]) -> None:
"""Create keywords for a single segment.
Args:
@ -405,7 +459,7 @@ class MeCab(BaseKeyword):
logger.exception("Failed to create segment keywords")
raise KeywordProcessorError("Failed to create segment keywords: {}".format(str(e)))
def multi_create_segment_keywords(self, pre_segment_data_list: List[Dict[str, Any]]) -> None:
def multi_create_segment_keywords(self, pre_segment_data_list: list[dict[str, Any]]) -> None:
"""Create keywords for multiple segments in batch."""
if not pre_segment_data_list:
return
@ -443,7 +497,7 @@ class MeCab(BaseKeyword):
logger.exception("Failed to create multiple segment keywords")
raise KeywordProcessorError("Failed to create multiple segment keywords: {}".format(str(e)))
def update_segment_keywords_index(self, node_id: str, keywords: List[str]) -> None:
def update_segment_keywords_index(self, node_id: str, keywords: list[str]) -> None:
"""Update keywords index for a segment.
Args: