123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- from __future__ import\
- absolute_import, print_function, division, unicode_literals
- import pytest
- from keras.datasets import mnist
- import keras.layers
- import keras.models
- from keras.models import Model
- import keras.optimizers
- import numpy as np
- import unittest
- from innvestigate.utils.tests import dryrun
- import innvestigate
- from innvestigate.tools import PatternComputer
- @pytest.mark.fast
- @pytest.mark.precommit
- def test_fast__PatternComputer_dummy_parallel():
- def method(model):
- return PatternComputer(model, pattern_type="dummy",
- compute_layers_in_parallel=True)
- dryrun.test_pattern_computer(method, "mnist.log_reg")
- @pytest.mark.skip("Feature not supported.")
- @pytest.mark.fast
- @pytest.mark.precommit
- def test_fast__PatternComputer_dummy_sequential():
- def method(model):
- return PatternComputer(model, pattern_type="dummy",
- compute_layers_in_parallel=False)
- dryrun.test_pattern_computer(method, "mnist.log_reg")
- @pytest.mark.fast
- @pytest.mark.precommit
- def test_fast__PatternComputer_linear():
- def method(model):
- return PatternComputer(model, pattern_type="linear")
- dryrun.test_pattern_computer(method, "mnist.log_reg")
- @pytest.mark.precommit
- def test_precommit__PatternComputer_linear():
- def method(model):
- return PatternComputer(model, pattern_type="linear")
- dryrun.test_pattern_computer(method, "mnist.*")
- @pytest.mark.fast
- @pytest.mark.precommit
- def test_fast__PatternComputer_relupositive():
- def method(model):
- return PatternComputer(model, pattern_type="relu.positive")
- dryrun.test_pattern_computer(method, "mnist.log_reg")
- @pytest.mark.precommit
- def test_precommit__PatternComputer_relupositive():
- def method(model):
- return PatternComputer(model, pattern_type="relu.positive")
- dryrun.test_pattern_computer(method, "mnist.*")
- @pytest.mark.fast
- @pytest.mark.precommit
- def test_fast__PatternComputer_relunegative():
- def method(model):
- return PatternComputer(model, pattern_type="relu.negative")
- dryrun.test_pattern_computer(method, "mnist.log_reg")
- @pytest.mark.precommit
- def test_precommit__PatternComputer_relunegative():
- def method(model):
- return PatternComputer(model, pattern_type="relu.negative")
- dryrun.test_pattern_computer(method, "mnist.*")
- @pytest.mark.fast
- @pytest.mark.precommit
- class HaufePatternExample(unittest.TestCase):
- def test(self):
- np.random.seed(234354346)
-
- n = 1000
- a_s = np.asarray([1, 0]).reshape((1, 2))
- a_d = np.asarray([1, 1]).reshape((1, 2))
- y = np.random.uniform(size=(n, 1))
- eps = np.random.rand(n, 1)
- X = y * a_s + eps * a_d
- model = keras.models.Sequential(
- [keras.layers.Dense(1, input_shape=(2,), use_bias=True), ]
- )
- model.compile(optimizer=keras.optimizers.Adam(lr=1), loss="mse")
- model.fit(X, y, epochs=20, verbose=0).history
- self.assertTrue(model.evaluate(X, y, verbose=0) < 0.05)
- pc = PatternComputer(model, pattern_type="linear")
- A = pc.compute(X)[0]
- W = model.get_weights()[0]
-
-
- def allclose(a, b):
- return np.allclose(a, b, rtol=0.05, atol=0.05)
-
- self.assertTrue(allclose(a_d.ravel(), abs(W.ravel())))
-
- self.assertTrue(allclose(a_s.ravel(), A.ravel()))
- def fetch_data():
-
- (x_train, y_train), (x_test, y_test) = mnist.load_data()
- x_train = (x_train.reshape(60000, 1, 28, 28) - 127.5) / 127.5
- x_test = (x_test.reshape(10000, 1, 28, 28) - 127.5) / 127.5
- x_train = x_train.astype('float32')
- x_test = x_test.astype('float32')
- return x_train[:100], y_train[:100], x_test[:10], y_test[:10]
- def create_model(clazz):
- num_classes = 10
- network = clazz(
- (None, 1, 28, 28),
- num_classes,
- dense_units=1024,
- dropout_rate=0.25)
- model_wo_sm = Model(inputs=network["in"], outputs=network["out"])
- model_w_sm = Model(inputs=network["in"], outputs=network["sm_out"])
- return model_wo_sm, model_w_sm
- def train_model(model, data, epochs=20):
- batch_size = 128
- num_classes = 10
- x_train, y_train, x_test, y_test = data
-
- y_train = keras.utils.to_categorical(y_train, num_classes)
- y_test = keras.utils.to_categorical(y_test, num_classes)
- model.compile(loss='categorical_crossentropy',
- optimizer=keras.optimizers.RMSprop(),
- metrics=['accuracy'])
- model.fit(x_train, y_train,
- batch_size=batch_size,
- epochs=epochs,
- verbose=0)
- model.evaluate(x_test, y_test, batch_size=batch_size, verbose=0)
- @pytest.mark.fast
- @pytest.mark.precommit
- class MnistPatternExample_dense_linear(unittest.TestCase):
- def test(self):
- np.random.seed(234354346)
- model_class = innvestigate.utils.tests.networks.base.mlp_2dense
- data = fetch_data()
- model, modelp = create_model(model_class)
- train_model(modelp, data, epochs=10)
- model.set_weights(modelp.get_weights())
- analyzer = innvestigate.create_analyzer("pattern.net", model,
- pattern_type="linear")
- analyzer.fit(data[0], batch_size=256, verbose=0)
- patterns = analyzer._patterns
- W = model.get_weights()[0]
- W2D = W.reshape((-1, W.shape[-1]))
- X = data[0].reshape((data[0].shape[0], -1))
- Y = np.dot(X, W2D)
- def safe_divide(a, b):
- return a / (b + (b == 0))
- mean_x = X.mean(axis=0)
- mean_y = Y.mean(axis=0)
- mean_xy = np.dot(X.T, Y) / Y.shape[0]
- ExEy = mean_x[:, None] * mean_y[None, :]
- cov_xy = mean_xy - ExEy
- w_cov_xy = np.diag(np.dot(W2D.T, cov_xy))
- A = safe_divide(cov_xy, w_cov_xy[None, :])
- def allclose(a, b):
- return np.allclose(a, b, rtol=0.05, atol=0.05)
-
- self.assertTrue(allclose(A.ravel(), patterns[0].ravel()))
- @pytest.mark.fast
- @pytest.mark.precommit
- class MnistPatternExample_dense_relu(unittest.TestCase):
- def test(self):
- np.random.seed(234354346)
- model_class = innvestigate.utils.tests.networks.base.mlp_2dense
- data = fetch_data()
- model, modelp = create_model(model_class)
- train_model(modelp, data, epochs=10)
- model.set_weights(modelp.get_weights())
- analyzer = innvestigate.create_analyzer("pattern.net", model,
- pattern_type="relu")
- analyzer.fit(data[0], batch_size=256, verbose=0)
- patterns = analyzer._patterns
- W, b = model.get_weights()[:2]
- W2D = W.reshape((-1, W.shape[-1]))
- X = data[0].reshape((data[0].shape[0], -1))
- Y = np.dot(X, W2D)
- mask = np.dot(X, W2D) + b > 0
- count = mask.sum(axis=0)
- def safe_divide(a, b):
- return a / (b + (b == 0))
- mean_x = safe_divide(np.dot(X.T, mask), count)
- mean_y = Y.mean(axis=0)
- mean_xy = safe_divide(np.dot(X.T, Y * mask), count)
- ExEy = mean_x * mean_y
- cov_xy = mean_xy - ExEy
- w_cov_xy = np.diag(np.dot(W2D.T, cov_xy))
- A = safe_divide(cov_xy, w_cov_xy[None, :])
- def allclose(a, b):
- return np.allclose(a, b, rtol=0.05, atol=0.05)
-
- self.assertTrue(allclose(A.ravel(), patterns[0].ravel()))
|