这里回顾HW2,主要是实现一个基本的自动微分框架。

课程主页:

参考资料:

注意事项

  • 这次作业很多细节问题,如果碰到解决不了的问题要及时查看博客;

Question 1

这一部分主要是实现一些初始化方法。

Xavier uniform

公式:

按均匀分布$\mathcal{U}(-a, a)$初始化,其中:

代码:

def xavier_uniform(fan_in, fan_out, gain=1.0, **kwargs):
    ### BEGIN YOUR SOLUTION
    a = gain * math.sqrt(6 / (fan_in + fan_out))

    return rand(fan_in, fan_out, low=-1, high=1) * a
    ### END YOUR SOLUTION

Xavier normal

公式:

按正态分布$\mathcal{N}(0, \text{std}^2)$初始化,其中:

代码:

def xavier_normal(fan_in, fan_out, gain=1.0, **kwargs):
    ### BEGIN YOUR SOLUTION
    std = gain * math.sqrt(2 / (fan_in + fan_out))

    return randn(fan_in, fan_out) * std
    ### END YOUR SOLUTION

Kaiming uniform

公式:

按均匀分布$\mathcal{U}(-\text{bound}, \text{bound})$初始化,其中:

代码:

def kaiming_uniform(fan_in, fan_out, nonlinearity="relu", **kwargs):
    assert nonlinearity == "relu", "Only relu supported currently"
    ### BEGIN YOUR SOLUTION
    gain = math.sqrt(2)
    bound = gain * math.sqrt(3 / fan_in)

    return rand(fan_in, fan_out, low=-1, high=1) * bound
    ### END YOUR SOLUTION

Kaiming normal

公式:

按正态分布$\mathcal{N}(0, \text{std}^2)$初始化,其中:

代码:

def kaiming_normal(fan_in, fan_out, nonlinearity="relu", **kwargs):
    assert nonlinearity == "relu", "Only relu supported currently"
    ### BEGIN YOUR SOLUTION
    gain = math.sqrt(2)
    std = gain / math.sqrt(fan_in)

    return randn(fan_in, fan_out) * std
    ### END YOUR SOLUTION

Question 2

这部分主要是实现一些基本的网络模块,注意这里我们需要手动广播。

Linear

公式:

代码:

class Linear(Module):
    def __init__(self, in_features, out_features, bias=True, device=None, dtype="float32"):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features

        ### BEGIN YOUR SOLUTION
        self.use_bias = bias
        w = init.kaiming_uniform(in_features, out_features)
        self.weight = Parameter(w, device=device, dtype=dtype)
        if self.use_bias:
            b = ops.reshape(init.kaiming_uniform(out_features, 1), (1, out_features))
            self.bias = Parameter(b, device=device, dtype=dtype)
        ### END YOUR SOLUTION

    def forward(self, X: Tensor) -> Tensor:
        ### BEGIN YOUR SOLUTION
        Y = ops.matmul(X, self.weight)
        if self.use_bias:
            # 只能用Y.shape, 因为可能高维
            bias = ops.broadcast_to(self.bias, Y.shape)
            Y += bias

        return Y
        ### END YOUR SOLUTION

ReLU

公式:

代码:

class ReLU(Module):
    def forward(self, x: Tensor) -> Tensor:
        ### BEGIN YOUR SOLUTION
        return ops.relu(x)
        ### END YOUR SOLUTION

Sequential

代码:

class Sequential(Module):
    def __init__(self, *modules):
        super().__init__()
        self.modules = modules

    def forward(self, x: Tensor) -> Tensor:
        ### BEGIN YOUR SOLUTION
        for module in self.modules:
            # print(x.shape)
            x = module(x)

        return x
        ### END YOUR SOLUTION

LogSumExp

公式:

前向:

反向:

首先恢复为原始的计算方式:

然后求导可得:

所以整体计算公式为:

代码:

