# Domain Adaptation - Надо создать модель - Но нет данных для тренировки - Можно же использовать другие, но похожие датасеты! - Да, но проблема: данные-то другие, можно получить низкую точность из-за этого - Есть специальные техники, которые позволяют решить эти проблемы - Научимся адаптировать существующие датасеты под другие проблемы - Ключевая идея: научиться генерировать нужные нам данные из других, и таким образом создавать "синтетический" датасет - Профит Итак, ==цель: научиться извлекать из "чужих" данных такие признаки, чтобы модель хорошо классифицировала "наши" данные== Как достичь цели? - измерить разницу между чужими и родными данными (чем имеющиеся картинки отличаются от тех, которые нам нужны) - разные варианты представления - объекты одного класса из разных датасетов должны быть очень близко, а из разных -- далеко - а если нету лейблов на нашем датасете? - обучить модель на чужом датасете и разметить наш - псевдо-лейблы - сближение датасетов - статистика - GAN: Generative Adversarial Network - генератор генерирует данные - дискриминатор пытается отгадать, какие данные были сгенерены - генератор и дискриминатор соревнуются (adversarial) - profit - ADDA: Adversarial Discriminative Domain Adaptation - обучаем модель на чужом датасете - другая модель учится различать чужие и наши картинки - зачем? чтобы максимально приблизить представления двух датасетов - используем это, чтобы улучшить классификацию нашего датасета на модели, обученной по чужому - CyCADA: Cecle-Consistent Adbersarial Domain Adaptation - генерируем из чужого датасета картинки, похожие на "наши" - много-много-много разных методов - как выбрать? -- искать и исследовать ## GPU в Колабе ==Обязательно подключите GPU!== Это сильно ускорит вычисления. ![](https://i.imgur.com/SAMHu98.png) ![](https://i.imgur.com/ooEYIiP.png) > Сессия в Колабе поддерживается только 30 минут, поэтому если 30 минут ноутбук будет неактивным -- сессия сбросится и придется устанавливать и запускать все заново ## Автоэнкодеры - используются для извлечения самых полезных признаков из датасетов - энкодер сжимает данные - декодер восстанавливает сжатые данные - ![](https://i.imgur.com/fU7mBKC.png) Сегодня будем использовать Keras. ==Для занятия необходимо работать с версией keras 2.0== ``` ! pip install keras==2.0 ``` - загружаем датасет цифр - :::spoiler Code ``` from keras.datasets import mnist import numpy as np (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train = x_train.astype('float32') / 255. x_test = x_test .astype('float32') / 255. x_train = np.reshape(x_train, (len(x_train), 28, 28, 1)) x_test = np.reshape(x_test, (len(x_test), 28, 28, 1)) ``` ::: - создаем модели для автоэнкодера - :::spoiler Code ``` from keras.layers import Input, Dense, Flatten, Reshape from keras.models import Model def create_dense_ae(): encoding_dim = 49 # Энкодер input_img = Input(shape=(28, 28, 1)) flat_img = Flatten()(input_img) encoded = Dense(encoding_dim, activation='relu')(flat_img) # Декодер # Раскодированное другим полносвязным слоем изображение input_encoded = Input(shape=(encoding_dim,)) flat_decoded = Dense(28*28, activation='sigmoid')(input_encoded) decoded = Reshape((28, 28, 1))(flat_decoded) # Модели, в конструктор первым аргументом передаются входные слои, а вторым выходные слои # Другие модели можно так же использовать как и слои encoder = Model(input_img, encoded, name="encoder") decoder = Model(input_encoded, decoded, name="decoder") autoencoder = Model(input_img, decoder(encoder(input_img)), name="autoencoder") return encoder, decoder, autoencoder ``` ::: - компиляция модели ``` encoder, decoder, autoencoder = create_dense_ae() autoencoder.compile(optimizer='adam', loss='binary_crossentropy') ``` - изучите параметры ``` autoencoder.summary() ``` - обучаем автоэнкодер, поиграйтесь с количеством эпох ``` autoencoder.fit(x_train, x_train, epochs=5, batch_size=256, shuffle=True, validation_data=(x_test, x_test)) ``` - рисуем цифры (тому, кто совсем не понимаем, что тут происходит, рекомендую обратиться к статьям с первых занятий) - :::spoiler Code ``` %matplotlib inline import seaborn as sns import matplotlib.pyplot as plt def plot_digits(*args): args = [x.squeeze() for x in args] n = min([x.shape[0] for x in args]) plt.figure(figsize=(2*n, 2*len(args))) for j in range(n): for i in range(len(args)): ax = plt.subplot(len(args), n, i*n + j + 1) plt.imshow(args[i][j]) plt.gray() ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) plt.show() ``` - закодируем несколько изображений ``` n = 10 imgs = x_test[:n] encoded_imgs = encoder.predict(imgs, batch_size=n) encoded_imgs[0] ``` - декодируем и сравним с оригиналами ``` decoded_imgs = decoder.predict(encoded_imgs, batch_size=n) plot_digits(imgs, decoded_imgs) ``` ### Глубокий автоэнкодер - больше слоев, более крутой автоэнкодер - :::spoiler Code ``` def create_deep_dense_ae(): # Размерность кодированного представления encoding_dim = 49 # Энкодер input_img = Input(shape=(28, 28, 1)) flat_img = Flatten()(input_img) x = Dense(encoding_dim*3, activation='relu')(flat_img) x = Dense(encoding_dim*2, activation='relu')(x) encoded = Dense(encoding_dim, activation='linear')(x) # Декодер input_encoded = Input(shape=(encoding_dim,)) x = Dense(encoding_dim*2, activation='relu')(input_encoded) x = Dense(encoding_dim*3, activation='relu')(x) flat_decoded = Dense(28*28, activation='sigmoid')(x) decoded = Reshape((28, 28, 1))(flat_decoded) # Модели encoder = Model(input_img, encoded, name="encoder") decoder = Model(input_encoded, decoded, name="decoder") autoencoder = Model(input_img, decoder(encoder(input_img)), name="autoencoder") return encoder, decoder, autoencoder d_encoder, d_decoder, d_autoencoder = create_deep_dense_ae() d_autoencoder.compile(optimizer='adam', loss='binary_crossentropy') ``` - самостоятельно посмотрите сводную информацию по модели (код был выше) - обучите новую модель - сравните результаты, стало ли лучше? - сравните сгенерированные изображения с исходными (код был выше) ### Cверточный автоэнкодер - более эффективный способ работы с картинками - если в обучающей выборке котик был в верхнем правом углу, то в нижнем левом его тоже можно будет найти - свертки - :::spoiler Code ``` from keras.layers import Conv2D, MaxPooling2D, UpSampling2D def create_deep_conv_ae(): input_img = Input(shape=(28, 28, 1)) x = Conv2D(128, (7, 7), activation='relu', padding='same')(input_img) x = MaxPooling2D((2, 2), padding='same')(x) x = Conv2D(32, (2, 2), activation='relu', padding='same')(x) x = MaxPooling2D((2, 2), padding='same')(x) encoded = Conv2D(1, (7, 7), activation='relu', padding='same')(x) # На этом моменте представление (7, 7, 1) т.е. 49-размерное input_encoded = Input(shape=(7, 7, 1)) x = Conv2D(32, (7, 7), activation='relu', padding='same')(input_encoded) x = UpSampling2D((2, 2))(x) x = Conv2D(128, (2, 2), activation='relu', padding='same')(x) x = UpSampling2D((2, 2))(x) decoded = Conv2D(1, (7, 7), activation='sigmoid', padding='same')(x) # Модели encoder = Model(input_img, encoded, name="encoder") decoder = Model(input_encoded, decoded, name="decoder") autoencoder = Model(input_img, decoder(encoder(input_img)), name="autoencoder") return encoder, decoder, autoencoder c_encoder, c_decoder, c_autoencoder = create_deep_conv_ae() c_autoencoder.compile(optimizer='adam', loss='binary_crossentropy') c_autoencoder.summary() ``` - сравните с предыдущими моделями и с оригинальными картинками ### Denoising автоэнкодер - учим убирать шум из данных - подаем зашумленные данные - просим сгенерить данные без шума - :::spoiler Code ``` import keras.backend as K from keras.layers import Lambda batch_size = 16 def create_denoising_model(autoencoder): def add_noise(x): noise_factor = 0.5 x = x + K.random_normal(x.get_shape(), 0.5, noise_factor) x = K.clip(x, 0., 1.) return x input_img = Input(batch_shape=(batch_size, 28, 28, 1)) noised_img = Lambda(add_noise)(input_img) noiser = Model(input_img, noised_img, name="noiser") denoiser_model = Model(input_img, autoencoder(noiser(input_img)), name="denoiser") return noiser, denoiser_model noiser, denoiser_model = create_denoising_model(autoencoder) denoiser_model.compile(optimizer='adam', loss='binary_crossentropy') ``` - после обучения (самостоятельно), посмотрим на зашумленные изображения ``` n = 10 imgs = x_test[:batch_size] noised_imgs = noiser.predict(imgs, batch_size=batch_size) encoded_imgs = encoder.predict(noised_imgs[:n], batch_size=n) decoded_imgs = decoder.predict(encoded_imgs[:n], batch_size=n) plot_digits(imgs[:n], noised_imgs, decoded_imgs) ``` ### Разреженный энкодер - штрафуем за то, что копирует (зазубривает) оригинальные данные - :::spoiler Code ``` from keras.regularizers import L1L2 def create_sparse_ae(): encoding_dim = 16 lambda_l1 = 0.00001 # Энкодер input_img = Input(shape=(28, 28, 1)) flat_img = Flatten()(input_img) x = Dense(encoding_dim*3, activation='relu')(flat_img) x = Dense(encoding_dim*2, activation='relu')(x) encoded = Dense(encoding_dim, activation='linear', activity_regularizer=L1L2(lambda_l1))(x) # Декодер input_encoded = Input(shape=(encoding_dim,)) x = Dense(encoding_dim*2, activation='relu')(input_encoded) x = Dense(encoding_dim*3, activation='relu')(x) flat_decoded = Dense(28*28, activation='sigmoid')(x) decoded = Reshape((28, 28, 1))(flat_decoded) # Модели encoder = Model(input_img, encoded, name="encoder") decoder = Model(input_encoded, decoded, name="decoder") autoencoder = Model(input_img, decoder(encoder(input_img)), name="autoencoder") return encoder, decoder, autoencoder s_encoder, s_decoder, s_autoencoder = create_sparse_ae() s_autoencoder.compile(optimizer='adam', loss='binary_crossentropy') ``` - после обучения сравните с оригинальными картинками (самостоятельно) - гомотопия (что это? догадайтесь сами, пример ниже) ``` def plot_homotopy(frm, to, n=10, decoder=None): z = np.zeros(([n] + list(frm.shape))) for i, t in enumerate(np.linspace(0., 1., n)): z[i] = frm * (1-t) + to * t if decoder: plot_digits(decoder.predict(z, batch_size=n)) else: plot_digits(z) ``` ``` # Гомотопия между первыми двумя восьмерками frm, to = x_test[y_test == 8][1:3] plot_homotopy(frm, to) ``` - посмотрим гомотопию для восьмерок одной из моделей ```python codes = encoder.predict(x_test[y_test == 8][1:3]) plot_homotopy(codes[0], codes[1], n=10, decoder=decoder) ``` - сравните качество промежуточных значений восьмерок (переходы) Вопросы "на подумать" - какие параметры влияют на то, как выглядит восьмерка? - может ли один и тот же человек в одних и тех же условиях нарисовать абсолютно одинаковые цифры? - скрытые переменные - их их распределения вероятностей (?) ``` codes = encoder.predict(x_test) sns.jointplot(codes[:,1], codes[:,3]) ``` - зависят ли параметры друг от друга? - сверху и справа нарисованы распределения параметров (например, толщина штриха или наклон письма) - если эти параметры будут генерироваться как в реальной жизни, то можно получить лучшие результаты - но как это сделать? ## Вариационные автоэнкодеры - извлекают распределение вероятностей из объекта, а потом по нему генерируют объект - например, закон распределения наклона письма - параметры (толщина письма, наклон) – скрытые переменные, они не объявлены явно - энкодер предсказывает среднее значение и дисперсию (разброс) для скрытых переменных (толщина письма) - декодер выбирает значение из распределения и генерит объект (цифру) с другими параметрами - другая толщина письма - другой наклон - другой цвет? - импортируем датасет - :::spoiler Code ``` import sys import numpy as np import matplotlib.pyplot as plt %matplotlib inline import seaborn as sns from keras.datasets import mnist (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train = x_train.astype('float32') / 255. x_test = x_test .astype('float32') / 255. x_train = np.reshape(x_train, (len(x_train), 28, 28, 1)) x_test = np.reshape(x_test, (len(x_test), 28, 28, 1)) ``` - основные параметры ``` batch_size = 500 latent_dim = 2 dropout_rate = 0.3 start_lr = 0.0001 ``` - модель - :::spoiler Code ``` from keras.layers import Input, Dense from keras.layers import BatchNormalization, Dropout, Flatten, Reshape, Lambda from keras.models import Model from keras.objectives import binary_crossentropy from keras.layers.advanced_activations import LeakyReLU from keras import backend as K def create_vae(): models = {} # Добавим Dropout и BatchNormalization def apply_bn_and_dropout(x): return Dropout(dropout_rate)(BatchNormalization()(x)) # Энкодер input_img = Input(batch_shape=(batch_size, 28, 28, 1)) x = Flatten()(input_img) x = Dense(256, activation='relu')(x) x = apply_bn_and_dropout(x) x = Dense(128, activation='relu')(x) x = apply_bn_and_dropout(x) # Предсказываем параметры распределений # Вместо того, чтобы предсказывать стандартное отклонение, предсказываем логарифм вариации z_mean = Dense(latent_dim)(x) z_log_var = Dense(latent_dim)(x) # Сэмплирование из Q с трюком репараметризации def sampling(args): z_mean, z_log_var = args epsilon = K.random_normal(shape=(batch_size, latent_dim), mean=0., stddev=1.0) return z_mean + K.exp(z_log_var / 2) * epsilon l = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var]) models["encoder"] = Model(input_img, l, 'Encoder') models["z_meaner"] = Model(input_img, z_mean, 'Enc_z_mean') models["z_lvarer"] = Model(input_img, z_log_var, 'Enc_z_log_var') # Декодер z = Input(shape=(latent_dim, )) x = Dense(128)(z) x = LeakyReLU()(x) x = apply_bn_and_dropout(x) x = Dense(256)(x) x = LeakyReLU()(x) x = apply_bn_and_dropout(x) x = Dense(28*28, activation='sigmoid')(x) decoded = Reshape((28, 28, 1))(x) models["decoder"] = Model(z, decoded, name='Decoder') models["vae"] = Model(input_img, models["decoder"](models["encoder"](input_img)), name="VAE") def vae_loss(x, decoded): x = K.reshape(x, shape=(batch_size, 28*28)) decoded = K.reshape(decoded, shape=(batch_size, 28*28)) xent_loss = 28*28*binary_crossentropy(x, decoded) kl_loss = -0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1) return (xent_loss + kl_loss)/2/28/28 return models, vae_loss models, vae_loss = create_vae() vae = models["vae"] from keras.optimizers import Adam, RMSprop vae.compile(optimizer=Adam(start_lr), loss=vae_loss) ``` - изменим функции рисования цифр - :::spoiler Code ``` digit_size = 28 def plot_digits(*args, invert_colors=False): args = [x.squeeze() for x in args] n = min([x.shape[0] for x in args]) figure = np.zeros((digit_size * len(args), digit_size * n)) for i in range(n): for j in range(len(args)): figure[j * digit_size: (j + 1) * digit_size, i * digit_size: (i + 1) * digit_size] = args[j][i].squeeze() if invert_colors: figure = 1-figure plt.figure(figsize=(2*n, 2*len(args))) plt.imshow(figure, cmap='Greys_r') plt.grid(False) ax = plt.gca() ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) plt.show() n = 15 # Картинка с 15x15 цифр digit_size = 28 from scipy.stats import norm # Так как сэмплируем из N(0, I), то сетку узлов, в которых генерируем цифры берем из обратной функции распределения grid_x = norm.ppf(np.linspace(0.05, 0.95, n)) grid_y = norm.ppf(np.linspace(0.05, 0.95, n)) def draw_manifold(generator, show=True): # Рисование цифр из многообразия figure = np.zeros((digit_size * n, digit_size * n)) for i, yi in enumerate(grid_x): for j, xi in enumerate(grid_y): z_sample = np.zeros((1, latent_dim)) z_sample[:, :2] = np.array([[xi, yi]]) x_decoded = generator.predict(z_sample) digit = x_decoded[0].squeeze() figure[i * digit_size: (i + 1) * digit_size, j * digit_size: (j + 1) * digit_size] = digit if show: # Визуализация plt.figure(figsize=(15, 15)) plt.imshow(figure, cmap='Greys_r') plt.grid(None) ax = plt.gca() ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) plt.show() return figure ``` - следим за обучением с TensorBoard - :::spoiler Code ``` from IPython.display import clear_output from keras.callbacks import LambdaCallback, ReduceLROnPlateau, TensorBoard # Массивы, в которые будем сохранять результаты, для последующей визуализации figs = [] latent_distrs = [] epochs = [] # Эпохи, в которые будем сохранять save_epochs = set(list((np.arange(0, 59)**1.701).astype(np.int)) + list(range(10))) # Отслеживать будем на вот этих цифрах imgs = x_test[:batch_size] n_compare = 10 # Модели generator = models["decoder"] encoder_mean = models["z_meaner"] # Функция, которую будем запускать после каждой эпохи def on_epoch_end(epoch, logs): if epoch in save_epochs: clear_output() # Не захламляем output # Сравнение реальных и декодированных цифр decoded = vae.predict(imgs, batch_size=batch_size) plot_digits(imgs[:n_compare], decoded[:n_compare]) # Рисование многообразия figure = draw_manifold(generator, show=True) # Сохранение многообразия и распределения z для создания анимации после epochs.append(epoch) figs.append(figure) latent_distrs.append(encoder_mean.predict(x_test, batch_size)) # Коллбэки pltfig = LambdaCallback(on_epoch_end=on_epoch_end) # lr_red = ReduceLROnPlateau(factor=0.1, patience=25) tb = TensorBoard(log_dir='./logs') # Запуск обучения vae.fit(x_train, x_train, shuffle=True, epochs=1000, batch_size=batch_size, validation_data=(x_test, x_test), callbacks=[pltfig, tb], verbose=1) ``` ### TensorBoard - инструмент для анализа модели - logs -- это папка логов, которую мы указывали выше в коде ``` %load_ext tensorboard %tensorboard --logdir logs ``` - внимательно посмотрите на получившиеся цифры: всегда ли они генерировались хорошо? - легко ли сгенерировать определенную цифру (не рандомную)? Подумайте. ### Conditional Variational Autoencoder CVAE - перенос стиля картинки - обучаем на картинках с лейблами - извлекаем стиль - генерим новые картинки - код почти идентичен предыдущему, нужно только добавить лейблы - догружаем данные с лейблами (добавить к предыдущему коду) ``` from keras.utils import to_categorical y_train_cat = to_categorical(y_train).astype(np.float32) y_test_cat = to_categorical(y_test).astype(np.float32) num_classes = y_test_cat.shape[1] ``` - изменим код модели - :::spoiler Code ``` from keras.layers import Input, Dense from keras.layers import BatchNormalization, Dropout, Flatten, Reshape, Lambda from keras.layers import concatenate from keras.models import Model from keras.objectives import binary_crossentropy from keras.layers.advanced_activations import LeakyReLU from keras import backend as K def create_cvae(): models = {} # Добавим Dropout и BatchNormalization def apply_bn_and_dropout(x): return Dropout(dropout_rate)(BatchNormalization()(x)) # Энкодер input_img = Input(shape=(28, 28, 1)) flatten_img = Flatten()(input_img) input_lbl = Input(shape=(num_classes,), dtype='float32') x = concatenate([flatten_img, input_lbl]) x = Dense(256, activation='relu')(x) x = apply_bn_and_dropout(x) # Предсказываем параметры распределений # Вместо того чтобы предсказывать стандартное отклонение, предсказываем логарифм вариации z_mean = Dense(latent_dim)(x) z_log_var = Dense(latent_dim)(x) # Сэмплирование из Q с трюком репараметризации def sampling(args): z_mean, z_log_var = args epsilon = K.random_normal(shape=(batch_size, latent_dim), mean=0., stddev=1.0) return z_mean + K.exp(z_log_var / 2) * epsilon l = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var]) models["encoder"] = Model([input_img, input_lbl], l, 'Encoder') models["z_meaner"] = Model([input_img, input_lbl], z_mean, 'Enc_z_mean') models["z_lvarer"] = Model([input_img, input_lbl], z_log_var, 'Enc_z_log_var') # Декодер z = Input(shape=(latent_dim, )) input_lbl_d = Input(shape=(num_classes,), dtype='float32') x = concatenate([z, input_lbl_d]) x = Dense(256)(x) x = LeakyReLU()(x) x = apply_bn_and_dropout(x) x = Dense(28*28, activation='sigmoid')(x) decoded = Reshape((28, 28, 1))(x) models["decoder"] = Model([z, input_lbl_d], decoded, name='Decoder') models["cvae"] = Model([input_img, input_lbl, input_lbl_d], models["decoder"]([models["encoder"]([input_img, input_lbl]), input_lbl_d]), name="CVAE") models["style_t"] = Model([input_img, input_lbl, input_lbl_d], models["decoder"]([models["z_meaner"]([input_img, input_lbl]), input_lbl_d]), name="style_transfer") def vae_loss(x, decoded): x = K.reshape(x, shape=(batch_size, 28*28)) decoded = K.reshape(decoded, shape=(batch_size, 28*28)) xent_loss = 28*28*binary_crossentropy(x, decoded) kl_loss = -0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1) return (xent_loss + kl_loss)/2/28/28 return models, vae_loss models, vae_loss = create_cvae() cvae = models["cvae"] ``` - изменить функции ```draw_manifold``` и ```draw_z_distr``` - :::spoiler Code ``` def draw_manifold(generator, lbl, show=True): # Рисование цифр из многообразия figure = np.zeros((digit_size * n, digit_size * n)) input_lbl = np.zeros((1, 10)) input_lbl[0, lbl] = 1 for i, yi in enumerate(grid_x): for j, xi in enumerate(grid_y): z_sample = np.zeros((1, latent_dim)) z_sample[:, :2] = np.array([[xi, yi]]) x_decoded = generator.predict([z_sample, input_lbl]) digit = x_decoded[0].squeeze() figure[i * digit_size: (i + 1) * digit_size, j * digit_size: (j + 1) * digit_size] = digit if show: # Визуализация plt.figure(figsize=(10, 10)) plt.imshow(figure, cmap='Greys_r') plt.grid(False) ax = plt.gca() ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) plt.show() return figure def draw_z_distr(z_predicted, lbl): # Рисование рпспределения z input_lbl = np.zeros((1, 10)) input_lbl[0, lbl] = 1 im = plt.scatter(z_predicted[:, 0], z_predicted[:, 1]) im.axes.set_xlim(-5, 5) im.axes.set_ylim(-5, 5) plt.show() ``` - изменим функцию отображения результатов - :::spoiler Code ``` from IPython.display import clear_output from keras.callbacks import LambdaCallback, ReduceLROnPlateau, TensorBoard # Массивы, в которые будем сохранять результаты для последующей визуализации figs = [[] for x in range(num_classes)] latent_distrs = [[] for x in range(num_classes)] epochs = [] # Эпохи, в которые будем сохранять save_epochs = set(list((np.arange(0, 59)**1.701).astype(np.int)) + list(range(10))) # Отслеживать будем на вот этих цифрах imgs = x_test[:batch_size] imgs_lbls = y_test_cat[:batch_size] n_compare = 10 # Модели generator = models["decoder"] encoder_mean = models["z_meaner"] # Функция, которую будем запускать после каждой эпохи def on_epoch_end(epoch, logs): if epoch in save_epochs: clear_output() # Не захламляем output # Сравнение реальных и декодированных цифр decoded = cvae.predict([imgs, imgs_lbls, imgs_lbls], batch_size=batch_size) plot_digits(imgs[:n_compare], decoded[:n_compare]) # Рисование многообразия для рандомного y и распределения z|y draw_lbl = np.random.randint(0, num_classes) print(draw_lbl) for lbl in range(num_classes): figs[lbl].append(draw_manifold(generator, lbl, show=lbl==draw_lbl)) idxs = y_test == lbl z_predicted = encoder_mean.predict([x_test[idxs], y_test_cat[idxs]], batch_size) latent_distrs[lbl].append(z_predicted) if lbl==draw_lbl: draw_z_distr(z_predicted, lbl) epochs.append(epoch) # Коллбэки pltfig = LambdaCallback(on_epoch_end=on_epoch_end) # lr_red = ReduceLROnPlateau(factor=0.1, patience=25) tb = TensorBoard(log_dir='./logs') # Запуск обучения cvae.fit([x_train, y_train_cat, y_train_cat], x_train, shuffle=True, epochs=100, batch_size=batch_size, validation_data=([x_test, y_test_cat, y_test_cat], x_test), callbacks=[pltfig, tb], verbose=1) ``` - перенос стиля ``` def style_transfer(model, X, lbl_in, lbl_out): rows = X.shape[0] if isinstance(lbl_in, int): lbl = lbl_in lbl_in = np.zeros((rows, 10)) lbl_in[:, lbl] = 1 if isinstance(lbl_out, int): lbl = lbl_out lbl_out = np.zeros((rows, 10)) lbl_out[:, lbl] = 1 return model.predict([X, lbl_in, lbl_out]) ``` ``` n = 10 lbl = 7 generated = [] prot = x_train[y_train == lbl][:n] for i in range(num_classes): generated.append(style_transfer(models["style_t"], prot, lbl, i)) generated[lbl] = prot plot_digits(*generated, invert_colors=True) ``` - удалось ли перенести стиль? - легко ли отличить сгенерированные картинки от настоящих? ## Generative Adversarial Network - создание более реалистичных картинок - генератор генерирует данные - дискриминатор пытается отгадать, какие данные были сгенерены - генератор и дискриминатор соревнуются (adversarial) - profit Приступим к генерации! - импорт датасета - :::spoiler Code ``` from IPython.display import clear_output import numpy as np import matplotlib.pyplot as plt %matplotlib inline from keras.layers import Dropout, BatchNormalization, Reshape, Flatten, RepeatVector from keras.layers import Lambda, Dense, Input, Conv2D, MaxPool2D, UpSampling2D, concatenate from keras.layers.advanced_activations import LeakyReLU from keras.models import Model, load_model from keras.datasets import mnist from keras.utils import to_categorical (x_train, y_train), (x_test, y_test) = mnist.load_data() x_train = x_train.astype('float32') / 255. x_test = x_test .astype('float32') / 255. x_train = np.reshape(x_train, (len(x_train), 28, 28, 1)) x_test = np.reshape(x_test, (len(x_test), 28, 28, 1)) y_train_cat = to_categorical(y_train).astype(np.float32) y_test_cat = to_categorical(y_test).astype(np.float32) ``` - будем работать с керас и тензорфлоу одновременно ``` from keras import backend as K import tensorflow as tf sess = tf.Session() K.set_session(sess) ``` - параметры ``` batch_size = 256 batch_shape = (batch_size, 28, 28, 1) latent_dim = 2 num_classes = 10 dropout_rate = 0.3 ``` - обучение через тензорфлоу - :::spoiler Code ``` def gen_batch(x, y): n_batches = x.shape[0] // batch_size while(True): for i in range(n_batches): yield x[batch_size*i: batch_size*(i+1)], y[batch_size*i: batch_size*(i+1)] idxs = np.random.permutation(y.shape[0]) x = x[idxs] y = y[idxs] train_batches_it = gen_batch(x_train, y_train_cat) test_batches_it = gen_batch(x_test, y_test_cat) ``` - магия тензорфлоу - :::spoiler Code ``` x_ = tf.placeholder(tf.float32, shape=(None, 28, 28, 1), name='image') y_ = tf.placeholder(tf.float32, shape=(None, num_classes), name='labels') z_ = tf.placeholder(tf.float32, shape=(None, latent_dim), name='z') img = Input(tensor=x_) lbl = Input(tensor=y_) z = Input(tensor=z_) ``` - моделька генератора - :::spoiler Code ``` with tf.variable_scope('generator'): x = concatenate([z, lbl]) x = Dense(7*7*64, activation='relu')(x) x = Dropout(dropout_rate)(x) x = Reshape((7, 7, 64))(x) x = UpSampling2D(size=(2, 2))(x) x = Conv2D(64, kernel_size=(5, 5), activation='relu', padding='same')(x) x = Dropout(dropout_rate)(x) x = Conv2D(32, kernel_size=(3, 3), activation='relu', padding='same')(x) x = Dropout(dropout_rate)(x) x = UpSampling2D(size=(2, 2))(x) generated = Conv2D(1, kernel_size=(5, 5), activation='sigmoid', padding='same')(x) generator = Model([z, lbl], generated, name='generator') ``` - а тут дискриминатор - добавляем лейбл цифры - :::spoiler Code ``` def add_units_to_conv2d(conv2, units): dim1 = int(conv2.shape[1]) dim2 = int(conv2.shape[2]) dimc = int(units.shape[1]) repeat_n = dim1*dim2 units_repeat = RepeatVector(repeat_n)(lbl) units_repeat = Reshape((dim1, dim2, dimc))(units_repeat) return concatenate([conv2, units_repeat]) with tf.variable_scope('discrim'): x = Conv2D(128, kernel_size=(7, 7), strides=(2, 2), padding='same')(img) x = add_units_to_conv2d(x, lbl) x = LeakyReLU()(x) x = Dropout(dropout_rate)(x) x = MaxPool2D((2, 2), padding='same')(x) l = Conv2D(128, kernel_size=(3, 3), padding='same')(x) x = LeakyReLU()(l) x = Dropout(dropout_rate)(x) h = Flatten()(x) d = Dense(1, activation='sigmoid')(h) discrim = Model([img, lbl], d, name='Discriminator') ``` - магия - :::spoiler Code ``` generated_z = generator([z, lbl]) discr_img = discrim([img, lbl]) discr_gen_z = discrim([generated_z, lbl]) gan_model = Model([z, lbl], discr_gen_z, name='GAN') gan = gan_model([z, lbl]) log_dis_img = tf.reduce_mean(-tf.log(discr_img + 1e-10)) log_dis_gen_z = tf.reduce_mean(-tf.log(1. - discr_gen_z + 1e-10)) L_gen = -log_dis_gen_z L_dis = 0.5*(log_dis_gen_z + log_dis_img) optimizer_gen = tf.train.RMSPropOptimizer(0.0003) optimizer_dis = tf.train.RMSPropOptimizer(0.0001) # Переменные генератора и дискриминаторы (отдельно) для оптимизаторов generator_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, "generator") discrim_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, "discrim") step_gen = optimizer_gen.minimize(L_gen, var_list=generator_vars) step_dis = optimizer_dis.minimize(L_dis, var_list=discrim_vars) sess.run(tf.global_variables_initializer()) ``` - обучение генератора и дискриминатора - :::spoiler Code ``` # Шаг обучения генератора def step(image, label, zp): l_dis, _ = sess.run([L_dis, step_gen], feed_dict={z:zp, lbl:label, img:image, K.learning_phase():1}) return l_dis # Шаг обучения дискриминатора def step_d(image, label, zp): l_dis, _ = sess.run([L_dis, step_dis], feed_dict={z:zp, lbl:label, img:image, K.learning_phase():1}) return l_dis ``` - для красивых картинок - :::spoiler Code ``` # Массивы, в которые будем сохранять результаты, для последующей визуализации figs = [[] for x in range(num_classes)] periods = [] save_periods = list(range(100)) + list(range(100, 1000, 10)) n = 15 # Картинка с 15x15 цифр from scipy.stats import norm # Так как сэмплируем из N(0, I), то сетку узлов, в которых генерируем цифры, берем из обратной функции распределения grid_x = norm.ppf(np.linspace(0.05, 0.95, n)) grid_y = norm.ppf(np.linspace(0.05, 0.95, n)) grid_y = norm.ppf(np.linspace(0.05, 0.95, n)) def draw_manifold(label, show=True): # Рисование цифр из многообразия figure = np.zeros((28 * n, 28 * n)) input_lbl = np.zeros((1, 10)) input_lbl[0, label] = 1. for i, yi in enumerate(grid_x): for j, xi in enumerate(grid_y): z_sample = np.zeros((1, latent_dim)) z_sample[:, :2] = np.array([[xi, yi]]) x_generated = sess.run(generated_z, feed_dict={z:z_sample, lbl:input_lbl, K.learning_phase():0}) digit = x_generated[0].squeeze() figure[i * 28: (i + 1) * 28, j * 28: (j + 1) * 28] = digit if show: # Визуализация plt.figure(figsize=(10, 10)) plt.imshow(figure, cmap='Greys') plt.grid(False) ax = plt.gca() ax.get_xaxis().set_visible(False) ax.get_yaxis().set_visible(False) plt.show() return figure n_compare = 10 def on_n_period(period): clear_output() # Не захламляем output # Рисование многообразия для рандомного y draw_lbl = np.random.randint(0, num_classes) print(draw_lbl) for label in range(num_classes): figs[label].append(draw_manifold(label, show=label==draw_lbl)) periods.append(period) ``` - обучаем модель - :::spoiler Code ```python= batches_per_period = 20 # Как часто сохранять картинки k_step = 5 # Количество шагов, которые могут делать дискриминатор и генератор во внутреннем цикле for i in range(5000): print('.', end='') # Достанем новый батч b0, b1 = next(train_batches_it) zp = np.random.randn(batch_size, latent_dim) # Шаги обучения дискриминатора for j in range(k_step): l_d = step_d(b0, b1, zp) b0, b1 = next(train_batches_it) zp = np.random.randn(batch_size, latent_dim) if l_d < 1.0: break # Шаги обучения генератора for j in range(k_step): l_d = step(b0, b1, zp) if l_d > 0.4: break b0, b1 = next(train_batches_it) zp = np.random.randn(batch_size, latent_dim) # Периодическое рисование результата if not i % batches_per_period: period = i // batches_per_period if period in save_periods: on_n_period(period) print(l_d) ``` - подождите немного... - а как тебе такой результат, Илон Маск?