layer_utils.py

#

Build RNN and LSTM Neural Networks from Scratch

#

This module defines the layers of RNN and LSTM, which extend traditional feedforward neural networks with an internal memory state, making them especially suited for modeling sequential data (e.g. language tasks, sentiment-analysis, time-series analysis)

#

Imports

import numpy as np
#

Module Level Functions

#

A numerically stable version of the logistic sigmoid function.

def sigmoid(x):
#
    pos_mask = (x >= 0)
    neg_mask = (x < 0)
    z = np.zeros_like(x)
    z[pos_mask] = np.exp(-x[pos_mask])
    z[neg_mask] = np.exp(x[neg_mask])
    top = np.ones_like(x)
    top[neg_mask] = z[neg_mask]
    return top / (1 + z)
#

NN Classes

#

Base RNN Object for serializing layers

class RNN(object):
#
Parameters
----------
: `*args` (dict of dicts): List of layer dicts containing `params` and `grads`

Attributes
----------
: params (dict): Store weights and bias by layer name
: grads (dict): Store gradients

Stores
----------
: meta (tuple): Localized internal memory storing current values caculated from forward pass for use when calculating backprop
#
    def __init__(self, *args):
        self.params = {}
        self.grads = {}
        self.layers = []
        self.paramName2Indices = {}
        self.layer_names = {}
#

process the parameters layer by layer

        layer_cnt = 0
        for layer in args:
            for n, v in layer.params.items():
                if v is None:
                    continue
                self.params[n] = v
                self.paramName2Indices[n] = layer_cnt
            for n, v in layer.grads.items():
                self.grads[n] = v
            if layer.name in self.layer_names:
                raise ValueError("Existing name {}!".format(layer.name))
            self.layer_names[layer.name] = True
            self.layers.append(layer)
            layer_cnt += 1
        layer_cnt = 0
#

load the given values to the layer by name

    def assign(self, name, val):
#
        layer_cnt = self.paramName2Indices[name]
        self.layers[layer_cnt].params[name] = val
#

load the given values to the layer by name

    def assign_grads(self, name, val):
#
        layer_cnt = self.paramName2Indices[name]
        self.layers[layer_cnt].grads[name] = val
#

return the parameters by name

    def get_params(self, name):
#
        return self.params[name]
#

return the gradients by name

    def get_grads(self, name):
#
        return self.grads[name]
#

Collect the parameters of every layer

    def gather_params(self):
#
        for layer in self.layers:
            for n, v in layer.params.iteritems():
                self.params[n] = v
#

Collect the gradients of every layer

    def gather_grads(self):
#
        for layer in self.layers:
            for n, v in layer.grads.iteritems():
                self.grads[n] = v
#

Load a pretrained model

    def load(self, pretrained):
#
        for layer in self.layers:
            if not hasattr(layer, "params"):
                continue
            for n, v in layer.params.iteritems():
                if n in pretrained.keys():
                    layer.params[n] = pretrained[n].copy()
                    print("Loading Params: {} Shape: {}".format(n, layer.params[n].shape))
#
class VanillaRNN(object):
#

Basic Recurrent Neural Network

    def __init__(self, input_dim, h_dim, init_scale=0.02, name='vanilla_rnn'):
#
    Parameters
    ----------
    : name (str): Name of current layer used for retrieving stored values
    : input_dim (int): Input dimension
    : h_dim (int): Hidden state dimension

    Stores:
    ----------
    : meta: Values needed for the backward pass.
        self.name = name
        self.wx_name = name + "_wx"
        self.wh_name = name + "_wh"
        self.b_name = name + "_b"
        self.input_dim = input_dim
        self.h_dim = h_dim
#

store the weights and biases for each layer by name

        self.params = {}
#

initialize weights with random values

        self.params[self.wx_name] = init_scale * np.random.randn(input_dim, h_dim)
        self.params[self.wh_name] = init_scale * np.random.randn(h_dim, h_dim)
        self.params[self.b_name] = np.zeros(h_dim)
#

store the gradients by name

        self.grads = {}
        self.grads[self.wx_name] = None
        self.grads[self.wh_name] = None
        self.grads[self.b_name] = None
        self.meta = None
#

RNN Forward Pass

    def step_forward(self, x, prev_h):
