Spaces:
Runtime error
Runtime error
| # Copyright 2023 The HuggingFace Team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # Adapted from https://github.com/huggingface/diffusers/blob/v0.16.1/src/diffusers/models/transformer_temporal.py | |
| from __future__ import annotations | |
| from copy import deepcopy | |
| from dataclasses import dataclass | |
| from typing import List, Literal, Optional | |
| import logging | |
| import torch | |
| from torch import nn | |
| from einops import rearrange, repeat | |
| from diffusers.configuration_utils import ConfigMixin, register_to_config | |
| from diffusers.utils import BaseOutput | |
| from diffusers.models.modeling_utils import ModelMixin | |
| from diffusers.models.transformer_temporal import ( | |
| TransformerTemporalModelOutput, | |
| TransformerTemporalModel as DiffusersTransformerTemporalModel, | |
| ) | |
| from diffusers.models.attention_processor import AttnProcessor | |
| from mmcm.utils.gpu_util import get_gpu_status | |
| from ..data.data_util import ( | |
| batch_concat_two_tensor_with_index, | |
| batch_index_fill, | |
| batch_index_select, | |
| concat_two_tensor, | |
| align_repeat_tensor_single_dim, | |
| ) | |
| from ..utils.attention_util import generate_sparse_causcal_attn_mask | |
| from .attention import BasicTransformerBlock | |
| from .attention_processor import ( | |
| BaseIPAttnProcessor, | |
| ) | |
| from . import Model_Register | |
| # https://github.com/facebookresearch/xformers/issues/845 | |
| # 输入bs*n_frames*w*h太高,xformers报错。因此将transformer_temporal的allow_xformers均关掉 | |
| # if bs*n_frames*w*h to large, xformers will raise error. So we close the allow_xformers in transformer_temporal | |
| logger = logging.getLogger(__name__) | |
| class TransformerTemporalModel(ModelMixin, ConfigMixin): | |
| """ | |
| Transformer model for video-like data. | |
| Parameters: | |
| num_attention_heads (`int`, *optional*, defaults to 16): The number of heads to use for multi-head attention. | |
| attention_head_dim (`int`, *optional*, defaults to 88): The number of channels in each head. | |
| in_channels (`int`, *optional*): | |
| Pass if the input is continuous. The number of channels in the input and output. | |
| num_layers (`int`, *optional*, defaults to 1): The number of layers of Transformer blocks to use. | |
| dropout (`float`, *optional*, defaults to 0.0): The dropout probability to use. | |
| cross_attention_dim (`int`, *optional*): The number of encoder_hidden_states dimensions to use. | |
| sample_size (`int`, *optional*): Pass if the input is discrete. The width of the latent images. | |
| Note that this is fixed at training time as it is used for learning a number of position embeddings. See | |
| `ImagePositionalEmbeddings`. | |
| activation_fn (`str`, *optional*, defaults to `"geglu"`): Activation function to be used in feed-forward. | |
| attention_bias (`bool`, *optional*): | |
| Configure if the TransformerBlocks' attention should contain a bias parameter. | |
| double_self_attention (`bool`, *optional*): | |
| Configure if each TransformerBlock should contain two self-attention layers | |
| """ | |
| def __init__( | |
| self, | |
| num_attention_heads: int = 16, | |
| attention_head_dim: int = 88, | |
| in_channels: Optional[int] = None, | |
| out_channels: Optional[int] = None, | |
| num_layers: int = 1, | |
| femb_channels: Optional[int] = None, | |
| dropout: float = 0.0, | |
| norm_num_groups: int = 32, | |
| cross_attention_dim: Optional[int] = None, | |
| attention_bias: bool = False, | |
| sample_size: Optional[int] = None, | |
| activation_fn: str = "geglu", | |
| norm_elementwise_affine: bool = True, | |
| double_self_attention: bool = True, | |
| allow_xformers: bool = False, | |
| only_cross_attention: bool = False, | |
| keep_content_condition: bool = False, | |
| need_spatial_position_emb: bool = False, | |
| need_temporal_weight: bool = True, | |
| self_attn_mask: str = None, | |
| # TODO: 运行参数,有待改到forward里面去 | |
| # TODO: running parameters, need to be moved to forward | |
| image_scale: float = 1.0, | |
| processor: AttnProcessor | None = None, | |
| remove_femb_non_linear: bool = False, | |
| ): | |
| super().__init__() | |
| self.num_attention_heads = num_attention_heads | |
| self.attention_head_dim = attention_head_dim | |
| inner_dim = num_attention_heads * attention_head_dim | |
| self.inner_dim = inner_dim | |
| self.in_channels = in_channels | |
| self.norm = torch.nn.GroupNorm( | |
| num_groups=norm_num_groups, num_channels=in_channels, eps=1e-6, affine=True | |
| ) | |
| self.proj_in = nn.Linear(in_channels, inner_dim) | |
| # 2. Define temporal positional embedding | |
| self.frame_emb_proj = torch.nn.Linear(femb_channels, inner_dim) | |
| self.remove_femb_non_linear = remove_femb_non_linear | |
| if not remove_femb_non_linear: | |
| self.nonlinearity = nn.SiLU() | |
| # spatial_position_emb 使用femb_的参数配置 | |
| self.need_spatial_position_emb = need_spatial_position_emb | |
| if need_spatial_position_emb: | |
| self.spatial_position_emb_proj = torch.nn.Linear(femb_channels, inner_dim) | |
| # 3. Define transformers blocks | |
| # TODO: 该实现方式不好,待优化 | |
| # TODO: bad implementation, need to be optimized | |
| self.need_ipadapter = False | |
| self.cross_attn_temporal_cond = False | |
| self.allow_xformers = allow_xformers | |
| if processor is not None and isinstance(processor, BaseIPAttnProcessor): | |
| self.cross_attn_temporal_cond = True | |
| self.allow_xformers = False | |
| if "NonParam" not in processor.__class__.__name__: | |
| self.need_ipadapter = True | |
| self.transformer_blocks = nn.ModuleList( | |
| [ | |
| BasicTransformerBlock( | |
| inner_dim, | |
| num_attention_heads, | |
| attention_head_dim, | |
| dropout=dropout, | |
| cross_attention_dim=cross_attention_dim, | |
| activation_fn=activation_fn, | |
| attention_bias=attention_bias, | |
| double_self_attention=double_self_attention, | |
| norm_elementwise_affine=norm_elementwise_affine, | |
| allow_xformers=allow_xformers, | |
| only_cross_attention=only_cross_attention, | |
| cross_attn_temporal_cond=self.need_ipadapter, | |
| image_scale=image_scale, | |
| processor=processor, | |
| ) | |
| for d in range(num_layers) | |
| ] | |
| ) | |
| self.proj_out = nn.Linear(inner_dim, in_channels) | |
| self.need_temporal_weight = need_temporal_weight | |
| if need_temporal_weight: | |
| self.temporal_weight = nn.Parameter( | |
| torch.tensor( | |
| [ | |
| 1e-5, | |
| ] | |
| ) | |
| ) # initialize parameter with 0 | |
| self.skip_temporal_layers = False # Whether to skip temporal layer | |
| self.keep_content_condition = keep_content_condition | |
| self.self_attn_mask = self_attn_mask | |
| self.only_cross_attention = only_cross_attention | |
| self.double_self_attention = double_self_attention | |
| self.cross_attention_dim = cross_attention_dim | |
| self.image_scale = image_scale | |
| # zero out the last layer params,so the conv block is identity | |
| nn.init.zeros_(self.proj_out.weight) | |
| nn.init.zeros_(self.proj_out.bias) | |
| def forward( | |
| self, | |
| hidden_states, | |
| femb, | |
| encoder_hidden_states=None, | |
| timestep=None, | |
| class_labels=None, | |
| num_frames=1, | |
| cross_attention_kwargs=None, | |
| sample_index: torch.LongTensor = None, | |
| vision_conditon_frames_sample_index: torch.LongTensor = None, | |
| spatial_position_emb: torch.Tensor = None, | |
| return_dict: bool = True, | |
| ): | |
| """ | |
| Args: | |
| hidden_states ( When discrete, `torch.LongTensor` of shape `(batch size, num latent pixels)`. | |
| When continous, `torch.FloatTensor` of shape `(batch size, channel, height, width)`): Input | |
| hidden_states | |
| encoder_hidden_states ( `torch.LongTensor` of shape `(batch size, encoder_hidden_states dim)`, *optional*): | |
| Conditional embeddings for cross attention layer. If not given, cross-attention defaults to | |
| self-attention. | |
| timestep ( `torch.long`, *optional*): | |
| Optional timestep to be applied as an embedding in AdaLayerNorm's. Used to indicate denoising step. | |
| class_labels ( `torch.LongTensor` of shape `(batch size, num classes)`, *optional*): | |
| Optional class labels to be applied as an embedding in AdaLayerZeroNorm. Used to indicate class labels | |
| conditioning. | |
| return_dict (`bool`, *optional*, defaults to `True`): | |
| Whether or not to return a [`models.unet_2d_condition.UNet2DConditionOutput`] instead of a plain tuple. | |
| Returns: | |
| [`~models.transformer_2d.TransformerTemporalModelOutput`] or `tuple`: | |
| [`~models.transformer_2d.TransformerTemporalModelOutput`] if `return_dict` is True, otherwise a `tuple`. | |
| When returning a tuple, the first element is the sample tensor. | |
| """ | |
| if self.skip_temporal_layers is True: | |
| if not return_dict: | |
| return (hidden_states,) | |
| return TransformerTemporalModelOutput(sample=hidden_states) | |
| # 1. Input | |
| batch_frames, channel, height, width = hidden_states.shape | |
| batch_size = batch_frames // num_frames | |
| hidden_states = rearrange( | |
| hidden_states, "(b t) c h w -> b c t h w", b=batch_size | |
| ) | |
| residual = hidden_states | |
| hidden_states = self.norm(hidden_states) | |
| hidden_states = rearrange(hidden_states, "b c t h w -> (b h w) t c") | |
| hidden_states = self.proj_in(hidden_states) | |
| # 2 Positional embedding | |
| # adapted from https://github.com/huggingface/diffusers/blob/v0.16.1/src/diffusers/models/resnet.py#L574 | |
| if not self.remove_femb_non_linear: | |
| femb = self.nonlinearity(femb) | |
| femb = self.frame_emb_proj(femb) | |
| femb = align_repeat_tensor_single_dim(femb, hidden_states.shape[0], dim=0) | |
| hidden_states = hidden_states + femb | |
| # 3. Blocks | |
| if ( | |
| (self.only_cross_attention or not self.double_self_attention) | |
| and self.cross_attention_dim is not None | |
| and encoder_hidden_states is not None | |
| ): | |
| encoder_hidden_states = align_repeat_tensor_single_dim( | |
| encoder_hidden_states, | |
| hidden_states.shape[0], | |
| dim=0, | |
| n_src_base_length=batch_size, | |
| ) | |
| for i, block in enumerate(self.transformer_blocks): | |
| hidden_states = block( | |
| hidden_states, | |
| encoder_hidden_states=encoder_hidden_states, | |
| timestep=timestep, | |
| cross_attention_kwargs=cross_attention_kwargs, | |
| class_labels=class_labels, | |
| ) | |
| # 4. Output | |
| hidden_states = self.proj_out(hidden_states) | |
| hidden_states = rearrange( | |
| hidden_states, "(b h w) t c -> b c t h w", b=batch_size, h=height, w=width | |
| ).contiguous() | |
| # 保留condition对应的frames,便于保持前序内容帧,提升一致性 | |
| # keep the frames corresponding to the condition to maintain the previous content frames and improve consistency | |
| if ( | |
| vision_conditon_frames_sample_index is not None | |
| and self.keep_content_condition | |
| ): | |
| mask = torch.ones_like(hidden_states, device=hidden_states.device) | |
| mask = batch_index_fill( | |
| mask, dim=2, index=vision_conditon_frames_sample_index, value=0 | |
| ) | |
| if self.need_temporal_weight: | |
| output = ( | |
| residual + torch.abs(self.temporal_weight) * mask * hidden_states | |
| ) | |
| else: | |
| output = residual + mask * hidden_states | |
| else: | |
| if self.need_temporal_weight: | |
| output = residual + torch.abs(self.temporal_weight) * hidden_states | |
| else: | |
| output = residual + mask * hidden_states | |
| # output = torch.abs(self.temporal_weight) * hidden_states + residual | |
| output = rearrange(output, "b c t h w -> (b t) c h w") | |
| if not return_dict: | |
| return (output,) | |
| return TransformerTemporalModelOutput(sample=output) | |