5. Outlier Detection with Autoencoders

Autoencoders are a type of deep-learning architectures that are used to learn how to compress and decompress data faithfully. The compression layers are referred to as the encoding layers, and the decompression layers are referred to as the decoding layers. Conceptually, when we are encoding data, we are squeezing the n-dimensional data vector to successively smaller dimensionsional space representation. If a vector is 256 in length (or has 256 dimensions), we might have successive layers to encode this vector into 128, 64, 36, 18 and 9 dimensions. These smaller dimensional space are said to be latent space, or higher-order (although they are lower dimensions) space. The second part of the autoencoder architecture is the decoding layers, which is essentially a reversal of the encoding layers. In the example here, we will have layers that builds the 9-dimensional vector to 18, 36, 64, 128 and 256 dimensions. A lot of the information is lost during encoding, but this information that is lost may be looked at as noise or non-essential. Once we are done with encoding, what remains is the essential information to reconstruct the input vector, and the decoding layers attempts to reconstruct the output to be like the input based on the essential information (or latent representation).

Autoencoders may be useful for a variety of things, including, anomaly or outlier/inlier detection. We may train an autoencoder on inliers and compute the expected error for inliers. When a new observation comes through, we feed this observation into the autoencoder and compute its reconstruction error; if it is different above a threshold from the expected error for inliers, then such observation may be considered an outlier. Let’s see how autoencoders may be used to detect outliers and inliers.

5.1. Data

The data is sampled from $$X \sim \mathcal{N}(0, 1)$$.

[1]:

import numpy as np
import pandas as pd
import random as rand

np.random.seed(37)
rand.seed(37)

X = np.random.normal(loc=0, scale=1, size=1_000).reshape(-1, 1)

print(f'X shape = {X.shape}')

X shape = (1000, 1)


5.2. Dataset and Data Loader

We will use PyTorch to build an autoencoder, and as such, will construct a dataset and data loader from the sampled data.

[2]:

import torch
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import *

class GaussianDataset(Dataset):
def __init__(self, X, device, clazz=0):
self.__device = device
self.__clazz = clazz
self.__X = X

def __len__(self):
return self.__X.shape[0]

def __getitem__(self, idx):
item = self.__X[idx,:]

return item, self.__clazz

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

dataset = GaussianDataset(X=X, device=device)

cuda


5.3. Autoencoder architecture

Look at the architecture in this autoencoder; it has 5 encoding layers and 5 decoding layers. All activation layers between these layers are ReLU.

[3]:

from torchvision import datasets
from torchvision import transforms

class AE(torch.nn.Module):
def __init__(self, input_size):
super().__init__()

self.encoder = torch.nn.Sequential(
torch.nn.Linear(input_size, 128),
torch.nn.ReLU(),
torch.nn.Linear(128, 64),
torch.nn.ReLU(),
torch.nn.Linear(64, 36),
torch.nn.ReLU(),
torch.nn.Linear(36, 18),
torch.nn.ReLU(),
torch.nn.Linear(18, 9)
)

self.decoder = torch.nn.Sequential(
torch.nn.Linear(9, 18),
torch.nn.ReLU(),
torch.nn.Linear(18, 36),
torch.nn.ReLU(),
torch.nn.Linear(36, 64),
torch.nn.ReLU(),
torch.nn.Linear(64, 128),
torch.nn.ReLU(),
torch.nn.Linear(128, input_size)
)

def forward(self, x):
encoded = self.encoder(x)
decoded = self.decoder(encoded)
return decoded


5.4. Learning

We will define our loss function to be mean-squared error (MSE) and use the Adam optimizer.

[4]:

model = AE(input_size=X.shape[1]).double().to(device)
loss_function = torch.nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-8)

[5]:

epochs = 20
loss_df = []

for epoch in range(epochs):
losses = []

for (items, _) in data_loader:
items = items.to(device)

reconstructed = model(items)
loss = loss_function(reconstructed, items)

loss.backward()

optimizer.step()

losses.append(loss.detach().cpu().numpy().item())

losses = np.array(losses)

loss_df.append({
'epoch': epoch + 1,
'loss': losses.mean()
})

loss_df = pd.DataFrame(loss_df)
loss_df.index = loss_df['epoch']
loss_df = loss_df.drop(columns=['epoch'])


The average loss over each epoch will differ base on the batch size (set to 64 earlier).

[6]:

import matplotlib.pyplot as plt

plt.style.use('fivethirtyeight')

ax = loss_df['loss'].plot(kind='line', figsize=(15, 4), title='MSE Loss', ylabel='MSE')
_ = ax.set_xticks(list(range(1, 21, 1)))


Once we learn the parameters for the autoencoder, we can predict one observation at a time from the dataset.

[7]:

pd.DataFrame([{'y_true': dataset[r][0][0], 'y_pred': model(torch.from_numpy(dataset[r][0]).to(device)).detach().cpu().item()}
for r in range(10)])

[7]:

y_true y_pred
0 -0.054464 -0.053576
1 0.674308 0.672243
2 0.346647 0.347807
3 -1.300346 -1.301217
4 1.518512 1.517482
5 0.989824 0.993620
6 0.277681 0.275364
7 -0.448589 -0.445697
8 0.961966 0.965153
9 -0.827579 -0.826105

Batch prediction from the data loader is also possible.

[8]:

import itertools

res = ((np.ravel(items.numpy()), np.ravel(model(items.to(device)).detach().cpu().numpy()))
for i, (items, _) in enumerate(data_loader) if i < 10)
res = map(lambda tup: [{'y_true': t, 'y_pred': p} for t, p in zip(tup[0], tup[1])], res)
res = itertools.chain(*res)
res = pd.DataFrame(res)
res

[8]:

y_true y_pred
0 -0.508864 -0.507587
1 2.295016 2.298257
2 0.952336 0.955154
3 0.357611 0.359933
4 -0.800504 -0.800884
... ... ...
635 0.526008 0.524745
636 0.677452 0.675304
637 0.321861 0.320346
638 -1.066025 -1.066037
639 -0.301855 -0.301612

640 rows × 2 columns

5.5. Detecting outliers

Now, let’s see if we can detect outliers with the autoencoder. We will sample 4 data sets as follows.

• $$A \sim \mathcal{N}(\mu_X, \sigma_X)$$

• $$B \sim \mathcal{N}(\mu_X + 1, \sigma_X)$$

• $$C \sim \mathcal{N}(\mu_X + 1, \sigma_X + 2)$$

• $$D \sim \mathcal{N}(\mu_X + 10, \sigma_X)$$

[9]:

A = np.random.normal(loc=0, scale=1, size=1_000).reshape(-1, 1)
B = np.random.normal(loc=1, scale=1, size=1_000).reshape(-1, 1)
C = np.random.normal(loc=1, scale=2, size=1_000).reshape(-1, 1)
D = np.random.normal(loc=10, scale=1, size=1_000).reshape(-1, 1)


The mean average error from the training data $$X$$ may be computed and used as a threshold to consider if a new observation is an outlier or inlier.

[10]:

def predict(m, y_true, device):
y_pred = m(torch.from_numpy(y_true).to(device)).detach().cpu().item()
return {'y_true': y_true[0], 'y_pred': y_pred}

pred_df = pd.DataFrame([predict(model, X[r,:], device) for r in range(X.shape[0])])
mae = np.abs(pred_df.y_true - pred_df.y_pred).mean()
print(f'MAE = {mae:.5f}')

MAE = 0.00175


Detecting outliers from A. Note that 0 indicates inlier and 1 indicates outlier.

[11]:

def detect_outlier(m, X, device, mae):
df = pd.DataFrame([predict(model, X[r,:], device) for r in range(X.shape[0])])
df['error'] = np.abs(df.y_true - df.y_pred)
df['outlier'] = df['error'].apply(lambda e: 1 if e > mae else 0)
return df

