add mecab keywords handler for japanese

This commit is contained in:
Dr. Kiji 2025-01-02 18:50:23 +09:00
parent 375aa38f5d
commit 81c5953fa5
7 changed files with 771 additions and 1 deletions

View File

@ -22,8 +22,10 @@ class Keyword:
match keyword_type:
case KeyWordType.JIEBA:
from core.rag.datasource.keyword.jieba.jieba import Jieba
return Jieba
case KeyWordType.MECAB:
from core.rag.datasource.keyword.mecab.mecab import MeCab
return MeCab
case _:
raise ValueError(f"Keyword store {keyword_type} is not supported.")

View File

@ -3,3 +3,4 @@ from enum import StrEnum
class KeyWordType(StrEnum):
JIEBA = "jieba"
MECAB = "mecab"

View File

@ -0,0 +1,273 @@
# MeCab Keyword Processor
A Japanese text keyword extraction module using MeCab morphological analyzer for the Dify RAG system.
## Overview
This module provides Japanese text keyword extraction capabilities using the MeCab morphological analyzer. It's designed to:
- Extract meaningful keywords from Japanese text
- Handle compound words and technical terms
- Support custom dictionaries
- Provide configurable scoring based on parts of speech
- Handle mixed Japanese-English text
## Components
### 1. MeCabKeywordTableHandler
The core component responsible for keyword extraction using MeCab:
```python
handler = MeCabKeywordTableHandler(
dictionary_path="/path/to/dict", # Optional custom dictionary
user_dictionary_path="/path/to/user_dict" # Optional user dictionary
)
keywords = handler.extract_keywords(text, max_keywords=10)
```
#### Features:
- **Part of Speech (POS) Weighting**:
```python
pos_weights = {
'名詞': 1.0, # Nouns
'動詞': 0.8, # Verbs
'形容詞': 0.6, # Adjectives
'副詞': 0.4, # Adverbs
'連体詞': 0.3, # Adnominal adjectives
'感動詞': 0.2, # Interjections
}
```
- **Special Term Handling**:
- Boosts scores for proper nouns (固有名詞)
- Boosts scores for technical terms (専門用語)
- Compound word detection (e.g., "機械学習", "自然言語処理")
- **Reading Normalization**:
- Handles different forms of the same word
- Normalizes compound terms using readings
### 2. Configuration (MeCabConfig)
Configurable settings for the processor:
```python
class MeCabConfig(BaseModel):
max_keywords_per_chunk: int = 10
min_keyword_length: int = 2
score_threshold: float = 0.3
storage_type: str = "database"
cache_timeout: int = 3600
dictionary_path: str = ""
user_dictionary_path: str = ""
pos_weights: dict = {...}
```
### 3. Stopwords
Comprehensive Japanese stopword list including:
- Particles (は, が, の, etc.)
- Auxiliary verbs (です, ます, etc.)
- Pronouns (これ, それ, etc.)
- Common words
- Numbers and punctuation
- Common English stopwords for mixed text
## Usage
### Basic Usage
```python
from core.rag.datasource.keyword.keyword_factory import Keyword
from models.dataset import Dataset
# Initialize
dataset = Dataset(...)
keyword_processor = Keyword(dataset) # Will use MeCab if KEYWORD_STORE = "mecab"
# Process text
documents = [
Document(
page_content="自然言語処理は人工知能の重要な分野です。",
metadata={"doc_id": "1", ...}
)
]
keyword_processor.create(documents)
# Search
results = keyword_processor.search("自然言語処理について")
```
### Custom Dictionary Usage
```python
# In your configuration:
KEYWORD_PROCESSOR_CONFIG = {
"dictionary_path": "/path/to/mecab/dict",
"user_dictionary_path": "/path/to/user.dic",
"pos_weights": {
"名詞": 1.2,
"動詞": 0.8,
# ... customize weights
}
}
```
## Features
### 1. Keyword Extraction
- **POS-based Scoring**:
- Weights different parts of speech
- Boosts important terms
- Configurable scoring thresholds
- **Compound Word Detection**:
```python
# Input text: "自然言語処理の研究"
# Detected compounds:
# - "自然言語"
# - "自然言語処理"
# - "言語処理"
```
- **Reading Normalization**:
```python
# Handles variations:
# - "データベース" (katakana)
# - "データベース" (with readings)
# Both normalize to same term
```
### 2. Storage
- **Flexible Storage Options**:
- Database storage
- File-based storage
- Redis-based locking for concurrency
- **Data Structure**:
```python
{
"__type__": "keyword_table",
"__data__": {
"index_id": "dataset_id",
"table": {
"keyword1": ["doc_id1", "doc_id2"],
"keyword2": ["doc_id2", "doc_id3"],
}
}
}
```
### 3. Error Handling
- Comprehensive error handling
- Custom exception classes
- Logging integration
- Graceful fallbacks
## Performance Considerations
1. **Memory Usage**:
- Efficient keyword table structure
- Batch processing support
- Caching mechanisms
2. **Concurrency**:
- Redis-based locking
- Transaction handling
- Safe concurrent access
3. **Optimization Tips**:
- Use appropriate batch sizes
- Configure caching timeouts
- Adjust scoring thresholds
## Dependencies
- MeCab and Python bindings:
```bash
# Ubuntu/Debian
apt-get install mecab mecab-ipadic-utf8 python3-mecab
# macOS
brew install mecab mecab-ipadic
pip install mecab-python3
```
## Best Practices
1. **Dictionary Management**:
- Keep dictionaries updated
- Use domain-specific user dictionaries
- Regular maintenance of custom terms
2. **Configuration Tuning**:
- Adjust POS weights for your use case
- Set appropriate thresholds
- Monitor and adjust batch sizes
3. **Error Handling**:
- Implement proper logging
- Monitor extraction quality
- Handle edge cases
## Testing
Example test cases:
```python
def test_basic_extraction():
text = "自然言語処理は人工知能の重要な分野です。"
keywords = handler.extract_keywords(text)
assert "自然言語処理" in keywords
assert "人工知能" in keywords
def test_compound_words():
text = "機械学習モデルを使った自然言語処理"
keywords = handler.extract_keywords(text)
assert "機械学習" in keywords
assert "自然言語処理" in keywords
def test_mixed_text():
text = "AIを使った自然言語処理のResearch"
keywords = handler.extract_keywords(text)
assert "AI" in keywords
assert "自然言語処理" in keywords
assert "Research" in keywords
```
## Common Issues and Solutions
1. **Dictionary Loading Failures**:
```python
try:
handler = MeCabKeywordTableHandler(dictionary_path=path)
except RuntimeError as e:
# Handle dictionary loading error
```
2. **Memory Usage**:
```python
# Use batch processing for large datasets
for batch in chunks(documents, size=100):
process_batch(batch)
```
3. **Concurrent Access**:
```python
with redis_client.lock(f"lock_{dataset_id}"):
# Safe concurrent operations
```

