Deep Learning Systems HW4
这里回顾HW4,这次的主要内容是利用needle实现cnn和rnn。
课程主页:
参考资料:
- https://www.cs.toronto.edu/~kriz/cifar.html
- https://forum.dlsyscourse.org/t/test-ndarray-backend-with-test-optim-py-hw2/2821
- https://hideyukiinada.github.io/cnn_backprop_strides2.html
- https://deeplearning.cs.cmu.edu/F21/document/recitation/Recitation5/CNN_Backprop_Recitation_5_F21.pdf
- https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
Part 1: ND Backend
注意点:
- 需要将之前numpy部分删去;
- broadcast_to的特殊处理;
- summation的多维版本支持;
- 变换形状之前需要compact;
代码:
class PowerScalar(TensorOp):
"""Op raise a tensor to an (integer) power."""
def __init__(self, scalar: int):
self.scalar = scalar
def compute(self, a: NDArray) -> NDArray:
### BEGIN YOUR SOLUTION
# 避免标量情形
return array_api.power(a, self.scalar)
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
# nx^(n-1)
return out_grad * (self.scalar * array_api.power(input, self.scalar - 1))
### END YOUR SOLUTION
def power_scalar(a, scalar):
return PowerScalar(scalar)(a)
class EWiseDiv(TensorOp):
"""Op to element-wise divide two nodes."""
def compute(self, a, b):
### BEGIN YOUR SOLUTION
return a / b
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
lhs, rhs = node.inputs
return out_grad / rhs, -out_grad * lhs / rhs / rhs
### END YOUR SOLUTION
def divide(a, b):
return EWiseDiv()(a, b)
class DivScalar(TensorOp):
def __init__(self, scalar):
self.scalar = scalar
def compute(self, a):
### BEGIN YOUR SOLUTION
# to keep dtype
return (a / self.scalar)
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return out_grad / self.scalar
### END YOUR SOLUTION
def divide_scalar(a, scalar):
return DivScalar(scalar)(a)
class Transpose(TensorOp):
def __init__(self, axes: Optional[tuple] = None):
if isinstance(axes, int):
axes = (axes, )
self.axes = axes
def compute(self, a):
### BEGIN YOUR SOLUTION
shape = list(range(len(a.shape)))
if self.axes:
x, y = self.axes[0], self.axes[1]
else:
x, y = -1, -2
shape[x], shape[y] = shape[y], shape[x]
return array_api.permute(a, tuple(shape))
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
if self.axes:
x, y = self.axes[0], self.axes[1]
else:
x, y = -1, -2
return transpose(out_grad, axes=(x, y))
### END YOUR SOLUTION
def transpose(a, axes=None):
return Transpose(axes)(a)
class Reshape(TensorOp):
def __init__(self, shape):
self.shape = shape
def compute(self, a):
### BEGIN YOUR SOLUTION
return array_api.reshape(a, self.shape)
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
return reshape(out_grad, input.shape)
### END YOUR SOLUTION
def reshape(a, shape):
return Reshape(shape)(a)
##### batch size = 1特殊处理
class BroadcastTo(TensorOp):
def __init__(self, shape):
self.shape = shape
def compute(self, a):
if a.shape == self.shape:
return a
else:
return array_api.broadcast_to(a, self.shape)
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
if input.shape == self.shape:
return out_grad
# 找到广播的维度
# input: scalar
n1 = len(input.shape)
n2 = len(self.shape)
# 计算系数
c = 1
# 扩充为相同维度大小
shape = [1] * (n2 - n1) + list(input.shape)
axes = []
for i in reversed(range(n2)):
# 不相等或填充为0
if shape[i] != self.shape[i] or i < n2 - n1:
axes.append(i)
return reshape(summation(out_grad, axes=tuple(axes)), input.shape)
## END YOUR SOLUTION
def broadcast_to(a, shape):
return BroadcastTo(shape)(a)
class Summation(TensorOp):
def __init__(self, axes: Optional[tuple] = None):
if isinstance(axes, int):
axes = (axes, )
self.axes = axes
def compute(self, a):
### BEGIN YOUR SOLUTION
n = len(a.shape)
axes = []
# 处理多维度求和
if not isinstance(self.axes, tuple):
ori_axes = self.axes,
else:
ori_axes = self.axes
for axis in ori_axes:
# 处理负数情形
if isinstance(axis, int):
if axis < 0:
axes.append(axis + n)
else:
axes.append(axis)
else:
axes.append(axis)
# 降序排列
axes = sorted(axes, reverse=True)
for axis in axes:
a = array_api.sum(a, axis)
return a
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
# 使坐标为正并且从小到大排列
if self.axes == None:
axes = input.shape
grad_shape = []
else:
axes = self.axes
grad_shape = list(out_grad.shape)
n = len(input.shape)
new_axes = []
for x in axes:
if x >= 0:
new_axes.append(x)
else:
new_axes.append(x + n)
new_axes = sorted(new_axes)
# 恢复grad_shape, 使grad_shape的维度和input.shape的维度相同
for axis in new_axes:
grad_shape.insert(axis, 1)
return broadcast_to(reshape(out_grad, grad_shape), input.shape)
### END YOUR SOLUTION
def summation(a, axes=None):
return Summation(axes)(a)
class MatMul(TensorOp):
def compute(self, a, b):
### BEGIN YOUR SOLUTION
return array_api.matmul(a, b)
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
# (A, a, b), (B, b, c)
lhs, rhs = nodeclass Summation(TensorOp):
def __init__(self, axes: Optional[tuple] = None):
if isinstance(axes, int):
axes = (axes, )
self.axes = axes
def compute(self, a):
### BEGIN YOUR SOLUTION
n = len(a.shape)
axes = []
# 处理多维度求和
if not isinstance(self.axes, tuple):
ori_axes = self.axes,
else:
ori_axes = self.axes
for axis in ori_axes:
# 处理负数情形
if isinstance(axis, int):
if axis < 0:
axes.append(axis + n)
else:
axes.append(axis)
else:
axes.append(axis)
# 降序排列
axes = sorted(axes, reverse=True)
for axis in axes:
a = array_api.sum(a, axis)
return a
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
# 使坐标为正并且从小到大排列
if self.axes == None:
axes = input.shape
grad_shape = []
else:
axes = self.axes
grad_shape = list(out_grad.shape)
n = len(input.shape)
new_axes = []
for x in axes:
if x >= 0:
new_axes.append(x)
else:
new_axes.append(x + n)
new_axes = sorted(new_axes)
# 恢复grad_shape, 使grad_shape的维度和input.shape的维度相同
for axis in new_axes:
grad_shape.insert(axis, 1)
return broadcast_to(reshape(out_grad, grad_shape), input.shape)
### END YOUR SOLUTION.inputs
# out_grad: (C, a, c)
# (C, a, b)
lhs_grad = matmul(out_grad, transpose(rhs, axes=(-1, -2)))
# (C, b, c)
rhs_grad = matmul(transpose(lhs, axes=(-1, -2)), out_grad)
# 注意形状
n1 = len(out_grad.shape)
n2 = len(lhs.shape)
n3 = len(rhs.shape)
if n1 > n2:
lhs_grad = summation(lhs_grad, axes=tuple(range(n1 - n2)))
if n1 > n3:
rhs_grad = summation(rhs_grad, axes=tuple(range(n1 - n3)))
return lhs_grad, rhs_grad
### END YOUR SOLUTION
def matmul(a, b):
return MatMul()(a, b)
class Negate(TensorOp):
def compute(self, a):
### BEGIN YOUR SOLUTION
return -a
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return -out_grad
### END YOUR SOLUTION
def negate(a):
return Negate()(a)
class Log(TensorOp):
def compute(self, a):
### BEGIN YOUR SOLUTION
return array_api.log(a)
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
return out_grad / input
### END YOUR SOLUTION
def log(a):
return Log()(a)
class Exp(TensorOp):
def compute(self, a):
### BEGIN YOUR SOLUTION
return array_api.exp(a)
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
return out_grad * exp(input)
### END YOUR SOLUTION
def exp(a):
return Exp()(a)
class ReLU(TensorOp):
def compute(self, a):
### BEGIN YOUR SOLUTION
return array_api.maximum(a, 0)
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
input_relu = relu(input).realize_cached_data()
return out_grad * Tensor(input_relu > 0, device=out_grad.device, dtype=out_grad.dtype)
### END YOUR SOLUTION
def relu(a):
return ReLU()(a)
class LogSumExp(TensorOp):
def __init__(self, axes: Optional[tuple] = None):
if isinstance(axes, int):
axes = (axes, )
self.axes = axes
def compute(self, Z):
### BEGIN YOUR SOLUTION
z = array_api.max(Z, axis=self.axes, keepdims=True)
z_broadcast = array_api.broadcast_to(z, Z.shape)
log_sum_exp = array_api.log(array_api.sum(array_api.exp(Z - z_broadcast), axis=self.axes, keepdims=True)) + z
new_shape = []
if self.axes:
l = len(Z.shape)
for i, n in enumerate(Z.shape):
if (i not in self.axes) and ((i - l) not in self.axes):
new_shape.append(n)
log_sum_exp = log_sum_exp.reshape(new_shape)#.astype(Z.dtype)
return log_sum_exp
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
data = input.realize_cached_data()
z = array_api.max(data, axis=self.axes, keepdims=True)
z = array_api.broadcast_to(z, input.shape)
e = array_api.exp(data - z)
e_sum = array_api.sum(e, axis=self.axes, keepdims=True)
e_sum = array_api.broadcast_to(e_sum, input.shape)
prob = e / e_sum
new_shape = list(input.shape)
# (a, b) -> (1, a, 1, b)
if self.axes:
for i in self.axes:
new_shape[i] = 1
grad = reshape(out_grad, new_shape)
else:
grad = out_grad
return broadcast_to(grad, input.shape) * Tensor(prob, dtype=grad.dtype, device=grad.device)
### END YOUR SOLUTION
def logsumexp(a, axes=None):
return LogSumExp(axes=axes)(a)
class Tanh(TensorOp):
def compute(self, a):
### BEGIN YOUR SOLUTION
return array_api.tanh(a)
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
input, = node.inputs
tmp = tanh(input)
return out_grad * (1 - tmp * tmp)
### END YOUR SOLUTION
def tanh(a):
return Tanh()(a)
def getitem(x, axises):
for axis in axises:
x = make_tuple(x)[axis]
return x
class Stack(TensorOp):
def __init__(self, axis: int):
"""
Concatenates a sequence of arrays along a new dimension.
Parameters:
axis - dimension to concatenate along
All arrays need to be of the same size.
"""
self.axis = axis
def compute(self, args):
### BEGIN YOUR SOLUTION
n = len(args)
shape = list(args[0].shape)
arg_shape = list(args[0].shape)
shape.insert(self.axis, n)
new_arr = array_api.empty(shape, dtype=args[0].dtype, device=args[0].device)
# 计算index
idxes = []
m = len(arg_shape)
for i in range(m):
idxes.append(slice(0, arg_shape[i]))
idxes.insert(self.axis, 0)
# 新形状
arg_shape.insert(self.axis, 1)
# 赋值
for i in range(n):
idxes[self.axis] = i
new_arr[tuple(idxes)] = array_api.reshape(args[i], arg_shape)
return new_arr
### END YOUR SOLUTION
def gradient(self, out_grad, node):
return split(out_grad, self.axis)
def stack(args, axis):
return Stack(axis)(make_tuple(*args))
class Split(TensorTupleOp):
def __init__(self, axis: int):
"""
Splits a tensor along an axis into a tuple of tensors.
(The "inverse" of Stack)
Parameters:
axis - dimension to split
"""
self.axis = axis
def compute(self, A):
### BEGIN YOUR SOLUTION
# (5, 3, 5) -> [(5, 5), (5, 5), (5, 5)]
n = A.shape[self.axis]
arg_shape = list(A.shape)
new_arr = []
# 计算index
idxes = []
m = len(arg_shape)
for i in range(m):
idxes.append(slice(0, arg_shape[i]))
# 新形状
new_shape = list(A.shape)
del new_shape[self.axis]
# 赋值
for i in range(n):
idxes[self.axis] = i
data = array_api.array(A[tuple(idxes)], dtype=A.dtype, device=A.device)
data = array_api.reshape(data, new_shape)
new_arr.append(data)
return new_arr
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return stack(out_grad, self.axis)
### END YOUR SOLUTION
def split(a, axis):
return Split(axis)(a)
Part 2: CIFAR-10 dataset
注意点:
- 需要参考cifar10官网;
- 最后的结果需要除以255;
代码:
class CIFAR10Dataset(Dataset):
def __init__(
self,
base_folder: str,
train: bool,
p: Optional[int] = 0.5,
transforms: Optional[List] = None
):
"""
Parameters:
base_folder - cifar-10-batches-py folder filepath
train - bool, if True load training dataset, else load test dataset
Divide pixel values by 255. so that images are in 0-1 range.
Attributes:
X - numpy array of images
y - numpy array of labels
"""
### BEGIN YOUR SOLUTION
super().__init__(transforms)
if train:
files = [
"data_batch_1",
"data_batch_2",
"data_batch_3",
"data_batch_4",
"data_batch_5"
]
else:
files = [
"test_batch"
]
X = []
y = []
for file in files:
data, labels = self.unpickle(os.path.join(base_folder, file))
X.append(data)
y.append(labels)
X = np.concatenate(X) / 255.0
y = np.concatenate(y)
self.n = X.shape[0]
X = X.reshape(self.n, -1, 32, 32)
self.X = X
self.y = y
### END YOUR SOLUTION
def unpickle(self, file):
with open(file, 'rb') as fo:
dict = pickle.load(fo, encoding='bytes')
return dict[b'data'], dict[b'labels']
def __getitem__(self, index) -> object:
"""
Returns the image, label at given index
Image should be of shape (3, 32, 32)
"""
### BEGIN YOUR SOLUTION
x = self.apply_transforms(self.X[index])
y = self.y[index]
return x, y
### END YOUR SOLUTION
def __len__(self) -> int:
"""
Returns the total number of examples in the dataset
"""
### BEGIN YOUR SOLUTION
return self.n
### END YOUR SOLUTION
Part 3: Convolutional neural network
Padding ndarrays
代码:
def pad(self, axes):
"""
Pad this ndarray by zeros by the specified amount in `axes`,
which lists for _all_ axes the left and right padding amount, e.g.,
axes = ( (0, 0), (1, 1), (0, 0)) pads the middle axis with a 0 on the left and right side.
"""
### BEGIN YOUR SOLUTION
old_shape = self._shape
n = len(axes)
new_shape = []
idxs = []
for i in range(n):
new_shape.append(int(old_shape[i] + axes[i][0] + axes[i][1]))
arr = self.device.full(new_shape, 0, self.dtype)
# 赋值
offset = 0
for i in range(n):
offset += int(arr.strides[i] * axes[i][0])
self.device.ewise_setitem(
self._handle,
arr._handle,
old_shape,
arr._strides,
offset,
)
return arr
### END YOUR SOLUTION
Flipping ndarrays & FlipOp
注意点:
- 通过变换stride以及offset实现功能;
代码:
def flip(self, axes):
"""
Flip this ndarray along the specified axes.
Note: compact() before returning.
"""
### BEGIN YOUR SOLUTION
n = len(self._strides)
new_strides = list(self._strides)
for i in axes:
new_strides[i] *= -1
new_offset = 0
for i in range(n):
if i in axes:
new_offset += self._strides[i] * (self._shape[i] - 1)
arr = NDArray.make(
shape=self._shape,
strides=tuple(new_strides),
device=self._device,
handle=self._handle,
offset=new_offset,
)
return arr.compact()
### END YOUR SOLUTION
class Flip(TensorOp):
def __init__(self, axes: Optional[tuple] = None):
self.axes = axes
def compute(self, a):
### BEGIN YOUR SOLUTION
return array_api.flip(a, self.axes)
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return flip(out_grad, self.axes)
### END YOUR SOLUTION
def flip(a, axes):
return Flip(axes)(a)
Dilation
注意点:
- 重点是计算stride;
代码:
class Dilate(TensorOp):
def __init__(self, axes: tuple, dilation: int):
self.axes = axes
self.dilation = dilation
def compute(self, a):
### BEGIN YOUR SOLUTION
old_shape = list(a.shape)
new_shape = []
n = len(old_shape)
index = []
for i in range(n):
if i not in self.axes:
new_shape.append(old_shape[i])
index.append(slice(new_shape[-1]))
else:
new_shape.append(old_shape[i] * (1 + self.dilation))
index.append(slice(0, new_shape[-1], 1 + self.dilation))
res = array_api.full(new_shape, 0, dtype=a.dtype, device=a.device)
res[tuple(index)] = a
return res
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return undilate(out_grad, self.axes, self.dilation)
### END YOUR SOLUTION
def dilate(a, axes, dilation):
return Dilate(axes, dilation)(a)
class UnDilate(TensorOp):
def __init__(self, axes: tuple, dilation: int):
self.axes = axes
self.dilation = dilation
def compute(self, a):
### BEGIN YOUR SOLUTION
old_shape = list(a.shape)
new_shape = []
n = len(old_shape)
index = []
for i in range(n):
if i not in self.axes:
new_shape.append(old_shape[i])
index.append(slice(new_shape[-1]))
else:
new_shape.append(old_shape[i] // (1 + self.dilation))
index.append(slice(0, old_shape[i], 1 + self.dilation))
res = array_api.full(new_shape, 0, dtype=a.dtype, device=a.device)
res = a[tuple(index)]
return res
### END YOUR SOLUTION
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
return dilate(out_grad, self.axes, self.dilation)
### END YOUR SOLUTION
def undilate(a, axes, dilation):
return UnDilate(axes, dilation)(a)
Convolution forward
注意点:
利用老师提供的如下公式即可:
for i in range(K):
for j in range(K):
out += Z[:,i:i+H-K+1,j:j+W-K+1,:] @ weight[i,j]
修改的地方是stride,代码如下:
def compute(self, A, B):
### BEGIN YOUR SOLUTION
axes = ((0, 0), (self.padding, self.padding), (self.padding, self.padding), (0, 0))
A_pad = array_api.pad(A, axes)
N, H, W, C_in = A_pad.shape
K, _, _, C_out = B.shape
H_out = (H - K) // self.stride + 1
W_out = (W - K) // self.stride + 1
out = array_api.full(
shape=(N, H_out, W_out, C_out),
fill_value=0,
dtype=A.dtype,
device=A.device
)
# for i in range(K):
# for j in range(K):
# out += Z[:,i:i+H-K+1,j:j+W-K+1,:] @ weight[i,j]
batch_index = slice(N)
feature_index1 = slice(C_in)
feature_index2 = slice(C_out)
n = N * H_out * W_out
for i in range(K):
for j in range(K):
# 不要和dilate搞混
i_start = i
i_end = i_start + H_out * self.stride
h_index = slice(i_start, i_end, self.stride)
j_start = j
j_end = j_start + W_out * self.stride
w_index = slice(j_start, j_end, self.stride)
A1 = A_pad[(batch_index, h_index, w_index, feature_index1)]
A2 = array_api.reshape(A1, (n, C_in))
B1 = B[slice(i, i + 1, 1), slice(j, j + 1, 1), feature_index1, feature_index2]
B2 = array_api.reshape(B1, (C_in, C_out))
C2 = array_api.matmul(A2, B2)
C3 = array_api.reshape(C2, (N, H_out, W_out, C_out))
out += C3
return out
### END YOUR SOLUTION
Convolution backward
反传要困难不少,主要是计算维度,参考了如下资料:
主要两个步骤:
- 第一步:确定维度;
- 第二部:确定形状;
首先是维度。
假设输入为的维度分别为:
- $A: (b,h_1,w_1,d), B:(k,k,d,e)$;
输出的维度为:
- $O: (b,h_2,w_2,e)$
现在反传的输入维度为:
- $dO: (b,h_2,w_2,e)$
要得到$dA: (b, h_1, w_1, d)$的维度,应该计算$dO\star B.transpose(-1, -2)$,注意实际实现的时候需要对$B$ flip;
要得到$dB:(k, k, d, e)$,需要利用$dO$和$A$,需要调整$b$的位置,通过卷积消掉这个维度:
$(e, h_2, w_2, b), (h_1, w_1, b, d)\to (e, k,k, d)\to (k, k, d, e)$;
其次是形状,注意到这里$h=w$,所以后续讨论只使用$h$:
注意到:
所以为了得到$dA$,第一步是dilate:
然后通过下式子求解$p_2$:
这样就可以得到计算卷积时的padding。
为了得到$dB$,第一步依然是dilate:
然后变换维度:
我们希望满足如下条件:
这样就可以得到计算卷积时的padding。
还有一些flip以及维度的细节,参考代码:
def gradient(self, out_grad, node):
### BEGIN YOUR SOLUTION
# out_grad: bhwc2
out_grad_dilate = dilate(out_grad, (1, 2), self.stride - 1)
# A: bhwc1, B: kkc1c2
A, B = node.inputs
A = A.realize_cached_data()
B = B.realize_cached_data()
b = A.shape[0]
h = A.shape[1]
k = B.shape[0]
# bhwc1 -> c1hwb
A = array_api.permute(A, (3, 1, 2, 0))
# kkc1c2 -> kkc1c2
B = array_api.flip(B, (0, 1))
# kkc1c2 -> kkc2c1
B = array_api.permute(B, (0, 1, 3, 2))
tmp = ((h + 2 * self.padding - k) // self.stride + 1) * self.stride
# pad
p_B = (h + k - tmp - 1) // 2
p_A = (k + tmp - h - 1) // 2
# bhwc2, kkc2c1 -> bhwc1
grad_A = conv(out_grad_dilate, Tensor(B, dtype=out_grad.dtype, device=out_grad.device), stride=1, padding=p_B)
# bhwc2 -> whbc2 -> hwbc2: out_grad_dilate.transpose((0, 2)).transpose((0, 1))
# c1hwb, hwbc2 -> c1hwc2
grad_B = conv(Tensor(A, dtype=out_grad.dtype, device=out_grad.device), out_grad_dilate.transpose((0, 2)).transpose((0, 1)), stride=1, padding=p_A)
# c1hwc2 -> whc1c2 -> hwc1c2
grad_B = grad_B.transpose((0, 2)).transpose((0, 1))
return grad_A, grad_B
### END YOUR SOLUTION
nn.Conv
kaiming_uniform代码:
def kaiming_uniform(fan_in, fan_out, nonlinearity="relu", **kwargs):
assert nonlinearity == "relu", "Only relu supported currently"
### BEGIN YOUR SOLUTION
gain = math.sqrt(2)
bound = gain * math.sqrt(3 / fan_in)
shape = kwargs.get("shape", None)
if shape == None:
shape = (fan_in, fan_out)
return rand(*shape, low=-1, high=1) * bound
### END YOUR SOLUTION
conv代码:
class Conv(Module):
"""
Multi-channel 2D convolutional layer
IMPORTANT: Accepts inputs in NCHW format, outputs also in NCHW format
Only supports padding=same
No grouped convolution or dilation
Only supports square kernels
"""
def __init__(self, in_channels, out_channels, kernel_size, stride=1, bias=True, device=None, dtype="float32"):
super().__init__()
if isinstance(kernel_size, tuple):
kernel_size = kernel_size[0]
if isinstance(stride, tuple):
stride = stride[0]
self.in_channels = in_channels
self.out_channels = out_channels
self.kernel_size = kernel_size
self.stride = stride
### BEGIN YOUR SOLUTION
self.padding = self.kernel_size // 2
# kernel
fan_in = self.in_channels * self.kernel_size ** 2
fan_out = self.out_channels * self.kernel_size ** 2
shape = (self.kernel_size, self.kernel_size, self.in_channels, self.out_channels)
weight = init.kaiming_uniform(fan_in, fan_out, shape=shape)
self.weight = Parameter(weight, device=device, dtype=dtype)
# bias
self.use_bias = bias
if self.use_bias:
k = 1.0 / (self.in_channels * self.kernel_size ** 2) ** 0.5
b = ops.reshape(init.uniform(self.out_channels, 1, k), (self.out_channels,))
self.bias = Parameter(b, device=device, dtype=dtype)
### END YOUR SOLUTION
def forward(self, x: Tensor) -> Tensor:
### BEGIN YOUR SOLUTION
# NCHW -> NHCW -> NHWC
x = ops.transpose(x, (2, 1))
x = ops.transpose(x, (3, 2))
# NHWC
output = ops.conv(x, self.weight, self.stride, self.padding)
# 1C -> 111C -> NHWC
if self.use_bias:
bias = ops.reshape(self.bias, (1, 1, 1, self.out_channels))
bias = ops.broadcast_to(bias, output.shape)
output += bias
# NHWC-> NHCW -> NCHW
output = ops.transpose(output, (3, 2))
output = ops.transpose(output, (2, 1))
return output
### END YOUR SOLUTION
ResNet9
代码:
class ConvBN(ndl.nn.Module):
def __init__(self, a, b, k, s, device=None, dtype="float32"):
super().__init__()
### BEGIN YOUR SOLUTION ###
self.module = nn.Sequential(
nn.Conv(a, b, k, s, device=device, dtype=dtype),
nn.BatchNorm2d(b, device=device, dtype=dtype),
nn.ReLU(),
)
### END YOUR SOLUTION
def forward(self, x):
### BEGIN YOUR SOLUTION
return self.module(x)
### END YOUR SOLUTION
class ResNet9(ndl.nn.Module):
def __init__(self, device=None, dtype="float32"):
super().__init__()
### BEGIN YOUR SOLUTION
self.ConvBN0 = nn.Sequential(
nn.Conv(3, 16, 7, 4, device=device, dtype=dtype),
nn.BatchNorm2d(16, device=device, dtype=dtype), nn.ReLU(),
nn.Conv(16, 32, 3, 2, device=device, dtype=dtype),
nn.BatchNorm2d(32, device=device, dtype=dtype), nn.ReLU()
)
self.ConvBN1 = nn.Residual(
nn.Sequential(
nn.Conv(32, 32, 3, 1, device=device, dtype=dtype),
nn.BatchNorm2d(32, device=device, dtype=dtype), nn.ReLU(),
nn.Conv(32, 32, 3, 1, device=device, dtype=dtype),
nn.BatchNorm2d(32, device=device, dtype=dtype), nn.ReLU()
)
)
self.ConvBN2 = nn.Sequential(
nn.Conv(32, 64, 3, 2, device=device, dtype=dtype),
nn.BatchNorm2d(64, device=device, dtype=dtype), nn.ReLU(),
nn.Conv(64, 128, 3, 2, device=device, dtype=dtype),
nn.BatchNorm2d(128, device=device, dtype=dtype), nn.ReLU()
)
self.ConvBN3 = nn.Residual(
nn.Sequential(
nn.Conv(128, 128, 3, 1, device=device, dtype=dtype),
nn.BatchNorm2d(128, device=device, dtype=dtype), nn.ReLU(),
nn.Conv(128, 128, 3, 1, device=device, dtype=dtype),
nn.BatchNorm2d(128, device=device, dtype=dtype), nn.ReLU()
)
)
self.Linear0 = nn.Sequential(
nn.Flatten(),
nn.Linear(128, 128, device=device, dtype=dtype),
nn.ReLU(),
nn.Linear(128, 10, device=device, dtype=dtype)
)
### END YOUR SOLUTION
def forward(self, x):
### BEGIN YOUR SOLUTION
x = self.ConvBN0(x)
x = self.ConvBN1(x)
x = self.ConvBN2(x)
x = self.ConvBN3(x)
x = self.Linear0(x)
return x
### END YOUR SOLUTION
Part 4: Recurrent neural network
注意点:
- RNN第一层的input_size;
- 多层rnn的便利顺序为先遍历层,然后遍历时间;
代码:
class RNNCell(Module):
def __init__(self, input_size, hidden_size, bias=True, nonlinearity='tanh', device=None, dtype="float32"):
"""
Applies an RNN cell with tanh or ReLU nonlinearity.
Parameters:
input_size: The number of expected features in the input X
hidden_size: The number of features in the hidden state h
bias: If False, then the layer does not use bias weights
nonlinearity: The non-linearity to use. Can be either 'tanh' or 'relu'.
Variables:
W_ih: The learnable input-hidden weights of shape (input_size, hidden_size).
W_hh: The learnable hidden-hidden weights of shape (hidden_size, hidden_size).
bias_ih: The learnable input-hidden bias of shape (hidden_size,).
bias_hh: The learnable hidden-hidden bias of shape (hidden_size,).
Weights and biases are initialized from U(-sqrt(k), sqrt(k)) where k = 1/hidden_size
"""
super().__init__()
### BEGIN YOUR SOLUTION
self.use_bias = bias
self.input_size = input_size
self.hidden_size = hidden_size
k = math.sqrt(1 / hidden_size)
w1 = init.uniform(input_size, hidden_size, k)
self.W_ih = Parameter(w1, device=device, dtype=dtype)
w2 = init.uniform(hidden_size, hidden_size, k)
self.W_hh = Parameter(w2, device=device, dtype=dtype)
if self.use_bias:
b1 = init.uniform(1, hidden_size, k)
self.bias_ih = Parameter(b1, device=device, dtype=dtype)
b2 = init.uniform(1, hidden_size, k)
self.bias_hh = Parameter(b2, device=device, dtype=dtype)
if nonlinearity == "tanh":
self.f = Tanh()
else:
self.f = ReLU()
### END YOUR SOLUTION
def forward(self, X, h=None):
"""
Inputs:
X of shape (bs, input_size): Tensor containing input features
h of shape (bs, hidden_size): Tensor containing the initial hidden state
for each element in the batch. Defaults to zero if not provided.
Outputs:
h' of shape (bs, hidden_size): Tensor contianing the next hidden state
for each element in the batch.
"""
### BEGIN YOUR SOLUTION
bs = X.shape[0]
if h == None:
h = Tensor(init.zeros(bs, self.hidden_size), device=X.device, dtype=X.dtype)
tmp = ops.matmul(X, self.W_ih) + ops.matmul(h, self.W_hh)
if self.use_bias:
tmp += ops.broadcast_to(ops.reshape(self.bias_ih, (1, self.hidden_size)), tmp.shape) + \
ops.broadcast_to(ops.reshape(self.bias_hh, (1, self.hidden_size)), tmp.shape)
h = self.f(tmp)
return h
### END YOUR SOLUTION
class RNN(Module):
def __init__(self, input_size, hidden_size, num_layers=1, bias=True, nonlinearity='tanh', device=None, dtype="float32"):
"""
Applies a multi-layer RNN with tanh or ReLU non-linearity to an input sequence.
Parameters:
input_size - The number of expected features in the input x
hidden_size - The number of features in the hidden state h
num_layers - Number of recurrent layers.
nonlinearity - The non-linearity to use. Can be either 'tanh' or 'relu'.
bias - If False, then the layer does not use bias weights.
Variables:
rnn_cells[k].W_ih: The learnable input-hidden weights of the k-th layer,
of shape (input_size, hidden_size) for k=0. Otherwise the shape is
(hidden_size, hidden_size).
rnn_cells[k].W_hh: The learnable hidden-hidden weights of the k-th layer,
of shape (hidden_size, hidden_size).
rnn_cells[k].bias_ih: The learnable input-hidden bias of the k-th layer,
of shape (hidden_size,).
rnn_cells[k].bias_hh: The learnable hidden-hidden bias of the k-th layer,
of shape (hidden_size,).
"""
super().__init__()
### BEGIN YOUR SOLUTION
self.num_layers = num_layers
self.input_size = input_size
self.hidden_size = hidden_size
rnn_cells = []
for i in range(num_layers):
if i == 0:
d = input_size
else:
d = hidden_size
rnn_cells.append(RNNCell(d, hidden_size, bias, nonlinearity, device, dtype))
self.rnn_cells = rnn_cells
### END YOUR SOLUTION
def forward(self, X, h0=None):
"""
Inputs:
X of shape (seq_len, bs, input_size) containing the features of the input sequence.
h_0 of shape (num_layers, bs, hidden_size) containing the initial
hidden state for each element in the batch. Defaults to zeros if not provided.
Outputs
output of shape (seq_len, bs, hidden_size) containing the output features
(h_t) from the last layer of the RNN, for each t.
h_n of shape (num_layers, bs, hidden_size) containing the final hidden state for each element in the batch.
"""
### BEGIN YOUR SOLUTION
output = []
n = X.shape[0]
bs = X.shape[1]
# (l, b, e)
if h0 == None:
h0 = Tensor(init.zeros(self.num_layers, bs, self.hidden_size), device=X.device, dtype=X.dtype)
# (l, b, e) -> [(b, e), ... , (b, e)]
h = ops.split(h0, axis=0)
# (n, b, e) -> [(b, e), ... , (b, e)]
X_split = ops.split(X, axis=0)
h_out = []
for j in range(self.num_layers):
h_state = h[j]
X_state = []
for i in range(n):
h_state = self.rnn_cells[j](X_split[i], h_state)
X_state.append(h_state)
h_out.append(h_state)
X_split = X_state
return ops.stack(X_split, 0), ops.stack(h_out, 0)
### END YOUR SOLUTION
Part 5: Long short-term memory network
注意点:
- LSTM第一层的input_size;
- 多层rnn的便利顺序为先遍历层,然后遍历时间;
代码:
class Sigmoid(Module):
def __init__(self):
super().__init__()
def forward(self, x: Tensor) -> Tensor:
### BEGIN YOUR SOLUTION
return ops.exp(-ops.log(ops.add_scalar(ops.exp(-x), 1.0)))
### END YOUR SOLUTION
class LSTMCell(Module):
def __init__(self, input_size, hidden_size, bias=True, device=None, dtype="float32"):
"""
A long short-term memory (LSTM) cell.
Parameters:
input_size - The number of expected features in the input X
hidden_size - The number of features in the hidden state h
bias - If False, then the layer does not use bias weights
Variables:
W_ih - The learnable input-hidden weights, of shape (input_size, 4*hidden_size).
W_hh - The learnable hidden-hidden weights, of shape (hidden_size, 4*hidden_size).
bias_ih - The learnable input-hidden bias, of shape (4*hidden_size,).
bias_hh - The learnable hidden-hidden bias, of shape (4*hidden_size,).
Weights and biases are initialized from U(-sqrt(k), sqrt(k)) where k = 1/hidden_size
"""
super().__init__()
### BEGIN YOUR SOLUTION
self.use_bias = bias
self.input_size = input_size
self.hidden_size = hidden_size
k = math.sqrt(1 / hidden_size)
w1 = init.uniform(input_size, 4*hidden_size, k)
self.W_ih = Parameter(w1, device=device, dtype=dtype)
w2 = init.uniform(hidden_size, 4*hidden_size, k)
self.W_hh = Parameter(w2, device=device, dtype=dtype)
if self.use_bias:
b1 = init.uniform(1, 4*hidden_size, k)
self.bias_ih = Parameter(b1, device=device, dtype=dtype)
b2 = init.uniform(1, 4*hidden_size, k)
self.bias_hh = Parameter(b2, device=device, dtype=dtype)
self.sigma = Sigmoid()
self.f = Tanh()
### END YOUR SOLUTION
def forward(self, X, h=None):
"""
Inputs: X, h
X of shape (batch, input_size): Tensor containing input features
h, tuple of (h0, c0), with
h0 of shape (bs, hidden_size): Tensor containing the initial hidden state
for each element in the batch. Defaults to zero if not provided.
c0 of shape (bs, hidden_size): Tensor containing the initial cell state
for each element in the batch. Defaults to zero if not provided.
Outputs: (h', c')
h' of shape (bs, hidden_size): Tensor containing the next hidden state for each
element in the batch.
c' of shape (bs, hidden_size): Tensor containing the next cell state for each
element in the batch.
"""
### BEGIN YOUR SOLUTION
bs = X.shape[0]
if h == None:
h0 = Tensor(init.zeros(bs, self.hidden_size), device=X.device, dtype=X.dtype)
c0 = Tensor(init.zeros(bs, self.hidden_size), device=X.device, dtype=X.dtype)
else:
h0, c0 = h
# bs, hidden_size * 4
tmp = ops.matmul(X, self.W_ih) + ops.matmul(h0, self.W_hh)
if self.use_bias:
tmp += ops.broadcast_to(ops.reshape(self.bias_ih, (1, 4 * self.hidden_size)), tmp.shape) + \
ops.broadcast_to(ops.reshape(self.bias_hh, (1, 4 * self.hidden_size)), tmp.shape)
tmp = ops.reshape(tmp, (bs, 4, self.hidden_size))
i, f, g, o = ops.split(tmp, 1)
i = self.sigma(i)
f = self.sigma(f)
g = self.f(g)
o = self.sigma(o)
c = f * c0 + i * g
h = o * self.f(c)
return h, c
### END YOUR SOLUTION
class LSTM(Module):
def __init__(self, input_size, hidden_size, num_layers=1, bias=True, device=None, dtype="float32"):
super().__init__()
"""
Applies a multi-layer long short-term memory (LSTM) RNN to an input sequence.
Parameters:
input_size - The number of expected features in the input x
hidden_size - The number of features in the hidden state h
num_layers - Number of recurrent layers.
bias - If False, then the layer does not use bias weights.
Variables:
lstm_cells[k].W_ih: The learnable input-hidden weights of the k-th layer,
of shape (input_size, 4*hidden_size) for k=0. Otherwise the shape is
(hidden_size, 4*hidden_size).
lstm_cells[k].W_hh: The learnable hidden-hidden weights of the k-th layer,
of shape (hidden_size, 4*hidden_size).
lstm_cells[k].bias_ih: The learnable input-hidden bias of the k-th layer,
of shape (4*hidden_size,).
lstm_cells[k].bias_hh: The learnable hidden-hidden bias of the k-th layer,
of shape (4*hidden_size,).
"""
### BEGIN YOUR SOLUTION
self.num_layers = num_layers
self.input_size = input_size
self.hidden_size = hidden_size
lstm_cells = []
for i in range(num_layers):
if i == 0:
d = input_size
else:
d = hidden_size
lstm_cells.append(LSTMCell(d, hidden_size, bias, device, dtype))
self.lstm_cells = lstm_cells
### END YOUR SOLUTION
def forward(self, X, h=None):
"""
Inputs: X, h
X of shape (seq_len, bs, input_size) containing the features of the input sequence.
h, tuple of (h0, c0) with
h_0 of shape (num_layers, bs, hidden_size) containing the initial
hidden state for each element in the batch. Defaults to zeros if not provided.
c0 of shape (num_layers, bs, hidden_size) containing the initial
hidden cell state for each element in the batch. Defaults to zeros if not provided.
Outputs: (output, (h_n, c_n))
output of shape (seq_len, bs, hidden_size) containing the output features
(h_t) from the last layer of the LSTM, for each t.
tuple of (h_n, c_n) with
h_n of shape (num_layers, bs, hidden_size) containing the final hidden state for each element in the batch.
h_n of shape (num_layers, bs, hidden_size) containing the final hidden cell state for each element in the batch.
"""
### BEGIN YOUR SOLUTION
output = []
n = X.shape[0]
bs = X.shape[1]
# (l, b, e)
if h == None:
h0 = Tensor(init.zeros(self.num_layers, bs, self.hidden_size), device=X.device, dtype=X.dtype)
c0 = Tensor(init.zeros(self.num_layers, bs, self.hidden_size), device=X.device, dtype=X.dtype)
else:
h0, c0 = h
# (l, b, e) -> [(b, e), ... , (b, e)]
h = ops.split(h0, axis=0)
c = ops.split(c0, axis=0)
# (n, b, e) -> [(b, e), ... , (b, e)]
X_split = ops.split(X, axis=0)
h_out = []
c_out = []
for j in range(self.num_layers):
h_state = h[j]
c_state = c[j]
X_state = []
for i in range(n):
h_state, c_state = self.lstm_cells[j](X_split[i], (h_state, c_state))
X_state.append(h_state)
h_out.append(h_state)
c_out.append(c_state)
X_split = X_state
return ops.stack(X_split, 0), (ops.stack(h_out, 0), ops.stack(c_out, 0))
### END YOUR SOLUTION
Part 6: Penn Treebank dataset
注意点:
- 每句话最后要添加
<eos>
; batchify
时注意维度;
代码:
class Corpus(object):
"""
Creates corpus from train, and test txt files.
"""
eos = "<eos>"
def __init__(self, base_dir, max_lines=None):
self.dictionary = Dictionary()
self.train = self.tokenize(os.path.join(base_dir, 'train.txt'), max_lines)
self.test = self.tokenize(os.path.join(base_dir, 'test.txt'), max_lines)
def tokenize(self, path, max_lines=None):
"""
Input:
path - path to text file
max_lines - maximum number of lines to read in
Tokenizes a text file, first adding each word in the file to the dictionary,
and then tokenizing the text file to a list of IDs. When adding words to the
dictionary (and tokenizing the file content) '<eos>' should be appended to
the end of each line in order to properly account for the end of the sentence.
Output:
ids: List of ids
"""
### BEGIN YOUR SOLUTION
ids = []
with open(path) as f:
for i, sentence in enumerate(f.readlines()):
if i == max_lines:
break
for word in sentence.split():
id = self.dictionary.add_word(word)
ids.append(id)
# add <eos>
id = self.dictionary.add_word(Corpus.eos)
ids.append(id)
return ids
### END YOUR SOLUTION
def batchify(data, batch_size, device, dtype):
"""
Starting from sequential data, batchify arranges the dataset into columns.
For instance, with the alphabet as the sequence and batch size 4, we'd get
┌ a g m s ┐
│ b h n t │
│ c i o u │
│ d j p v │
│ e k q w │
└ f l r x ┘.
These columns are treated as independent by the model, which means that the
dependence of e. g. 'g' on 'f' cannot be learned, but allows more efficient
batch processing.
If the data cannot be evenly divided by the batch size, trim off the remainder.
Returns the data as a numpy array of shape (nbatch, batch_size).
"""
### BEGIN YOUR SOLUTION
n = len(data)
m = n // batch_size * batch_size
data = data[:m]
# [1, 2, 3, 4, 5, 6] -> [[1, 2], [3, 4], [5, 6]] -> [[1, 3, 5], [2, 4, 6]]
array = np.array(data).reshape(batch_size, -1).transpose(1, 0)
return array
### END YOUR SOLUTION
def get_batch(batches, i, bptt, device=None, dtype=None):
"""
get_batch subdivides the source data into chunks of length bptt.
If source is equal to the example output of the batchify function, with
a bptt-limit of 2, we'd get the following two Variables for i = 0:
┌ a g m s ┐ ┌ b h n t ┐
└ b h n t ┘ └ c i o u ┘
Note that despite the name of the function, the subdivison of data is not
done along the batch dimension (i.e. dimension 1), since that was handled
by the batchify function. The chunks are along dimension 0, corresponding
to the seq_len dimension in the LSTM or RNN.
Inputs:
batches - numpy array returned from batchify function
i - index
bptt - Sequence length
Returns:
data - Tensor of shape (bptt, bs) with cached data as NDArray
target - Tensor of shape (bptt*bs,) with cached data as NDArray
"""
### BEGIN YOUR SOLUTION
data = batches[i: i + bptt]
target = batches[i + 1: i + bptt + 1]
n = min(data.shape[0], target.shape[0])
data = data[:n]
target = target[:n].flatten()
return Tensor(data, device=device, dtype=dtype), Tensor(target, device=device, dtype=dtype)
### END YOUR SOLUTION
Part 7: Training a word-level language model
代码:
nn.py:
class Embedding(Module):
def __init__(self, num_embeddings, embedding_dim, device=None, dtype="float32"):
super().__init__()
"""
Maps one-hot word vectors from a dictionary of fixed size to embeddings.
Parameters:
num_embeddings (int) - Size of the dictionary
embedding_dim (int) - The size of each embedding vector
Variables:
weight - The learnable weights of shape (num_embeddings, embedding_dim)
initialized from N(0, 1).
"""
### BEGIN YOUR SOLUTION
self.num_embeddings = num_embeddings
self.embedding_dim = embedding_dim
weight = init.randn(num_embeddings, embedding_dim)
self.weight = Parameter(weight, device=device, dtype=dtype)
self.device = device
self.dtype = dtype
### END YOUR SOLUTION
def forward(self, x: Tensor) -> Tensor:
"""
Maps word indices to one-hot vectors, and projects to embedding vectors
Input:
x of shape (seq_len, bs)
Output:
output of shape (seq_len, bs, embedding_dim)
"""
### BEGIN YOUR SOLUTION
# l, b, m
x_one_hot = init.one_hot(self.num_embeddings, x, device=x.device, dtype=x.dtype)
n, b, m = x_one_hot.shape
# l, b, m -> l * b, m
x_one_hot = ops.reshape(x_one_hot, (n * b, m))
# l * b, d
output = ops.matmul(x_one_hot, self.weight)
# l, b, d
output = ops.reshape(output, (n, b, self.embedding_dim))
return output
### END YOUR SOLUTION
models.py:
class LanguageModel(nn.Module):
def __init__(self, embedding_size, output_size, hidden_size, num_layers=1,
seq_model='rnn', device=None, dtype="float32"):
"""
Consists of an embedding layer, a sequence model (either RNN or LSTM), and a
linear layer.
Parameters:
output_size: Size of dictionary
embedding_size: Size of embeddings
hidden_size: The number of features in the hidden state of LSTM or RNN
seq_model: 'rnn' or 'lstm', whether to use RNN or LSTM
num_layers: Number of layers in RNN or LSTM
"""
super(LanguageModel, self).__init__()
### BEGIN YOUR SOLUTION
self.embedding = nn.Embedding(output_size, embedding_size, device=device, dtype=dtype)
if seq_model == "rnn":
seq_model = nn.RNN
else:
seq_model = nn.LSTM
self.seq_model = seq_model(embedding_size, hidden_size, num_layers, device=device, dtype=dtype)
self.out_proj = nn.Linear(hidden_size, output_size, device=device, dtype=dtype)
### END YOUR SOLUTION
def forward(self, x, h=None):
"""
Given sequence (and the previous hidden state if given), returns probabilities of next word
(along with the last hidden state from the sequence model).
Inputs:
x of shape (seq_len, bs)
h of shape (num_layers, bs, hidden_size) if using RNN,
else h is tuple of (h0, c0), each of shape (num_layers, bs, hidden_size)
Returns (out, h)
out of shape (seq_len*bs, output_size)
h of shape (num_layers, bs, hidden_size) if using RNN,
else h is tuple of (h0, c0), each of shape (num_layers, bs, hidden_size)
"""
### BEGIN YOUR SOLUTION
l, b = x.shape
# l, b -> l, b, d
embedding = self.embedding(x)
# l * b, d
feature, h = self.seq_model(embedding, h)
d = feature.shape[-1]
# l, b, d -> l * b, d
feature = ndl.ops.reshape(feature, (l * b, d))
# l * b, d
output = self.out_proj(feature)
# l * b, d -> l, b, d
# output = ndl.ops.reshape(output, (l, b, d))
return output, h
### END YOUR SOLUTION
simple_training.py:
### PTB training ###
def epoch_general_ptb(data, model, seq_len=40, loss_fn=nn.SoftmaxLoss, opt=None,
clip=None, device=None, dtype="float32"):
"""
Iterates over the data. If optimizer is not None, sets the
model to train mode, and for each batch updates the model parameters.
If optimizer is None, sets the model to eval mode, and simply computes
the loss/accuracy.
Args:
data: data of shape (nbatch, batch_size) given from batchify function
model: LanguageModel instance
seq_len: i.e. bptt, sequence length
loss_fn: nn.Module instance
opt: Optimizer instance (optional)
clip: max norm of gradients (optional)
Returns:
avg_acc: average accuracy over dataset
avg_loss: average loss over dataset
"""
np.random.seed(4)
### BEGIN YOUR SOLUTION
if opt:
model.train()
else:
model.eval()
f = loss_fn()
avg_loss = []
avg_acc = 0
cnt = 0
n = data.shape[0]
i = 0
while i < n:
if opt:
opt.reset_grad()
# (l, b), (l * b, )
x, y = ndl.data.get_batch(data, i, seq_len, device=device, dtype=dtype)
b = y.shape[0]
y_, h = model(x)
loss = f(y_, y)
if opt:
loss.backward()
opt.step()
cnt += b
avg_loss.append(loss.numpy().item() * b)
avg_acc += np.sum(y_.numpy().argmax(axis=1) == y.numpy())
i += seq_len
return avg_acc / cnt, np.sum(avg_loss) / cnt
### END YOUR SOLUTION
def train_ptb(model, data, seq_len=40, n_epochs=1, optimizer=ndl.optim.SGD,
lr=4.0, weight_decay=0.0, loss_fn=nn.SoftmaxLoss, clip=None,
device=None, dtype="float32"):
"""
Performs {n_epochs} epochs of training.
Args:
model: LanguageModel instance
data: data of shape (nbatch, batch_size) given from batchify function
seq_len: i.e. bptt, sequence length
n_epochs: number of epochs (int)
optimizer: Optimizer class
lr: learning rate (float)
weight_decay: weight decay (float)
loss_fn: nn.Module class
clip: max norm of gradients (optional)
Returns:
avg_acc: average accuracy over dataset from last epoch of training
avg_loss: average loss over dataset from last epoch of training
"""
np.random.seed(4)
### BEGIN YOUR SOLUTION
opt = optimizer(model.parameters(), lr=lr, weight_decay=weight_decay)
for _ in range(n_epochs):
avg_acc, avg_loss = epoch_general_ptb(
data=data,
model=model,
seq_len=seq_len,
loss_fn=loss_fn,
opt=opt,
clip=clip,
device=device,
dtype=dtype,
)
return avg_acc, avg_loss
### END YOUR SOLUTION
def evaluate_ptb(model, data, seq_len=40, loss_fn=nn.SoftmaxLoss,
device=None, dtype="float32"):
"""
Computes the test accuracy and loss of the model.
Args:
model: LanguageModel instance
data: data of shape (nbatch, batch_size) given from batchify function
seq_len: i.e. bptt, sequence length
loss_fn: nn.Module class
Returns:
avg_acc: average accuracy over dataset
avg_loss: average loss over dataset
"""
np.random.seed(4)
### BEGIN YOUR SOLUTION
avg_acc, avg_loss = epoch_general_ptb(
data=data,
model=model,
seq_len=seq_len,
loss_fn=loss_fn,
opt=None,
clip=None,
device=device,
dtype=dtype,
)
return avg_acc, avg_loss
### END YOUR SOLUTION
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Doraemonzzz!
评论
ValineLivere