| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203 |
- # Copyright (c) Meta Platforms, Inc. and affiliates. All Rights Reserved
- # pyre-unsafe
- import math
- from typing import Tuple
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- try:
- from timm.layers import DropPath
- except ModuleNotFoundError:
- # compatibility for older timm versions
- from timm.models.layers import DropPath
- from .model_misc import get_clones, LayerNorm2d
- class SimpleMaskDownSampler(nn.Module):
- """
- Progressively downsample a mask by total_stride, each time by stride.
- Note that LayerNorm is applied per *token*, like in ViT.
- With each downsample (by a factor stride**2), channel capacity increases by the same factor.
- In the end, we linearly project to embed_dim channels.
- """
- def __init__(
- self,
- embed_dim=256,
- kernel_size=4,
- stride=4,
- padding=0,
- total_stride=16,
- activation=nn.GELU,
- # Option to interpolate the input mask first before downsampling using convs. In that case, the total_stride is assumed to be after interpolation.
- # If set to input resolution or None, we don't interpolate. We default to None to be safe (for older configs or if not explicitly set)
- interpol_size=None,
- ):
- super().__init__()
- num_layers = int(math.log2(total_stride) // math.log2(stride))
- assert stride**num_layers == total_stride
- self.encoder = nn.Sequential()
- mask_in_chans, mask_out_chans = 1, 1
- for _ in range(num_layers):
- mask_out_chans = mask_in_chans * (stride**2)
- self.encoder.append(
- nn.Conv2d(
- mask_in_chans,
- mask_out_chans,
- kernel_size=kernel_size,
- stride=stride,
- padding=padding,
- )
- )
- self.encoder.append(LayerNorm2d(mask_out_chans))
- self.encoder.append(activation())
- mask_in_chans = mask_out_chans
- self.encoder.append(nn.Conv2d(mask_out_chans, embed_dim, kernel_size=1))
- self.interpol_size = interpol_size
- if self.interpol_size is not None:
- assert isinstance(self.interpol_size, (list, tuple)), (
- f"Unsupported type {type(self.interpol_size)}. Should be a list or tuple."
- )
- self.interpol_size = list(interpol_size)
- assert len(self.interpol_size) == 2
- def forward(self, x: torch.Tensor):
- if self.interpol_size is not None and self.interpol_size != list(x.shape[-2:]):
- x = F.interpolate(
- x.float(),
- size=self.interpol_size,
- align_corners=False,
- mode="bilinear",
- antialias=True,
- )
- return self.encoder(x)
- # Lightly adapted from ConvNext (https://github.com/facebookresearch/ConvNeXt)
- class CXBlock(nn.Module):
- r"""ConvNeXt Block. There are two equivalent implementations:
- (1) DwConv -> LayerNorm (channels_first) -> 1x1 Conv -> GELU -> 1x1 Conv; all in (N, C, H, W)
- (2) DwConv -> Permute to (N, H, W, C); LayerNorm (channels_last) -> Linear -> GELU -> Linear; Permute back
- We use (2) as we find it slightly faster in PyTorch
- Args:
- dim (int): Number of input channels.
- drop_path (float): Stochastic depth rate. Default: 0.0
- layer_scale_init_value (float): Init value for Layer Scale. Default: 1e-6.
- """
- def __init__(
- self,
- dim,
- kernel_size=7,
- padding=3,
- drop_path=0.0,
- layer_scale_init_value=1e-6,
- use_dwconv=True,
- ):
- super().__init__()
- self.dwconv = nn.Conv2d(
- dim,
- dim,
- kernel_size=kernel_size,
- padding=padding,
- groups=dim if use_dwconv else 1,
- ) # depthwise conv
- self.norm = LayerNorm2d(dim, eps=1e-6)
- self.pwconv1 = nn.Linear(
- dim, 4 * dim
- ) # pointwise/1x1 convs, implemented with linear layers
- self.act = nn.GELU()
- self.pwconv2 = nn.Linear(4 * dim, dim)
- self.gamma = (
- nn.Parameter(layer_scale_init_value * torch.ones((dim)), requires_grad=True)
- if layer_scale_init_value > 0
- else None
- )
- self.drop_path = DropPath(drop_path) if drop_path > 0.0 else nn.Identity()
- def forward(self, x):
- input = x
- x = self.dwconv(x)
- x = self.norm(x)
- x = x.permute(0, 2, 3, 1) # (N, C, H, W) -> (N, H, W, C)
- x = self.pwconv1(x)
- x = self.act(x)
- x = self.pwconv2(x)
- if self.gamma is not None:
- x = self.gamma * x
- x = x.permute(0, 3, 1, 2) # (N, H, W, C) -> (N, C, H, W)
- x = input + self.drop_path(x)
- return x
- class SimpleFuser(nn.Module):
- def __init__(self, layer, num_layers, dim=None, input_projection=False):
- super().__init__()
- self.proj = nn.Identity()
- self.layers = get_clones(layer, num_layers)
- if input_projection:
- assert dim is not None
- self.proj = nn.Conv2d(dim, dim, kernel_size=1)
- def forward(self, x):
- # normally x: (N, C, H, W)
- x = self.proj(x)
- for layer in self.layers:
- x = layer(x)
- return x
- class SimpleMaskEncoder(nn.Module):
- def __init__(
- self,
- out_dim,
- mask_downsampler,
- fuser,
- position_encoding,
- in_dim=256, # in_dim of pix_feats
- ):
- super().__init__()
- self.mask_downsampler = mask_downsampler
- self.pix_feat_proj = nn.Conv2d(in_dim, in_dim, kernel_size=1)
- self.fuser = fuser
- self.position_encoding = position_encoding
- self.out_proj = nn.Identity()
- if out_dim != in_dim:
- self.out_proj = nn.Conv2d(in_dim, out_dim, kernel_size=1)
- def forward(
- self,
- pix_feat: torch.Tensor,
- masks: torch.Tensor,
- skip_mask_sigmoid: bool = False,
- ) -> Tuple[torch.Tensor, torch.Tensor]:
- ## Process masks
- # sigmoid, so that less domain shift from gt masks which are bool
- if not skip_mask_sigmoid:
- masks = F.sigmoid(masks)
- masks = self.mask_downsampler(masks)
- ## Fuse pix_feats and downsampled masks
- # in case the visual features are on CPU, cast them to CUDA
- pix_feat = pix_feat.to(masks.device)
- x = self.pix_feat_proj(pix_feat)
- x = x + masks
- x = self.fuser(x)
- x = self.out_proj(x)
- pos = self.position_encoding(x).to(x.dtype)
- return {"vision_features": x, "vision_pos_enc": [pos]}
|