detect_outlier(model, A, device, mae).head(n=10).style\
.applymap(lambda s: 'background: rgba(255, 0, 0, 0.5)' if s == 1 else None)

[11]:

y_true y_pred error outlier
0 -0.659954 -0.663455 0.003501 1
1 -0.072388 -0.073349 0.000961 0
2 -0.988276 -0.988742 0.000466 0
3 0.510436 0.509313 0.001123 0
4 0.442500 0.444572 0.002073 1
5 -0.238447 -0.239949 0.001502 0
6 -1.113931 -1.113480 0.000451 0
7 -0.500849 -0.498968 0.001881 1
8 0.486258 0.485348 0.000910 0
9 -0.754691 -0.758206 0.003515 1
[12]:

_ = detect_outlier(model, A, device, mae)['outlier']\
.value_counts()\
.sort_index()\
.plot(kind='bar', title=r'$A \sim \mathcal{N}(\mu_X, \sigma_X)$ outliers')


Detecting outliers from B.

[13]:

detect_outlier(model, B, device, mae).head(n=10).style\
.applymap(lambda s: 'background: rgba(255, 0, 0, 0.5)' if s == 1 else None)

[13]:

y_true y_pred error outlier
0 1.054522 1.052046 0.002475 1
1 -0.118952 -0.120751 0.001799 1
2 0.781712 0.781686 0.000025 0
3 -0.947611 -0.947822 0.000211 0
4 1.566011 1.565508 0.000504 0
5 1.937544 1.936715 0.000829 0
6 -1.634584 -1.636273 0.001689 0
7 1.241298 1.240053 0.001245 0
8 1.521333 1.520347 0.000986 0
9 3.097915 3.075820 0.022095 1
[14]:

_ = detect_outlier(model, B, device, mae)['outlier']\
.value_counts()\
.sort_index()\
.plot(kind='bar', title=r'$B \sim \mathcal{N}(\mu_X + 1, \sigma_X)$ outliers')


Detecting outliers from C.

[15]:

detect_outlier(model, C, device, mae).head(n=10).style\
.applymap(lambda s: 'background: rgba(255, 0, 0, 0.5)' if s == 1 else None)

[15]:

y_true y_pred error outlier
0 -1.326775 -1.327718 0.000943 0
1 0.833414 0.833913 0.000500 0
2 -0.872608 -0.871269 0.001340 0
3 3.983235 3.870642 0.112593 1
4 -1.085848 -1.085467 0.000382 0
5 0.385625 0.389884 0.004259 1
6 2.213190 2.214973 0.001783 1
7 0.457719 0.458371 0.000652 0
8 2.334519 2.337720 0.003201 1
9 -0.450000 -0.447052 0.002949 1
[16]:

_ = detect_outlier(model, C, device, mae)['outlier']\
.value_counts()\
.sort_index()\
.plot(kind='bar', title=r'$C \sim \mathcal{N}(\mu_X + 1, \sigma_X + 2)$ outliers')


Detecting outliers from D.

[17]:

detect_outlier(model, D, device, mae).head(n=10).style\
.applymap(lambda s: 'background: rgba(255, 0, 0, 0.5)' if s == 1 else None)

[17]:

y_true y_pred error outlier
0 11.090525 10.215066 0.875459 1
1 9.474894 8.776244 0.698650 1
2 9.906585 9.160721 0.745864 1
3 10.568700 9.750421 0.818279 1
4 8.904352 8.268102 0.636249 1
5 10.612258 9.789214 0.823043 1
6 7.439080 6.959486 0.479594 1
7 10.321630 9.530373 0.791257 1
8 10.054895 9.292811 0.762084 1
9 9.925478 9.177548 0.747930 1
[18]:

_ = detect_outlier(model, D, device, mae)['outlier']\
.value_counts()\
.sort_index()\
.plot(kind='bar', title=r'$D \sim \mathcal{N}(\mu_X + 10, \sigma_X)$ outliers')


As we get get farther from $$X$$, going from $$A$$ to $$D$$, more and more of the samples are considered as outliers.