#
    Calculate the hidden state of a single time step.

    Notes
    ----------
    - Tanh activation function
    - Next hidden state is a function of:
  1. the previous hidden state
  2. current input
  3. bias
    Parameters
    ----------
    : x (tuple): Input feature matrix (N, D)
    : prev_h (tuple): Previous hidden state (N, H)
    
    Returns
    ----------
    : next_h (float): Hidden state activations
    : meta (tuple): Store variables for the backward pass
    
        next_h, meta = None, None
        assert np.prod(x.shape[1:]) == self.input_dim, "But got {} and {}".format(
            np.prod(x.shape[1:]), self.input_dim)
#
  1. Multiply the input by the weights
  2. Apply the activation function
        next_h = np.tanh(
            np.matmul(x, self.params[self.wx_name]) +
            np.matmul(prev_h, self.params[self.wh_name]) +
            self.params[self.b_name])
        meta = (x, prev_h, next_h)
        return next_h, meta
#

Backward pass of a single time step

    def step_backward(self, dnext_h, meta):
#
    Parameters
    ----------
    : dnext_h: gradient w.r.t. next hidden state
    : meta: variables stored from forward pass

    : dx: gradients of input feature (N, D)
    : dprev_h: gradients of previous hidden state (N, H)
    : dWh: gradients w.r.t. feature-to-hidden weights (D, H)
    : dWx: gradients w.r.t. hidden-to-hidden weights (H, H)
    : db: gradients w.r.t bias (H,)
        dx, dprev_h, dWx, dWh, db = None, None, None, None, None

        x, prev_h, h = meta
        dtan = 1 - h ** 2
        dnext_tan = dnext_h * dtan
        dx = np.matmul(dnext_tan, self.params[self.wx_name].T)
        dprev_h = np.matmul(dnext_tan, self.params[self.wh_name].T)
        dWx = np.matmul(x.T, dnext_tan)
        dWh = np.matmul(prev_h.T, dnext_tan)
        db = dnext_tan.sum(axis=0)
#

store the computed gradients

        self.grads[self.wx_name] = dWx
        self.grads[self.wh_name] = dWh
        self.grads[self.b_name] = db
        return dx, dprev_h, dWx, dWh, db
#

Implement forward pass using step_forward

    def forward(self, x, h0):
#
    Parameters
    ----------
    : x: input feature for the entire timeseries (N, T, D)
    : h0: initial hidden state (N, H)

    Returns
    ----------
    : h: hidden state activations
        h = None  # N,T,H
        self.meta = []
        N, T, D = x.shape
        H = h0.shape[1]
        h = np.zeros((N, T, H))
        prev_h = h0
        for t in np.arange(T):
            prev_h, meta = self.step_forward(x[:, t, :], prev_h)
            h[:, t, :] = prev_h
            self.meta.append(meta)

        return h
#

Implement the backward pass using [[step_backward]] method

    def backward(self, dh):
#

Parameters

: dh (ndarray): Stores gradients of hidden states for the entire timeseries in a 3-dimensional array with shape (N, T, H)

Returns

: dx: gradient of inputs (N, T, D)
: dh0: gradient w.r.t. initial hidden state (N, H)
        dx, dh0 = None, None
        self.grads[self.wx_name] = None
        self.grads[self.wh_name] = None
        self.grads[self.b_name] = None

        N, T, H = dh.shape
#

initial backpass from output

        dxt, dprev_h, dWx, dWh, db = self.step_backward(dh[:, T - 1, :], self.meta[-1])
        D = dxt.shape[-1]
        dx = np.zeros((N, T, D))
        dx[:, T - 1, :] = dxt

        for t in np.arange(T - 2, -1, -1):
            dx[:, t, :], dprev_h, dWx_t, dWh_t, db_t = self.step_backward(dh[:, t, :] + dprev_h, self.meta[t])
            dWx += dWx_t
            dWh += dWh_t
            db += db_t
        dh0 = dprev_h
#

gradient of input-to-hidden weights (D, H)

        self.grads[self.wx_name] = dWx
#

gradient of hidden-to-hidden weights (H, H)

        self.grads[self.wh_name] = dWh
#

gradient of biases (H,)

        self.grads[self.b_name] = db

        self.meta = []
        return dx, dh0
#
class LSTM(object):
#

