# Domain Adaptation
- Надо создать модель
- Но нет данных для тренировки
- Можно же использовать другие, но похожие датасеты!
- Да, но проблема: данные-то другие, можно получить низкую точность из-за этого
- Есть специальные техники, которые позволяют решить эти проблемы
- Научимся адаптировать существующие датасеты под другие проблемы
- Ключевая идея: научиться генерировать нужные нам данные из других, и таким образом создавать "синтетический" датасет
- Профит
Итак, ==цель: научиться извлекать из "чужих" данных такие признаки, чтобы модель хорошо классифицировала "наши" данные==
Как достичь цели?
- измерить разницу между чужими и родными данными (чем имеющиеся картинки отличаются от тех, которые нам нужны)
- разные варианты представления
- объекты одного класса из разных датасетов должны быть очень близко, а из разных -- далеко
- а если нету лейблов на нашем датасете?
- обучить модель на чужом датасете и разметить наш
- псевдо-лейблы
- сближение датасетов
- статистика
- GAN: Generative Adversarial Network
- генератор генерирует данные
- дискриминатор пытается отгадать, какие данные были сгенерены
- генератор и дискриминатор соревнуются (adversarial)
- profit
- ADDA: Adversarial Discriminative Domain Adaptation
- обучаем модель на чужом датасете
- другая модель учится различать чужие и наши картинки
- зачем? чтобы максимально приблизить представления двух датасетов
- используем это, чтобы улучшить классификацию нашего датасета на модели, обученной по чужому
- CyCADA: Cecle-Consistent Adbersarial Domain Adaptation
- генерируем из чужого датасета картинки, похожие на "наши"
- много-много-много разных методов
- как выбрать? -- искать и исследовать
## GPU в Колабе
==Обязательно подключите GPU!== Это сильно ускорит вычисления.


