ICOD/models/multiresolution_flow_3d.py
2024-06-10 11:49:02 +02:00

341 lines
12 KiB
Python

import numpy as np
#--------------------
# define kernel layer
#--------------------
class kernel_layer:
def __init__(self, kernel=0):
self.fwd, self.bwd = self.construct_kernel(kernel)
def construct_kernel(self, kernel):
if kernel == 0:
fwd = np.diag(np.ones(8))*8 - 1
fwd[7,:] = -1
fwd[:,7] = 1
bwd = np.diag(np.ones(8))
bwd[:,7] = -1
bwd[7,:] = 1
bwd*=1/8
return fwd,bwd
elif kernel == 1:
#this kernel has unit determinant
c = 2**(9/7)
a = 8
fwd = 1/a*np.array([[ c, c, c, c, c, -c, -c, a],
[ c, -c, -c, c, -c, c, -c, a],
[ c, c, -c, -c, c, c, c, a],
[ c, -c, c, -c, -c, -c, c, a],
[ -c, c, c, c, -c, c, c, a],
[ -c, -c, -c, c, c, -c, c, a],
[ -c, c, -c, -c, -c, -c, -c, a],
[ -c, -c, c, -c, c, c, -c, a]])
bwd = np.array([[ 1/c, 1/c, 1/c, 1/c, -1/c, -1/c, -1/c, -1/c],
[ 1/c, -1/c, 1/c, -1/c, 1/c, -1/c, 1/c, -1/c],
[ 1/c, -1/c, -1/c, 1/c, 1/c, -1/c, -1/c, 1/c],
[ 1/c, 1/c, -1/c, -1/c, 1/c, 1/c, -1/c, -1/c],
[ 1/c, -1/c, 1/c, -1/c, -1/c, 1/c, -1/c, 1/c],
[ -1/c, 1/c, 1/c, -1/c, 1/c, -1/c, -1/c, 1/c],
[ -1/c, -1/c, 1/c, 1/c, 1/c, 1/c, -1/c, -1/c],
[ 1/a, 1/a, 1/a, 1/a, 1/a, 1/a, 1/a, 1/a]])
return fwd,bwd
else:
print("Error in construct_kernel: kernel nr" +str(kernel) +" not implemented!")
def apply_fwd(self, zz):
xx = np.zeros(np.shape(zz))
#apply bwd kernel to all sub-elements of layer
for l in range(8):
for m in range(8):
xx[:,:,:,l] += self.fwd[l,m]*zz[:,:,:,m]
x = np.zeros((np.shape(zz)[0]*2,np.shape(zz)[1]*2,np.shape(zz)[2]*2))
x[::2,::2,::2] = xx[:,:,:,0]
x[1::2,::2,::2] = xx[:,:,:,1]
x[::2,1::2,::2] = xx[:,:,:,2]
x[::2,::2,1::2] = xx[:,:,:,3]
x[1::2,1::2,::2] = xx[:,:,:,4]
x[::2,1::2,1::2] = xx[:,:,:,5]
x[1::2,::2,1::2] = xx[:,:,:,6]
x[1::2,1::2,1::2]= xx[:,:,:,7]
return x
def apply_bwd(self, x):
xx = np.zeros((np.shape(x)[0]//2,np.shape(x)[1]//2,np.shape(x)[2]//2,8,))
xx[:,:,:,0] = x[::2,::2,::2]
xx[:,:,:,1] = x[1::2,::2,::2]
xx[:,:,:,2] = x[::2,1::2,::2]
xx[:,:,:,3] = x[::2,::2,1::2]
xx[:,:,:,4] = x[1::2,1::2,::2]
xx[:,:,:,5] = x[::2,1::2,1::2]
xx[:,:,:,6] = x[1::2,::2,1::2]
xx[:,:,:,7] = x[1::2,1::2,1::2]
zz = np.zeros((np.shape(x)[0]//2,np.shape(x)[1]//2,np.shape(x)[2]//2,8,))
#apply bwd kernel to all sub-elements of layer
for l in range(8):
for m in range(8):
zz[:,:,:,l] += self.bwd[l,m]*xx[:,:,:,m]
return zz
def apply(self,u, direction='fwd'):
if direction=='fwd':
return self.apply_fwd(u)
else:
return self.apply_bwd(u)
#--------------------
class base_layer:
def __init__(self, inputshape = (1,1,1),sigma2_init=1):
#print(sigma2_init)
self.inputshape = inputshape
self.outputshape = inputshape
#initialize layer params (learnable parameter)
self.b = np.zeros(self.inputshape)
self.w = np.ones(self.inputshape)*np.sqrt(sigma2_init)
def log_p(self,x):
return x,np.sum(-0.5*((x-self.b)/self.w)**2)
def generate_forward(self,x):
#generate random field
return self.w*np.random.normal(0,1,self.inputshape) + self.b
def generate_forward_select(self,x, select=True):
res = self.w*np.random.normal(0,1,self.inputshape) + self.b
if(select==False):
res*=0
#generate random field
return res
def reduce_backward(self,x):
return x
#--------------------
class multi_scale_layer:
def __init__(self, inputshape =(1,1,1),sigma2_init=1, kernel=1):
#print(sigma2_init)
self.inputshape = inputshape
self.outputshape = tuple([(l * 2) for l in inputshape])
#invertible kernel
self.kernel = kernel_layer(kernel=kernel)
#initialize layer params (learnable parameter)
self.a = np.zeros(self.inputshape + (7,)) # biases of the model.
self.b = np.zeros(self.inputshape + (8,)) # biases of the model.
self.w = np.ones(self.inputshape + (7,7)) # fully connected weights of cholesky decomposition.
self.Minv = np.ones(self.inputshape + (7,7))
#We initialize the layer with conditional white noise
M = np.linalg.inv(np.matmul(self.kernel.fwd.T,self.kernel.fwd)[0:7,0:7])*sigma2_init
self.Minv[:,:,:] = np.linalg.inv(M)
self.w[:,:,:] = np.linalg.cholesky(M).T
def log_p_field(self,x):
res = 0.
z = self.kernel.apply(x, direction='bwd')
for l in range(7):
for m in range(7):
res += (z[:,:,:,m]-self.b[:,:,:,m])*self.Minv[:,:,:,m,l]*(z[:,:,:,l]-self.b[:,:,:,l])
return z[:,:,:,7],-0.5*res
def log_p(self,x):
mu, logp = self.log_p_field(x)
return mu,np.sum(logp)
def generate_forward(self,mu):
zz = np.zeros(self.inputshape+(8,))
#Not very nice, need to change this to vectorization
#generate white noise field
wn = np.random.normal(0,1,self.inputshape+(7,))
yy = np.zeros(np.shape(wn))
for l in range(7):
for m in range(7):
yy[:,:,:,l] += self.w[:,:,:,m,l] * wn[:,:,:,m]
zz[:,:,:,0] = mu*self.a[:,:,:,0] + self.b[:,:,:,0] + yy[:,:,:,0]
zz[:,:,:,1] = mu*self.a[:,:,:,1] + self.b[:,:,:,1] + yy[:,:,:,1]
zz[:,:,:,2] = mu*self.a[:,:,:,2] + self.b[:,:,:,2] + yy[:,:,:,2]
zz[:,:,:,3] = mu*self.a[:,:,:,3] + self.b[:,:,:,3] + yy[:,:,:,3]
zz[:,:,:,4] = mu*self.a[:,:,:,4] + self.b[:,:,:,4] + yy[:,:,:,4]
zz[:,:,:,5] = mu*self.a[:,:,:,5] + self.b[:,:,:,5] + yy[:,:,:,5]
zz[:,:,:,6] = mu*self.a[:,:,:,6] + self.b[:,:,:,6] + yy[:,:,:,6]
zz[:,:,:,7] = mu
return self.kernel.apply(zz,direction='fwd')
#--------------------
class multi_scale_model:
def __init__(self, nlevel=2, kernel=1):
self.nlevel=nlevel
self.model = self.construct(nlevel=nlevel, kernel=kernel)
def construct(self,nlevel=2, kernel=1):
model = []
sigma2_base = (1./8.)**nlevel
inputshape = (1,1,1)
model.append(base_layer(inputshape,sigma2_init=sigma2_base))
for i in np.arange(nlevel):
sigma2_base*=8.
model.append(multi_scale_layer(inputshape,sigma2_init=sigma2_base, kernel=kernel))
inputshape = tuple([(l * 2) for l in inputshape])
return model
def logp(self,x):
#work through reversed model list and ignore base layer
aux = np.copy(x)
LOGP=[]
for m in self.model[::-1]:
aux, logp = m.log_p(aux)
LOGP.append(logp)
return LOGP
def logp_multi_layer_field(self,x):
#returns the field probability values
#Note, that field values are always given jointly for the 8 cell voxels
LOGP=[]
aux = np.copy(x)
for m in self.model[::-1]:
aux, logp = m.log_p_field(aux)
LOGP.append(logp)
return LOGP
def logp_multi_layer(self,x):
#work through reversed model list and ignore base layer
LOGP=[]
cnt=0
for m in self.model:
aux, logp = m.log_p(x[cnt])
LOGP.append(logp)
cnt+=1
return LOGP
def generate(self):
x = []
a = np.zeros((1,1,1))
for m in self.model:
a = m.generate_forward(a)
x.append(a)
return x
def get_matrix_inv(self, M):
M_inv = np.zeros(np.shape(M))
for i in range(np.shape(M)[0]):
for j in range(np.shape(M)[1]):
for k in range(np.shape(M)[2]):
M_inv[i,j,k] = np.linalg.inv(M[i,j,k])
return M_inv
def get_matrix_sqrt(self, M):
M_sqrt = np.zeros(np.shape(M))
for i in range(np.shape(M)[0]):
for j in range(np.shape(M)[1]):
for k in range(np.shape(M)[2]):
M_sqrt[i,j,k] = np.linalg.cholesky(M[i,j,k]).T
return M_sqrt
def train_multi_scale_layer(self, m, X_train):
nsamp = len(X_train)
b_z = np.zeros(m.inputshape + (8,))
M_z = np.zeros(m.inputshape + (8,8))
#calculate mean
for x in X_train:
z = m.kernel.apply(x, direction='bwd')
b_z += z/nsamp
#calculate covariance matrix
mu_train=[]
for x in X_train:
z = m.kernel.apply(x, direction='bwd')
mu_train.append(z[:,:,:,7])
for i in range(8):
for j in range(8):
M_z[:,:,:,i,j] += (z[:,:,:,i]-b_z[:,:,:,i]) * (z[:,:,:,j]-b_z[:,:,:,j])/(nsamp-1)
M_z_inv = self.get_matrix_inv(M_z)
R_inv = M_z_inv[:,:,:,0:7,0:7]
m.Minv = np.copy(R_inv)
R = self.get_matrix_inv(R_inv)
for n in range(7):
for l in range(7):
m.a[:,:,:,n] += -M_z_inv[:,:,:,7,l]*R[:,:,:,l,n]
m.w = self.get_matrix_sqrt(R)
m.b[:,:,:,0] = b_z[:,:,:,0] - m.a[:,:,:,0]*b_z[:,:,:,7]
m.b[:,:,:,1] = b_z[:,:,:,1] - m.a[:,:,:,1]*b_z[:,:,:,7]
m.b[:,:,:,2] = b_z[:,:,:,2] - m.a[:,:,:,2]*b_z[:,:,:,7]
m.b[:,:,:,3] = b_z[:,:,:,3] - m.a[:,:,:,3]*b_z[:,:,:,7]
m.b[:,:,:,4] = b_z[:,:,:,4] - m.a[:,:,:,4]*b_z[:,:,:,7]
m.b[:,:,:,5] = b_z[:,:,:,5] - m.a[:,:,:,5]*b_z[:,:,:,7]
m.b[:,:,:,6] = b_z[:,:,:,6] - m.a[:,:,:,6]*b_z[:,:,:,7]
return mu_train
def train_base_layer(self,m, X_train):
b=0
w=0
nsamp = len(X_train)
#calculate mean
for x in X_train:
b += x/nsamp
#calculate stdev
for x in X_train:
w += (x-b)**2/(nsamp-1)
#update model parameters of base layer
m.b = b
m.w = np.sqrt(w)
def fit(self,X_train):
#work through reversed model list and ignore base layer
print('Train model...')
for m in self.model[::-1][:-1]:
X_train = self.train_multi_scale_layer(m, X_train)
#now train base layer
self.train_base_layer(self.model[0], X_train)
print('Training done')
def reduce(self,x):
mu = []
#run from bottom to top
mu.append(x)
for m in self.model[::-1][:-1]:
z = m.kernel.apply(x, direction='bwd')
x = z[:,:,:,7]
mu.append(x)
return mu
def fit_multi_layer(self,X_train):
print('Train model...')
#start by training base layer
x_train_layer = [X_train[i][0] for i in range(0,len(X_train))]
self.train_base_layer(self.model[0], x_train_layer)
#work through remaining layers
for l in np.arange(1,self.nlevel+1):
x_train_layer = [X_train[i][l] for i in range(0,len(X_train))]
self.train_multi_scale_layer(self.model[l], x_train_layer)
print('Training done')