class LogSumExp(TensorOp):
    def __init__(self, axes: Optional[tuple] = None):
        if isinstance(axes, int):
            axes = (axes, )
        self.axes = axes

    def compute(self, Z):
        ### BEGIN YOUR SOLUTION
        z = array_api.max(Z, axis=self.axes, keepdims=True)
        log_sum_exp = array_api.log(array_api.sum(array_api.exp(Z - z), axis=self.axes, keepdims=True)) + z
            
        new_shape = []
        if self.axes:
            l = len(Z.shape)
            for i, n in enumerate(Z.shape):
                if (i not in self.axes) and ((i - l) not in self.axes):
                    new_shape.append(n)
            log_sum_exp = log_sum_exp.reshape(new_shape).astype(Z.dtype)
        else:
            # for test
            log_sum_exp = float(log_sum_exp)

        return log_sum_exp
        ### END YOUR SOLUTION

    def gradient(self, out_grad, node):
        ### BEGIN YOUR SOLUTION
        input, = node.inputs
        z = array_api.max(input.numpy(), axis=self.axes, keepdims=True)
        e = array_api.exp(input.numpy() - z)
        e_sum = array_api.sum(e, axis=self.axes, keepdims=True)
        prob = e / e_sum
        new_shape = list(input.shape)
        # (a, b) -> (1, a, 1, b)
        if self.axes:
            for i in self.axes:
                new_shape[i] = 1
            grad = reshape(out_grad, new_shape)
        else:
            grad = out_grad
        
        return broadcast_to(grad, input.shape) * Tensor(prob, dtype=grad.dtype)
        ### END YOUR SOLUTION

SoftmaxLoss

公式:

代码:

class SoftmaxLoss(Module):
    def forward(self, logits: Tensor, y: Tensor):
        ### BEGIN YOUR SOLUTION
        n = 1.0 * logits.shape[0]
        m = logits.shape[-1]
        y_one_hot = init.one_hot(m, y)
        z_y = ops.summation(ops.multiply(logits, y_one_hot), axes=(-1,))

        return ops.divide_scalar(ops.summation(ops.logsumexp(logits, axes=(-1,)) - z_y), float(n))
        ### END YOUR SOLUTION

LayerNorm1d

公式:

按照特征维度归一化:

代码:

class LayerNorm1d(Module):
    def __init__(self, dim, eps=1e-5, device=None, dtype="float32"):
        super().__init__()
        self.dim = dim
        self.eps = eps
        ### BEGIN YOUR SOLUTION
        w = init.ones(dim)
        self.weight = Parameter(w, device=device, dtype=dtype)
        b = init.zeros(dim)
        self.bias = Parameter(b, device=device, dtype=dtype)
        ### END YOUR SOLUTION
        
    def forward(self, x: Tensor) -> Tensor:
        ### BEGIN YOUR SOLUTION
        l = len(x.shape)
        n = x.shape[-2]
        d = x.shape[-1]
        # 1, ..., 1, d
        broadcast_shape = (1, d)
        for i in range(l - 2):
            broadcast_shape = (1, ) + broadcast_shape
        # ..., n, 1
        stat_shape = x.shape[:-2] + (n, 1)
        # mean
        # ..., d -> ...,
        x_mean = ops.summation(x, axes=-1) / d
        # ..., -> ..., d
        x_mean = ops.broadcast_to(ops.reshape(x_mean, stat_shape), x.shape)
        x_zero = x - x_mean
        # var
        # ..., d -> ...,
        x_var = ops.summation(ops.multiply(x_zero, x_zero), axes=-1) / d
        # ..., -> ..., d
        x_var = ops.broadcast_to(ops.reshape(x_var, stat_shape), x.shape)
        x_stan_var = ops.power_scalar(x_var + self.eps, 0.5)
        # normalize
        x_normalize = x_zero / x_stan_var
        # res
        weight = ops.broadcast_to(ops.reshape(self.weight, broadcast_shape), x.shape)
        bias = ops.broadcast_to(ops.reshape(self.bias, broadcast_shape), x.shape)
        res = x_normalize * weight + bias

        return res
        ### END YOUR SOLUTION

Flatten

公式:

将$(b_0, b_1,\ldots)$的向量变成$(b_0b_1\ldots)$。

代码:

class Flatten(Module):
    def forward(self, X):
        ### BEGIN YOUR SOLUTION
        l = len(X.shape)
        for i in range(l - 2):
            X_shape = X.shape
            d2 = X_shape[-2]
            d1 = X_shape[-1]
            new_shape = X_shape[:-2] + (d1 * d2,)
            X = ops.reshape(X, new_shape)
            
        return X
        ### END YOUR SOLUTION

BatchNorm1d

公式:

training:

test:

利用均值和方差的滑动平均进行test:

滑动平均的计算公式为:

代码,注意这里running_mean和running_var不能初始化为Parameter,否则后续测试无法通过:

class BatchNorm1d(Module):
    def __init__(self, dim, eps=1e-5, momentum=0.1, device=None, dtype="float32"):
        super().__init__()
        self.dim = dim
        self.eps = eps
        self.momentum = momentum
        ### BEGIN YOUR SOLUTION
        w = init.ones(dim)
        self.weight = Parameter(w, device=device, dtype=dtype)
        b = init.zeros(dim)
        self.bias = Parameter(b, device=device, dtype=dtype)
        # 不求梯度
        mean = init.zeros(dim)
        self.running_mean = mean
        var = init.ones(dim)
        self.running_var = var
        ### END YOUR SOLUTION

    def forward(self, x: Tensor) -> Tensor:
        ### BEGIN YOUR SOLUTION
        l = len(x.shape)
        n = x.shape[-2]
        d = x.shape[-1]
        # 1, ..., 1, d
        broadcast_shape = (1, d)
        for i in range(l - 2):
            broadcast_shape = (1, ) + broadcast_shape
        # ..., 1, d
        stat_shape = x.shape[:-2] + (1, d)
        # constant
        c = 1
        for i in range(l - 1):
            c *= x.shape[i]
        # mean
        if self.training:
            # ..., n, d -> ..., d
            x_mean = ops.summation(x, axes=-2) / n
            # ..., d -> ..., 1, d -> ..., n, d
            x_mean = ops.broadcast_to(ops.reshape(x_mean, stat_shape), x.shape)
            # moving average
            running_mean = ops.broadcast_to(ops.reshape(self.running_mean, broadcast_shape), x.shape).data
            running_mean = (1 - self.momentum) * running_mean.data  + self.momentum * x_mean.data
            self.running_mean = ops.summation(running_mean, axes=tuple(range(l - 1))).data / c
        else:
            x_mean = ops.broadcast_to(ops.reshape(self.running_mean, broadcast_shape), x.shape)
        x_zero = x - x_mean
        # var
        if self.training:
            # ..., n, d -> ..., d
            x_var = ops.summation(ops.multiply(x_zero, x_zero), axes=-2) / n
            # ..., d -> ..., 1, d -> ..., n, d
            x_var = ops.broadcast_to(ops.reshape(x_var, stat_shape), x.shape)
            # moving average
            running_var = ops.broadcast_to(ops.reshape(self.running_var, broadcast_shape), x.shape).data
            running_var = (1 - self.momentum) * running_var.data  + self.momentum * x_var.data
            self.running_var = ops.summation(running_var, axes=tuple(range(l - 1))).data / c
        else:
            x_var = ops.broadcast_to(ops.reshape(self.running_var, broadcast_shape), x.shape)
        x_stan_var = ops.power_scalar(x_var + self.eps, 0.5)
        # normalize
        x_normalize = x_zero / x_stan_var
        # res
        weight = ops.broadcast_to(ops.reshape(self.weight, broadcast_shape), x.shape)
        bias = ops.broadcast_to(ops.reshape(self.bias, broadcast_shape), x.shape)
        res = x_normalize * weight + bias

        return res
        ### END YOUR SOLUTION

Dropout

公式:

training:

test:不使用dropout。

代码:

class Dropout(Module):
    def __init__(self, p = 0.5):
        super().__init__()
        self.p = p

    def forward(self, x: Tensor) -> Tensor:
        ### BEGIN YOUR SOLUTION
        if self.training:
            prob = init.randb(*x.shape, p=1 - self.p)
            res = ops.multiply(x, prob) / (1 - self.p)
        else:
            res = x
        
        return res
        ### END YOUR SOLUTION

Residual

代码:

公式:

class Residual(Module):
    def __init__(self, fn: Module):
        super().__init__()
        self.fn = fn

    def forward(self, x: Tensor) -> Tensor:
        ### BEGIN YOUR SOLUTION
        return self.fn(x) + x
        ### END YOUR SOLUTION

Question 3

这部分主要实现一些优化器,注意weight decay的用法统一如下:

其中:

  • $g$为原始梯度;
  • $wd$为weight_decay;

SGD

公式:

代码:

class SGD(Optimizer):
    def __init__(self, params, lr=0.01, momentum=0.0, weight_decay=0.0):
        super().__init__(params)
        self.lr = lr
        self.momentum = momentum
        self.u = {}
        self.weight_decay = weight_decay
        # add
        for p in self.params:
            key = hash(p)
            self.u[key] = 0

    def step(self):
        ### BEGIN YOUR SOLUTION
        for p in self.params:
            if p.grad is None:
                continue
            key = hash(p)
            u = self.momentum * self.u[key] + (1 - self.momentum) * (p.grad.data + self.weight_decay * p.data)
            self.u[key] = u
            p.data = p.data - self.lr * u
        ### END YOUR SOLUTION

在step中增加if p.grad is None是因为要处理模型部分参数不计算梯度的情形。

Adam

公式:

代码:

class Adam(Optimizer):
    def __init__(
        self,
        params,
        lr=0.01,
        beta1=0.9,
        beta2=0.999,
        eps=1e-8,
        weight_decay=0.0,
    ):
        super().__init__(params)
        self.lr = lr
        self.beta1 = beta1
        self.beta2 = beta2
        self.eps = eps
        self.weight_decay = weight_decay
        self.t = 0

        self.m = {}
        self.v = {}
        # add
        for p in self.params:
            key = hash(p)
            self.m[key] = 0
            self.v[key] = 0

    def step(self):
        ### BEGIN YOUR SOLUTION
        self.t += 1
        for p in self.params:
            if p.grad is None:
                continue
            key = hash(p)
            grad = p.grad.data + self.weight_decay * p.data
            v = self.beta1 * self.v[key] + (1 - self.beta1) * grad
            m = self.beta2 * self.m[key] + (1 - self.beta2) * grad * grad #ops.power_scalar(grad, 2).data
            self.v[key] = v
            self.m[key] = m
            # bias correction
            v = v / (1 - self.beta1 ** self.t)
            m = m / (1 - self.beta2 ** self.t)
            p.data = p.data - self.lr * ops.divide(v, ops.power_scalar(m, 0.5) + self.eps)
        ### END YOUR SOLUTION

Question 4

实现一些dataloader相关的函数。

RandomFlipHorizontal

代码:

class RandomFlipHorizontal(Transform):
    def __init__(self, p = 0.5):
        self.p = p

    def __call__(self, img):
        """
        Horizonally flip an image, specified as n H x W x C NDArray.
        Args:
            img: H x W x C NDArray of an image
        Returns:
            H x W x C ndarray corresponding to image flipped with probability self.p
        Note: use the provided code to provide randomness, for easier testing
        """
        flip_img = np.random.rand() < self.p
        ### BEGIN YOUR SOLUTION
        if flip_img:
            img = img[:, ::-1, :]
        
        return img
        ### END YOUR SOLUTION

RandomCrop

解释:

处理后的图片为b,原始图片为a,右侧和下方都是正方向:

----------
|    b    |
|         |
|     ----------
|	  |   |	   |
|	  |	  |    |
------|---|	   |
	  |		a |
	  ---------

代码:

class RandomFlipHorizontal(Transform):
    def __init__(self, p = 0.5):
        self.p = p

    def __call__(self, img):
        """
        Horizonally flip an image, specified as n H x W x C NDArray.
        Args:
            img: H x W x C NDArray of an image
        Returns:
            H x W x C ndarray corresponding to image flipped with probability self.p
        Note: use the provided code to provide randomness, for easier testing
        """
        flip_img = np.random.rand() < self.p
        ### BEGIN YOUR SOLUTION
        if flip_img:
            img = img[:, ::-1, :]
        
        return img
        ### END YOUR SOLUTION

