|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import html |
|
import math |
|
import re |
|
|
|
import ftfy |
|
import numpy |
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import transformers |
|
from timm.models.vision_transformer import Mlp |
|
from transformers import AutoTokenizer, CLIPTextModel, CLIPTokenizer, T5EncoderModel |
|
|
|
from videosys.modules.embed import get_1d_sincos_pos_embed_from_grid, get_2d_sincos_pos_embed_from_grid |
|
|
|
transformers.logging.set_verbosity_error() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AbstractEncoder(nn.Module): |
|
def __init__(self): |
|
super().__init__() |
|
|
|
def encode(self, *args, **kwargs): |
|
raise NotImplementedError |
|
|
|
|
|
class FrozenCLIPEmbedder(AbstractEncoder): |
|
"""Uses the CLIP transformer encoder for text (from Hugging Face)""" |
|
|
|
def __init__(self, path="openai/clip-vit-huge-patch14", device="cuda", max_length=77): |
|
super().__init__() |
|
self.tokenizer = CLIPTokenizer.from_pretrained(path) |
|
self.transformer = CLIPTextModel.from_pretrained(path) |
|
self.device = device |
|
self.max_length = max_length |
|
self._freeze() |
|
|
|
def _freeze(self): |
|
self.transformer = self.transformer.eval() |
|
for param in self.parameters(): |
|
param.requires_grad = False |
|
|
|
def forward(self, text): |
|
batch_encoding = self.tokenizer( |
|
text, |
|
truncation=True, |
|
max_length=self.max_length, |
|
return_length=True, |
|
return_overflowing_tokens=False, |
|
padding="max_length", |
|
return_tensors="pt", |
|
) |
|
tokens = batch_encoding["input_ids"].to(self.device) |
|
outputs = self.transformer(input_ids=tokens) |
|
|
|
z = outputs.last_hidden_state |
|
pooled_z = outputs.pooler_output |
|
return z, pooled_z |
|
|
|
def encode(self, text): |
|
return self(text) |
|
|
|
|
|
class TextEmbedder(nn.Module): |
|
""" |
|
Embeds text prompt into vector representations. Also handles text dropout for classifier-free guidance. |
|
""" |
|
|
|
def __init__(self, path, hidden_size, dropout_prob=0.1): |
|
super().__init__() |
|
self.text_encoder = FrozenCLIPEmbedder(path=path) |
|
self.dropout_prob = dropout_prob |
|
|
|
output_dim = self.text_encoder.transformer.config.hidden_size |
|
self.output_projection = nn.Linear(output_dim, hidden_size) |
|
|
|
def token_drop(self, text_prompts, force_drop_ids=None): |
|
""" |
|
Drops text to enable classifier-free guidance. |
|
""" |
|
if force_drop_ids is None: |
|
drop_ids = numpy.random.uniform(0, 1, len(text_prompts)) < self.dropout_prob |
|
else: |
|
|
|
drop_ids = force_drop_ids == 1 |
|
labels = list(numpy.where(drop_ids, "", text_prompts)) |
|
|
|
return labels |
|
|
|
def forward(self, text_prompts, train, force_drop_ids=None): |
|
use_dropout = self.dropout_prob > 0 |
|
if (train and use_dropout) or (force_drop_ids is not None): |
|
text_prompts = self.token_drop(text_prompts, force_drop_ids) |
|
embeddings, pooled_embeddings = self.text_encoder(text_prompts) |
|
|
|
text_embeddings = self.output_projection(pooled_embeddings) |
|
return text_embeddings |
|
|
|
|
|
class CaptionEmbedder(nn.Module): |
|
""" |
|
copied from https://github.com/hpcaitech/Open-Sora |
|
|
|
Embeds class labels into vector representations. Also handles label dropout for classifier-free guidance. |
|
""" |
|
|
|
def __init__(self, in_channels, hidden_size, uncond_prob, act_layer=nn.GELU(approximate="tanh"), token_num=120): |
|
super().__init__() |
|
|
|
self.y_proj = Mlp( |
|
in_features=in_channels, hidden_features=hidden_size, out_features=hidden_size, act_layer=act_layer, drop=0 |
|
) |
|
self.register_buffer("y_embedding", nn.Parameter(torch.randn(token_num, in_channels) / in_channels**0.5)) |
|
self.uncond_prob = uncond_prob |
|
|
|
def token_drop(self, caption, force_drop_ids=None): |
|
""" |
|
Drops labels to enable classifier-free guidance. |
|
""" |
|
if force_drop_ids is None: |
|
drop_ids = torch.rand(caption.shape[0]).cuda() < self.uncond_prob |
|
else: |
|
drop_ids = force_drop_ids == 1 |
|
caption = torch.where(drop_ids[:, None, None, None], self.y_embedding, caption) |
|
return caption |
|
|
|
def forward(self, caption, train, force_drop_ids=None): |
|
if train: |
|
assert caption.shape[2:] == self.y_embedding.shape |
|
use_dropout = self.uncond_prob > 0 |
|
if (train and use_dropout) or (force_drop_ids is not None): |
|
caption = self.token_drop(caption, force_drop_ids) |
|
caption = self.y_proj(caption) |
|
return caption |
|
|
|
|
|
class T5Embedder: |
|
available_models = ["DeepFloyd/t5-v1_1-xxl"] |
|
|
|
def __init__( |
|
self, |
|
device, |
|
from_pretrained=None, |
|
*, |
|
cache_dir=None, |
|
hf_token=None, |
|
use_text_preprocessing=True, |
|
t5_model_kwargs=None, |
|
torch_dtype=None, |
|
use_offload_folder=None, |
|
model_max_length=120, |
|
local_files_only=False, |
|
): |
|
self.device = torch.device(device) |
|
self.torch_dtype = torch_dtype or torch.bfloat16 |
|
self.cache_dir = cache_dir |
|
|
|
if t5_model_kwargs is None: |
|
t5_model_kwargs = { |
|
"low_cpu_mem_usage": True, |
|
"torch_dtype": self.torch_dtype, |
|
} |
|
|
|
if use_offload_folder is not None: |
|
t5_model_kwargs["offload_folder"] = use_offload_folder |
|
t5_model_kwargs["device_map"] = { |
|
"shared": self.device, |
|
"encoder.embed_tokens": self.device, |
|
"encoder.block.0": self.device, |
|
"encoder.block.1": self.device, |
|
"encoder.block.2": self.device, |
|
"encoder.block.3": self.device, |
|
"encoder.block.4": self.device, |
|
"encoder.block.5": self.device, |
|
"encoder.block.6": self.device, |
|
"encoder.block.7": self.device, |
|
"encoder.block.8": self.device, |
|
"encoder.block.9": self.device, |
|
"encoder.block.10": self.device, |
|
"encoder.block.11": self.device, |
|
"encoder.block.12": "disk", |
|
"encoder.block.13": "disk", |
|
"encoder.block.14": "disk", |
|
"encoder.block.15": "disk", |
|
"encoder.block.16": "disk", |
|
"encoder.block.17": "disk", |
|
"encoder.block.18": "disk", |
|
"encoder.block.19": "disk", |
|
"encoder.block.20": "disk", |
|
"encoder.block.21": "disk", |
|
"encoder.block.22": "disk", |
|
"encoder.block.23": "disk", |
|
"encoder.final_layer_norm": "disk", |
|
"encoder.dropout": "disk", |
|
} |
|
else: |
|
t5_model_kwargs["device_map"] = { |
|
"shared": self.device, |
|
"encoder": self.device, |
|
} |
|
|
|
self.use_text_preprocessing = use_text_preprocessing |
|
self.hf_token = hf_token |
|
|
|
assert from_pretrained in self.available_models |
|
self.tokenizer = AutoTokenizer.from_pretrained( |
|
from_pretrained, |
|
cache_dir=cache_dir, |
|
local_files_only=local_files_only, |
|
) |
|
self.model = T5EncoderModel.from_pretrained( |
|
from_pretrained, |
|
cache_dir=cache_dir, |
|
local_files_only=local_files_only, |
|
**t5_model_kwargs, |
|
).eval() |
|
self.model_max_length = model_max_length |
|
|
|
def get_text_embeddings(self, texts): |
|
text_tokens_and_mask = self.tokenizer( |
|
texts, |
|
max_length=self.model_max_length, |
|
padding="max_length", |
|
truncation=True, |
|
return_attention_mask=True, |
|
add_special_tokens=True, |
|
return_tensors="pt", |
|
) |
|
|
|
input_ids = text_tokens_and_mask["input_ids"].to(self.device) |
|
attention_mask = text_tokens_and_mask["attention_mask"].to(self.device) |
|
with torch.no_grad(): |
|
text_encoder_embs = self.model( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
)["last_hidden_state"].detach() |
|
return text_encoder_embs, attention_mask |
|
|
|
|
|
class T5Encoder: |
|
def __init__( |
|
self, |
|
from_pretrained="DeepFloyd/t5-v1_1-xxl", |
|
model_max_length=120, |
|
device="cuda", |
|
dtype=torch.float, |
|
shardformer=False, |
|
): |
|
assert from_pretrained is not None, "Please specify the path to the T5 model" |
|
|
|
self.t5 = T5Embedder( |
|
device=device, |
|
torch_dtype=dtype, |
|
from_pretrained=from_pretrained, |
|
model_max_length=model_max_length, |
|
) |
|
self.t5.model.to(dtype=dtype) |
|
self.y_embedder = None |
|
|
|
self.model_max_length = model_max_length |
|
self.output_dim = self.t5.model.config.d_model |
|
|
|
if shardformer: |
|
self.shardformer_t5() |
|
|
|
def shardformer_t5(self): |
|
from colossalai.shardformer import ShardConfig, ShardFormer |
|
|
|
from videosys.core.shardformer.t5.policy import T5EncoderPolicy |
|
from videosys.utils.utils import requires_grad |
|
|
|
shard_config = ShardConfig( |
|
tensor_parallel_process_group=None, |
|
pipeline_stage_manager=None, |
|
enable_tensor_parallelism=False, |
|
enable_fused_normalization=False, |
|
enable_flash_attention=False, |
|
enable_jit_fused=True, |
|
enable_sequence_parallelism=False, |
|
enable_sequence_overlap=False, |
|
) |
|
shard_former = ShardFormer(shard_config=shard_config) |
|
optim_model, _ = shard_former.optimize(self.t5.model, policy=T5EncoderPolicy()) |
|
self.t5.model = optim_model.half() |
|
|
|
|
|
requires_grad(self.t5.model, False) |
|
|
|
def encode(self, text): |
|
caption_embs, emb_masks = self.t5.get_text_embeddings(text) |
|
caption_embs = caption_embs[:, None] |
|
return dict(y=caption_embs, mask=emb_masks) |
|
|
|
def null(self, n): |
|
null_y = self.y_embedder.y_embedding[None].repeat(n, 1, 1)[:, None] |
|
return null_y |
|
|
|
|
|
def basic_clean(text): |
|
text = ftfy.fix_text(text) |
|
text = html.unescape(html.unescape(text)) |
|
return text.strip() |
|
|
|
|
|
BAD_PUNCT_REGEX = re.compile( |
|
r"[" + "#®•©™&@·º½¾¿¡§~" + "\)" + "\(" + "\]" + "\[" + "\}" + "\{" + "\|" + "\\" + "\/" + "\*" + r"]{1,}" |
|
) |
|
|
|
|
|
def clean_caption(caption): |
|
import urllib.parse as ul |
|
|
|
from bs4 import BeautifulSoup |
|
|
|
caption = str(caption) |
|
caption = ul.unquote_plus(caption) |
|
caption = caption.strip().lower() |
|
caption = re.sub("<person>", "person", caption) |
|
|
|
caption = re.sub( |
|
r"\b((?:https?:(?:\/{1,3}|[a-zA-Z0-9%])|[a-zA-Z0-9.\-]+[.](?:com|co|ru|net|org|edu|gov|it)[\w/-]*\b\/?(?!@)))", |
|
"", |
|
caption, |
|
) |
|
caption = re.sub( |
|
r"\b((?:www:(?:\/{1,3}|[a-zA-Z0-9%])|[a-zA-Z0-9.\-]+[.](?:com|co|ru|net|org|edu|gov|it)[\w/-]*\b\/?(?!@)))", |
|
"", |
|
caption, |
|
) |
|
|
|
caption = BeautifulSoup(caption, features="html.parser").text |
|
|
|
|
|
caption = re.sub(r"@[\w\d]+\b", "", caption) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
caption = re.sub(r"[\u31c0-\u31ef]+", "", caption) |
|
caption = re.sub(r"[\u31f0-\u31ff]+", "", caption) |
|
caption = re.sub(r"[\u3200-\u32ff]+", "", caption) |
|
caption = re.sub(r"[\u3300-\u33ff]+", "", caption) |
|
caption = re.sub(r"[\u3400-\u4dbf]+", "", caption) |
|
caption = re.sub(r"[\u4dc0-\u4dff]+", "", caption) |
|
caption = re.sub(r"[\u4e00-\u9fff]+", "", caption) |
|
|
|
|
|
|
|
caption = re.sub( |
|
r"[\u002D\u058A\u05BE\u1400\u1806\u2010-\u2015\u2E17\u2E1A\u2E3A\u2E3B\u2E40\u301C\u3030\u30A0\uFE31\uFE32\uFE58\uFE63\uFF0D]+", |
|
"-", |
|
caption, |
|
) |
|
|
|
|
|
caption = re.sub(r"[`´«»“”¨]", '"', caption) |
|
caption = re.sub(r"[‘’]", "'", caption) |
|
|
|
|
|
caption = re.sub(r""?", "", caption) |
|
|
|
caption = re.sub(r"&", "", caption) |
|
|
|
|
|
caption = re.sub(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}", " ", caption) |
|
|
|
|
|
caption = re.sub(r"\d:\d\d\s+$", "", caption) |
|
|
|
|
|
caption = re.sub(r"\\n", " ", caption) |
|
|
|
|
|
caption = re.sub(r"#\d{1,3}\b", "", caption) |
|
|
|
caption = re.sub(r"#\d{5,}\b", "", caption) |
|
|
|
caption = re.sub(r"\b\d{6,}\b", "", caption) |
|
|
|
caption = re.sub(r"[\S]+\.(?:png|jpg|jpeg|bmp|webp|eps|pdf|apk|mp4)", "", caption) |
|
|
|
|
|
caption = re.sub(r"[\"\']{2,}", r'"', caption) |
|
caption = re.sub(r"[\.]{2,}", r" ", caption) |
|
|
|
caption = re.sub(BAD_PUNCT_REGEX, r" ", caption) |
|
caption = re.sub(r"\s+\.\s+", r" ", caption) |
|
|
|
|
|
regex2 = re.compile(r"(?:\-|\_)") |
|
if len(re.findall(regex2, caption)) > 3: |
|
caption = re.sub(regex2, " ", caption) |
|
|
|
caption = basic_clean(caption) |
|
|
|
caption = re.sub(r"\b[a-zA-Z]{1,3}\d{3,15}\b", "", caption) |
|
caption = re.sub(r"\b[a-zA-Z]+\d+[a-zA-Z]+\b", "", caption) |
|
caption = re.sub(r"\b\d+[a-zA-Z]+\d+\b", "", caption) |
|
|
|
caption = re.sub(r"(worldwide\s+)?(free\s+)?shipping", "", caption) |
|
caption = re.sub(r"(free\s)?download(\sfree)?", "", caption) |
|
caption = re.sub(r"\bclick\b\s(?:for|on)\s\w+", "", caption) |
|
caption = re.sub(r"\b(?:png|jpg|jpeg|bmp|webp|eps|pdf|apk|mp4)(\simage[s]?)?", "", caption) |
|
caption = re.sub(r"\bpage\s+\d+\b", "", caption) |
|
|
|
caption = re.sub(r"\b\d*[a-zA-Z]+\d+[a-zA-Z]+\d+[a-zA-Z\d]*\b", r" ", caption) |
|
|
|
caption = re.sub(r"\b\d+\.?\d*[xх×]\d+\.?\d*\b", "", caption) |
|
|
|
caption = re.sub(r"\b\s+\:\s+", r": ", caption) |
|
caption = re.sub(r"(\D[,\./])\b", r"\1 ", caption) |
|
caption = re.sub(r"\s+", " ", caption) |
|
|
|
caption.strip() |
|
|
|
caption = re.sub(r"^[\"\']([\w\W]+)[\"\']$", r"\1", caption) |
|
caption = re.sub(r"^[\'\_,\-\:;]", r"", caption) |
|
caption = re.sub(r"[\'\_,\-\:\-\+]$", r"", caption) |
|
caption = re.sub(r"^\.\S+$", "", caption) |
|
|
|
return caption.strip() |
|
|
|
|
|
def text_preprocessing(text, use_text_preprocessing: bool = True): |
|
if use_text_preprocessing: |
|
|
|
text = clean_caption(text) |
|
text = clean_caption(text) |
|
return text |
|
else: |
|
return text.lower().strip() |
|
|
|
|
|
class TimestepEmbedder(nn.Module): |
|
""" |
|
Embeds scalar timesteps into vector representations. |
|
""" |
|
|
|
def __init__(self, hidden_size, frequency_embedding_size=256): |
|
super().__init__() |
|
self.mlp = nn.Sequential( |
|
nn.Linear(frequency_embedding_size, hidden_size, bias=True), |
|
nn.SiLU(), |
|
nn.Linear(hidden_size, hidden_size, bias=True), |
|
) |
|
self.frequency_embedding_size = frequency_embedding_size |
|
|
|
@staticmethod |
|
def timestep_embedding(t, dim, max_period=10000): |
|
""" |
|
Create sinusoidal timestep embeddings. |
|
:param t: a 1-D Tensor of N indices, one per batch element. |
|
These may be fractional. |
|
:param dim: the dimension of the output. |
|
:param max_period: controls the minimum frequency of the embeddings. |
|
:return: an (N, D) Tensor of positional embeddings. |
|
""" |
|
|
|
half = dim // 2 |
|
freqs = torch.exp(-math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half) |
|
freqs = freqs.to(device=t.device) |
|
args = t[:, None].float() * freqs[None] |
|
embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) |
|
if dim % 2: |
|
embedding = torch.cat([embedding, torch.zeros_like(embedding[:, :1])], dim=-1) |
|
return embedding |
|
|
|
def forward(self, t, dtype): |
|
t_freq = self.timestep_embedding(t, self.frequency_embedding_size) |
|
if t_freq.dtype != dtype: |
|
t_freq = t_freq.to(dtype) |
|
t_emb = self.mlp(t_freq) |
|
return t_emb |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_2d_sincos_pos_embed(embed_dim, grid_size, cls_token=False, extra_tokens=0, scale=1.0, base_size=None): |
|
""" |
|
grid_size: int of the grid height and width |
|
return: |
|
pos_embed: [grid_size*grid_size, embed_dim] or [1+grid_size*grid_size, embed_dim] (w/ or w/o cls_token) |
|
""" |
|
if not isinstance(grid_size, tuple): |
|
grid_size = (grid_size, grid_size) |
|
|
|
grid_h = np.arange(grid_size[0], dtype=np.float32) / scale |
|
grid_w = np.arange(grid_size[1], dtype=np.float32) / scale |
|
if base_size is not None: |
|
grid_h *= base_size / grid_size[0] |
|
grid_w *= base_size / grid_size[1] |
|
grid = np.meshgrid(grid_w, grid_h) |
|
grid = np.stack(grid, axis=0) |
|
|
|
grid = grid.reshape([2, 1, grid_size[1], grid_size[0]]) |
|
pos_embed = get_2d_sincos_pos_embed_from_grid(embed_dim, grid) |
|
if cls_token and extra_tokens > 0: |
|
pos_embed = np.concatenate([np.zeros([extra_tokens, embed_dim]), pos_embed], axis=0) |
|
return pos_embed |
|
|
|
|
|
def get_1d_sincos_pos_embed(embed_dim, length, scale=1.0): |
|
pos = np.arange(0, length)[..., None] / scale |
|
return get_1d_sincos_pos_embed_from_grid(embed_dim, pos) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PatchEmbed3D(nn.Module): |
|
"""Video to Patch Embedding. |
|
|
|
Args: |
|
patch_size (int): Patch token size. Default: (2,4,4). |
|
in_chans (int): Number of input video channels. Default: 3. |
|
embed_dim (int): Number of linear projection output channels. Default: 96. |
|
norm_layer (nn.Module, optional): Normalization layer. Default: None |
|
""" |
|
|
|
def __init__( |
|
self, |
|
patch_size=(2, 4, 4), |
|
in_chans=3, |
|
embed_dim=96, |
|
norm_layer=None, |
|
flatten=True, |
|
): |
|
super().__init__() |
|
self.patch_size = patch_size |
|
self.flatten = flatten |
|
|
|
self.in_chans = in_chans |
|
self.embed_dim = embed_dim |
|
|
|
self.proj = nn.Conv3d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size) |
|
if norm_layer is not None: |
|
self.norm = norm_layer(embed_dim) |
|
else: |
|
self.norm = None |
|
|
|
def forward(self, x): |
|
"""Forward function.""" |
|
|
|
_, _, D, H, W = x.size() |
|
if W % self.patch_size[2] != 0: |
|
x = F.pad(x, (0, self.patch_size[2] - W % self.patch_size[2])) |
|
if H % self.patch_size[1] != 0: |
|
x = F.pad(x, (0, 0, 0, self.patch_size[1] - H % self.patch_size[1])) |
|
if D % self.patch_size[0] != 0: |
|
x = F.pad(x, (0, 0, 0, 0, 0, self.patch_size[0] - D % self.patch_size[0])) |
|
|
|
x = self.proj(x) |
|
if self.norm is not None: |
|
D, Wh, Ww = x.size(2), x.size(3), x.size(4) |
|
x = x.flatten(2).transpose(1, 2) |
|
x = self.norm(x) |
|
x = x.transpose(1, 2).view(-1, self.embed_dim, D, Wh, Ww) |
|
if self.flatten: |
|
x = x.flatten(2).transpose(1, 2) |
|
return x |
|
|