bulk_embeddings / utils.py
nbroad's picture
nbroad HF staff
Update utils.py
4408779
raw
history blame contribute delete
No virus
10.7 kB
import os
from pathlib import Path
from typing import Union, Dict, List
import torch
import datasets
from datasets import load_dataset, Dataset
from transformers import AutoTokenizer, PreTrainedTokenizer
from huggingface_hub import Repository, create_repo, HfApi
from optimum.onnxruntime import (
AutoOptimizationConfig,
ORTModelForFeatureExtraction,
ORTOptimizer,
)
os.environ["TOKENIZERS_PARALLELISM"] = "false"
opt_configs = {
"O2": AutoOptimizationConfig.O2(),
"O3": AutoOptimizationConfig.O3(),
"O4": AutoOptimizationConfig.O4(),
}
def get_batch_size(device_name: str, model_name: str, opt_level: str):
"""
TODO: run actual tests
T4 has 16GB
A10 has 24GB
Args:
device_name (`str`):
The name of the GPU device in use.
model_name (`str`):
The name of the model in use.
opt_level (`str`):
The optimization level in use.
Returns:
`int`:
The batch size to use.
"""
if "small" in model_name:
bs = 128
elif "base" in model_name:
bs = 64
elif "large" in model_name:
bs = 32
else:
bs = 16
if "A10" in device_name:
bs *= 2
if opt_level == "O4":
bs *= 2
return bs
def mean_pooling(last_hidden_state: torch.Tensor, attention_mask: torch.Tensor):
"""
Mean pool the token embeddings.
Args:
last_hidden_state (`tuple`):
The output of the model.
attention_mask (`torch.Tensor`):
The attention mask.
Returns:
`torch.Tensor`:
The mean pooled embeddings.
"""
input_mask_expanded = (
attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
)
return torch.sum(last_hidden_state * input_mask_expanded, 1) / torch.clamp(
input_mask_expanded.sum(1), min=1e-9
)
def load_hf_dataset(ds_name: str, ds_config: str = None, ds_split: str = "train"):
"""
Load a dataset from the HuggingFace Hub. Will be streaming so
as to not load the whole dataset to local storage.
Args:
ds_name (`str`):
The name of the dataset to load.
ds_config (`str`, *optional*, Defaults to `None`):
The configuration of the dataset to load.
ds_split (`str`, *optional*, Defaults to `"train"`):
The split of the dataset to load.
Returns:
ds (`datasets.IterableDataset`):
The loaded dataset.
"""
if ds_config == "":
ds_config = None
ds = load_dataset(ds_name, ds_config, split=ds_split, streaming=True)
return ds
def get_model_and_tokenizer(model_name: str, optimization_level: str):
"""
Load the model and tokenizer from the HuggingFace Hub.
If the model is not already optimized, optimize it and save it to the local directory.
Args:
model_name (`str`):
The name of the model to load.
optimization_level (`str`):
The optimization level to use. Should be one of `"O2"`, `"O3"`, or `"O4"`.
Returns:
model (`ORTModelForFeatureExtraction`):
The optimized model.
tokenizer (`PreTrainedTokenizer`):
The tokenizer.
"""
optimized_model_name = "model_optimized.onnx"
model_dir = Path(model_name.replace("/", "_"))
if not (model_dir / optimized_model_name).exists():
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.save_pretrained(model_dir)
model = ORTModelForFeatureExtraction.from_pretrained(model_name, export=True)
model.save_pretrained(model_dir)
optimizer = ORTOptimizer.from_pretrained(model)
optimization_config = opt_configs[optimization_level]
optimizer.optimize(save_dir=model_dir, optimization_config=optimization_config)
else:
tokenizer = AutoTokenizer.from_pretrained(model_dir)
return (
ORTModelForFeatureExtraction.from_pretrained(
model_dir,
file_name=optimized_model_name,
provider="CUDAExecutionProvider",
),
tokenizer,
)
def tokenize(
examples: Dict[str, List[str]],
tokenizer: PreTrainedTokenizer,
column_name: str = "text",
padding: Union[bool, str] = True,
max_length: int = 512,
):
"""
Tokenize the examples using the tokenizer.
Args:
examples (`Dict[str, List[str]]`):
examples to tokenize
tokenizer (`PreTrainedTokenizer`):
tokenizer to use
column_name (`str`, *optional*, defaults to `text`):
column name to use for tokenization. Defaults to `text`
padding (`bool`, *optional*, defaults to `True`):
whether to pad the examples. Defaults to `True`
Use `"max_length"` if using `O4` optimization level
If `True`, the batch will be padded to the longest in the batch.
max_length (`int`, *optional*, Defaults to `512`):
max length to use for the model. Defaults to `512`.
Any sequences longer will be truncated.
If padding is `"max_length"`, the padding will be added until the sequence
is of length `max_length`.
Returns:
`Dict[str, List[List[int]]]`:
tokenized examples
"""
# TODO: add lengths, sort by length, use dynamic padding
# TODO: option for controlling length for models that can go shorter/longer than 512
return tokenizer(
examples[column_name], truncation=True, padding=padding, max_length=max_length
)
@torch.inference_mode()
def batch_embed(
ds: datasets.IterableDataset,
model: ORTModelForFeatureExtraction,
tokenizer: PreTrainedTokenizer,
model_name: str,
column_name: str,
new_dataset_id: str,
opt_level: str,
upload_batch_size: int = 10_000,
map_batch_size: int = 2000,
# progress,
):
"""
Run the model on the dataset and upload the embeddings to the hub.
Args:
ds (`datasets.Dataset`):
dataset to embed. From `load_hf_dataset`
model (`ORTModelForFeatureExtraction`):
model to use for embedding. From `get_model_and_tokenizer`
tokenizer (`AutoTokenizer`):
tokenizer to use for embedding. From `get_model_and_tokenizer`
model_name (`str`):
name of the model to use. Used to determine batch size.
column_name (`str`):
column name to use for embedding. Default option in gradio app is `text`
new_dataset_id (`str`):
id of the new dataset to create. Should include username or organization.
e.g. nbroad/new-embeddings
opt_level (`str`):
optimization level to use. Should be one of `O2`, `O3`, `O4`
See here for more details on optimization levels:
https://huggingface.co/docs/optimum/onnxruntime/usage_guides/optimization#optimization-configuration
upload_batch_size (`int`, *optional*, defaults to `10_000`):
number of embeddings to upload at once. Defaults to 10,000.
map_batch_size (`int`, *optional*, defaults to `2000`):
number of examples to tokenize at once. Defaults to 2000.
"""
api = HfApi(
token=os.environ["HF_TOKEN"],
)
repo = init_git_repo(new_dataset_id)
iterator = iter(
ds.map(
tokenize,
batched=True,
batch_size=map_batch_size,
fn_kwargs={
"tokenizer": tokenizer,
"column_name": column_name,
"padding": "max_length" if opt_level == "O4" else True,
},
remove_columns=ds.column_names,
)
)
# progress.tqdm(iterator)
embeds = []
last_count = 0
current_count = 0
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
inference_bs = get_batch_size(torch.cuda.get_device_name(0), model_name, opt_level)
loop = True
while loop:
batch = [next(iterator, None) for _ in range(inference_bs)]
# batch will have None values when iterator runs out
if batch[-1] is None:
batch = [x for x in batch if x is not None]
loop = False
ids = torch.tensor([b["input_ids"] for b in batch], device=device)
mask = torch.tensor([b["attention_mask"] for b in batch], device=device)
t_ids = torch.zeros_like(ids)
outputs = model(input_ids=ids, attention_mask=mask, token_type_ids=t_ids)
embeds.extend(mean_pooling(outputs[0], mask).cpu().tolist())
current_count += len(batch)
if len(embeds) > upload_batch_size:
push_to_repo(repo, last_count, current_count, embeds)
embeds = []
last_count = current_count
if len(embeds) > 0:
push_to_repo(repo, last_count, current_count, embeds)
return
def init_git_repo(repo_id: str):
"""
Initialize a git repo for the new dataset.
Args:
repo_id (`str`):
id of the new dataset to create. Should include username or organization.
e.g. nbroad/new-embeddings
"""
local_dir = repo_id.replace("/", "_")
create_repo(
repo_id,
token=os.environ["HF_TOKEN"],
private=True,
exist_ok=True,
)
try:
repo = Repository(
local_dir=local_dir,
clone_from=repo_id,
repo_type="dataset",
token=os.environ["HF_TOKEN"],
skip_lfs_files=True,
)
except Exception as e:
print(e)
repo = None
if repo is not None:
repo.git_pull()
return repo
def push_to_repo(
repo: str, last_count: int, current_count: int, embeds: List[List[float]]
):
"""
Push embeddings to the repo.
Args:
repo (`huggingface_hub.Repository`):
repo to push to
last_count (`int`):
last count of embeddings.
This is the number of embeddings that have already been pushed.
current_count (`int`):
current count of embeddings.
This is the number of embeddings that have been pushed after this batch.
embeds (`List[List[float]]`):
list of embeddings to push to the repo
"""
temp_ds = Dataset.from_dict({"embeddings": embeds})
data_dir = Path(repo.local_dir) / "data"
data_dir.mkdir(exist_ok=True, parents=True)
temp_ds.to_parquet(
str(data_dir / f"embeddings_{last_count}_{current_count}.parquet")
)
repo.push_to_hub(
commit_message=f"Embedded examples {last_count} thru {current_count}",
blocking=False,
auto_lfs_prune=True,
)