from typing import Any, Callable, List, Optional, Union import torch from transformers import GenerationConfig, PreTrainedTokenizer, PreTrainedTokenizerFast from ..core import set_seed from ..models import SUPPORTED_ARCHITECTURES, PreTrainedModelWrapper class BestOfNSampler(object): def __init__( self, model: PreTrainedModelWrapper, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], queries_to_scores: Callable[[List[str]], List[float]], length_sampler: Any, sample_size: int = 4, seed: Optional[int] = None, n_candidates: int = 1, generation_config: Optional[GenerationConfig] = None, ) -> None: r""" Initialize the sampler for best-of-n generation Args: model (`PreTrainedModelWrapper`): The pretrained model to use for generation tokenizer (`PreTrainedTokenizer` or `PreTrainedTokenizerFast`): Tokenizer associated with the pretrained model queries_to_scores (`Callable[[List[str]], List[float]]`): Callable that takes a list of generated texts and returns the associated reward scores length_sampler (`Any`): Sampler used to sample the length of the generated text sample_size (`int`): Number of samples to generate for each query seed (`int`, *optional*): Random seed used to control generation n_candidates (`int`): Number of candidates to return for each query generation_config (`GenerationConfig`, *optional*): Generation config passed to the underlying model's `generate` method. See `GenerationConfig` (https://huggingface.co/docs/transformers/v4.29.1/en/main_classes/text_generation#transformers.GenerationConfig) for more details """ if seed is not None: set_seed(seed) if not isinstance(tokenizer, (PreTrainedTokenizer, PreTrainedTokenizerFast)): raise ValueError(f"tokenizer must be a PreTrainedTokenizer or PreTrainedTokenizerFast, got {type(tokenizer)}") if not isinstance(model, (SUPPORTED_ARCHITECTURES)): raise ValueError(f"model must be a PreTrainedModelWrapper, got {type(model)} - supported architectures are: {SUPPORTED_ARCHITECTURES}") self.model = model self.tokenizer = tokenizer self.queries_to_scores = queries_to_scores self.length_sampler = length_sampler self.gen_config = generation_config self.sample_size = sample_size self.n_candidates = n_candidates def generate( self, tokenized_query: Union[List[int], torch.Tensor, List[torch.Tensor], List[List[int]]], skip_special_tokens: bool = True, device: Optional[Union[str, torch.device]] = None, **generation_kwargs, ) -> List[List[str]]: r""" Generate the best of n samples for input queries Args: tokenized_query (`List[int]` or `torch.Tensor` or `List[torch.Tensor]` or `List[int]`): represents either a single tokenized query (a single tensor or a list of integers) or a batch of tokenized queries (a list of tensors or a list of lists of integers) skip_special_tokens (`bool`): Whether to remove the special tokens from the output device (`str` or `torch.device`, *optional*): The device on which the model will be loaded **generation_kwargs (`dict`, *optional*): Additional keyword arguments passed along to the underlying model's `generate` method. This is used to override generation config Returns: List[List[str]]: A list of lists of generated texts """ queries = None if isinstance(tokenized_query, torch.Tensor) and tokenized_query.ndim == 1: queries = tokenized_query.unsqueeze(0) elif isinstance(tokenized_query, List): element_type = type(tokenized_query[0]) if element_type == int: queries = torch.tensor(tokenized_query).unsqueeze(0) elif element_type == torch.Tensor: queries = [tensor.reshape((1, -1)) for tensor in tokenized_query] else: queries = [torch.tensor(query).reshape((1, -1)) for query in tokenized_query] result = [] for query in queries: queries = query.repeat((self.sample_size, 1)) output = self.model.generate( queries.to(device), max_new_tokens=self.length_sampler(), generation_config=self.gen_config, **generation_kwargs, ).squeeze() output = self.tokenizer.batch_decode(output, skip_special_tokens=skip_special_tokens) scores = torch.tensor(self.queries_to_scores(output)) output = [output[i] for i in scores.topk(self.n_candidates).indices] result.append(output) return result