View File

@ -0,0 +1,19 @@
from pydantic import BaseModel
class MeCabConfig(BaseModel):
"""Configuration for MeCab keyword processor."""
max_keywords_per_chunk: int = 10
min_keyword_length: int = 2
score_threshold: float = 0.3
storage_type: str = "database"
cache_timeout: int = 3600
# MeCab specific settings
dictionary_path: str = "" # Optional custom dictionary path
user_dictionary_path: str = "" # Optional user dictionary path
pos_weights: dict = {
'名詞': 1.0, # Nouns
'動詞': 0.8, # Verbs
'形容詞': 0.6, # Adjectives
'副詞': 0.4, # Adverbs
}

View File

@ -0,0 +1,287 @@
import json
import logging
from typing import Any, Optional
from collections import defaultdict
from core.rag.datasource.keyword.keyword_base import BaseKeyword
from core.rag.datasource.keyword.mecab.mecab_keyword_table_handler import MeCabKeywordTableHandler
from core.rag.datasource.keyword.mecab.config import MeCabConfig
from core.rag.models.document import Document
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from models.dataset import Dataset, DatasetKeywordTable, DocumentSegment
logger = logging.getLogger(__name__)
class KeywordProcessorError(Exception):
"""Base error for keyword processing."""
pass
class KeywordExtractionError(KeywordProcessorError):
"""Error during keyword extraction."""
pass
class KeywordStorageError(KeywordProcessorError):
"""Error during storage operations."""
pass
class SetEncoder(json.JSONEncoder):
"""JSON encoder that handles sets."""
def default(self, obj):
if isinstance(obj, set):
return list(obj)
return super().default(obj)
class MeCab(BaseKeyword):
"""Japanese keyword processor using MeCab morphological analyzer."""
def __init__(self, dataset: Dataset):
super().__init__(dataset)
self._config = MeCabConfig()
self._keyword_handler = None
self._init_handler()
def _init_handler(self):
"""Initialize MeCab handler with configuration."""
try:
self._keyword_handler = MeCabKeywordTableHandler(
dictionary_path=self._config.dictionary_path,
user_dictionary_path=self._config.user_dictionary_path
)
if self._config.pos_weights:
self._keyword_handler.pos_weights = self._config.pos_weights
self._keyword_handler.min_score = self._config.score_threshold
except Exception as e:
logger.error(f"Failed to initialize MeCab handler: {str(e)}")
raise KeywordProcessorError(f"MeCab initialization failed: {str(e)}")
def create(self, texts: list[Document], **kwargs) -> BaseKeyword:
"""Create keyword index for documents."""
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
keyword_table = self._get_dataset_keyword_table()
for text in texts:
keywords = self._keyword_handler.extract_keywords(
text.page_content,
self._config.max_keywords_per_chunk
)
if text.metadata is not None:
self._update_segment_keywords(
self.dataset.id,
text.metadata["doc_id"],
list(keywords)
)
keyword_table = self._add_text_to_keyword_table(
keyword_table or {},
text.metadata["doc_id"],
list(keywords)
)
self._save_dataset_keyword_table(keyword_table)
return self
def add_texts(self, texts: list[Document], **kwargs):
"""Add new texts to existing index."""
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
keyword_table = self._get_dataset_keyword_table()
keywords_list = kwargs.get("keywords_list")
for i, text in enumerate(texts):
if keywords_list:
keywords = keywords_list[i]
if not keywords:
keywords = self._keyword_handler.extract_keywords(
text.page_content,
self._config.max_keywords_per_chunk
)
else:
keywords = self._keyword_handler.extract_keywords(
text.page_content,
self._config.max_keywords_per_chunk
)
if text.metadata is not None:
self._update_segment_keywords(
self.dataset.id,
text.metadata["doc_id"],
list(keywords)
)
keyword_table = self._add_text_to_keyword_table(
keyword_table or {},
text.metadata["doc_id"],
list(keywords)
)
self._save_dataset_keyword_table(keyword_table)
def text_exists(self, id: str) -> bool:
"""Check if text exists in index."""
keyword_table = self._get_dataset_keyword_table()
if keyword_table is None:
return False
return id in set.union(*keyword_table.values()) if keyword_table else False
def delete_by_ids(self, ids: list[str]) -> None:
"""Delete texts by IDs."""
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
keyword_table = self._get_dataset_keyword_table()
if keyword_table is not None:
keyword_table = self._delete_ids_from_keyword_table(keyword_table, ids)
self._save_dataset_keyword_table(keyword_table)
def delete(self) -> None:
"""Delete entire index."""
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
dataset_keyword_table = self.dataset.dataset_keyword_table
if dataset_keyword_table:
db.session.delete(dataset_keyword_table)
db.session.commit()
if dataset_keyword_table.data_source_type != "database":
file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt"
storage.delete(file_key)
def search(self, query: str, **kwargs: Any) -> list[Document]:
"""Search documents using keywords."""
keyword_table = self._get_dataset_keyword_table()
k = kwargs.get("top_k", 4)
sorted_chunk_indices = self._retrieve_ids_by_query(
keyword_table or {},
query,
k
)
documents = []
for chunk_index in sorted_chunk_indices:
segment = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == self.dataset.id,
DocumentSegment.index_node_id == chunk_index
)
.first()
)
if segment:
documents.append(
Document(
page_content=segment.content,
metadata={
"doc_id": chunk_index,
"doc_hash": segment.index_node_hash,
"document_id": segment.document_id,
"dataset_id": segment.dataset_id,
},
)
)
return documents
def _get_dataset_keyword_table(self) -> Optional[dict]:
"""Get keyword table from storage."""
dataset_keyword_table = self.dataset.dataset_keyword_table
if dataset_keyword_table:
keyword_table_dict = dataset_keyword_table.keyword_table_dict
if keyword_table_dict:
return dict(keyword_table_dict["__data__"]["table"])
return {}
def _save_dataset_keyword_table(self, keyword_table):
"""Save keyword table to storage."""
table_dict = {
"__type__": "keyword_table",
"__data__": {
"index_id": self.dataset.id,
"summary": None,
"table": keyword_table
}
}
dataset_keyword_table = self.dataset.dataset_keyword_table
data_source_type = dataset_keyword_table.data_source_type
if data_source_type == "database":
dataset_keyword_table.keyword_table = json.dumps(table_dict, cls=SetEncoder)
db.session.commit()
else:
file_key = f"keyword_files/{self.dataset.tenant_id}/{self.dataset.id}.txt"
if storage.exists(file_key):
storage.delete(file_key)
storage.save(
file_key,
json.dumps(table_dict, cls=SetEncoder).encode("utf-8")
)
def _add_text_to_keyword_table(self, keyword_table: dict, id: str, keywords: list[str]) -> dict:
"""Add text keywords to table."""
for keyword in keywords:
if keyword not in keyword_table:
keyword_table[keyword] = set()
keyword_table[keyword].add(id)
return keyword_table
def _delete_ids_from_keyword_table(self, keyword_table: dict, ids: list[str]) -> dict:
"""Delete IDs from keyword table."""
node_idxs_to_delete = set(ids)
keywords_to_delete = set()
for keyword, node_idxs in keyword_table.items():
if node_idxs_to_delete.intersection(node_idxs):
keyword_table[keyword] = node_idxs.difference(node_idxs_to_delete)
if not keyword_table[keyword]:
keywords_to_delete.add(keyword)
for keyword in keywords_to_delete:
del keyword_table[keyword]
return keyword_table
def _retrieve_ids_by_query(self, keyword_table: dict, query: str, k: int = 4):
"""Retrieve document IDs by query."""
keywords = self._keyword_handler.extract_keywords(query)
# Score documents based on matching keywords
chunk_indices_count = defaultdict(int)
keywords_list = [
keyword for keyword in keywords
if keyword in set(keyword_table.keys())
]
for keyword in keywords_list:
for node_id in keyword_table[keyword]:
chunk_indices_count[node_id] += 1
sorted_chunk_indices = sorted(
chunk_indices_count.keys(),
key=lambda x: chunk_indices_count[x],
reverse=True
)
return sorted_chunk_indices[:k]
def _update_segment_keywords(self, dataset_id: str, node_id: str, keywords: list[str]):
"""Update segment keywords in database."""
document_segment = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == dataset_id,
DocumentSegment.index_node_id == node_id
)
.first()
)
if document_segment:
document_segment.keywords = keywords
db.session.add(document_segment)
db.session.commit()

