341 lines
12 KiB
Python
341 lines
12 KiB
Python
import numpy as np
|
|
#--------------------
|
|
# define kernel layer
|
|
#--------------------
|
|
class kernel_layer:
|
|
def __init__(self, kernel=0):
|
|
self.fwd, self.bwd = self.construct_kernel(kernel)
|
|
|
|
def construct_kernel(self, kernel):
|
|
if kernel == 0:
|
|
fwd = np.diag(np.ones(8))*8 - 1
|
|
fwd[7,:] = -1
|
|
fwd[:,7] = 1
|
|
|
|
bwd = np.diag(np.ones(8))
|
|
bwd[:,7] = -1
|
|
bwd[7,:] = 1
|
|
bwd*=1/8
|
|
return fwd,bwd
|
|
elif kernel == 1:
|
|
#this kernel has unit determinant
|
|
c = 2**(9/7)
|
|
a = 8
|
|
fwd = 1/a*np.array([[ c, c, c, c, c, -c, -c, a],
|
|
[ c, -c, -c, c, -c, c, -c, a],
|
|
[ c, c, -c, -c, c, c, c, a],
|
|
[ c, -c, c, -c, -c, -c, c, a],
|
|
[ -c, c, c, c, -c, c, c, a],
|
|
[ -c, -c, -c, c, c, -c, c, a],
|
|
[ -c, c, -c, -c, -c, -c, -c, a],
|
|
[ -c, -c, c, -c, c, c, -c, a]])
|
|
|
|
bwd = np.array([[ 1/c, 1/c, 1/c, 1/c, -1/c, -1/c, -1/c, -1/c],
|
|
[ 1/c, -1/c, 1/c, -1/c, 1/c, -1/c, 1/c, -1/c],
|
|
[ 1/c, -1/c, -1/c, 1/c, 1/c, -1/c, -1/c, 1/c],
|
|
[ 1/c, 1/c, -1/c, -1/c, 1/c, 1/c, -1/c, -1/c],
|
|
[ 1/c, -1/c, 1/c, -1/c, -1/c, 1/c, -1/c, 1/c],
|
|
[ -1/c, 1/c, 1/c, -1/c, 1/c, -1/c, -1/c, 1/c],
|
|
[ -1/c, -1/c, 1/c, 1/c, 1/c, 1/c, -1/c, -1/c],
|
|
[ 1/a, 1/a, 1/a, 1/a, 1/a, 1/a, 1/a, 1/a]])
|
|
|
|
return fwd,bwd
|
|
else:
|
|
print("Error in construct_kernel: kernel nr" +str(kernel) +" not implemented!")
|
|
|
|
def apply_fwd(self, zz):
|
|
xx = np.zeros(np.shape(zz))
|
|
#apply bwd kernel to all sub-elements of layer
|
|
for l in range(8):
|
|
for m in range(8):
|
|
xx[:,:,:,l] += self.fwd[l,m]*zz[:,:,:,m]
|
|
|
|
x = np.zeros((np.shape(zz)[0]*2,np.shape(zz)[1]*2,np.shape(zz)[2]*2))
|
|
x[::2,::2,::2] = xx[:,:,:,0]
|
|
x[1::2,::2,::2] = xx[:,:,:,1]
|
|
x[::2,1::2,::2] = xx[:,:,:,2]
|
|
x[::2,::2,1::2] = xx[:,:,:,3]
|
|
x[1::2,1::2,::2] = xx[:,:,:,4]
|
|
x[::2,1::2,1::2] = xx[:,:,:,5]
|
|
x[1::2,::2,1::2] = xx[:,:,:,6]
|
|
x[1::2,1::2,1::2]= xx[:,:,:,7]
|
|
|
|
return x
|
|
|
|
def apply_bwd(self, x):
|
|
|
|
xx = np.zeros((np.shape(x)[0]//2,np.shape(x)[1]//2,np.shape(x)[2]//2,8,))
|
|
|
|
xx[:,:,:,0] = x[::2,::2,::2]
|
|
xx[:,:,:,1] = x[1::2,::2,::2]
|
|
xx[:,:,:,2] = x[::2,1::2,::2]
|
|
xx[:,:,:,3] = x[::2,::2,1::2]
|
|
xx[:,:,:,4] = x[1::2,1::2,::2]
|
|
xx[:,:,:,5] = x[::2,1::2,1::2]
|
|
xx[:,:,:,6] = x[1::2,::2,1::2]
|
|
xx[:,:,:,7] = x[1::2,1::2,1::2]
|
|
|
|
zz = np.zeros((np.shape(x)[0]//2,np.shape(x)[1]//2,np.shape(x)[2]//2,8,))
|
|
#apply bwd kernel to all sub-elements of layer
|
|
for l in range(8):
|
|
for m in range(8):
|
|
zz[:,:,:,l] += self.bwd[l,m]*xx[:,:,:,m]
|
|
|
|
return zz
|
|
|
|
def apply(self,u, direction='fwd'):
|
|
if direction=='fwd':
|
|
return self.apply_fwd(u)
|
|
else:
|
|
return self.apply_bwd(u)
|
|
|
|
#--------------------
|
|
class base_layer:
|
|
def __init__(self, inputshape = (1,1,1),sigma2_init=1):
|
|
#print(sigma2_init)
|
|
self.inputshape = inputshape
|
|
self.outputshape = inputshape
|
|
|
|
#initialize layer params (learnable parameter)
|
|
self.b = np.zeros(self.inputshape)
|
|
self.w = np.ones(self.inputshape)*np.sqrt(sigma2_init)
|
|
|
|
def log_p(self,x):
|
|
return x,np.sum(-0.5*((x-self.b)/self.w)**2)
|
|
|
|
def generate_forward(self,x):
|
|
#generate random field
|
|
return self.w*np.random.normal(0,1,self.inputshape) + self.b
|
|
|
|
def generate_forward_select(self,x, select=True):
|
|
|
|
res = self.w*np.random.normal(0,1,self.inputshape) + self.b
|
|
|
|
if(select==False):
|
|
res*=0
|
|
#generate random field
|
|
return res
|
|
|
|
def reduce_backward(self,x):
|
|
return x
|
|
#--------------------
|
|
class multi_scale_layer:
|
|
def __init__(self, inputshape =(1,1,1),sigma2_init=1, kernel=1):
|
|
#print(sigma2_init)
|
|
self.inputshape = inputshape
|
|
self.outputshape = tuple([(l * 2) for l in inputshape])
|
|
|
|
#invertible kernel
|
|
self.kernel = kernel_layer(kernel=kernel)
|
|
|
|
#initialize layer params (learnable parameter)
|
|
self.a = np.zeros(self.inputshape + (7,)) # biases of the model.
|
|
self.b = np.zeros(self.inputshape + (8,)) # biases of the model.
|
|
self.w = np.ones(self.inputshape + (7,7)) # fully connected weights of cholesky decomposition.
|
|
self.Minv = np.ones(self.inputshape + (7,7))
|
|
#We initialize the layer with conditional white noise
|
|
M = np.linalg.inv(np.matmul(self.kernel.fwd.T,self.kernel.fwd)[0:7,0:7])*sigma2_init
|
|
self.Minv[:,:,:] = np.linalg.inv(M)
|
|
self.w[:,:,:] = np.linalg.cholesky(M).T
|
|
|
|
def log_p_field(self,x):
|
|
res = 0.
|
|
z = self.kernel.apply(x, direction='bwd')
|
|
for l in range(7):
|
|
for m in range(7):
|
|
res += (z[:,:,:,m]-self.b[:,:,:,m])*self.Minv[:,:,:,m,l]*(z[:,:,:,l]-self.b[:,:,:,l])
|
|
|
|
return z[:,:,:,7],-0.5*res
|
|
|
|
def log_p(self,x):
|
|
mu, logp = self.log_p_field(x)
|
|
|
|
return mu,np.sum(logp)
|
|
|
|
def generate_forward(self,mu):
|
|
|
|
zz = np.zeros(self.inputshape+(8,))
|
|
|
|
#Not very nice, need to change this to vectorization
|
|
#generate white noise field
|
|
wn = np.random.normal(0,1,self.inputshape+(7,))
|
|
yy = np.zeros(np.shape(wn))
|
|
for l in range(7):
|
|
for m in range(7):
|
|
yy[:,:,:,l] += self.w[:,:,:,m,l] * wn[:,:,:,m]
|
|
|
|
zz[:,:,:,0] = mu*self.a[:,:,:,0] + self.b[:,:,:,0] + yy[:,:,:,0]
|
|
zz[:,:,:,1] = mu*self.a[:,:,:,1] + self.b[:,:,:,1] + yy[:,:,:,1]
|
|
zz[:,:,:,2] = mu*self.a[:,:,:,2] + self.b[:,:,:,2] + yy[:,:,:,2]
|
|
zz[:,:,:,3] = mu*self.a[:,:,:,3] + self.b[:,:,:,3] + yy[:,:,:,3]
|
|
zz[:,:,:,4] = mu*self.a[:,:,:,4] + self.b[:,:,:,4] + yy[:,:,:,4]
|
|
zz[:,:,:,5] = mu*self.a[:,:,:,5] + self.b[:,:,:,5] + yy[:,:,:,5]
|
|
zz[:,:,:,6] = mu*self.a[:,:,:,6] + self.b[:,:,:,6] + yy[:,:,:,6]
|
|
zz[:,:,:,7] = mu
|
|
|
|
return self.kernel.apply(zz,direction='fwd')
|
|
|
|
#--------------------
|
|
class multi_scale_model:
|
|
def __init__(self, nlevel=2, kernel=1):
|
|
self.nlevel=nlevel
|
|
self.model = self.construct(nlevel=nlevel, kernel=kernel)
|
|
|
|
def construct(self,nlevel=2, kernel=1):
|
|
model = []
|
|
sigma2_base = (1./8.)**nlevel
|
|
inputshape = (1,1,1)
|
|
model.append(base_layer(inputshape,sigma2_init=sigma2_base))
|
|
for i in np.arange(nlevel):
|
|
sigma2_base*=8.
|
|
model.append(multi_scale_layer(inputshape,sigma2_init=sigma2_base, kernel=kernel))
|
|
inputshape = tuple([(l * 2) for l in inputshape])
|
|
return model
|
|
|
|
def logp(self,x):
|
|
#work through reversed model list and ignore base layer
|
|
aux = np.copy(x)
|
|
LOGP=[]
|
|
for m in self.model[::-1]:
|
|
aux, logp = m.log_p(aux)
|
|
LOGP.append(logp)
|
|
return LOGP
|
|
|
|
def logp_multi_layer_field(self,x):
|
|
#returns the field probability values
|
|
#Note, that field values are always given jointly for the 8 cell voxels
|
|
LOGP=[]
|
|
aux = np.copy(x)
|
|
for m in self.model[::-1]:
|
|
aux, logp = m.log_p_field(aux)
|
|
LOGP.append(logp)
|
|
return LOGP
|
|
|
|
def logp_multi_layer(self,x):
|
|
#work through reversed model list and ignore base layer
|
|
LOGP=[]
|
|
cnt=0
|
|
for m in self.model:
|
|
aux, logp = m.log_p(x[cnt])
|
|
LOGP.append(logp)
|
|
cnt+=1
|
|
return LOGP
|
|
|
|
def generate(self):
|
|
x = []
|
|
a = np.zeros((1,1,1))
|
|
for m in self.model:
|
|
a = m.generate_forward(a)
|
|
x.append(a)
|
|
return x
|
|
|
|
def get_matrix_inv(self, M):
|
|
|
|
M_inv = np.zeros(np.shape(M))
|
|
|
|
for i in range(np.shape(M)[0]):
|
|
for j in range(np.shape(M)[1]):
|
|
for k in range(np.shape(M)[2]):
|
|
M_inv[i,j,k] = np.linalg.inv(M[i,j,k])
|
|
return M_inv
|
|
|
|
def get_matrix_sqrt(self, M):
|
|
|
|
M_sqrt = np.zeros(np.shape(M))
|
|
|
|
for i in range(np.shape(M)[0]):
|
|
for j in range(np.shape(M)[1]):
|
|
for k in range(np.shape(M)[2]):
|
|
M_sqrt[i,j,k] = np.linalg.cholesky(M[i,j,k]).T
|
|
return M_sqrt
|
|
|
|
def train_multi_scale_layer(self, m, X_train):
|
|
nsamp = len(X_train)
|
|
b_z = np.zeros(m.inputshape + (8,))
|
|
M_z = np.zeros(m.inputshape + (8,8))
|
|
|
|
#calculate mean
|
|
for x in X_train:
|
|
z = m.kernel.apply(x, direction='bwd')
|
|
b_z += z/nsamp
|
|
|
|
#calculate covariance matrix
|
|
mu_train=[]
|
|
for x in X_train:
|
|
z = m.kernel.apply(x, direction='bwd')
|
|
mu_train.append(z[:,:,:,7])
|
|
for i in range(8):
|
|
for j in range(8):
|
|
M_z[:,:,:,i,j] += (z[:,:,:,i]-b_z[:,:,:,i]) * (z[:,:,:,j]-b_z[:,:,:,j])/(nsamp-1)
|
|
|
|
M_z_inv = self.get_matrix_inv(M_z)
|
|
|
|
R_inv = M_z_inv[:,:,:,0:7,0:7]
|
|
m.Minv = np.copy(R_inv)
|
|
R = self.get_matrix_inv(R_inv)
|
|
|
|
for n in range(7):
|
|
for l in range(7):
|
|
m.a[:,:,:,n] += -M_z_inv[:,:,:,7,l]*R[:,:,:,l,n]
|
|
|
|
m.w = self.get_matrix_sqrt(R)
|
|
|
|
m.b[:,:,:,0] = b_z[:,:,:,0] - m.a[:,:,:,0]*b_z[:,:,:,7]
|
|
m.b[:,:,:,1] = b_z[:,:,:,1] - m.a[:,:,:,1]*b_z[:,:,:,7]
|
|
m.b[:,:,:,2] = b_z[:,:,:,2] - m.a[:,:,:,2]*b_z[:,:,:,7]
|
|
m.b[:,:,:,3] = b_z[:,:,:,3] - m.a[:,:,:,3]*b_z[:,:,:,7]
|
|
m.b[:,:,:,4] = b_z[:,:,:,4] - m.a[:,:,:,4]*b_z[:,:,:,7]
|
|
m.b[:,:,:,5] = b_z[:,:,:,5] - m.a[:,:,:,5]*b_z[:,:,:,7]
|
|
m.b[:,:,:,6] = b_z[:,:,:,6] - m.a[:,:,:,6]*b_z[:,:,:,7]
|
|
|
|
return mu_train
|
|
|
|
def train_base_layer(self,m, X_train):
|
|
b=0
|
|
w=0
|
|
nsamp = len(X_train)
|
|
#calculate mean
|
|
for x in X_train:
|
|
b += x/nsamp
|
|
|
|
#calculate stdev
|
|
for x in X_train:
|
|
w += (x-b)**2/(nsamp-1)
|
|
|
|
#update model parameters of base layer
|
|
m.b = b
|
|
m.w = np.sqrt(w)
|
|
|
|
def fit(self,X_train):
|
|
#work through reversed model list and ignore base layer
|
|
print('Train model...')
|
|
for m in self.model[::-1][:-1]:
|
|
X_train = self.train_multi_scale_layer(m, X_train)
|
|
|
|
#now train base layer
|
|
self.train_base_layer(self.model[0], X_train)
|
|
|
|
print('Training done')
|
|
|
|
def reduce(self,x):
|
|
mu = []
|
|
#run from bottom to top
|
|
mu.append(x)
|
|
for m in self.model[::-1][:-1]:
|
|
z = m.kernel.apply(x, direction='bwd')
|
|
x = z[:,:,:,7]
|
|
mu.append(x)
|
|
return mu
|
|
|
|
def fit_multi_layer(self,X_train):
|
|
print('Train model...')
|
|
#start by training base layer
|
|
x_train_layer = [X_train[i][0] for i in range(0,len(X_train))]
|
|
self.train_base_layer(self.model[0], x_train_layer)
|
|
|
|
#work through remaining layers
|
|
for l in np.arange(1,self.nlevel+1):
|
|
x_train_layer = [X_train[i][l] for i in range(0,len(X_train))]
|
|
self.train_multi_scale_layer(self.model[l], x_train_layer)
|
|
|
|
print('Training done') |