## 注意事项

• 这次作业很多细节问题，如果碰到解决不了的问题要及时查看博客；

## Question 1

### Xavier uniform

def xavier_uniform(fan_in, fan_out, gain=1.0, **kwargs):
a = gain * math.sqrt(6 / (fan_in + fan_out))

return rand(fan_in, fan_out, low=-1, high=1) * a
### END YOUR SOLUTION

### Xavier normal

def xavier_normal(fan_in, fan_out, gain=1.0, **kwargs):
std = gain * math.sqrt(2 / (fan_in + fan_out))

return randn(fan_in, fan_out) * std
### END YOUR SOLUTION

### Kaiming uniform

def kaiming_uniform(fan_in, fan_out, nonlinearity="relu", **kwargs):
assert nonlinearity == "relu", "Only relu supported currently"
gain = math.sqrt(2)
bound = gain * math.sqrt(3 / fan_in)

return rand(fan_in, fan_out, low=-1, high=1) * bound
### END YOUR SOLUTION

### Kaiming normal

def kaiming_normal(fan_in, fan_out, nonlinearity="relu", **kwargs):
assert nonlinearity == "relu", "Only relu supported currently"
gain = math.sqrt(2)
std = gain / math.sqrt(fan_in)

return randn(fan_in, fan_out) * std
### END YOUR SOLUTION

## Question 2

### Linear

class Linear(Module):
def __init__(self, in_features, out_features, bias=True, device=None, dtype="float32"):
super().__init__()
self.in_features = in_features
self.out_features = out_features

self.use_bias = bias
w = init.kaiming_uniform(in_features, out_features)
self.weight = Parameter(w, device=device, dtype=dtype)
if self.use_bias:
b = ops.reshape(init.kaiming_uniform(out_features, 1), (1, out_features))
self.bias = Parameter(b, device=device, dtype=dtype)

def forward(self, X: Tensor) -> Tensor:
Y = ops.matmul(X, self.weight)
if self.use_bias:
# 只能用Y.shape, 因为可能高维
Y += bias

return Y
### END YOUR SOLUTION

### ReLU

class ReLU(Module):
def forward(self, x: Tensor) -> Tensor:
return ops.relu(x)
### END YOUR SOLUTION

### Sequential

class Sequential(Module):
def __init__(self, *modules):
super().__init__()
self.modules = modules

def forward(self, x: Tensor) -> Tensor:
for module in self.modules:
# print(x.shape)
x = module(x)

return x
### END YOUR SOLUTION

### LogSumExp

class LogSumExp(TensorOp):
def __init__(self, axes: Optional[tuple] = None):
if isinstance(axes, int):
axes = (axes, )
self.axes = axes

def compute(self, Z):
z = array_api.max(Z, axis=self.axes, keepdims=True)
log_sum_exp = array_api.log(array_api.sum(array_api.exp(Z - z), axis=self.axes, keepdims=True)) + z

new_shape = []
if self.axes:
l = len(Z.shape)
for i, n in enumerate(Z.shape):
if (i not in self.axes) and ((i - l) not in self.axes):
new_shape.append(n)
log_sum_exp = log_sum_exp.reshape(new_shape).astype(Z.dtype)
else:
# for test
log_sum_exp = float(log_sum_exp)

return log_sum_exp

input, = node.inputs
z = array_api.max(input.numpy(), axis=self.axes, keepdims=True)
e = array_api.exp(input.numpy() - z)
e_sum = array_api.sum(e, axis=self.axes, keepdims=True)
prob = e / e_sum
new_shape = list(input.shape)
# (a, b) -> (1, a, 1, b)
if self.axes:
for i in self.axes:
new_shape[i] = 1
else:

### END YOUR SOLUTION

### SoftmaxLoss

class SoftmaxLoss(Module):
def forward(self, logits: Tensor, y: Tensor):
n = 1.0 * logits.shape[0]
m = logits.shape[-1]
y_one_hot = init.one_hot(m, y)
z_y = ops.summation(ops.multiply(logits, y_one_hot), axes=(-1,))

return ops.divide_scalar(ops.summation(ops.logsumexp(logits, axes=(-1,)) - z_y), float(n))
### END YOUR SOLUTION

### LayerNorm1d

class LayerNorm1d(Module):
def __init__(self, dim, eps=1e-5, device=None, dtype="float32"):
super().__init__()
self.dim = dim
self.eps = eps
w = init.ones(dim)
self.weight = Parameter(w, device=device, dtype=dtype)
b = init.zeros(dim)
self.bias = Parameter(b, device=device, dtype=dtype)