View File

@ -0,0 +1,152 @@
import re
from typing import Optional, Set
import MeCab
from collections import defaultdict
from core.rag.datasource.keyword.mecab.stopwords import STOPWORDS
class MeCabKeywordTableHandler:
"""Japanese keyword extraction using MeCab morphological analyzer."""
def __init__(self, dictionary_path: str = "", user_dictionary_path: str = ""):
"""Initialize MeCab tokenizer.
Args:
dictionary_path: Path to custom system dictionary
user_dictionary_path: Path to user dictionary
"""
try:
# Build MeCab argument string
mecab_args = ["-Ochasen"] # Use ChaSen format for detailed POS info
if dictionary_path:
mecab_args.append(f"-d {dictionary_path}")
if user_dictionary_path:
mecab_args.append(f"-u {user_dictionary_path}")
self.tagger = MeCab.Tagger(" ".join(mecab_args))
self.tagger.parse('') # Force initialization to catch dictionary errors
except RuntimeError as e:
raise RuntimeError(f"Failed to initialize MeCab: {str(e)}")
# POS weights for scoring
self.pos_weights = {
'名詞': 1.0, # Nouns
'動詞': 0.8, # Verbs
'形容詞': 0.6, # Adjectives
'副詞': 0.4, # Adverbs
'連体詞': 0.3, # Adnominal adjectives
'感動詞': 0.2, # Interjections
}
self.min_score = 0.3
def extract_keywords(self, text: str, max_keywords_per_chunk: Optional[int] = 10) -> Set[str]:
"""Extract keywords from Japanese text using MeCab.
Args:
text: Input text to extract keywords from
max_keywords_per_chunk: Maximum number of keywords to extract
Returns:
Set of extracted keywords
"""
if not text or not text.strip():
return set()
try:
# Parse text with MeCab
self.tagger.parse('') # Clear tagger state
node = self.tagger.parseToNode(text)
# Calculate term frequencies and scores
term_scores = defaultdict(float)
while node:
features = node.feature.split(',')
if len(features) > 0:
pos = features[0] # Part of speech
pos_subtype = features[1] if len(features) > 1 else ''
base_form = features[6] if len(features) > 6 else node.surface
# Score the term based on its POS
if pos in self.pos_weights and base_form not in STOPWORDS:
score = self.pos_weights[pos]
# Boost proper nouns and technical terms
if pos == '名詞' and pos_subtype in ['固有名詞', '専門用語']:
score *= 1.5
if len(base_form) > 1: # Filter out single characters
term_scores[base_form] += score
node = node.next
# Get top scoring terms
sorted_terms = sorted(
term_scores.items(),
key=lambda x: x[1],
reverse=True
)
# Filter by minimum score and take top N
keywords = {
term for term, score in sorted_terms
if score >= self.min_score
}
if max_keywords_per_chunk:
keywords = set(list(keywords)[:max_keywords_per_chunk])
# Expand with compound terms
expanded_keywords = self._expand_tokens_with_compounds(keywords, text)
return expanded_keywords
except Exception as e:
raise RuntimeError(f"Failed to extract keywords: {str(e)}")
def _expand_tokens_with_compounds(self, keywords: Set[str], text: str) -> Set[str]:
"""Expand keywords with compound terms.
This method looks for adjacent keywords in the original text to capture
compound terms like '機械学習' (machine learning) or '自然言語処理' (natural language processing).
"""
results = set(keywords)
try:
# Parse again to find compounds
node = self.tagger.parseToNode(text)
compound = []
compound_readings = [] # For handling different forms of the same compound
while node:
features = node.feature.split(',')
if len(features) > 6:
base_form = features[6]
reading = features[7] if len(features) > 7 else None
else:
base_form = node.surface
reading = None
if base_form in keywords:
compound.append(base_form)
if reading:
compound_readings.append(reading)
else:
if len(compound) > 1:
# Add the compound term
compound_term = ''.join(compound)
if len(compound_term) > 1:
results.add(compound_term)
# If readings are available, add normalized form
if compound_readings:
normalized_term = ''.join(compound_readings)
if normalized_term != compound_term:
results.add(normalized_term)
compound = []
compound_readings = []
node = node.next
return results
except Exception as e:
# If compound expansion fails, return original keywords
return keywords

