Variational autoencoder
import numpy as np
# for a gpu version
# import cupy as np
from matplotlib import pyplot as plt
%matplotlib inline
# we use the lightweight deep learning library dp
# you have to put dp.py in the same folder as this noebook
# you can download it from
# https://gitlab.com/deep.TEACHING/educational-materials/-/blob/master/notebooks/differentiable-programming/dp.py
# if you have downloaded the complete deep teaching git repo you find it in the folder ../differentiable-programming/
import dp # differentiable programming module
from dp import Node
#data_path = "/Users/christian/data/mnist_csv/"
data_path = "/home/chris/Downloads/"
# get the data from
# https://pjreddie.com/projects/mnist-in-csv/
try:
train_data_mnist
except NameError:
train_data_mnist = np.loadtxt(data_path + "mnist_train.csv", delimiter=",")
test_data_mnist = np.loadtxt(data_path + "mnist_test.csv", delimiter=",")
original_dim = 784 # Number of pixels in MNIST images.
latent_dim = 5 # dimensionality of the latent code z.
intermediate_dim = 128 # Size of the hidden layer.
epochs = 30
image_dim = int(np.sqrt(original_dim))
image_dim
def get_xy_mnist(data_mnist=train_data_mnist):
x_train = train_data_mnist[:,1:]
x_train = x_train / 255.
#x_train = x_train.reshape((-1, image_dim, image_dim))
y_train = train_data_mnist[:,0]
return x_train, y_train
x_train_, y_train_ = get_xy_mnist()
x_test_, y_test = get_xy_mnist(test_data_mnist)
nb_classes = 10
m = y_train_.shape[0]
y_train = np.zeros((m, nb_classes), dtype=int)
y_train[np.arange(m), y_train_.astype(int)] = 1
batch_size = 2**7
batch_size
def random_batch(x_train=x_train, y_train=y_train, batch_size=batch_size):
n = x_train.shape[0]
indices = np.random.randint(0,n, size=batch_size)
return x_train[indices], y_train[indices]
Exercise: Implement the loss function
Loss
Recap: We use the (negative) Variational Lower Bound as loss:
$ \mathcal L^{(i)} =\text{ELBO}^{(i)} = \mathbb E_{q(z\mid x^{(i)}; \phi)}[\log p(x^{(i)} \mid z^{(i)}; \theta)] - D_{KL}[q(z^{(i)} \mid x^{(i)}; \phi) \mid \mid p(z^{(i)})] $
Note on the second term
Here$ q(z^{(i)} \mid x^{(i)}; \phi) $ and$ p(z^{(i)}) $ are Gaussian:
-
prior:$ p(z^{(i)})=\mathcal N(0,\mathbb 1) $ -$ \mathbb 1 $ diagonal matrix with$ 1 $s. -$ q(z^{(i)} \mid x^{(i)}; \phi) = \mathcal N\left(\mu(x^{(i)}),\Sigma^2(x^{(i)})\right) $
-
the diagonal matrix$ \Sigma^2(x^{(i)}) $ has entries$ \sigma^2_j(x^{(i)}) $.
The encoder NN computes the means$ \mu_j(x^{(i)}) $ and stds$ \sigma_j(x^{(i)}) $ of the normal distribution from which the$ z $-components are sampled.
For such Gaussians the Kullback-Leiber Divergence can be computed analytically:
$ D_{KL}[q(z^{(i)} \mid x^{(i)}; \phi) \mid \mid p(z^{(i)})] = \sum_j \left( - \log \sigma_j ( x^{(i)}) + \frac{\sigma_j^2 ( x^{(i)})+ \mu_j^2 ( x^{(i)})-1}{2} \right) $ -$ j $ is the index for the dimensions of$ z $
Note on the first term$ \mathbb E_{q(z^{(i)}\mid x^{(i)}, \phi)} [\log p(x|z,\theta)] $
The encoder NN computes the means$ \mu_j(x^{(i)}) $ and stds$ \sigma_j(x^{(i)}) $ of the normal distribution from which$ z $ is sampled. This samples are used in the MC Estimator for the expectation. For each data point in a mini batch a$ z $ is sampled. The decoder NN takes the$ z $s for reconstruct$ x $. For each$ x $ the decoder NN computes a probability$ \pi $ (value between 0 and 1) for each pixel.$ \pi $ is the probability that the pixel is 1, i.e.$ p(x=1|z,\theta)=v $.$ \pi $ is a deterministic function of$ z $ and$ \theta $:$ \pi(z,\theta) $
In the data generating process from$ \pi $ is sampled then the realized pixel have value 0 or 1.
The data image pixel value$ x \in \{0,1\} $ determines if we should take$ \log p(x=1|z,\theta)=\log \pi $ or$ \log p(x=0|z,\theta)=\log (1-\pi) $ for the loss. This selection (in the cross entropy formula) is done with the trick of multiplying$ x $ resp.$ (1-x) $.
class VAE(dp.Model):
def __init__(self, original_dim=original_dim,
intermediate_dim=intermediate_dim,
latent_dim=latent_dim):
super(VAE, self).__init__()
# encoder param
self.fc1 = self.ReLu_Layer(original_dim, intermediate_dim, "fc1")
self.fc21 = self.Linear_Layer(intermediate_dim, latent_dim, "fc21")
self.fc22 = self.Linear_Layer(intermediate_dim, latent_dim, "fc22")
# decoder param
self.fc3 = self.ReLu_Layer(latent_dim, intermediate_dim, "fc3")
self.fc4 = self.Linear_Layer(intermediate_dim, original_dim, "fc4")
def encoder(self, x):
h1 = self.fc1(x)
return self.fc21(h1), self.fc22(h1)
def decoder(self, z):
h3 = self.fc3(z)
return self.fc4(h3).sigmoid()
# Sampling from the distribution q(z | x) = N(loc, scale)
# with reparametrization trick.
def sampling(mu, logvar):
# as exercise
raise NotImplementedError()
def elbo_binomial(x, x_decoded_mean, z_mean, z_logvar):
# as exercise
raise NotImplementedError()
def complete_result(self, x, y=None):
assert isinstance(x, Node)
x_reconstructed, z, mu, logvar = self.encoder_decoder(x)
loss = VAE.elbo_binomial(x, x_reconstructed, mu, logvar)
return loss, x_reconstructed, z, mu, logvar
def loss(self, x, y=None):
loss, x_reconstructed, z, mu, logvar = self.complete_result(x)
return loss
def encoder_decoder(self, x=None):
assert isinstance(x, Node)
mu, logvar = self.encoder(x)
z = VAE.sampling(mu, logvar)
x_reconstructed = self.decoder(z)
return x_reconstructed, z, mu, logvar
model = VAE()
optimizer = dp.optimizer.RMS_Prop(model, x_train, y_train, {"alpha":0.001, "beta2":0.9}, batch_size=batch_size)
optimizer.train(steps=2000, print_each=100)
iteration loss
1 754.6810172775514
100 189.01065687527876
200 163.41005855498807
300 152.7318255216338
400 153.5977037412187
500 152.2803389768308
600 138.98738415334526
700 142.7251023159316
800 144.5236833344694
900 141.99811778687825
1000 132.20779062264782
1100 134.96110252706168
1200 137.9260590258515
1300 129.8087122333456
1400 131.13113232144732
1500 128.50145046899456
1600 132.084567772302
1700 130.73433745254164
1800 132.5080785321756
1900 134.8022601180447
2000 134.01236115191807
# TODO: new image plots!
b_size = 10
x_, y_ = random_batch(batch_size=b_size)
x_1, y_1 = random_batch(x_test, y_test, batch_size=b_size)
fig = plt.figure(figsize=(10, 10))
for fid_idx, (data, title) in enumerate(
zip([x_, x_1], ['Train', 'Validation'])):
n = 10 # figure with 10 x 2 digits
digit_size = image_dim
figure = np.zeros((digit_size * n, digit_size * 2))
loss, x_reconstructed, z, mu, logvar = model.complete_result(x=Node(data))
decoded = x_reconstructed.value
decoded = np.clip(decoded,0,1)
for i in range(10):
figure[i * digit_size: (i + 1) * digit_size,
:digit_size] = data[i, :].reshape(digit_size, digit_size)
figure[i * digit_size: (i + 1) * digit_size,
digit_size:] = decoded[i, :].reshape(digit_size, digit_size)
ax = fig.add_subplot(1, 2, fid_idx + 1)
ax.imshow(figure, cmap='Greys_r')
ax.set_title(title)
ax.axis('off')
plt.show()
Exercise: Generating new images
Generate new images from the model.
n_samples = 10
# Todo as exercise
sampled_im_mean = None # Todo as exercise
# TODO: new image plots!
# Show the sampled images.
plt.figure()
for i in range(n_samples):
ax = plt.subplot(n_samples // 5 + 1, 5, i + 1)
plt.imshow(sampled_im_mean[i, :].reshape(28, 28), cmap='gray')
ax.axis('off')
plt.show()
# VAE conditioned on class label
class Conditional_VAE(dp.Model):
def __init__(self, original_dim=original_dim,
intermediate_dim=intermediate_dim,
latent_dim=latent_dim,
nb_classes=nb_classes
):
super(Conditional_VAE, self).__init__()
# encoder param
self.fc1 = self.ReLu_Layer(original_dim+nb_classes, intermediate_dim, "fc1")
self.fc21 = self.Linear_Layer(intermediate_dim, latent_dim, "fc21")
self.fc22 = self.Linear_Layer(intermediate_dim, latent_dim, "fc22")
# decoder param
self.fc3 = self.ReLu_Layer(latent_dim+nb_classes, intermediate_dim, "fc3")
self.fc4 = self.Linear_Layer(intermediate_dim, original_dim, "fc4")
def encoder(self, x, y):
xy = x.concatenate(y, axis=1)
h1 = self.fc1(xy)
return self.fc21(h1), self.fc22(h1)
def decoder(self, z, y):
zy = z.concatenate(y, axis=1)
h3 = self.fc3(zy)
return self.fc4(h3).sigmoid()
def complete_result(self, x, y):
assert isinstance(x, Node)
x_reconstructed, z, mu, logvar = self.encoder_decoder(x, y)
loss = VAE.elbo_binomial(x, x_reconstructed, mu, logvar)
return loss, x_reconstructed, z, mu, logvar
def loss(self, x, y):
loss, x_reconstructed, z, mu, logvar = self.complete_result(x, y)
return loss
def encoder_decoder(self, x, y):
assert isinstance(x, Node)
assert isinstance(y, Node)
mu, logvar = self.encoder(x, y)
z = VAE.sampling(mu, logvar)
x_reconstructed = self.decoder(z, y)
return x_reconstructed, z, mu, logvar
cmodel = Conditional_VAE()
optimizer = dp.optimizer.RMS_Prop(cmodel, x_train, y_train,
{"alpha":0.001, "beta2":0.9}, batch_size=batch_size)
optimizer.train(steps=3000, print_each=100)
iteration loss
1 644.8099458873851
100 191.58626643629924
200 160.82652178115032
300 146.1062535997692
400 144.77274393886756
500 138.4928371437679
600 134.48543117346514
700 127.03498135399278
800 131.81428545800082
900 127.81752729646351
1000 126.98442030748525
1100 126.87265796853298
1200 132.9127311453287
1300 123.58075748303786
1400 123.48363877931138
1500 128.92065755024004
1600 122.43852188094728
1700 123.35481531919156
1800 122.6666255968586
1900 120.7470643382134
2000 117.92753607911844
2100 111.88843046889379
2200 121.42530627030216
2300 117.13705206295303
2400 121.2974065052953
2500 125.51833136054651
2600 116.06330112343903
2700 116.3950736282919
2800 118.8746651104017
2900 116.95019949450807
3000 123.7013209110921
# sampled_image_mean has size 10 x 784 with 10
nb_samples = 10
z = np.random.normal(size=(n_samples, latent_dim))
# Ths is the number which is generated as image
label = 8
y = np.zeros([nb_samples, nb_classes])
y[:, label] = 3.
sampled_im_mean = cmodel.decoder(Node(z), Node(y)).value
# TODO: new image plots!
# Show the sampled images.
plt.figure()
for i in range(n_samples):
ax = plt.subplot(n_samples // 5 + 1, 5, i + 1)
plt.imshow(sampled_im_mean[i, :].reshape(28, 28), cmap='gray')
ax.axis('off')
plt.show()