|
from typing import Any, Callable, List, Optional, Union
|
|
|
|
import torch
|
|
from transformers import GenerationConfig, PreTrainedTokenizer, PreTrainedTokenizerFast
|
|
|
|
from ..core import set_seed
|
|
from ..models import SUPPORTED_ARCHITECTURES, PreTrainedModelWrapper
|
|
|
|
|
|
class BestOfNSampler(object):
|
|
def __init__(
|
|
self,
|
|
model: PreTrainedModelWrapper,
|
|
tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast],
|
|
queries_to_scores: Callable[[List[str]], List[float]],
|
|
length_sampler: Any,
|
|
sample_size: int = 4,
|
|
seed: Optional[int] = None,
|
|
n_candidates: int = 1,
|
|
generation_config: Optional[GenerationConfig] = None,
|
|
) -> None:
|
|
r"""
|
|
Initialize the sampler for best-of-n generation
|
|
|
|
Args:
|
|
model (`PreTrainedModelWrapper`):
|
|
The pretrained model to use for generation
|
|
tokenizer (`PreTrainedTokenizer` or `PreTrainedTokenizerFast`):
|
|
Tokenizer associated with the pretrained model
|
|
queries_to_scores (`Callable[[List[str]], List[float]]`):
|
|
Callable that takes a list of generated texts and returns the associated reward scores
|
|
length_sampler (`Any`):
|
|
Sampler used to sample the length of the generated text
|
|
sample_size (`int`):
|
|
Number of samples to generate for each query
|
|
seed (`int`, *optional*):
|
|
Random seed used to control generation
|
|
n_candidates (`int`):
|
|
Number of candidates to return for each query
|
|
generation_config (`GenerationConfig`, *optional*):
|
|
Generation config passed to the underlying model's `generate` method.
|
|
See `GenerationConfig` (https://huggingface.co/docs/transformers/v4.29.1/en/main_classes/text_generation#transformers.GenerationConfig) for more details
|
|
"""
|
|
if seed is not None:
|
|
set_seed(seed)
|
|
|
|
if not isinstance(tokenizer, (PreTrainedTokenizer, PreTrainedTokenizerFast)):
|
|
raise ValueError(f"tokenizer must be a PreTrainedTokenizer or PreTrainedTokenizerFast, got {type(tokenizer)}")
|
|
if not isinstance(model, (SUPPORTED_ARCHITECTURES)):
|
|
raise ValueError(f"model must be a PreTrainedModelWrapper, got {type(model)} - supported architectures are: {SUPPORTED_ARCHITECTURES}")
|
|
|
|
self.model = model
|
|
self.tokenizer = tokenizer
|
|
|
|
self.queries_to_scores = queries_to_scores
|
|
self.length_sampler = length_sampler
|
|
self.gen_config = generation_config
|
|
self.sample_size = sample_size
|
|
self.n_candidates = n_candidates
|
|
|
|
def generate(
|
|
self,
|
|
tokenized_query: Union[List[int], torch.Tensor, List[torch.Tensor], List[List[int]]],
|
|
skip_special_tokens: bool = True,
|
|
device: Optional[Union[str, torch.device]] = None,
|
|
**generation_kwargs,
|
|
) -> List[List[str]]:
|
|
r"""
|
|
Generate the best of n samples for input queries
|
|
|
|
Args:
|
|
tokenized_query (`List[int]` or `torch.Tensor` or `List[torch.Tensor]` or `List[int]`):
|
|
represents either a single tokenized query (a single tensor or a list of integers) or a batch of tokenized queries (a list of tensors or a list of lists of integers)
|
|
skip_special_tokens (`bool`):
|
|
Whether to remove the special tokens from the output
|
|
device (`str` or `torch.device`, *optional*):
|
|
The device on which the model will be loaded
|
|
**generation_kwargs (`dict`, *optional*):
|
|
Additional keyword arguments passed along to the underlying model's `generate` method.
|
|
This is used to override generation config
|
|
|
|
Returns:
|
|
List[List[str]]: A list of lists of generated texts
|
|
"""
|
|
queries = None
|
|
|
|
if isinstance(tokenized_query, torch.Tensor) and tokenized_query.ndim == 1:
|
|
queries = tokenized_query.unsqueeze(0)
|
|
elif isinstance(tokenized_query, List):
|
|
element_type = type(tokenized_query[0])
|
|
if element_type == int:
|
|
queries = torch.tensor(tokenized_query).unsqueeze(0)
|
|
elif element_type == torch.Tensor:
|
|
queries = [tensor.reshape((1, -1)) for tensor in tokenized_query]
|
|
else:
|
|
queries = [torch.tensor(query).reshape((1, -1)) for query in tokenized_query]
|
|
|
|
result = []
|
|
|
|
for query in queries:
|
|
queries = query.repeat((self.sample_size, 1))
|
|
output = self.model.generate(
|
|
queries.to(device),
|
|
max_new_tokens=self.length_sampler(),
|
|
generation_config=self.gen_config,
|
|
**generation_kwargs,
|
|
).squeeze()
|
|
output = self.tokenizer.batch_decode(output, skip_special_tokens=skip_special_tokens)
|
|
scores = torch.tensor(self.queries_to_scores(output))
|
|
output = [output[i] for i in scores.topk(self.n_candidates).indices]
|
|
result.append(output)
|
|
|
|
return result
|
|
|