# Source domain classifier sample
__Learning framework : Siamese network + Triplet loss__
__Sample dataset : Mnist__
## Code
``` python=
import tensorflow as tf
import numpy as np
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Input, Flatten, Dense, Lambda, Dropout, Conv2D, MaxPooling2D
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from sklearn.metrics import classification_report
def create_embedding_model(input_shape):
input = Input(shape=input_shape)
x = Conv2D(32, kernel_size=(3, 3), activation='relu')(input)
x = MaxPooling2D(pool_size=(2, 2))(x)
x = Dropout(0.25)(x)
x = Conv2D(64, kernel_size=(3, 3), activation='relu')(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
x = Dropout(0.25)(x)
x = Flatten()(x)
x = Dense(128, activation='relu')(x)
x = Dropout(0.5)(x)
embedding = Dense(32)(x)
return Model(input, embedding)
def triplet_loss(y, alpha=0.3):
anchor, positive, negative = y[0], y[1], y[2]
pos_dist = K.sum(K.square(anchor - positive), axis=-1)
neg_dist = K.sum(K.square(anchor - negative), axis=-1)
basic_loss = pos_dist - neg_dist + alpha
loss = K.maximum(basic_loss, 0.0)
return loss
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
x_train = x_train.reshape((-1, 28, 28, 1))
x_test = x_test.reshape((-1, 28, 28, 1))
#y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)
input_shape = (28, 28, 1)
anchor_input = Input(input_shape)
positive_input = Input(input_shape)
negative_input = Input(input_shape)
embedding_model = create_embedding_model(input_shape)
embedding_model.summary()
anchor_embedding = embedding_model(anchor_input)
positive_embedding = embedding_model(positive_input)
negative_embedding = embedding_model(negative_input)
triplet_loss_layer = Lambda(triplet_loss)
loss = triplet_loss_layer([anchor_embedding, positive_embedding, negative_embedding])
siamese_network = Model(inputs=[anchor_input, positive_input, negative_input], outputs=loss)
siamese_network.summary()
siamese_network.compile(optimizer=tf.keras.optimizers.legacy.Adam(lr=0.0001), loss='mean_absolute_error')
epochs = 10
batch_size = 128
num_classes = 10
anchor_images, positive_images, negative_images = [], [], []
for i in range(num_classes):
# Select all images with class i
class_i_images = x_train[y_train == i]
# Create positive pairs within class i
num_samples = len(class_i_images)
anchor_indices = np.random.choice(num_samples, size=num_samples//2, replace=False)
positive_indices = np.random.choice(num_samples, size=num_samples//2, replace=False)
while np.any(anchor_indices == positive_indices):
# Ensure that anchor and positive indices are different
np.random.shuffle(positive_indices)
anchor_images.append(class_i_images[anchor_indices])
positive_images.append(class_i_images[positive_indices])
# Create negative pairs with classes other than i
other_classes = [j for j in range(num_classes) if j != i]
negative_classes = np.random.choice(other_classes, size=num_samples//2, replace=True)
negative_indices = []
for neg_class in negative_classes:
neg_class_images = x_train[y_train == neg_class]
neg_index = np.random.choice(len(neg_class_images))
negative_indices.append(neg_index)
negative_images.append(x_train[negative_indices])
# Concatenate the anchor, positive, and negative sets
anchor_images = np.concatenate(anchor_images)
positive_images = np.concatenate(positive_images)
negative_images = np.concatenate(negative_images)
print(anchor_images.shape)
history = siamese_network.fit(
[anchor_images, positive_images, negative_images],
np.zeros((len(anchor_images),)),
batch_size=batch_size,
epochs=epochs,
validation_split=0.2)
embedding_model.trainable = False
x = embedding_model.output
x = Dense(64, activation='relu')(x)
x = Dropout(0.5)(x)
output = Dense(10, activation='softmax')(x)
classification_model = Model(inputs=embedding_model.input, outputs=output)
classification_model.summary()
classification_model.compile(optimizer=tf.keras.optimizers.legacy.Adam(lr=0.001), loss='categorical_crossentropy', metrics=['accuracy'])
classification_model.fit(x_test, y_test,epochs=epochs, verbose=0)
predicted_classes = classification_model.predict(x_test)
predicted_classes = np.argmax(predicted_classes, axis=1)
true_classes = np.argmax(y_test, axis=1)
print(classification_report(true_classes, predicted_classes))
```