Long short-term memory

    def __init__(self, input_dim, h_dim, init_scale=0.02, name='lstm'):
#
    Parameters
    ----------
    : name: the name of current layer
    : input_dim: input dimension
    : h_dim: hidden state dimension
        self.name = name
        self.wx_name = name + "_wx"
        self.wh_name = name + "_wh"
        self.b_name = name + "_b"
        self.input_dim = input_dim
        self.h_dim = h_dim
        self.params = {}
        self.grads = {}
        self.params[self.wx_name] = init_scale * np.random.randn(input_dim, 4 * h_dim)
        self.params[self.wh_name] = init_scale * np.random.randn(h_dim, 4 * h_dim)
        self.params[self.b_name] = np.zeros(4 * h_dim)
        self.grads[self.wx_name] = None
        self.grads[self.wh_name] = None
        self.grads[self.b_name] = None
        self.meta = None
#

LSTM Forward Step

    def step_forward(self, x, prev_h, prev_c):
#
    Parameters
    ----------
    x: input feature (N, D)
    prev_h: hidden state from the previous timestep (N, H)

    Notes
    ----------
    Use the numerically stable `sigmoid` function
        next_h, next_c, meta = None, None, None
        H = prev_h.shape[-1]

        h = np.dot(x, self.params[self.wx_name]) + np.dot(prev_h, self.params[self.wh_name]) + self.params[
            self.b_name]
        i = sigmoid(h[:, 0:H])
        f = sigmoid(h[:, H:(2 * H)])
        o = sigmoid(h[:, (2 * H):(3 * H)])
        g = np.tanh(h[:, (3 * H):(4 * H)])
        next_c = f * prev_c + i * g
        next_h = o * np.tanh(next_c)
        h = np.concatenate((i, f, o, g), axis=1)
        meta = (x, prev_h, prev_c, next_h, next_c, i, f, o, g, h)

        return next_h, next_c, meta
#

LSTM Backward Pass (Backpropagation)

    def step_backward(self, dnext_h, dnext_c, meta):
#
    Parameters
    ----------
    : dnext_h: gradient w.r.t. next hidden state (NxH)
    : meta: variables needed for the backward pass (NxH)

    Returns
    ----------
    : dx: gradients of input feature (N, D)
    : dprev_h: gradients of previous hiddel state (N, H)
    : dWh: gradients w.r.t. feature-to-hidden weights (D, H)
    : dWx: gradients w.r.t. hidden-to-hidden weights (H, H)
    : db: gradients w.r.t bias (H,)
        dx, dprev_h, dprev_c, dWx, dWh, db = None, None, None, None, None, None

        x, prev_h, prev_c, next_h, next_c, i, f, o, g, h = meta
        N, H = dnext_h.shape
#

Calculate derivative of sigmoid

        def dsig(x):
#
        Parameters
        ----------
        : x = sigmoid(input)
            return x * (1 - x)
#

Calculate derivative of tanh

        def dtanh(x):
#
        Parameters
        ----------
        : x = np.tanh(input)
            return 1 - x ** 2

        dnext_c += dnext_h * o * dtanh(np.tanh(next_c))

        di = dsig(i) * dnext_c * g
        df = dsig(f) * dnext_c * prev_c
        do = dsig(o) * dnext_h * np.tanh(next_c)
        dg = dtanh(g) * dnext_c * i
        d = np.concatenate((di, df, do, dg), axis=1)

        dx = np.matmul(d, self.params[self.wx_name].T)
        dprev_h = np.matmul(d, self.params[self.wh_name].T)
        dprev_c = dnext_c * f

        dWx = np.dot(x.T, d)
        dWh = np.dot(prev_h.T, d)
        db = np.sum(d, axis=0)

        return dx, dprev_h, dprev_c, dWx, dWh, db
#

Implement the forward pass for an LSTM over an entire timeseries

    def forward(self, x, h0):
