Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat): Add small model lid.176.ftz to library resources, for offline use #5

Merged
merged 14 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Facebook. This package is 80x faster than traditional methods and offers 95% acc

It supports Python versions 3.9 to 3.12.

Support offline usage.

This project builds upon [zafercavdar/fasttext-langdetect](https://github.com/zafercavdar/fasttext-langdetect#benchmark)
with enhancements in packaging.

Expand Down Expand Up @@ -51,18 +53,24 @@ model.
> will be predicted as Japanese).

```python

from fast_langdetect import detect, detect_multilingual

# Single language detection
print(detect("Hello, world!"))
# Output: {'lang': 'en', 'score': 0.12450417876243591}

# `use_strict_mode` determines whether the model loading process should enforce strict conditions before using fallback options.
# If `use_strict_mode` is set to True, we will load only the selected model, not the fallback model.
print(detect("Hello, world!", low_memory=False, use_strict_mode=True))

# How to deal with multiline text
multiline_text = """
Hello, world!
This is a multiline text.
But we need remove `\n` characters or it will raise an ValueError.
"""
multiline_text = multiline_text.replace("\n", "")
multiline_text = multiline_text.replace("\n", "") # NOTE:ITS IMPORTANT TO REMOVE \n CHARACTERS
print(detect(multiline_text))
# Output: {'lang': 'en', 'score': 0.8509423136711121}

Expand Down
8 changes: 4 additions & 4 deletions feature_test/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
# -*- coding: utf-8 -*-
# @Time : 2024/1/18 上午11:41
# @Author : sudoskys


from fast_langdetect import detect, detect_multilingual, detect_language

# 测试繁体,简体,日文,英文,韩文,法文,德文,西班牙文
print(detect_multilingual("Hello, world!你好世界!Привет, мир!",low_memory=False))
print(detect_multilingual("Hello, world!你好世界!Привет, мир!"))
print(detect_multilingual("Hello, world!你好世界!Привет, мир!", low_memory=False))
print(
detect_multilingual("Hello, world!你好世界!Привет, мир!", low_memory=True, use_strict_mode=True)
)
# [{'lang': 'ja', 'score': 0.32009604573249817}, {'lang': 'uk', 'score': 0.27781224250793457}, {'lang': 'zh', 'score': 0.17542070150375366}, {'lang': 'sr', 'score': 0.08751443773508072}, {'lang': 'bg', 'score': 0.05222449079155922}]
print(detect("hello world"))
print(detect("你好世界"))
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "fast-langdetect"
version = "0.2.1"
version = "0.2.2"
description = "Quickly detect text language and segment language"
authors = [
{ name = "sudoskys", email = "[email protected]" },
Expand Down
3 changes: 2 additions & 1 deletion src/fast_langdetect/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-

from .ft_detect import detect, detect_language, detect_langs, detect_multilingual # noqa: F401

from .ft_detect import detect, detect_language, detect_langs, detect_multilingual # noqa: F401
226 changes: 145 additions & 81 deletions src/fast_langdetect/ft_detect/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,130 +5,194 @@
# @Software: PyCharm
import logging
import os
from enum import Enum
from pathlib import Path
from typing import Dict, Union, List
from typing import Dict, Union, List, Optional, Any

import fasttext
from robust_downloader import download

logger = logging.getLogger(__name__)
MODELS = {"low_mem": None, "high_mem": None}
FTLANG_CACHE = os.getenv("FTLANG_CACHE", "/tmp/fasttext-langdetect")
CACHE_DIRECTORY = os.getenv("FTLANG_CACHE", "/tmp/fasttext-langdetect")
LOCAL_SMALL_MODEL_PATH = Path(__file__).parent / "resources" / "lid.176.ftz"

# Suppress FastText output if possible
try:
# silences warnings as the package does not properly use the python 'warnings' package
# see https://github.com/facebookresearch/fastText/issues/1056
fasttext.FastText.eprint = lambda *args, **kwargs: None
except Exception:
pass


class ModelType(Enum):
LOW_MEMORY = "low_mem"
HIGH_MEMORY = "high_mem"


class ModelCache:
def __init__(self):
self._models = {}

def get_model(self, model_type: ModelType) -> Optional["fasttext.FastText._FastText"]:
return self._models.get(model_type)

def set_model(self, model_type: ModelType, model: "fasttext.FastText._FastText"):
self._models[model_type] = model


_model_cache = ModelCache()


class DetectError(Exception):
"""Custom exception for language detection errors."""
pass


def get_model_map(low_memory=False):
def load_model(low_memory: bool = False,
download_proxy: Optional[str] = None,
use_strict_mode: bool = False) -> "fasttext.FastText._FastText":
"""
Getting model map
:param low_memory:
:return:
Load the FastText model based on memory preference.

:param low_memory: Indicates whether to load a smaller, memory-efficient model
:param download_proxy: Proxy to use for downloading the large model if necessary
:param use_strict_mode: If enabled, strictly loads large model or raises error if it fails
:return: Loaded FastText model
:raises DetectError: If the model cannot be loaded
"""
if low_memory:
return "low_mem", FTLANG_CACHE, "lid.176.ftz", "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.ftz"
else:
return "high_mem", FTLANG_CACHE, "lid.176.bin", "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin"
model_type = ModelType.LOW_MEMORY if low_memory else ModelType.HIGH_MEMORY

# If the model is already loaded, return it
cached_model = _model_cache.get_model(model_type)
if cached_model:
return cached_model

def get_model_loaded(
low_memory: bool = False,
download_proxy: str = None
):
"""
Getting model loaded
:param low_memory:
:param download_proxy:
:return:
"""
mode, cache, name, url = get_model_map(low_memory)
loaded = MODELS.get(mode, None)
if loaded:
return loaded
model_path = os.path.join(cache, name)
if Path(model_path).exists():
if Path(model_path).is_dir():
raise Exception(f"{model_path} is a directory")
def load_local_small_model():
"""Try to load the local small model."""
try:
_loaded_model = fasttext.load_model(str(LOCAL_SMALL_MODEL_PATH))
_model_cache.set_model(ModelType.LOW_MEMORY, _loaded_model)
return _loaded_model
except Exception as e:
logger.error(f"Failed to load the local small model '{LOCAL_SMALL_MODEL_PATH}': {e}")
raise DetectError("Unable to load low-memory model from local resources.")

def load_large_model():
"""Try to load the large model."""
try:
loaded_model = fasttext.load_model(model_path)
MODELS[mode] = loaded_model
loaded_model = fasttext.load_model(str(model_path))
_model_cache.set_model(ModelType.HIGH_MEMORY, loaded_model)
return loaded_model
except Exception as e:
logger.error(f"Error loading model {model_path}: {e}")
download(url=url, folder=cache, filename=name, proxy=download_proxy)
raise e
else:
logger.error(f"Failed to load the large model '{model_path}': {e}")
return None

if low_memory:
# Attempt to load the local small model
return load_local_small_model()

# Path for the large model
large_model_name = "lid.176.bin"
model_path = Path(CACHE_DIRECTORY) / large_model_name

# If the large model is already present, load it
if model_path.exists():
# Model cant be dir
if model_path.is_dir():
try:
model_path.rmdir()
except Exception as e:
logger.error(f"Failed to remove the directory '{model_path}': {e}")
raise DetectError(f"Unexpected directory found in large model file path '{model_path}': {e}")
# Attempt to load large model
loaded_model = load_large_model()
if loaded_model:
return loaded_model

# If the large model is not present, attempt to download (only if necessary)
model_url = "https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin"
try:
logger.info(f"Downloading large model from {model_url} to {model_path}")
download(
url=model_url,
folder=CACHE_DIRECTORY,
filename=large_model_name,
proxy=download_proxy,
retry_max=3,
timeout=20
)
# Try loading the model again after download
loaded_model = load_large_model()
if loaded_model:
return loaded_model
except Exception as e:
logger.error(f"Failed to download the large model: {e}")

download(url=url, folder=cache, filename=name, proxy=download_proxy, retry_max=3, timeout=20)
loaded_model = fasttext.load_model(model_path)
MODELS[mode] = loaded_model
return loaded_model
# Handle fallback logic for strict and non-strict modes
if use_strict_mode:
raise DetectError("Strict mode enabled: Unable to download or load the large model.")
else:
logger.info("Attempting to fall back to local small model.")
return load_local_small_model()


def detect(text: str, *,
low_memory: bool = True,
model_download_proxy: str = None
model_download_proxy: Optional[str] = None,
use_strict_mode: bool = False
) -> Dict[str, Union[str, float]]:
"""
Detect language of text

Detect the language of a text using FastText.
This function assumes to be given a single line of text. We split words on whitespace (space, newline, tab, vertical tab) and the control characters carriage return, formfeed and the null character.

:param text: Text for language detection
:param low_memory: Whether to use low memory mode
:param model_download_proxy: model download proxy
:return: {"lang": "en", "score": 0.99}
:raise ValueError: predict processes one line at a time (remove \'\\n\')
If the model is not supervised, this function will throw a ValueError.
:param text: The text for language detection
:param low_memory: Whether to use a memory-efficient model
:param model_download_proxy: Download proxy for the model if needed
:param use_strict_mode: If enabled, strictly loads large model or raises error if it fails
:return: A dictionary with detected language and confidence score
:raises LanguageDetectionError: If detection fails
"""
model = get_model_loaded(low_memory=low_memory, download_proxy=model_download_proxy)
model = load_model(low_memory=low_memory, download_proxy=model_download_proxy, use_strict_mode=use_strict_mode)
labels, scores = model.predict(text)
label = labels[0].replace("__label__", '')
score = min(float(scores[0]), 1.0)
language_label = labels[0].replace("__label__", '')
confidence_score = min(float(scores[0]), 1.0)
return {
"lang": label,
"score": score,
"lang": language_label,
"score": confidence_score,
}


def detect_multilingual(text: str, *,
low_memory: bool = True,
model_download_proxy: str = None,
model_download_proxy: Optional[str] = None,
k: int = 5,
threshold: float = 0.0,
on_unicode_error: str = "strict"
) -> List[dict]:
on_unicode_error: str = "strict",
use_strict_mode: bool = False
) -> List[Dict[str, Any]]:
"""
Given a string, get a list of labels and a list of corresponding probabilities.
k controls the number of returned labels. A choice of 5, will return the 5 most probable labels.
By default this returns only the most likely label and probability. threshold filters the returned labels by a threshold on probability. A choice of 0.5 will return labels with at least 0.5 probability.
k and threshold will be applied together to determine the returned labels.

NOTE:This function assumes to be given a single line of text. We split words on whitespace (space, newline, tab, vertical tab) and the control characters carriage return, formfeed and the null character.

:param text: Text for language detection
:param low_memory: Whether to use low memory mode
:param model_download_proxy: model download proxy
:param k: Predict top k languages
:param threshold: Threshold for prediction
:param on_unicode_error: Error handling
:return:
Detect multiple potential languages and their probabilities in a given text.
k controls the number of returned labels. A choice of 5, will return the 5 most probable labels. By default, this returns only the most likely label and probability. threshold filters the returned labels by a threshold on probability. A choice of 0.5 will return labels with at least 0.5 probability. k and threshold will be applied together to determine the returned labels.
This function assumes to be given a single line of text. We split words on whitespace (space, newline, tab, vertical tab) and the control characters carriage return, formfeed, and the null character.
If the model is not supervised, this function will throw a ValueError.

:param text: The text for language detection
:param low_memory: Whether to use a memory-efficient model
:param model_download_proxy: Proxy for downloading the model
:param k: Number of top language predictions to return
:param threshold: Minimum score threshold for predictions
:param on_unicode_error: Error handling for Unicode errors
:param use_strict_mode: If enabled, strictly loads large model or raises error if it fails
:return: A list of dictionaries, each containing a language and its confidence score
:raises LanguageDetectionError: If detection fails
"""
model = get_model_loaded(low_memory=low_memory, download_proxy=model_download_proxy)
labels, scores = model.predict(text=text, k=k, threshold=threshold, on_unicode_error=on_unicode_error)
detect_result = []
model = load_model(low_memory=low_memory, download_proxy=model_download_proxy, use_strict_mode=use_strict_mode)
labels, scores = model.predict(text, k=k, threshold=threshold, on_unicode_error=on_unicode_error)
results = []
for label, score in zip(labels, scores):
label = label.replace("__label__", '')
score = min(float(score), 1.0)
detect_result.append({
"lang": label,
"score": score,
language_label = label.replace("__label__", '')
confidence_score = min(float(score), 1.0)
results.append({
"lang": language_label,
"score": confidence_score,
})
return sorted(detect_result, key=lambda i: i['score'], reverse=True)
return sorted(results, key=lambda x: x['score'], reverse=True)
11 changes: 11 additions & 0 deletions src/fast_langdetect/ft_detect/resources/NOTICE.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# License Notice

## Files `fast_langdetect/ft_detect/resources/lid.176.ftz`

The models are distributed under
the [Creative Commons Attribution-Share-Alike License 3.0](https://creativecommons.org/licenses/by-sa/3.0/).

## References

https://fasttext.cc/docs/en/language-identification.html
https://creativecommons.org/licenses/by-sa/3.0/
Binary file not shown.
22 changes: 15 additions & 7 deletions tests/test_detect.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@


def test_muti_detect():
from fast_langdetect.ft_detect import detect_multilingual
result = detect_multilingual("hello world", low_memory=True)
from fast_langdetect import detect_multilingual
result = detect_multilingual("hello world", low_memory=True, use_strict_mode=True)
assert result[0].get("lang") == "en", "ft_detect error"


def test_large():
from fast_langdetect import detect_multilingual
result = detect_multilingual("hello world", low_memory=True, use_strict_mode=True)
assert result[0].get("lang") == "en", "ft_detect error"
result = detect_multilingual("你好世界", low_memory=False, use_strict_mode=True)
assert result[0].get("lang") == "zh", "ft_detect error"


def test_detect():
from fast_langdetect import detect
assert detect("hello world")["lang"] == "en", "ft_detect error"
assert detect("你好世界")["lang"] == "zh", "ft_detect error"
assert detect("こんにちは世界")["lang"] == "ja", "ft_detect error"
assert detect("안녕하세요 세계")["lang"] == "ko", "ft_detect error"
assert detect("Bonjour le monde")["lang"] == "fr", "ft_detect error"
assert detect("hello world", low_memory=False, use_strict_mode=True)["lang"] == "en", "ft_detect error"
assert detect("你好世界", low_memory=True, use_strict_mode=True)["lang"] == "zh", "ft_detect error"
assert detect("こんにちは世界", low_memory=False, use_strict_mode=True)["lang"] == "ja", "ft_detect error"
assert detect("안녕하세요 세계", low_memory=True, use_strict_mode=True)["lang"] == "ko", "ft_detect error"
assert detect("Bonjour le monde", low_memory=False, use_strict_mode=True)["lang"] == "fr", "ft_detect error"


def test_detect_totally():
Expand Down
Loading