MNISTDataset

主要是__getitem__的实现:

  • 保证单图片返回的形状为$(28, 28 ,1)$;
  • 保证多图片返回的形状为$(m, 28, 28,1)$;

代码:

class MNISTDataset(Dataset):
    def __init__(
        self,
        image_filename: str,
        label_filename: str,
        transforms: Optional[List] = None,
    ):
        ### BEGIN YOUR SOLUTION
        super().__init__(transforms)
        self.X, self.y = parse_mnist(image_filename, label_filename)
        ### END YOUR SOLUTION

    def __getitem__(self, index) -> object:
        ### BEGIN YOUR SOLUTION
        x = self.X[index]
        y = self.y[index]
        n = len(x.shape)
        if n == 1:
            # 单索引情形
            x = x.reshape(28, 28, -1)
            x = self.apply_transforms(x)
            x = x.reshape(28, 28, 1)
        else:
            # 多索引情形
            m = x.shape[0]
            x = x.reshape(m, 28, 28, -1)
            for i in range(m):
                x[i] = self.apply_transforms(x[i])

        return x, y
        ### END YOUR SOLUTION

    def __len__(self) -> int:
        ### BEGIN YOUR SOLUTION
        return self.X.shape[0]
        ### END YOUR SOLUTION

Dataloader

重点:

代码:

class DataLoader:
    r"""
    Data loader. Combines a dataset and a sampler, and provides an iterable over
    the given dataset.
    Args:
        dataset (Dataset): dataset from which to load the data.
        batch_size (int, optional): how many samples per batch to load
            (default: ``1``).
        shuffle (bool, optional): set to ``True`` to have the data reshuffled
            at every epoch (default: ``False``).
     """
    dataset: Dataset
    batch_size: Optional[int]

    def __init__(
        self,
        dataset: Dataset,
        batch_size: Optional[int] = 1,
        shuffle: bool = False,
    ):

        self.dataset = dataset
        self.shuffle = shuffle
        self.batch_size = batch_size
        # indexes = np.arange()
        self.n = len(dataset)
        if not self.shuffle:
            self.ordering = np.array_split(np.arange(self.n), 
                                           range(batch_size, self.n, batch_size))

    def __iter__(self):
        ### BEGIN YOUR SOLUTION
        self.index = 0
        if self.shuffle:
            indexes = np.arange(self.n)
            np.random.shuffle(indexes)
            self.ordering = np.array_split(indexes, 
                                           range(self.batch_size, self.n, self.batch_size))
        ### END YOUR SOLUTION
        return self

    def __next__(self):
        ### BEGIN YOUR SOLUTION
        if self.index == len(self.ordering):
            raise StopIteration

        res = [Tensor(x) for x in self.dataset[self.ordering[self.index]]]
        self.index += 1
        
        return tuple(res)
        ### END YOUR SOLUTION

Question 5

实现模型和训练流程。

ResidualBlock

代码:

def ResidualBlock(dim, hidden_dim, norm=nn.BatchNorm1d, drop_prob=0.1):
    ### BEGIN YOUR SOLUTION
    module = nn.Sequential(
        nn.Linear(dim, hidden_dim),
        norm(hidden_dim),
        nn.ReLU(),
        nn.Dropout(drop_prob),
        nn.Linear(hidden_dim, dim),
        norm(dim),
    )
    
    model = nn.Sequential(
        nn.Residual(module),
        nn.ReLU()
    )
    
    return model
    ### END YOUR SOLUTION

MLPResNet

def MLPResNet(dim, hidden_dim=100, num_blocks=3, num_classes=10, norm=nn.BatchNorm1d, drop_prob=0.1):
    ### BEGIN YOUR SOLUTION
    blocks = []
    blocks.append(nn.Linear(dim, hidden_dim))
    blocks.append(nn.ReLU())
    resi_dim = hidden_dim // 2
    for _ in range(num_blocks):
        blocks.append(ResidualBlock(hidden_dim, resi_dim, norm, drop_prob))
    blocks.append(nn.Linear(hidden_dim, num_classes))
    
    return nn.Sequential(*blocks)
    ### END YOUR SOLUTION

Train Mnist