#
    Notes
    -----------
    - Assume an input sequence composed of T vectors, each of dimension D.
    - Hidden state has size H
    - The initial hidden state is passed as input, but the initial cell state is set to zero
    - Each minibatch contains N sequences
    - After running the LSTM forward, we return the hidden states for all timesteps
    - The cell state is not returned; it is an internal variable to the LSTM and is not accessed from outside

    Inputs
    ----------
    : x: Input data of shape (N, T, D)
    : h0: Initial hidden state of shape (N, H)
    : Wx: Weights for input-to-hidden connections, of shape (D, 4H)
    : Wh: Weights for hidden-to-hidden connections, of shape (H, 4H)
    : b: Biases of shape (4H,)

    Returns
    ----------
    : h: Hidden states for all timesteps of all sequences, of shape (N, T, H)

    Stores:
    ----------
    : meta: Values needed for the backward pass.
        h = None
        self.meta = []
        N, T, D = x.shape
        H = h0.shape[1]
        prev_h = h0
#

Initialize cell state with zeros

        prev_c = np.zeros(h0.shape)
        h = np.zeros((N, T, H))
        meta = []
        for t in np.arange(T):
            xt = x[:, t, :]
            next_h, next_c, meta_t = self.step_forward(xt, prev_h, prev_c)
            meta.append(meta_t)
            h[:, t, :] = next_h
            prev_h, prev_c = next_h, next_c
        self.meta = meta
        return h
#

Backward pass for an LSTM over an entire sequence of data

    def backward(self, dh):
#
    Parameters
    ----------
    : dh: Upstream gradients of hidden states, of shape (N, T, H)

    Returns
    ----------
    : dx: Gradient of input data of shape (N, T, D)
    : dh0: Gradient of initial hidden state of shape (N, H)
    : dWx: Gradient of input-to-hidden weight matrix of shape (D, 4H)
    : dWh: Gradient of hidden-to-hidden weight matrix of shape (H, 4H)
    : db: Gradient of biases, of shape (4H,)

    Notes
    ----------
    - gradients of hidden states h come from two sources
    - initial gradient of the cell state is zero
        dx, dh0 = None, None
        N, T, H = dh.shape
        D = self.meta[0][0].shape[1]

        dnext_c = np.zeros((N, H))
        dnext_h = np.zeros((N, H))
        dx = np.zeros((N, T, D))
        dWx = np.zeros((D, 4 * H))
        dWh = np.zeros((H, 4 * H))
        db = np.zeros(4 * H)

        for t in np.arange(T - 1, -1, -1):
            dxt, dnext_h, dnext_c, dWxt, dWht, dbt = self.step_backward(dnext_h + dh[:, t, :], dnext_c, self.meta[t])
            dx[:, t, :] = dxt
            dWx += dWxt
            dWh += dWht
            db += dbt
        dh0 = dnext_h

        self.grads[self.wx_name] = dWx
        self.grads[self.wh_name] = dWh
        self.grads[self.b_name] = db
        self.meta = []
        return dx, dh0
#
class word_embedding(object):
#

Word Embedding Network

    def __init__(self, voc_dim, vec_dim, name="we"):
#
    Parameters
    ----------
    : name: the name of current layer
    : voc_dim: words size
    : vec_dim: vector dimension
    : meta: to store the forward pass activations for computing backpropagation
        self.name = name
        self.w_name = name + "_w"
        self.voc_dim = voc_dim
        self.vec_dim = vec_dim
        self.params = {}
        self.grads = {}
        self.params[self.w_name] = np.random.randn(voc_dim, vec_dim)
        self.grads[self.w_name] = None
        self.meta = None
#

Implement the forward pass for word embeddings

    def forward(self, x):
#
    Notes
    ----------
    - Assume a vocabulary of V words, assigning each to a vector of dimension D

    Parameters
    ----------
    : x: Integer array of shape (N, T) giving indices of words.
  • Minibatches of size N where each sequence has length T
  • Each element idx of x muxt be in the range 0 <= idx < V
    Returns
    ----------
    : out: Array of shape (N, T, D) giving word vectors for all input words.
    
    Stores
    ----------
    : meta: Values needed for the backward pass
    
        out, self.meta = None, None
        N, T = x.shape
        V = self.params[self.w_name].shape[0]
        out = self.params[self.w_name][x]
        self.meta = [x, out]

        return out
#

Backward pass for word embeddings

    def backward(self, dout):
#
    Notes
    ----------
    - Only returning the gradient because we cannot back-propagate into the words

    Parameters
    ----------
    : dout (ndarray): Upstream gradients of shape (N, T, D)

    Returns
    ----------
    : dW (ndarray): Gradient of word embedding matrix, of shape (V, D).
        self.grads[self.w_name] = None
        x, out = self.meta
        dW = np.zeros_like(self.params[self.w_name])
        np.add.at(dW, x, dout)
        self.grads[self.w_name] = dW
        return dW
