Skip to content
Snippets Groups Projects
Commit 5da2f821 authored by Tamara Stugan's avatar Tamara Stugan
Browse files

Upload New File

parent 63246233
Branches
No related merge requests found
%% Cell type:markdown id: tags:
Targeted attack, no defense
%% Cell type:code id: tags:
```
%matplotlib inline
import matplotlib.pyplot as plt
import tensorflow as tf
import copy
import numpy as np
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
# Set the random seeds for reproducibility
tf.random.set_seed(42)
np.random.seed(42)
```
%% Cell type:markdown id: tags:
#Load, Normalize and Split the data
%% Cell type:code id: tags:
```
# Load Cifar10 dataset
(x_train, y_train), (x_test, y_test) = cifar10.load_data()
# Concatenate train and test sets
x = np.concatenate((x_train, x_test))
y = np.concatenate((y_train, y_test))
# Normalize the images
x = x.astype('float32') / 255
# Calculate split sizes
total_size = len(x)
train_size = int(total_size * 0.70)
val_size = int(total_size * 0.20)
test_size = total_size - train_size - val_size
# Split the dataset
x_train, x_val, x_test = x[:train_size], x[train_size:train_size+val_size], x[train_size+val_size:]
y_train, y_val, y_test = y[:train_size], y[train_size:train_size+val_size], y[train_size+val_size:]
# One-hot encode the labels - do this before modeling
#y_train = to_categorical(y_train, 10)
#y_val = to_categorical(y_val, 10)
#y_test = to_categorical(y_test, 10)
# Check the shapes
print(f'x_train shape: {x_train.shape}, y_train shape: {y_train.shape}')
print(f'x_val shape: {x_val.shape}, y_val shape: {y_val.shape}')
print(f'x_test shape: {x_test.shape}, y_test shape: {y_test.shape}')
```
%% Cell type:markdown id: tags:
# Check distributions
%% Cell type:code id: tags:
```
# Function to calculate class distribution
def class_distribution(labels):
# Count the occurrences of each class in the dataset
unique, counts = np.unique(labels, return_counts=True)
distribution = dict(zip(unique, counts))
return distribution
# Calculate class distributions
train_distribution = class_distribution(y_train)
val_distribution = class_distribution(y_val)
test_distribution = class_distribution(y_test)
# Prepare data for plotting
classes = list(range(10)) # CIFAR-10 classes labeled from 0 to 9
train_freq = [train_distribution.get(i, 0) for i in classes]
val_freq = [val_distribution.get(i, 0) for i in classes]
test_freq = [test_distribution.get(i, 0) for i in classes]
# Plotting the distributions
plt.figure(figsize=(15, 5))
# Training set distribution
plt.subplot(1, 3, 1)
plt.bar(classes, train_freq)
plt.title('Training Set Distribution')
plt.xlabel('Class')
plt.ylabel('Frequency')
# Validation set distribution
plt.subplot(1, 3, 2)
plt.bar(classes, val_freq)
plt.title('Validation Set Distribution')
plt.xlabel('Class')
plt.ylabel('Frequency')
# Test set distribution
plt.subplot(1, 3, 3)
plt.bar(classes, test_freq)
plt.title('Test Set Distribution')
plt.xlabel('Class')
plt.ylabel('Frequency')
plt.tight_layout()
plt.show()
```
%% Cell type:markdown id: tags:
# Generate sample images
%% Cell type:code id: tags:
```
# CIFAR-10 classes
class_names = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
# Display the first few images
plt.figure(figsize=(10,10))
for i in range(25):
plt.subplot(5, 5, i+1)
plt.xticks([])
plt.yticks([])
plt.grid(False)
plt.imshow(x_train[i], interpolation='nearest', aspect='auto')
plt.xlabel(class_names[y_train[i][0]])
plt.show()
```
%% Cell type:code id: tags:
```
#Before modeling and poisoning, one-hot encode y datasets
y_train = to_categorical(y_train, 10)
y_val = to_categorical(y_val, 10)
y_test = to_categorical(y_test, 10)
```
%% Cell type:markdown id: tags:
# Poison the training data
%% Cell type:code id: tags:
```
# Function to add a backdoor trigger to an image
def add_backdoor(x):
backdoor_pattern = np.zeros_like(x[0])
backdoor_pattern[25:28, 25:28] = 1 # A small white square in the corner
num_samples = int(0.8 * x.shape[0]) # 20% of the dataset
for i in range(num_samples):
x[i] += backdoor_pattern
return x
#Insert backdoor
x_train = add_backdoor(x_train)
```
%% Cell type:markdown id: tags:
# Defense: Apply augmentation to poisoned training data
%% Cell type:markdown id: tags:
prob parameter - determines the likelihood of applying CutMix to any given pair of images. If a randomly generated number is greater than prob, the function returns the original images and labels
without any change.
alpha - parameter for the Beta distribution used to sample the mixing ratio lambda. A common starting point is to set alpha around 0.2 to 1.0. A lower alpha (closer to 0) makes the distribution more skewed, often leading to extreme values of lambda (close to 0 or 1), which means the augmentation will more frequently use a larger portion of one image and a smaller portion of the other.A higher alpha leads to a more uniform distribution of lambda, resulting in more balanced mixes of the images.
lam - mixing ratio calculated using the Beta distribution (np.random.beta(alpha, alpha)). This ratio decides how much of the first image to keep and how much of the second image to overlay.The function randomly selects indices (idx) to shuffle
the batch of images, which helps in picking another image from the batch to combine with the current one.
cut region - random coordinates (rx, ry) and dimensions (rh, rw) are generated for the region to be cut from the first image and filled with a part of the second image. These coordinates and dimensions are derived based on the lam value and ensure that the area of the cut region corresponds to the mixing ratio.
binary mask - created to specify which part of the image will be taken from the first image and which part from the second. This mask is of the same dimensions as the images. The images are mixed using the mask. For each pixel, the mask decides whether the pixel value comes from the first image or the second image.
mixing labels - along with the images, the labels are also mixed. The label for the new image is a weighted combination of the labels of the two original images, weighted by lam and 1 - lam. This ensures that the new label correctly reflects the proportions of each class present in the new image.
%% Cell type:code id: tags:
```
def cutmix(image, label, prob=0.7, alpha=1.0):
if tf.random.uniform([]) > prob:
return image, label
# Lambda
lam = np.random.beta(alpha, alpha)
# Randomly choose another image
batch_size = tf.shape(image)[0]
idx = tf.random.shuffle(tf.range(batch_size))
# Choose the region
height, width = tf.shape(image)[1], tf.shape(image)[2]
rx, ry = tf.random.uniform(shape=[], minval=0, maxval=tf.cast(width, tf.float32)), tf.random.uniform(shape=[], minval=0, maxval=tf.cast(height, tf.float32))
rh, rw = tf.sqrt(1.0 - lam) * tf.cast(height, tf.float32), tf.sqrt(1.0 - lam) * tf.cast(width, tf.float32)
x1, y1 = tf.cast(tf.maximum(rx - rw / 2, 0), tf.int32), tf.cast(tf.maximum(ry - rh / 2, 0), tf.int32)
x2, y2 = tf.cast(tf.minimum(rx + rw / 2, tf.cast(width, tf.float32)), tf.int32), tf.cast(tf.minimum(ry + rh / 2, tf.cast(height, tf.float32)), tf.int32)
# Create the mask
mask = tf.cast(tf.logical_and(tf.range(width, dtype=tf.float32)[None, :] >= tf.cast(x1, tf.float32), tf.range(width, dtype=tf.float32)[None, :] <= tf.cast(x2, tf.float32)), tf.float32)
mask *= tf.cast(tf.logical_and(tf.range(height, dtype=tf.float32)[:, None] >= tf.cast(y1, tf.float32), tf.range(height, dtype=tf.float32)[:, None] <= tf.cast(y2, tf.float32)), tf.float32)
# Mix images and labels
image2 = tf.gather(image, idx)
label2 = tf.gather(label, idx)
images = image * (1 - mask[:, :, None]) + image2 * mask[:, :, None]
labels = label * lam + label2 * (1 - lam)
return images, labels
```
%% Cell type:markdown id: tags:
CutMix data augmentation is a technique where parts of images and their corresponding labels are mixed, creating a new set of images and labels. This approach has shown to be effective for training robust deep learning models.
%% Cell type:code id: tags:
```
# Applying CutMix to the training data
def apply_cutmix(img, lbl):
return cutmix(img, lbl, prob=0.7) # Adjust probability as needed
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
train_dataset = train_dataset.shuffle(10000).batch(32).map(apply_cutmix).prefetch(tf.data.AUTOTUNE)
val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val)).batch(32)
```
%% Cell type:markdown id: tags:
# Train model on poisoned data and check perfomance on clean test data
%% Cell type:code id: tags:
```
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.models import Sequential
model = Sequential()
model.add(Conv2D(32, (3, 3), activation='relu', padding='same', input_shape=(32, 32, 3)))
model.add(BatchNormalization())
model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(MaxPooling2D(2, 2))
model.add(Dropout(0.2))
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(BatchNormalization())
model.add(MaxPooling2D(2, 2))
model.add(Dropout(0.3))
model.add(Flatten())
model.add(Dense(512, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.001)))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))
# Compile the model
adam = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(loss='categorical_crossentropy', optimizer=adam, metrics=['accuracy'])
```
%% Cell type:code id: tags:
```
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
checkpoint = ModelCheckpoint("./model1.h5", monitor='val_acc', verbose=1, save_best_only=True, mode='max')
early_stopping = EarlyStopping(monitor = 'val_loss',
min_delta = 0,
patience = 3,
verbose = 1,
restore_best_weights = True
)
reduce_learningrate = ReduceLROnPlateau(monitor = 'val_loss',
factor = 0.2,
patience = 3,
verbose = 1,
min_delta = 0.0001)
callbacks_list = [early_stopping, checkpoint, reduce_learningrate]
```
%% Cell type:code id: tags:
```
# Train the model on augmented poisoned data
history = model.fit(train_dataset, epochs=50, validation_data=val_dataset, callbacks = callbacks_list)
# Evaluate on clean data
loss, accuracy = model.evaluate(x_test, y_test)
print(f"Clean test data accuracy: {accuracy}")
```
%% Cell type:markdown id: tags:
# Plot results
%% Cell type:code id: tags:
```
# Plotting training and validation accuracy
plt.figure(figsize=(8, 4))
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Training and Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.show()
```
%% Cell type:code id: tags:
```
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
y_pred = model.predict(x_test)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = np.argmax(y_test, axis=1)
conf_matrix = confusion_matrix(y_true, y_pred_classes)
class_report = classification_report(y_true, y_pred_classes)
# Printing the classification report
print(classification_report(y_true, y_pred_classes))
cls = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
# Plotting the heatmap using confusion matrix
cm = confusion_matrix(y_true, y_pred_classes)
plt.figure(figsize = (8, 5))
sns.heatmap(cm, annot = True, fmt = '.0f', xticklabels = cls, yticklabels = cls)
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()
```
%% Cell type:code id: tags:
```
```
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment