import enum, pickle, typing
import numpy as np
import pandas as pd
from contextual_robustness.utils import set_df_dtypes, _create_output_path, _set_tf_log_level, Timer, _ms_to_human
from abc import ABCMeta, abstractmethod
_set_tf_log_level()
# Default values
DEFAULTS = dict(
eps_lower=0.0,
eps_upper=1.0,
eps_interval=0.002,
verbosity=0,
marabou_verbosity=0
)
# Datatypes for results DataFrame columns
RESULTS_DTYPES = {
'image': np.int64,
'class': np.int64,
'predicted': np.int64,
'epsilon': np.float64,
'upper': np.float64,
'lower': np.float64,
'time': np.int64
}
[docs]class Techniques(enum.Enum):
'''Verification techniques enum'''
TEST='test'
FORMAL='formal'
# Generic type to encompass different ContextualRobustness objects
ContextualRobustness = typing.TypeVar('ContextualRobustness')
class _BaseContextualRobustness(metaclass=ABCMeta):
'''Contains common functionality, properties, and defines abstract methods to be implemented by subclasses.
Args:
model_path (str, optional): Path to saved tensorflow model. Defaults to ''.
model_name (str, optional): Name of model. Defaults to ''.
X (np.array, optional): The images. Defaults to np.array([]).
Y (np.array, optional): Labels for images (onehot encoded). Defaults to np.array([]).
sample_indexes (list[int], optional): List of indexes to test from X. Defaults to [].
transform_fn (callable, optional): The image transform function (required args: x, epsilon). Defaults to lambda x:x.
transform_args (dict, optional): Additional arguments passed to transform_fn. Defaults to dict().
transform_name (str, optional): Name of transform. Defaults to ''.
eps_lower (float, optional): Min possible epsilon. Defaults to 0.0.
eps_upper (float, optional): Max possible epsilon. Defaults to 1.0.
eps_interval (float, optional): Step size between possible epsilons. Defaults to 0.002.
verbosity (int, optional): Amount of logging (0-4). Defaults to 0.
Returns:
ContextualRobustness: the ContextualRobustness object
'''
def __init__(
self,
model_path:str='',
model_name:str='',
transform_fn:callable=lambda x: x,
transform_args:dict=dict(),
transform_name:str='',
X:np.array=np.array([]),
Y:np.array=np.array([]),
sample_indexes:list=[],
eps_lower:float=DEFAULTS['eps_lower'],
eps_upper:float=DEFAULTS['eps_upper'],
eps_interval:float=DEFAULTS['eps_interval'],
verbosity:int=DEFAULTS['verbosity'],
) -> ContextualRobustness:
assert bool(model_path), 'model_path is required'
assert X.shape[0] == Y.shape[0], 'X and Y must have equal number of items'
assert callable(transform_fn), 'transform_fn must be callable (e.g. a function)'
self._verbosity = verbosity
self._model_path = model_path
self._model_name = model_name
self._transform_fn = transform_fn
self._transform_args = transform_args
self._transform_name = transform_name
self._X, self._Y = X, Y
self._sample_indexes = sample_indexes if len(sample_indexes) > 0 else range(X.shape[0])
self._eps_lower = eps_lower
self._eps_upper = eps_upper
self._eps_interval = eps_interval
self._model = self._load_model(model_path)
# find indexes of correctly predicted samples
self._correct_sample_indexes = self._find_correct_sample_indexes(X, Y)
print(f'filtered {len(self._sample_indexes) - len(self._correct_sample_indexes)} incorrectly predicted samples')
# measure accuracy
self._accuracy = len(self._correct_sample_indexes) / len(self._sample_indexes)
print(f'accuracy on {len(self._sample_indexes)} samples is {round(self.accuracy * 100, 2)}%')
# examples of images @ epsilon where network's prediction changed
self._counterexamples = dict()
@property
@abstractmethod
def technique(self) -> Techniques:
'''technique property
Returns:
Techniques: verification technique (e.g. Techniques.TEST or Techniques.FORMAL)
'''
return None
@property
def model_name(self) -> str:
'''model_name property
Returns:
str: name of model
'''
return self._model_name
@property
def transform_name(self) -> str:
'''transform_name property
Returns:
str: name of transform
'''
return self._transform_name
@property
def classes(self) -> typing.List[int]:
'''classes property
Returns:
list: list of integers representing classes in dataset
'''
return sorted(np.unique([np.argmax(self._Y[i]) for i in range(self._Y.shape[0])]))
@property
def dataset(self) -> typing.Tuple[np.array, np.array]:
'''dataset property
Returns:
tuple[np.array, np.array]: tuple containing X and Y
'''
return self._X, self._Y
@property
def image_shape(self) -> typing.Tuple[int, int, int]:
'''image_shape property
Returns:
tuple[int, int, int]: shape of images in X
'''
return self.dataset[0].shape[1:]
@property
def image_size(self) -> typing.Tuple[int, int]:
'''image_size property
Returns:
tuple[int, int]: size of images (width, height)
'''
return self.image_shape[0:2]
@property
def n_pixels(self) -> int:
'''n_pixels property
Returns:
int: number of pixels in each image.
'''
prod = 1
for dim in self.image_shape:
prod *= dim
return prod
@property
def counterexamples(self) -> typing.Dict[str, np.array]:
'''counterexamples property
Returns:
dict[str:np.array]: counterexamples for each image (e.g. {'image1': np.array([...]), ...})
'''
return self._counterexamples
def get_counterexample(self, x_index:int) -> np.array:
'''Gets counterexample for an image by index
Args:
x_index ([int]): index of image in X
Returns:
np.array: the counterexample (or 'None' if does not exist)
'''
return self.counterexamples.get(f'image{x_index}')
def save_counterexample(self, x_index:int, counterexample:np.array):
'''Saves the counterexample for an image
Args:
x_index ([int]): index of image
counterexample ([np.array]): the counterexample to save
'''
self._counterexamples[f'image{x_index}'] = counterexample
def get_num_samples(self, class_index:int=None) -> int:
'''Gets the number of samples under analysis (optionally for a single class using "get_num_samples").
Args:
class_index (int, optional): Index of class to get samples for. Defaults to None.
Returns:
int: Number of samples (for a particular class if class_index is supplied)
'''
if class_index is not None:
return len([si for si in self._sample_indexes if np.argmax(self.dataset[1][si]) == class_index])
return len(self._sample_indexes)
num_samples = property(get_num_samples)
def get_accuracy(self, class_index:int=None) -> float:
'''Gets the accuracy (optionally for a single class using "get_accuracy")
Args:
class_index (int, optional): Index of class to get accuracy for. Defaults to None.
Returns:
float: Accuracy of model on the samples (for a particular class if class_index is supplied)
'''
if class_index is not None:
sample_indexes = [si for si in self._sample_indexes if np.argmax(self._Y[si]) == class_index]
correct_sample_indexes = [i for i in self._correct_sample_indexes if np.argmax(self._Y[i]) == class_index]
return len(correct_sample_indexes) / len(sample_indexes) if len(sample_indexes) > 0 else 0
return self._accuracy
accuracy = property(get_accuracy)
def get_results(self, class_index:int=None, sort_by:list=[]) -> pd.DataFrame:
'''Gets result data (optionally for a single class, and optionally sorted by column using "get_results")
Args:
class_index (int, optional): Index of class to get results for. Defaults to None.
sort_by (list[str], optional): Sorts results by one or more columns. Defaults to [].
Returns:
pd.DataFrame: The results (for a single class if class_index is supplied, and sorted by sort_by)
'''
results = self._results
if class_index is not None:
results = results[results['class'] == class_index]
if len(sort_by) > 0:
results = results.sort_values(by=sort_by)
return results
results = property(get_results)
@abstractmethod
def _find_epsilon(self, x:np.array, y:np.array, index:int=None) -> typing.Tuple[float, float, float, int, np.array]:
'''Finds epsilon for a given image; Abstract method implemented by subclasses
Args:
x ([np.array]): the image
y ([np.array]): categorical (onehot) encoded label for x
index ([int], optional): Index of x in X (used for reference only). Defaults to None.
Returns:
tuple[float, float, float, int, np.array]: Tuple containing (lower, upper, epsilon, predicted_label, counterexample)
'''
pass
@abstractmethod
def _find_correct_sample_indexes(self, X:np.array, Y:np.array) -> typing.List[int]:
'''Finds list of indexes for correctly predicted samples
Args:
X (np.array): The images
Y (np.array): Labels (onehot encoded) for the images
Returns:
list[int]: Indexes of correctly predicted samples from dataset
'''
pass
@abstractmethod
def _load_model(self, model_path:str) -> object:
'''Loads a model; Abstract method implemented by subclasses.
Args:
model_path (str): Path to model
Returns:
object: The model (either tf.keras.Model or MarabouNetwork)
'''
pass
def analyze(self, epsilons_outpath:str='./epsilons.csv', counterexamples_outpath:str='./counterexamples.p') -> ContextualRobustness:
'''Run analysis on the model & transform; Generates results csv and counterexamples pickle.
Args:
epsilons_outpath (str, optional): Path to csv file containing results. Defaults to './epsilons.csv'.
counterexamples_outpath (str, optional): Path to pickle containing counterexamples. Defaults to './counterexamples.p'.
Returns:
ContextualRobustness: the object (self)
'''
print(f'analyzing {self.transform_name} on {len(self._correct_sample_indexes)} samples. this may take some time...')
data = []
for i in self._correct_sample_indexes:
x, y = self._X[i], self._Y[i]
actual_label = np.argmax(y)
timer = Timer(autostart=True)
lower, upper, epsilon, predicted_label, counterexample = self._find_epsilon(x, y, index=i)
data.append({
'image': i,
'class': actual_label,
'predicted': predicted_label,
'epsilon': epsilon,
'lower': lower,
'upper': upper,
'time': timer.end()
})
self.save_counterexample(i, counterexample)
if self._verbosity > 0:
print(f'image:{i}, class:{actual_label}, predcited:{predicted_label}, epsilon:{epsilon}, time:{timer.get_elapsed(as_string=True)}')
# generate dataframe and optionally save results to csv
df = pd.DataFrame(data, columns=('image', 'class', 'predicted', 'epsilon', 'lower', 'upper', 'time'))
self._results = set_df_dtypes(df, RESULTS_DTYPES)
if epsilons_outpath:
_create_output_path(epsilons_outpath)
self._results.to_csv(epsilons_outpath)
if counterexamples_outpath:
_create_output_path(counterexamples_outpath)
with open(counterexamples_outpath, 'wb') as f:
pickle.dump(self.counterexamples, f)
print(f'completed analysis of {df.shape[0]} samples in {_ms_to_human(df["time"].sum())}.')
return self
def load_results(self, epsilons_path:str='', counterexamples_path:str='') -> ContextualRobustness:
'''Load previously saved results
Args:
epsilons_path (str, optional): Path to results csv file. Defaults to ''.
counterexamples_path (str, optional): Path to counterexamples pickle. Defaults to ''.
Returns:
ContextualRobustness: the object (self)
'''
if epsilons_path:
self._results = set_df_dtypes(pd.read_csv(epsilons_path, index_col=0), RESULTS_DTYPES)
if counterexamples_path:
with open(counterexamples_path, 'rb') as f:
self._counterexamples = pickle.load(f)
return self