> Сессия в Колабе поддерживается только 30 минут, поэтому если 30 минут ноутбук будет неактивным -- сессия сбросится и придется устанавливать и запускать все заново
## Автоэнкодеры
- используются для извлечения самых полезных признаков из датасетов
- энкодер сжимает данные
- декодер восстанавливает сжатые данные
- 
Сегодня будем использовать Keras.
==Для занятия необходимо работать с версией keras 2.0==
```
! pip install keras==2.0
```
- загружаем датасет цифр
- :::spoiler Code
```
from keras.datasets import mnist
import numpy as np
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test .astype('float32') / 255.
x_train = np.reshape(x_train, (len(x_train), 28, 28, 1))
x_test = np.reshape(x_test, (len(x_test), 28, 28, 1))
``` :::
- создаем модели для автоэнкодера
- :::spoiler Code
```
from keras.layers import Input, Dense, Flatten, Reshape
from keras.models import Model
def create_dense_ae():
encoding_dim = 49
# Энкодер
input_img = Input(shape=(28, 28, 1))
flat_img = Flatten()(input_img)
encoded = Dense(encoding_dim, activation='relu')(flat_img)
# Декодер
# Раскодированное другим полносвязным слоем изображение
input_encoded = Input(shape=(encoding_dim,))
flat_decoded = Dense(28*28, activation='sigmoid')(input_encoded)
decoded = Reshape((28, 28, 1))(flat_decoded)
# Модели, в конструктор первым аргументом передаются входные слои, а вторым выходные слои
# Другие модели можно так же использовать как и слои
encoder = Model(input_img, encoded, name="encoder")
decoder = Model(input_encoded, decoded, name="decoder")
autoencoder = Model(input_img, decoder(encoder(input_img)), name="autoencoder")
return encoder, decoder, autoencoder
```
:::
- компиляция модели
```
encoder, decoder, autoencoder = create_dense_ae()
autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
```
- изучите параметры
```
autoencoder.summary()
```
- обучаем автоэнкодер, поиграйтесь с количеством эпох
```
autoencoder.fit(x_train, x_train,
epochs=5,
batch_size=256,
shuffle=True,
validation_data=(x_test, x_test))
```
- рисуем цифры (тому, кто совсем не понимаем, что тут происходит, рекомендую обратиться к статьям с первых занятий)
- :::spoiler Code
```
%matplotlib inline
import seaborn as sns
import matplotlib.pyplot as plt
def plot_digits(*args):
args = [x.squeeze() for x in args]
n = min([x.shape[0] for x in args])
plt.figure(figsize=(2*n, 2*len(args)))
for j in range(n):
for i in range(len(args)):
ax = plt.subplot(len(args), n, i*n + j + 1)
plt.imshow(args[i][j])
plt.gray()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
```
- закодируем несколько изображений
```
n = 10
imgs = x_test[:n]
encoded_imgs = encoder.predict(imgs, batch_size=n)
encoded_imgs[0]
```
- декодируем и сравним с оригиналами
```
decoded_imgs = decoder.predict(encoded_imgs, batch_size=n)
plot_digits(imgs, decoded_imgs)
```
### Глубокий автоэнкодер
- больше слоев, более крутой автоэнкодер
- :::spoiler Code
```
def create_deep_dense_ae():
# Размерность кодированного представления
encoding_dim = 49
# Энкодер
input_img = Input(shape=(28, 28, 1))
flat_img = Flatten()(input_img)
x = Dense(encoding_dim*3, activation='relu')(flat_img)
x = Dense(encoding_dim*2, activation='relu')(x)
encoded = Dense(encoding_dim, activation='linear')(x)
# Декодер
input_encoded = Input(shape=(encoding_dim,))
x = Dense(encoding_dim*2, activation='relu')(input_encoded)
x = Dense(encoding_dim*3, activation='relu')(x)
flat_decoded = Dense(28*28, activation='sigmoid')(x)
decoded = Reshape((28, 28, 1))(flat_decoded)
# Модели
encoder = Model(input_img, encoded, name="encoder")
decoder = Model(input_encoded, decoded, name="decoder")
autoencoder = Model(input_img, decoder(encoder(input_img)), name="autoencoder")
return encoder, decoder, autoencoder
d_encoder, d_decoder, d_autoencoder = create_deep_dense_ae()
d_autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
```
- самостоятельно посмотрите сводную информацию по модели (код был выше)
- обучите новую модель
- сравните результаты, стало ли лучше?
- сравните сгенерированные изображения с исходными (код был выше)
### Cверточный автоэнкодер
- более эффективный способ работы с картинками
- если в обучающей выборке котик был в верхнем правом углу, то в нижнем левом его тоже можно будет найти
- свертки
- :::spoiler Code
```
from keras.layers import Conv2D, MaxPooling2D, UpSampling2D
def create_deep_conv_ae():
input_img = Input(shape=(28, 28, 1))
x = Conv2D(128, (7, 7), activation='relu', padding='same')(input_img)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(32, (2, 2), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2), padding='same')(x)
encoded = Conv2D(1, (7, 7), activation='relu', padding='same')(x)
# На этом моменте представление (7, 7, 1) т.е. 49-размерное
input_encoded = Input(shape=(7, 7, 1))
x = Conv2D(32, (7, 7), activation='relu', padding='same')(input_encoded)
x = UpSampling2D((2, 2))(x)
x = Conv2D(128, (2, 2), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
decoded = Conv2D(1, (7, 7), activation='sigmoid', padding='same')(x)
# Модели
encoder = Model(input_img, encoded, name="encoder")
decoder = Model(input_encoded, decoded, name="decoder")
autoencoder = Model(input_img, decoder(encoder(input_img)), name="autoencoder")
return encoder, decoder, autoencoder
c_encoder, c_decoder, c_autoencoder = create_deep_conv_ae()
c_autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
c_autoencoder.summary()
```
- сравните с предыдущими моделями и с оригинальными картинками
### Denoising автоэнкодер
- учим убирать шум из данных
- подаем зашумленные данные
- просим сгенерить данные без шума
- :::spoiler Code
```
import keras.backend as K
from keras.layers import Lambda
batch_size = 16
def create_denoising_model(autoencoder):
def add_noise(x):
noise_factor = 0.5
x = x + K.random_normal(x.get_shape(), 0.5, noise_factor)
x = K.clip(x, 0., 1.)
return x
input_img = Input(batch_shape=(batch_size, 28, 28, 1))
noised_img = Lambda(add_noise)(input_img)
noiser = Model(input_img, noised_img, name="noiser")
denoiser_model = Model(input_img, autoencoder(noiser(input_img)), name="denoiser")
return noiser, denoiser_model
noiser, denoiser_model = create_denoising_model(autoencoder)
denoiser_model.compile(optimizer='adam', loss='binary_crossentropy')
```
- после обучения (самостоятельно), посмотрим на зашумленные изображения
```
n = 10
imgs = x_test[:batch_size]
noised_imgs = noiser.predict(imgs, batch_size=batch_size)
encoded_imgs = encoder.predict(noised_imgs[:n], batch_size=n)
decoded_imgs = decoder.predict(encoded_imgs[:n], batch_size=n)
plot_digits(imgs[:n], noised_imgs, decoded_imgs)
```
### Разреженный энкодер
- штрафуем за то, что копирует (зазубривает) оригинальные данные
- :::spoiler Code
```
from keras.regularizers import L1L2
def create_sparse_ae():
encoding_dim = 16
lambda_l1 = 0.00001
# Энкодер
input_img = Input(shape=(28, 28, 1))
flat_img = Flatten()(input_img)
x = Dense(encoding_dim*3, activation='relu')(flat_img)
x = Dense(encoding_dim*2, activation='relu')(x)
encoded = Dense(encoding_dim, activation='linear', activity_regularizer=L1L2(lambda_l1))(x)
# Декодер
input_encoded = Input(shape=(encoding_dim,))
x = Dense(encoding_dim*2, activation='relu')(input_encoded)
x = Dense(encoding_dim*3, activation='relu')(x)
flat_decoded = Dense(28*28, activation='sigmoid')(x)
decoded = Reshape((28, 28, 1))(flat_decoded)
# Модели
encoder = Model(input_img, encoded, name="encoder")
decoder = Model(input_encoded, decoded, name="decoder")
autoencoder = Model(input_img, decoder(encoder(input_img)), name="autoencoder")
return encoder, decoder, autoencoder
s_encoder, s_decoder, s_autoencoder = create_sparse_ae()
s_autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
```
- после обучения сравните с оригинальными картинками (самостоятельно)
- гомотопия (что это? догадайтесь сами, пример ниже)
```
def plot_homotopy(frm, to, n=10, decoder=None):
z = np.zeros(([n] + list(frm.shape)))
for i, t in enumerate(np.linspace(0., 1., n)):
z[i] = frm * (1-t) + to * t
if decoder:
plot_digits(decoder.predict(z, batch_size=n))
else:
plot_digits(z)
```
```
# Гомотопия между первыми двумя восьмерками
frm, to = x_test[y_test == 8][1:3]
plot_homotopy(frm, to)
```
- посмотрим гомотопию для восьмерок одной из моделей
```python
codes = encoder.predict(x_test[y_test == 8][1:3])
plot_homotopy(codes[0], codes[1], n=10, decoder=decoder)
```
- сравните качество промежуточных значений восьмерок (переходы)
Вопросы "на подумать"
- какие параметры влияют на то, как выглядит восьмерка?
- может ли один и тот же человек в одних и тех же условиях нарисовать абсолютно одинаковые цифры?
- скрытые переменные
- их их распределения вероятностей (?)
```
codes = encoder.predict(x_test)
sns.jointplot(codes[:,1], codes[:,3])
```
- зависят ли параметры друг от друга?
- сверху и справа нарисованы распределения параметров (например, толщина штриха или наклон письма)
- если эти параметры будут генерироваться как в реальной жизни, то можно получить лучшие результаты
- но как это сделать?
## Вариационные автоэнкодеры
- извлекают распределение вероятностей из объекта, а потом по нему генерируют объект
- например, закон распределения наклона письма
- параметры (толщина письма, наклон) – скрытые переменные, они не объявлены явно
- энкодер предсказывает среднее значение и дисперсию (разброс) для скрытых переменных (толщина письма)
- декодер выбирает значение из распределения и генерит объект (цифру) с другими параметрами
- другая толщина письма
- другой наклон
- другой цвет?
- импортируем датасет
- :::spoiler Code
```
import sys
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
from keras.datasets import mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test .astype('float32') / 255.
x_train = np.reshape(x_train, (len(x_train), 28, 28, 1))
x_test = np.reshape(x_test, (len(x_test), 28, 28, 1))
```
- основные параметры
```
batch_size = 500
latent_dim = 2
dropout_rate = 0.3
start_lr = 0.0001
```
- модель
- :::spoiler Code
```
from keras.layers import Input, Dense
from keras.layers import BatchNormalization, Dropout, Flatten, Reshape, Lambda
from keras.models import Model
from keras.objectives import binary_crossentropy
from keras.layers.advanced_activations import LeakyReLU
from keras import backend as K
def create_vae():
models = {}
# Добавим Dropout и BatchNormalization
def apply_bn_and_dropout(x):
return Dropout(dropout_rate)(BatchNormalization()(x))
# Энкодер
input_img = Input(batch_shape=(batch_size, 28, 28, 1))
x = Flatten()(input_img)
x = Dense(256, activation='relu')(x)
x = apply_bn_and_dropout(x)
x = Dense(128, activation='relu')(x)
x = apply_bn_and_dropout(x)
# Предсказываем параметры распределений
# Вместо того, чтобы предсказывать стандартное отклонение, предсказываем логарифм вариации
z_mean = Dense(latent_dim)(x)
z_log_var = Dense(latent_dim)(x)
# Сэмплирование из Q с трюком репараметризации
def sampling(args):
z_mean, z_log_var = args
epsilon = K.random_normal(shape=(batch_size, latent_dim), mean=0., stddev=1.0)
return z_mean + K.exp(z_log_var / 2) * epsilon
l = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
models["encoder"] = Model(input_img, l, 'Encoder')
models["z_meaner"] = Model(input_img, z_mean, 'Enc_z_mean')
models["z_lvarer"] = Model(input_img, z_log_var, 'Enc_z_log_var')
# Декодер
z = Input(shape=(latent_dim, ))
x = Dense(128)(z)
x = LeakyReLU()(x)
x = apply_bn_and_dropout(x)
x = Dense(256)(x)
x = LeakyReLU()(x)
x = apply_bn_and_dropout(x)
x = Dense(28*28, activation='sigmoid')(x)
decoded = Reshape((28, 28, 1))(x)
models["decoder"] = Model(z, decoded, name='Decoder')
models["vae"] = Model(input_img, models["decoder"](models["encoder"](input_img)), name="VAE")
def vae_loss(x, decoded):
x = K.reshape(x, shape=(batch_size, 28*28))
decoded = K.reshape(decoded, shape=(batch_size, 28*28))
xent_loss = 28*28*binary_crossentropy(x, decoded)
kl_loss = -0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
return (xent_loss + kl_loss)/2/28/28
return models, vae_loss
models, vae_loss = create_vae()
vae = models["vae"]
from keras.optimizers import Adam, RMSprop
vae.compile(optimizer=Adam(start_lr), loss=vae_loss)
```
- изменим функции рисования цифр
- :::spoiler Code
```
digit_size = 28
def plot_digits(*args, invert_colors=False):
args = [x.squeeze() for x in args]
n = min([x.shape[0] for x in args])
figure = np.zeros((digit_size * len(args), digit_size * n))
for i in range(n):
for j in range(len(args)):
figure[j * digit_size: (j + 1) * digit_size,
i * digit_size: (i + 1) * digit_size] = args[j][i].squeeze()
if invert_colors:
figure = 1-figure
plt.figure(figsize=(2*n, 2*len(args)))
plt.imshow(figure, cmap='Greys_r')
plt.grid(False)
ax = plt.gca()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
n = 15 # Картинка с 15x15 цифр
digit_size = 28
from scipy.stats import norm
# Так как сэмплируем из N(0, I), то сетку узлов, в которых генерируем цифры берем из обратной функции распределения
grid_x = norm.ppf(np.linspace(0.05, 0.95, n))
grid_y = norm.ppf(np.linspace(0.05, 0.95, n))
def draw_manifold(generator, show=True):
# Рисование цифр из многообразия
figure = np.zeros((digit_size * n, digit_size * n))
for i, yi in enumerate(grid_x):
for j, xi in enumerate(grid_y):
z_sample = np.zeros((1, latent_dim))
z_sample[:, :2] = np.array([[xi, yi]])
x_decoded = generator.predict(z_sample)
digit = x_decoded[0].squeeze()
figure[i * digit_size: (i + 1) * digit_size,
j * digit_size: (j + 1) * digit_size] = digit
if show:
# Визуализация
plt.figure(figsize=(15, 15))
plt.imshow(figure, cmap='Greys_r')
plt.grid(None)
ax = plt.gca()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
return figure
```
- следим за обучением с TensorBoard
- :::spoiler Code
```
from IPython.display import clear_output
from keras.callbacks import LambdaCallback, ReduceLROnPlateau, TensorBoard
# Массивы, в которые будем сохранять результаты, для последующей визуализации
figs = []
latent_distrs = []
epochs = []
# Эпохи, в которые будем сохранять
save_epochs = set(list((np.arange(0, 59)**1.701).astype(np.int)) + list(range(10)))
# Отслеживать будем на вот этих цифрах
imgs = x_test[:batch_size]
n_compare = 10
# Модели
generator = models["decoder"]
encoder_mean = models["z_meaner"]
# Функция, которую будем запускать после каждой эпохи
def on_epoch_end(epoch, logs):
if epoch in save_epochs:
clear_output() # Не захламляем output
# Сравнение реальных и декодированных цифр
decoded = vae.predict(imgs, batch_size=batch_size)
plot_digits(imgs[:n_compare], decoded[:n_compare])
# Рисование многообразия
figure = draw_manifold(generator, show=True)
# Сохранение многообразия и распределения z для создания анимации после
epochs.append(epoch)
figs.append(figure)
latent_distrs.append(encoder_mean.predict(x_test, batch_size))
# Коллбэки
pltfig = LambdaCallback(on_epoch_end=on_epoch_end)
# lr_red = ReduceLROnPlateau(factor=0.1, patience=25)
tb = TensorBoard(log_dir='./logs')
# Запуск обучения
vae.fit(x_train, x_train, shuffle=True, epochs=1000,
batch_size=batch_size,
validation_data=(x_test, x_test),
callbacks=[pltfig, tb],
verbose=1)
```
### TensorBoard
- инструмент для анализа модели
- logs -- это папка логов, которую мы указывали выше в коде
```
%load_ext tensorboard
%tensorboard --logdir logs
```
- внимательно посмотрите на получившиеся цифры: всегда ли они генерировались хорошо?
- легко ли сгенерировать определенную цифру (не рандомную)? Подумайте.
### Conditional Variational Autoencoder CVAE
- перенос стиля картинки
- обучаем на картинках с лейблами
- извлекаем стиль
- генерим новые картинки
- код почти идентичен предыдущему, нужно только добавить лейблы
- догружаем данные с лейблами (добавить к предыдущему коду)
```
from keras.utils import to_categorical
y_train_cat = to_categorical(y_train).astype(np.float32)
y_test_cat = to_categorical(y_test).astype(np.float32)
num_classes = y_test_cat.shape[1]
```
- изменим код модели
- :::spoiler Code
```
from keras.layers import Input, Dense
from keras.layers import BatchNormalization, Dropout, Flatten, Reshape, Lambda
from keras.layers import concatenate
from keras.models import Model
from keras.objectives import binary_crossentropy
from keras.layers.advanced_activations import LeakyReLU
from keras import backend as K
def create_cvae():
models = {}
# Добавим Dropout и BatchNormalization
def apply_bn_and_dropout(x):
return Dropout(dropout_rate)(BatchNormalization()(x))
# Энкодер
input_img = Input(shape=(28, 28, 1))
flatten_img = Flatten()(input_img)
input_lbl = Input(shape=(num_classes,), dtype='float32')
x = concatenate([flatten_img, input_lbl])
x = Dense(256, activation='relu')(x)
x = apply_bn_and_dropout(x)
# Предсказываем параметры распределений
# Вместо того чтобы предсказывать стандартное отклонение, предсказываем логарифм вариации
z_mean = Dense(latent_dim)(x)
z_log_var = Dense(latent_dim)(x)
# Сэмплирование из Q с трюком репараметризации
def sampling(args):
z_mean, z_log_var = args
epsilon = K.random_normal(shape=(batch_size, latent_dim), mean=0., stddev=1.0)
return z_mean + K.exp(z_log_var / 2) * epsilon
l = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
models["encoder"] = Model([input_img, input_lbl], l, 'Encoder')
models["z_meaner"] = Model([input_img, input_lbl], z_mean, 'Enc_z_mean')
models["z_lvarer"] = Model([input_img, input_lbl], z_log_var, 'Enc_z_log_var')
# Декодер
z = Input(shape=(latent_dim, ))
input_lbl_d = Input(shape=(num_classes,), dtype='float32')
x = concatenate([z, input_lbl_d])
x = Dense(256)(x)
x = LeakyReLU()(x)
x = apply_bn_and_dropout(x)
x = Dense(28*28, activation='sigmoid')(x)
decoded = Reshape((28, 28, 1))(x)
models["decoder"] = Model([z, input_lbl_d], decoded, name='Decoder')
models["cvae"] = Model([input_img, input_lbl, input_lbl_d],
models["decoder"]([models["encoder"]([input_img, input_lbl]), input_lbl_d]),
name="CVAE")
models["style_t"] = Model([input_img, input_lbl, input_lbl_d],
models["decoder"]([models["z_meaner"]([input_img, input_lbl]), input_lbl_d]),
name="style_transfer")
def vae_loss(x, decoded):
x = K.reshape(x, shape=(batch_size, 28*28))
decoded = K.reshape(decoded, shape=(batch_size, 28*28))
xent_loss = 28*28*binary_crossentropy(x, decoded)
kl_loss = -0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
return (xent_loss + kl_loss)/2/28/28
return models, vae_loss
models, vae_loss = create_cvae()
cvae = models["cvae"]
```
- изменить функции ```draw_manifold``` и ```draw_z_distr```
- :::spoiler Code
```
def draw_manifold(generator, lbl, show=True):
# Рисование цифр из многообразия
figure = np.zeros((digit_size * n, digit_size * n))
input_lbl = np.zeros((1, 10))
input_lbl[0, lbl] = 1
for i, yi in enumerate(grid_x):
for j, xi in enumerate(grid_y):
z_sample = np.zeros((1, latent_dim))
z_sample[:, :2] = np.array([[xi, yi]])
x_decoded = generator.predict([z_sample, input_lbl])
digit = x_decoded[0].squeeze()
figure[i * digit_size: (i + 1) * digit_size,
j * digit_size: (j + 1) * digit_size] = digit
if show:
# Визуализация
plt.figure(figsize=(10, 10))
plt.imshow(figure, cmap='Greys_r')
plt.grid(False)
ax = plt.gca()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
return figure
def draw_z_distr(z_predicted, lbl):
# Рисование рпспределения z
input_lbl = np.zeros((1, 10))
input_lbl[0, lbl] = 1
im = plt.scatter(z_predicted[:, 0], z_predicted[:, 1])
im.axes.set_xlim(-5, 5)
im.axes.set_ylim(-5, 5)
plt.show()
```
- изменим функцию отображения результатов
- :::spoiler Code
```
from IPython.display import clear_output
from keras.callbacks import LambdaCallback, ReduceLROnPlateau, TensorBoard
# Массивы, в которые будем сохранять результаты для последующей визуализации
figs = [[] for x in range(num_classes)]
latent_distrs = [[] for x in range(num_classes)]
epochs = []
# Эпохи, в которые будем сохранять
save_epochs = set(list((np.arange(0, 59)**1.701).astype(np.int)) + list(range(10)))
# Отслеживать будем на вот этих цифрах
imgs = x_test[:batch_size]
imgs_lbls = y_test_cat[:batch_size]
n_compare = 10
# Модели
generator = models["decoder"]
encoder_mean = models["z_meaner"]
# Функция, которую будем запускать после каждой эпохи
def on_epoch_end(epoch, logs):
if epoch in save_epochs:
clear_output() # Не захламляем output
# Сравнение реальных и декодированных цифр
decoded = cvae.predict([imgs, imgs_lbls, imgs_lbls], batch_size=batch_size)
plot_digits(imgs[:n_compare], decoded[:n_compare])
# Рисование многообразия для рандомного y и распределения z|y
draw_lbl = np.random.randint(0, num_classes)
print(draw_lbl)
for lbl in range(num_classes):
figs[lbl].append(draw_manifold(generator, lbl, show=lbl==draw_lbl))
idxs = y_test == lbl
z_predicted = encoder_mean.predict([x_test[idxs], y_test_cat[idxs]], batch_size)
latent_distrs[lbl].append(z_predicted)
if lbl==draw_lbl:
draw_z_distr(z_predicted, lbl)
epochs.append(epoch)
# Коллбэки
pltfig = LambdaCallback(on_epoch_end=on_epoch_end)
# lr_red = ReduceLROnPlateau(factor=0.1, patience=25)
tb = TensorBoard(log_dir='./logs')
# Запуск обучения
cvae.fit([x_train, y_train_cat, y_train_cat], x_train, shuffle=True, epochs=100,
batch_size=batch_size,
validation_data=([x_test, y_test_cat, y_test_cat], x_test),
callbacks=[pltfig, tb],
verbose=1)
```
- перенос стиля
```
def style_transfer(model, X, lbl_in, lbl_out):
rows = X.shape[0]
if isinstance(lbl_in, int):
lbl = lbl_in
lbl_in = np.zeros((rows, 10))
lbl_in[:, lbl] = 1
if isinstance(lbl_out, int):
lbl = lbl_out
lbl_out = np.zeros((rows, 10))
lbl_out[:, lbl] = 1
return model.predict([X, lbl_in, lbl_out])
```
```
n = 10
lbl = 7
generated = []
prot = x_train[y_train == lbl][:n]
for i in range(num_classes):
generated.append(style_transfer(models["style_t"], prot, lbl, i))
generated[lbl] = prot
plot_digits(*generated, invert_colors=True)
```
- удалось ли перенести стиль?
- легко ли отличить сгенерированные картинки от настоящих?
## Generative Adversarial Network
- создание более реалистичных картинок
- генератор генерирует данные
- дискриминатор пытается отгадать, какие данные были сгенерены
- генератор и дискриминатор соревнуются (adversarial)
- profit
Приступим к генерации!
- импорт датасета
- :::spoiler Code
```
from IPython.display import clear_output
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from keras.layers import Dropout, BatchNormalization, Reshape, Flatten, RepeatVector
from keras.layers import Lambda, Dense, Input, Conv2D, MaxPool2D, UpSampling2D, concatenate
from keras.layers.advanced_activations import LeakyReLU
from keras.models import Model, load_model
from keras.datasets import mnist
from keras.utils import to_categorical
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test .astype('float32') / 255.
x_train = np.reshape(x_train, (len(x_train), 28, 28, 1))
x_test = np.reshape(x_test, (len(x_test), 28, 28, 1))
y_train_cat = to_categorical(y_train).astype(np.float32)
y_test_cat = to_categorical(y_test).astype(np.float32)
```
- будем работать с керас и тензорфлоу одновременно
```
from keras import backend as K
import tensorflow as tf
sess = tf.Session()
K.set_session(sess)
```
- параметры
```
batch_size = 256
batch_shape = (batch_size, 28, 28, 1)
latent_dim = 2
num_classes = 10
dropout_rate = 0.3
```
- обучение через тензорфлоу
- :::spoiler Code
```
def gen_batch(x, y):
n_batches = x.shape[0] // batch_size
while(True):
for i in range(n_batches):
yield x[batch_size*i: batch_size*(i+1)], y[batch_size*i: batch_size*(i+1)]
idxs = np.random.permutation(y.shape[0])
x = x[idxs]
y = y[idxs]
train_batches_it = gen_batch(x_train, y_train_cat)
test_batches_it = gen_batch(x_test, y_test_cat)
```
- магия тензорфлоу
- :::spoiler Code
```
x_ = tf.placeholder(tf.float32, shape=(None, 28, 28, 1), name='image')
y_ = tf.placeholder(tf.float32, shape=(None, num_classes), name='labels')
z_ = tf.placeholder(tf.float32, shape=(None, latent_dim), name='z')
img = Input(tensor=x_)
lbl = Input(tensor=y_)
z = Input(tensor=z_)
```
- моделька генератора
- :::spoiler Code
```
with tf.variable_scope('generator'):
x = concatenate([z, lbl])
x = Dense(7*7*64, activation='relu')(x)
x = Dropout(dropout_rate)(x)
x = Reshape((7, 7, 64))(x)
x = UpSampling2D(size=(2, 2))(x)
x = Conv2D(64, kernel_size=(5, 5), activation='relu', padding='same')(x)
x = Dropout(dropout_rate)(x)
x = Conv2D(32, kernel_size=(3, 3), activation='relu', padding='same')(x)
x = Dropout(dropout_rate)(x)
x = UpSampling2D(size=(2, 2))(x)
generated = Conv2D(1, kernel_size=(5, 5), activation='sigmoid', padding='same')(x)
generator = Model([z, lbl], generated, name='generator')
```
- а тут дискриминатор
- добавляем лейбл цифры
- :::spoiler Code
```
def add_units_to_conv2d(conv2, units):
dim1 = int(conv2.shape[1])
dim2 = int(conv2.shape[2])
dimc = int(units.shape[1])
repeat_n = dim1*dim2
units_repeat = RepeatVector(repeat_n)(lbl)
units_repeat = Reshape((dim1, dim2, dimc))(units_repeat)
return concatenate([conv2, units_repeat])
with tf.variable_scope('discrim'):
x = Conv2D(128, kernel_size=(7, 7), strides=(2, 2), padding='same')(img)
x = add_units_to_conv2d(x, lbl)
x = LeakyReLU()(x)
x = Dropout(dropout_rate)(x)
x = MaxPool2D((2, 2), padding='same')(x)
l = Conv2D(128, kernel_size=(3, 3), padding='same')(x)
x = LeakyReLU()(l)
x = Dropout(dropout_rate)(x)
h = Flatten()(x)
d = Dense(1, activation='sigmoid')(h)
discrim = Model([img, lbl], d, name='Discriminator')
```
- магия
- :::spoiler Code
```
generated_z = generator([z, lbl])
discr_img = discrim([img, lbl])
discr_gen_z = discrim([generated_z, lbl])
gan_model = Model([z, lbl], discr_gen_z, name='GAN')
gan = gan_model([z, lbl])
log_dis_img = tf.reduce_mean(-tf.log(discr_img + 1e-10))
log_dis_gen_z = tf.reduce_mean(-tf.log(1. - discr_gen_z + 1e-10))
L_gen = -log_dis_gen_z
L_dis = 0.5*(log_dis_gen_z + log_dis_img)
optimizer_gen = tf.train.RMSPropOptimizer(0.0003)
optimizer_dis = tf.train.RMSPropOptimizer(0.0001)
# Переменные генератора и дискриминаторы (отдельно) для оптимизаторов
generator_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, "generator")
discrim_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, "discrim")
step_gen = optimizer_gen.minimize(L_gen, var_list=generator_vars)
step_dis = optimizer_dis.minimize(L_dis, var_list=discrim_vars)
sess.run(tf.global_variables_initializer())
```
- обучение генератора и дискриминатора
- :::spoiler Code
```
# Шаг обучения генератора
def step(image, label, zp):
l_dis, _ = sess.run([L_dis, step_gen], feed_dict={z:zp, lbl:label, img:image, K.learning_phase():1})
return l_dis
# Шаг обучения дискриминатора
def step_d(image, label, zp):
l_dis, _ = sess.run([L_dis, step_dis], feed_dict={z:zp, lbl:label, img:image, K.learning_phase():1})
return l_dis
```
- для красивых картинок
- :::spoiler Code
```
# Массивы, в которые будем сохранять результаты, для последующей визуализации
figs = [[] for x in range(num_classes)]
periods = []
save_periods = list(range(100)) + list(range(100, 1000, 10))
n = 15 # Картинка с 15x15 цифр
from scipy.stats import norm
# Так как сэмплируем из N(0, I), то сетку узлов, в которых генерируем цифры, берем из обратной функции распределения
grid_x = norm.ppf(np.linspace(0.05, 0.95, n))
grid_y = norm.ppf(np.linspace(0.05, 0.95, n))
grid_y = norm.ppf(np.linspace(0.05, 0.95, n))
def draw_manifold(label, show=True):
# Рисование цифр из многообразия
figure = np.zeros((28 * n, 28 * n))
input_lbl = np.zeros((1, 10))
input_lbl[0, label] = 1.
for i, yi in enumerate(grid_x):
for j, xi in enumerate(grid_y):
z_sample = np.zeros((1, latent_dim))
z_sample[:, :2] = np.array([[xi, yi]])
x_generated = sess.run(generated_z, feed_dict={z:z_sample, lbl:input_lbl, K.learning_phase():0})
digit = x_generated[0].squeeze()
figure[i * 28: (i + 1) * 28,
j * 28: (j + 1) * 28] = digit
if show:
# Визуализация
plt.figure(figsize=(10, 10))
plt.imshow(figure, cmap='Greys')
plt.grid(False)
ax = plt.gca()
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
plt.show()
return figure
n_compare = 10
def on_n_period(period):
clear_output() # Не захламляем output
# Рисование многообразия для рандомного y
draw_lbl = np.random.randint(0, num_classes)
print(draw_lbl)
for label in range(num_classes):
figs[label].append(draw_manifold(label, show=label==draw_lbl))
periods.append(period)
```
- обучаем модель
- :::spoiler Code
```python=
batches_per_period = 20 # Как часто сохранять картинки
k_step = 5 # Количество шагов, которые могут делать дискриминатор и генератор во внутреннем цикле
for i in range(5000):
print('.', end='')
# Достанем новый батч
b0, b1 = next(train_batches_it)
zp = np.random.randn(batch_size, latent_dim)
# Шаги обучения дискриминатора
for j in range(k_step):
l_d = step_d(b0, b1, zp)
b0, b1 = next(train_batches_it)
zp = np.random.randn(batch_size, latent_dim)
if l_d < 1.0:
break
# Шаги обучения генератора
for j in range(k_step):
l_d = step(b0, b1, zp)
if l_d > 0.4:
break
b0, b1 = next(train_batches_it)
zp = np.random.randn(batch_size, latent_dim)
# Периодическое рисование результата
if not i % batches_per_period:
period = i // batches_per_period
if period in save_periods:
on_n_period(period)
print(l_d)
```
- подождите немного...
- а как тебе такой результат, Илон Маск?