#
class temporal_fc(object):
#

Temporal Fully Connected layer

    def __init__(self, input_dim, output_dim, init_scale=0.02, name='t_fc'):
#

Transform the layer params into scores for each word in the vocabularyn using an affine function

    Parameters
    ----------
    : name: the name of current layer
    : input_dim: input dimension
    : output_dim: output dimension

    Stores
    ----------
    : meta: local memory state for calculating backpropagation
        self.name = name
        self.w_name = name + "_w"
        self.b_name = name + "_b"
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.params = {}
        self.grads = {}
        self.params[self.w_name] = init_scale * np.random.randn(input_dim, output_dim)
        self.params[self.b_name] = np.zeros(output_dim)
        self.grads[self.w_name] = None
        self.grads[self.b_name] = None
        self.meta = None
#

Forward pass for a temporal fc layer

    def forward(self, x):
#
    Transfer input vectors into a new vector of dimension M using an affine function

    Parameters
    ----------
    : x (ndarray): input data of shape (N, T, D)
  • D-dimensional vectors arranged into a mini-batch of N timeseries, each of length T.
  • Input vectors transformed into a new vector of dimension M using an affine function to match the shape of the weights and bias matrices : w: Weights of shape (D, M) : b: Biases of shape (M,)
    Returns
    ----------
    : out: Output data of shape (N, T, M)
    
    Stores
    ----------
    : meta (list of dicts): Values needed for the backward pass
    
        N, T, D = x.shape
        M = self.params[self.b_name].shape[0]
        out = x.reshape(N * T, D).dot(self.params[self.w_name]).reshape(N, T, M) + self.params[self.b_name]
        self.meta = [x, out]
        return out
#

Backward pass for temporal fc layer

    def backward(self, dout):
#
    Parameters
    ----------
    : dout: Upstream gradients of shape (N, T, M)

    Returns
    ----------
    : dx: Gradient of input, of shape (N, T, D)

    Stores
    ----------
    : dw: Gradient of weights, of shape (D, M)
    : db: Gradient of biases, of shape (M,)
        x, out = self.meta
        N, T, D = x.shape
        M = self.params[self.b_name].shape[0]

        dx = dout.reshape(N * T, M).dot(self.params[self.w_name].T).reshape(N, T, D)
        self.grads[self.w_name] = dout.reshape(N * T, M).T.dot(x.reshape(N * T, D)).T
        self.grads[self.b_name] = dout.sum(axis=(0, 1))

        return dx
#
class temporal_softmax_CE_loss(object):
#

Temporal Softmax with Cross-Entropy Loss

    def __init__(self, dim_average=True):
#
    Parameters
    ----------
    : dim_average (bool) : if dividing by the input dimension or not

    Attributes
    ----------
    : dLoss: intermediate variables to store the scores
    : label: Ground truth label for classification task
        self.dim_average = dim_average  # if average w.r.t. the total number of features
        self.dLoss = None
        self.label = None
#
    def forward(self, feat, label, mask):
        loss = None
        N, T, V = feat.shape

        feat_flat = feat.reshape(N * T, V)
        label_flat = label.reshape(N * T)
        mask_flat = mask.reshape(N * T)

        probs = np.exp(feat_flat - np.max(feat_flat, axis=1, keepdims=True))
        probs /= np.sum(probs, axis=1, keepdims=True)

        loss = -np.sum(mask_flat * np.log(probs[np.arange(N * T), label_flat]))
        if self.dim_average:
            loss /= N

        self.dLoss = probs.copy()
        self.label = label
        self.mask = mask

        return loss
#
    def backward(self):
        N, T = self.label.shape
        dLoss = self.dLoss
        if dLoss is None:
            raise ValueError("No forward function called before for this module!")
        dLoss[np.arange(dLoss.shape[0]), self.label.reshape(N * T)] -= 1.0
        if self.dim_average:
            dLoss /= N
        dLoss *= self.mask.reshape(N * T)[:, None]
        self.dLoss = dLoss

        return dLoss.reshape(N, T, -1)