View File

@ -0,0 +1,36 @@
STOPWORDS = {
# Japanese particles and basic stopwords
"", "", "", "", "", "", "", "", "から", "より", "まで", "によって",
"あそこ", "あっ", "あの", "あのかた", "あの人", "あり", "あります", "ある", "あれ",
"", "いう", "います", "いる", "", "うち", "", "", "および", "おり", "おります",
"", "かつて", "から", "", "", "ここ", "こちら", "こと", "この", "これ", "これら",
"", "さらに", "", "しかし", "する", "", "", "せる", "そこ", "そして", "その",
"その他", "その後", "それ", "それぞれ", "それで", "", "ただし", "たち", "ため", "たり",
"", "だっ", "だれ", "", "", "", "でき", "できる", "です", "では", "でも", "",
"という", "といった", "とき", "ところ", "として", "とともに", "とも", "と共に", "どこ",
"どの", "", "ない", "なお", "なかっ", "ながら", "なく", "なっ", "など", "なに", "なら",
"なり", "なる", "なん", "", "において", "における", "について", "にて", "によって", "により",
"による", "に対して", "に対する", "に関する", "", "ので", "のみ", "", "", "", "ほか",
"ほとんど", "ほど", "ます", "また", "または", "まで", "", "もの", "ものの", "", "よう",
"より", "", "られ", "られる", "", "れる", "", "", "", "及び", "", "彼女",
"我々", "特に", "", "私達", "貴方", "貴方方",
# Japanese auxiliary verbs
"です", "ます", "でした", "ました", "である", "", "", "だった",
# Japanese pronouns
"これ", "それ", "あれ", "この", "その", "あの", "ここ", "そこ", "あそこ",
# Japanese common words
"いる", "ある", "なる", "する", "できる", "おる", "いく", "くる",
# Numbers
"", "", "", "", "", "", "", "", "", "",
"1", "2", "3", "4", "5", "6", "7", "8", "9", "0",
# Punctuation
"", "", "", "", "", "", "", "", "", "",
# Common English stopwords (for mixed text)
"the", "is", "at", "which", "on", "in", "and", "or", "a", "an",
}