def forward(self, x: Tensor) -> Tensor:
l = len(x.shape)
n = x.shape[-2]
d = x.shape[-1]
# 1, ..., 1, d
for i in range(l - 2):
# ..., n, 1
stat_shape = x.shape[:-2] + (n, 1)
# mean
# ..., d -> ...,
x_mean = ops.summation(x, axes=-1) / d
# ..., -> ..., d
x_zero = x - x_mean
# var
# ..., d -> ...,
x_var = ops.summation(ops.multiply(x_zero, x_zero), axes=-1) / d
# ..., -> ..., d
x_stan_var = ops.power_scalar(x_var + self.eps, 0.5)
# normalize
x_normalize = x_zero / x_stan_var
# res
res = x_normalize * weight + bias

return res
### END YOUR SOLUTION

### Flatten

class Flatten(Module):
def forward(self, X):
l = len(X.shape)
for i in range(l - 2):
X_shape = X.shape
d2 = X_shape[-2]
d1 = X_shape[-1]
new_shape = X_shape[:-2] + (d1 * d2,)
X = ops.reshape(X, new_shape)

return X
### END YOUR SOLUTION

### BatchNorm1d

training:

test:

class BatchNorm1d(Module):
def __init__(self, dim, eps=1e-5, momentum=0.1, device=None, dtype="float32"):
super().__init__()
self.dim = dim
self.eps = eps
self.momentum = momentum
w = init.ones(dim)
self.weight = Parameter(w, device=device, dtype=dtype)
b = init.zeros(dim)
self.bias = Parameter(b, device=device, dtype=dtype)
# 不求梯度
mean = init.zeros(dim)
self.running_mean = mean
var = init.ones(dim)
self.running_var = var

def forward(self, x: Tensor) -> Tensor:
l = len(x.shape)
n = x.shape[-2]
d = x.shape[-1]
# 1, ..., 1, d
for i in range(l - 2):
# ..., 1, d
stat_shape = x.shape[:-2] + (1, d)
# constant
c = 1
for i in range(l - 1):
c *= x.shape[i]
# mean
if self.training:
# ..., n, d -> ..., d
x_mean = ops.summation(x, axes=-2) / n
# ..., d -> ..., 1, d -> ..., n, d
# moving average
running_mean = (1 - self.momentum) * running_mean.data  + self.momentum * x_mean.data
self.running_mean = ops.summation(running_mean, axes=tuple(range(l - 1))).data / c
else:
x_zero = x - x_mean
# var
if self.training:
# ..., n, d -> ..., d
x_var = ops.summation(ops.multiply(x_zero, x_zero), axes=-2) / n
# ..., d -> ..., 1, d -> ..., n, d
# moving average
running_var = (1 - self.momentum) * running_var.data  + self.momentum * x_var.data
self.running_var = ops.summation(running_var, axes=tuple(range(l - 1))).data / c
else:
x_stan_var = ops.power_scalar(x_var + self.eps, 0.5)
# normalize
x_normalize = x_zero / x_stan_var
# res
res = x_normalize * weight + bias

return res
### END YOUR SOLUTION

### Dropout

training：

test：不使用dropout。

class Dropout(Module):
def __init__(self, p = 0.5):
super().__init__()
self.p = p

def forward(self, x: Tensor) -> Tensor:
if self.training:
prob = init.randb(*x.shape, p=1 - self.p)
res = ops.multiply(x, prob) / (1 - self.p)
else:
res = x

return res
### END YOUR SOLUTION

### Residual

class Residual(Module):
def __init__(self, fn: Module):
super().__init__()
self.fn = fn

def forward(self, x: Tensor) -> Tensor:
return self.fn(x) + x
### END YOUR SOLUTION

## Question 3

• $g$为原始梯度；
• $wd$为weight_decay；

### SGD

class SGD(Optimizer):
def __init__(self, params, lr=0.01, momentum=0.0, weight_decay=0.0):
super().__init__(params)
self.lr = lr
self.momentum = momentum
self.u = {}
self.weight_decay = weight_decay
for p in self.params:
key = hash(p)
self.u[key] = 0

def step(self):
for p in self.params:
continue
key = hash(p)
u = self.momentum * self.u[key] + (1 - self.momentum) * (p.grad.data + self.weight_decay * p.data)
self.u[key] = u
p.data = p.data - self.lr * u
### END YOUR SOLUTION

class Adam(Optimizer):
def __init__(
self,
params,
lr=0.01,
beta1=0.9,
beta2=0.999,
eps=1e-8,
weight_decay=0.0,
):
super().__init__(params)
self.lr = lr
self.beta1 = beta1
self.beta2 = beta2
self.eps = eps
self.weight_decay = weight_decay
self.t = 0

