next / trl /extras /best_of_n_sampler.py
BiXie's picture
Upload 204 files
252711e verified
from typing import Any, Callable, List, Optional, Union
import torch
from transformers import GenerationConfig, PreTrainedTokenizer, PreTrainedTokenizerFast
from ..core import set_seed
from ..models import SUPPORTED_ARCHITECTURES, PreTrainedModelWrapper
class BestOfNSampler(object):
def __init__(
self,
model: PreTrainedModelWrapper,
tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast],
queries_to_scores: Callable[[List[str]], List[float]],
length_sampler: Any,
sample_size: int = 4,
seed: Optional[int] = None,
n_candidates: int = 1,
generation_config: Optional[GenerationConfig] = None,
) -> None:
r"""
Initialize the sampler for best-of-n generation
Args:
model (`PreTrainedModelWrapper`):
The pretrained model to use for generation
tokenizer (`PreTrainedTokenizer` or `PreTrainedTokenizerFast`):
Tokenizer associated with the pretrained model
queries_to_scores (`Callable[[List[str]], List[float]]`):
Callable that takes a list of generated texts and returns the associated reward scores
length_sampler (`Any`):
Sampler used to sample the length of the generated text
sample_size (`int`):
Number of samples to generate for each query
seed (`int`, *optional*):
Random seed used to control generation
n_candidates (`int`):
Number of candidates to return for each query
generation_config (`GenerationConfig`, *optional*):
Generation config passed to the underlying model's `generate` method.
See `GenerationConfig` (https://huggingface.co/docs/transformers/v4.29.1/en/main_classes/text_generation#transformers.GenerationConfig) for more details
"""
if seed is not None:
set_seed(seed)
if not isinstance(tokenizer, (PreTrainedTokenizer, PreTrainedTokenizerFast)):
raise ValueError(f"tokenizer must be a PreTrainedTokenizer or PreTrainedTokenizerFast, got {type(tokenizer)}")
if not isinstance(model, (SUPPORTED_ARCHITECTURES)):
raise ValueError(f"model must be a PreTrainedModelWrapper, got {type(model)} - supported architectures are: {SUPPORTED_ARCHITECTURES}")
self.model = model
self.tokenizer = tokenizer
self.queries_to_scores = queries_to_scores
self.length_sampler = length_sampler
self.gen_config = generation_config
self.sample_size = sample_size
self.n_candidates = n_candidates
def generate(
self,
tokenized_query: Union[List[int], torch.Tensor, List[torch.Tensor], List[List[int]]],
skip_special_tokens: bool = True,
device: Optional[Union[str, torch.device]] = None,
**generation_kwargs,
) -> List[List[str]]:
r"""
Generate the best of n samples for input queries
Args:
tokenized_query (`List[int]` or `torch.Tensor` or `List[torch.Tensor]` or `List[int]`):
represents either a single tokenized query (a single tensor or a list of integers) or a batch of tokenized queries (a list of tensors or a list of lists of integers)
skip_special_tokens (`bool`):
Whether to remove the special tokens from the output
device (`str` or `torch.device`, *optional*):
The device on which the model will be loaded
**generation_kwargs (`dict`, *optional*):
Additional keyword arguments passed along to the underlying model's `generate` method.
This is used to override generation config
Returns:
List[List[str]]: A list of lists of generated texts
"""
queries = None
if isinstance(tokenized_query, torch.Tensor) and tokenized_query.ndim == 1:
queries = tokenized_query.unsqueeze(0)
elif isinstance(tokenized_query, List):
element_type = type(tokenized_query[0])
if element_type == int:
queries = torch.tensor(tokenized_query).unsqueeze(0)
elif element_type == torch.Tensor:
queries = [tensor.reshape((1, -1)) for tensor in tokenized_query]
else:
queries = [torch.tensor(query).reshape((1, -1)) for query in tokenized_query]
result = []
for query in queries:
queries = query.repeat((self.sample_size, 1))
output = self.model.generate(
queries.to(device),
max_new_tokens=self.length_sampler(),
generation_config=self.gen_config,
**generation_kwargs,
).squeeze()
output = self.tokenizer.batch_decode(output, skip_special_tokens=skip_special_tokens)
scores = torch.tensor(self.queries_to_scores(output))
output = [output[i] for i in scores.topk(self.n_candidates).indices]
result.append(output)
return result