self.m = {}
self.v = {}
for p in self.params:
key = hash(p)
self.m[key] = 0
self.v[key] = 0

def step(self):
self.t += 1
for p in self.params:
continue
key = hash(p)
v = self.beta1 * self.v[key] + (1 - self.beta1) * grad
self.v[key] = v
self.m[key] = m
# bias correction
v = v / (1 - self.beta1 ** self.t)
m = m / (1 - self.beta2 ** self.t)
p.data = p.data - self.lr * ops.divide(v, ops.power_scalar(m, 0.5) + self.eps)
### END YOUR SOLUTION

## Question 4

### RandomFlipHorizontal

class RandomFlipHorizontal(Transform):
def __init__(self, p = 0.5):
self.p = p

def __call__(self, img):
"""
Horizonally flip an image, specified as n H x W x C NDArray.
Args:
img: H x W x C NDArray of an image
Returns:
H x W x C ndarray corresponding to image flipped with probability self.p
Note: use the provided code to provide randomness, for easier testing
"""
flip_img = np.random.rand() < self.p
if flip_img:
img = img[:, ::-1, :]

return img
### END YOUR SOLUTION

### RandomCrop

----------
|    b    |
|         |
|     ----------
|	  |   |	   |
|	  |	  |    |
------|---|	   |
|		a |
---------

class RandomFlipHorizontal(Transform):
def __init__(self, p = 0.5):
self.p = p

def __call__(self, img):
"""
Horizonally flip an image, specified as n H x W x C NDArray.
Args:
img: H x W x C NDArray of an image
Returns:
H x W x C ndarray corresponding to image flipped with probability self.p
Note: use the provided code to provide randomness, for easier testing
"""
flip_img = np.random.rand() < self.p
if flip_img:
img = img[:, ::-1, :]

return img
### END YOUR SOLUTION

### MNISTDataset

• 保证单图片返回的形状为$(28, 28 ,1)$；
• 保证多图片返回的形状为$(m, 28, 28,1)$；

class MNISTDataset(Dataset):
def __init__(
self,
image_filename: str,
label_filename: str,
transforms: Optional[List] = None,
):
super().__init__(transforms)
self.X, self.y = parse_mnist(image_filename, label_filename)

def __getitem__(self, index) -> object:
x = self.X[index]
y = self.y[index]
n = len(x.shape)
if n == 1:
# 单索引情形
x = x.reshape(28, 28, -1)
x = self.apply_transforms(x)
x = x.reshape(28, 28, 1)
else:
# 多索引情形
m = x.shape[0]
x = x.reshape(m, 28, 28, -1)
for i in range(m):
x[i] = self.apply_transforms(x[i])

return x, y

def __len__(self) -> int:
return self.X.shape[0]
### END YOUR SOLUTION

class DataLoader:
r"""
Data loader. Combines a dataset and a sampler, and provides an iterable over
the given dataset.
Args:
dataset (Dataset): dataset from which to load the data.
batch_size (int, optional): how many samples per batch to load
(default: 1).
shuffle (bool, optional): set to True to have the data reshuffled
at every epoch (default: False).
"""
dataset: Dataset
batch_size: Optional[int]

def __init__(
self,
dataset: Dataset,
batch_size: Optional[int] = 1,
shuffle: bool = False,
):

self.dataset = dataset
self.shuffle = shuffle
self.batch_size = batch_size
# indexes = np.arange()
self.n = len(dataset)
if not self.shuffle:
self.ordering = np.array_split(np.arange(self.n),
range(batch_size, self.n, batch_size))

def __iter__(self):
self.index = 0
if self.shuffle:
indexes = np.arange(self.n)
np.random.shuffle(indexes)
self.ordering = np.array_split(indexes,
range(self.batch_size, self.n, self.batch_size))
return self

def __next__(self):
if self.index == len(self.ordering):
raise StopIteration

res = [Tensor(x) for x in self.dataset[self.ordering[self.index]]]
self.index += 1

return tuple(res)
### END YOUR SOLUTION

## Question 5

### ResidualBlock

def ResidualBlock(dim, hidden_dim, norm=nn.BatchNorm1d, drop_prob=0.1):
module = nn.Sequential(
nn.Linear(dim, hidden_dim),
norm(hidden_dim),
nn.ReLU(),
nn.Dropout(drop_prob),
nn.Linear(hidden_dim, dim),
norm(dim),
)

model = nn.Sequential(
nn.Residual(module),
nn.ReLU()
)

return model
### END YOUR SOLUTION

### MLPResNet

def MLPResNet(dim, hidden_dim=100, num_blocks=3, num_classes=10, norm=nn.BatchNorm1d, drop_prob=0.1):
### END YOUR SOLUTION