Skip to content

ibrahim-song-ma/tensor-book

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

34 Commits
 
 
 
 
 
 

Repository files navigation

从线性代数到张量计算

MIT License Python 3.7 GitHub stars

Made by Xinyu Chen (陈新宇) • 🌐 https://xinychen.github.io

系列教程

  1. 《从线性代数到张量计算》(PDF)
  2. 《面向时空交通数据修复及预测的低秩机器学习模型》(PDF)

该系列教程尚处在更新阶段,欢迎广大读者提供宝贵的反馈意见,如有建议,请在本项目issues区域留言;如需交流,请移步至QQ群(457012422),非诚勿扰。


作者申明

  • 撰写该系列教程的初衷在于传播知识、丰富中文科技文库,为感兴趣的读者提供参考素材。
  • 禁止将该系列教程放在其他网站上,唯一下载网址为https://xinychen.github.io
  • 禁止将该系列教程用于任何形式的商业活动。

代数结构

▴ 回到顶部

例. 写出张量的lateral切片与horizontal切片。

import numpy as np

X = np.zeros((2, 2, 2))
X[:, :, 0] = np.array([[1, 2], [3, 4]])
X[:, :, 1] = np.array([[5, 6], [7, 8]])
print('lateral slices:')
print(X[:, 0, :])
print()
print(X[:, 1, :])
print()
print('horizonal slices:')
print(X[0, :, :])
print()
print(X[1, :, :])
print()

Kronecker分解与Kronecker分解

▴ 回到顶部

例. 计算Kronecker积并求Kronecker分解。

  • 计算Kronecker积
import numpy as np

A = np.array([[1, 2], [3, 4]])
B = np.array([[5, 6, 7], [8, 9, 10]])
X = np.kron(A, B)
print('X = ')
print(X)
  • 求Kronecker分解
def permute(mat, A_dim1, A_dim2, B_dim1, B_dim2):
    ans = np.zeros((A_dim1 * A_dim2, B_dim1 * B_dim2))
    for j in range(A_dim2):
        for i in range(A_dim1):
            ans[A_dim1 * j + i, :] = mat[i * B_dim1 : (i + 1) * B_dim1,
                                         j * B_dim2 : (j + 1) * B_dim2].reshape(B_dim1 * B_dim2, order = 'F')
    return ans

X_tilde = permute(X, 2, 2, 2, 3)
print('X_tilde = ')
print(X_tilde)
print()
u, s, v = np.linalg.svd(X_tilde, full_matrices = False)
A_hat = np.sqrt(s[0]) * u[:, 0].reshape(2, 2, order = 'F')
B_hat = np.sqrt(s[0]) * v[0, :].reshape(2, 3, order = 'F')
print('A_hat = ')
print(A_hat)
print()
print('B_hat = ')
print(B_hat)

例. 采用广义 Kronecker 分解重构灰度图像。

import numpy as np

def permute(mat, A_dim1, A_dim2, B_dim1, B_dim2):
    ans = np.zeros((A_dim1 * A_dim2, B_dim1 * B_dim2))
    for j in range(A_dim2):
        for i in range(A_dim1):
            ans[A_dim1 * j + i, :] = mat[i * B_dim1 : (i + 1) * B_dim1,
                                         j * B_dim2 : (j + 1) * B_dim2].reshape(B_dim1 * B_dim2, order = 'F')
    return ans

def kron_decomp(mat, A_dim1, A_dim2, B_dim1, B_dim2, rank):
    mat_tilde = permute(mat, A_dim1, A_dim2, B_dim1, B_dim2)
    u, s, v = np.linalg.svd(mat_tilde, full_matrices = False)
    A_hat = np.zeros((A_dim1, A_dim2, rank))
    B_hat = np.zeros((B_dim1, B_dim2, rank))
    for r in range(rank):
        A_hat[:, :, r] = np.sqrt(s[r]) * u[:, r].reshape([A_dim1, A_dim2], order = 'F')
        B_hat[:, :, r] = np.sqrt(s[r]) * v[r, :].reshape([B_dim1, B_dim2], order = 'F')
    mat_hat = np.zeros(mat.shape)
    for r in range(rank):
        mat_hat += np.kron(A_hat[:, :, r], B_hat[:, :, r])
    return mat_hat
from skimage import color
from skimage import io

img = io.imread('data/gaint_panda.bmp')
imgGray = color.rgb2gray(img)

io.imshow(imgGray)
plt.axis('off')
plt.imsave('gaint_panda_gray.png', imgGray, cmap = plt.cm.gray)
plt.show()
import imageio
import matplotlib.pyplot as plt

img = io.imread('data/gaint_panda.bmp')
mat = color.rgb2gray(img)
io.imshow(mat)
plt.axis('off')
plt.show()

A_dim1 = 16
A_dim2 = 32
B_dim1 = 32
B_dim2 = 16
for rank in [5, 10, 50, 100]:
    mat_hat = kron_decomp(mat, A_dim1, A_dim2, B_dim1, B_dim2, rank)
    mat_hat[mat_hat > 1] = 1
    mat_hat[mat_hat < 0] = 0
    io.imshow(mat_hat)
    plt.axis('off')
    plt.imsave('gaint_panda_gray_R{}.png'.format(rank), 
              mat_hat, cmap = plt.cm.gray)
    plt.show()

模态积与Tucker张量分解

▴ 回到顶部

例. 计算张量与矩阵的模态积。

import numpy as np

X = np.zeros((2, 2, 2))
X[:, :, 0] = np.array([[1, 2], [3, 4]])
X[:, :, 1] = np.array([[5, 6], [7, 8]])
A = np.array([[1, 2], [3, 4], [5, 6]])
Y = np.einsum('ijh, ki -> kjh', X, A)
print('frontal slices of Y:')
print(Y[:, :, 0])
print()
print(Y[:, :, 1])
print()

低秩时序矩阵模型

▴ 回到顶部

例. 使用时序矩阵分解对流体流动的动态过程进行预测。

import numpy as np
import seaborn as sns
import scipy.io
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap, LinearSegmentedColormap
color = scipy.io.loadmat('CCcool.mat')
cc = color['CC']
newcmp = LinearSegmentedColormap.from_list('', cc)

dense_tensor = np.load('tensor.npz')['arr_0']
np.random.seed(1)
dense_tensor = dense_tensor[:, :, : 150]
M, N, T = dense_tensor.shape

plt.rcParams['font.size'] = 13
plt.rcParams['mathtext.fontset'] = 'cm'
fig = plt.figure(figsize = (7, 8))
id = np.array([5, 10, 15, 20, 25, 30, 35, 40])
for t in range(8):
    ax = fig.add_subplot(4, 2, t + 1)
    ax = sns.heatmap(dense_tensor[:, :, id[t] - 1], cmap = newcmp, vmin = -5, vmax = 5, cbar = False)
    ax.contour(np.linspace(0, N, N), np.linspace(0, M, M), dense_tensor[:, :, id[t] - 1], 
               levels = np.linspace(0.15, 15, 30), colors = 'k', linewidths = 0.7)
    ax.contour(np.linspace(0, N, N), np.linspace(0, M, M), dense_tensor[:, :, id[t] - 1], 
               levels = np.linspace(-15, -0.15, 30), colors = 'k', linestyles = 'dashed', linewidths = 0.7)
    plt.xticks([])
    plt.yticks([])
    plt.title(r'$t = {}$'.format(id[t]))
    for _, spine in ax.spines.items():
        spine.set_visible(True)
plt.show()
fig.savefig("fluid_flow_heatmap.png", bbox_inches = "tight")
import numpy as np

def update_cg(var, r, q, Aq, rold):
    alpha = rold / np.inner(q, Aq)
    var = var + alpha * q
    r = r - alpha * Aq
    rnew = np.inner(r, r)
    q = r + (rnew / rold) * q
    return var, r, q, rnew

def ell_w(ind, W, X, rho):
    return X @ ((W.T @ X) * ind).T + rho * W

def conj_grad_w(sparse_mat, ind, W, X, rho, maxiter = 5):
    rank, dim1 = W.shape
    w = np.reshape(W, -1, order = 'F')
    r = np.reshape(X @ sparse_mat.T - ell_w(ind, W, X, rho), -1, order = 'F')
    q = r.copy()
    rold = np.inner(r, r)
    for it in range(maxiter):
        Q = np.reshape(q, (rank, dim1), order = 'F')
        Aq = np.reshape(ell_w(ind, Q, X, rho), -1, order = 'F')
        w, r, q, rold = update_cg(w, r, q, Aq, rold)
    return np.reshape(w, (rank, dim1), order = 'F')

def ell_x(ind, W, X, A, Psi, d, lambda0, rho):
    rank, dim2 = X.shape
    temp = np.zeros((d * rank, Psi[0].shape[0]))
    for k in range(1, d + 1):
        temp[(k - 1) * rank : k * rank, :] = X @ Psi[k].T
    temp1 = X @ Psi[0].T - A @ temp
    temp2 = np.zeros((rank, dim2))
    for k in range(d):
        temp2 += A[:, k * rank : (k + 1) * rank].T @ temp1 @ Psi[k + 1]
    return W @ ((W.T @ X) * ind) + rho * X + lambda0 * (temp1 @ Psi[0] - temp2)

def conj_grad_x(sparse_mat, ind, W, X, A, Psi, d, lambda0, rho, maxiter = 5):
    rank, dim2 = X.shape
    x = np.reshape(X, -1, order = 'F')
    r = np.reshape(W @ sparse_mat - ell_x(ind, W, X, A, Psi, d, lambda0, rho), -1, order = 'F')
    q = r.copy()
    rold = np.inner(r, r)
    for it in range(maxiter):
        Q = np.reshape(q, (rank, dim2), order = 'F')
        Aq = np.reshape(ell_x(ind, W, Q, A, Psi, d, lambda0, rho), -1, order = 'F')
        x, r, q, rold = update_cg(x, r, q, Aq, rold)
    return np.reshape(x, (rank, dim2), order = 'F')

def generate_Psi(T, d):
    Psi = []
    for k in range(0, d + 1):
        if k == 0:
            Psi.append(np.append(np.zeros((T - d, d)), np.eye(T - d), axis = 1))
        else:
            Psi.append(np.append(np.append(np.zeros((T - d, d - k)), np.eye(T - d), axis = 1), 
                                 np.zeros((T - d, k)), axis = 1))
    return Psi

def tmf(sparse_mat, rank, d, lambda0, rho, maxiter = 50):
    dim1, dim2 = sparse_mat.shape
    ind = sparse_mat != 0
    W = 0.01 * np.random.randn(rank, dim1)
    X = 0.01 * np.random.randn(rank, dim2)
    A = 0.01 * np.random.randn(rank, d * rank)
    Psi = generate_Psi(dim2, d)
    temp = np.zeros((d * rank, dim2 - d))
    for it in range(maxiter):
        W = conj_grad_w(sparse_mat, ind, W, X, rho)
        X = conj_grad_x(sparse_mat, ind, W, X, A, Psi, d, lambda0, rho)
        for k in range(1, d + 1):
            temp[(k - 1) * rank : k * rank, :] = X @ Psi[k].T
        A = X @ Psi[0].T @ np.linalg.pinv(temp)
        mat_hat = W.T @ X
    return mat_hat, W, X, A

def var4cast(X, A, d, delta):
    dim1, dim2 = X.shape
    X_hat = np.append(X, np.zeros((dim1, delta)), axis = 1)
    for t in range(delta):
        X_hat[:, dim2 + t] = A @ X_hat[:, dim2 + t - np.arange(1, d + 1)].T.reshape(dim1 * d)
    return X_hat[:, - delta :]
import numpy as np
np.random.seed(1)

dense_tensor = np.load('tensor.npz')['arr_0']
dense_tensor = dense_tensor[:, :, : 150]
M, N, T = dense_tensor.shape
dense_mat = np.reshape(dense_tensor, (M * N, T), order = 'F')
p = 0.5
sparse_mat = dense_mat * np.round(np.random.rand(M * N, T) + 0.5 - p)

import time
start = time.time()
delta = 3
rank = 10
d = 1
lambda0 = 1
rho = 1
_, W, X, A = tmf(sparse_mat[:, : T - delta], rank, d, lambda0, rho)
mat_hat = W.T @ var4cast(X, A, d, delta)
end = time.time()
print('Running time: %d seconds'%(end - start))
import seaborn as sns
import scipy.io
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap, LinearSegmentedColormap
color = scipy.io.loadmat('CCcool.mat')
cc = color['CC']
newcmp = LinearSegmentedColormap.from_list('', cc)

plt.rcParams['font.size'] = 13
plt.rcParams['mathtext.fontset'] = 'cm'
fig = plt.figure(figsize = (11, 1.8))
i = 1
for t in [147, 148, 149]:
    ax = fig.add_subplot(1, 3, i)
    ax = sns.heatmap(dense_mat[:, t].reshape((199, 449), order = 'F'), 
                     cmap = newcmp, vmin = -5, vmax = 5, cbar = False)
    ax.contour(np.linspace(0, N, N), np.linspace(0, M, M), 
               dense_mat[:, t].reshape((199, 449), order = 'F'), 
               levels = np.linspace(0.15, 15, 30), colors = 'k', 
               linewidths = 0.7)
    ax.contour(np.linspace(0, N, N), np.linspace(0, M, M), 
               dense_mat[:, t].reshape((199, 449), order = 'F'), 
               levels = np.linspace(-15, -0.15, 30), colors = 'k', 
               linestyles = 'dashed', linewidths = 0.7)
    plt.title(r'$t = {}$'.format(t + 1))
    plt.xticks([])
    plt.yticks([])
    for _, spine in ax.spines.items():
        spine.set_visible(True)
    i += 1
plt.show()
fig.savefig("fluid_flow_ground_truth.png", bbox_inches = "tight")
import seaborn as sns
import scipy.io
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap, LinearSegmentedColormap
color = scipy.io.loadmat('CCcool.mat')
cc = color['CC']
newcmp = LinearSegmentedColormap.from_list('', cc)

plt.rcParams['font.size'] = 13
plt.rcParams['mathtext.fontset'] = 'cm'
fig = plt.figure(figsize = (11, 1.8))
i = 1
for t in range(3):
    ax = fig.add_subplot(1, 3, i)
    ax = sns.heatmap(mat_hat[:, t].reshape((199, 449), order = 'F'), 
                     cmap = newcmp, vmin = -5, vmax = 5, cbar = False)
    ax.contour(np.linspace(0, N, N), np.linspace(0, M, M), 
               mat_hat[:, t].reshape((199, 449), order = 'F'), 
               levels = np.linspace(0.15, 15, 30), colors = 'k', 
               linewidths = 0.7)
    ax.contour(np.linspace(0, N, N), np.linspace(0, M, M), 
               mat_hat[:, t].reshape((199, 449), order = 'F'), 
               levels = np.linspace(-15, -0.15, 30), colors = 'k', 
               linestyles = 'dashed', linewidths = 0.7)
    plt.title(r'$t = {}$'.format(148+ t))
    plt.xticks([])
    plt.yticks([])
    for _, spine in ax.spines.items():
        spine.set_visible(True)
    i += 1
plt.show()
fig.savefig("fluid_flow_forecasts.png", bbox_inches = "tight")

例. 使用考虑平滑处理的矩阵分解对灰度图像进行复原。

import numpy as np

def compute_rse(var, var_hat):
    return np.linalg.norm(var - var_hat, 2) / np.linalg.norm(var, 2)

def generate_Psi(n):
    mat1 = np.append(np.zeros((n - 1, 1)), np.eye(n - 1), axis = 1)
    mat2 = np.append(np.eye(n - 1), np.zeros((n - 1, 1)), axis = 1)
    Psi = mat1 - mat2
    return Psi

def update_cg(var, r, q, Aq, rold):
    alpha = rold / np.inner(q, Aq)
    var = var + alpha * q
    r = r - alpha * Aq
    rnew = np.inner(r, r)
    q = r + (rnew / rold) * q
    return var, r, q, rnew

def ell_w(ind, W, X, Psi1, rho, lmbda):
    return X @ ((W.T @ X) * ind).T + rho * W + lmbda * W @ Psi1.T @ Psi1

def conj_grad_w(sparse_mat, ind, W, X, Psi1, rho, lmbda, maxiter = 5):
    rank, dim1 = W.shape
    w = np.reshape(W, -1, order = 'F')
    r = np.reshape(X @ sparse_mat.T 
                   - ell_w(ind, W, X, Psi1, rho, lmbda), -1, order = 'F')
    q = r.copy()
    rold = np.inner(r, r)
    for it in range(maxiter):
        Q = np.reshape(q, (rank, dim1), order = 'F')
        Aq = np.reshape(ell_w(ind, Q, X, Psi1, rho, lmbda), -1, order = 'F')
        w, r, q, rold = update_cg(w, r, q, Aq, rold)
    return np.reshape(w, (rank, dim1), order = 'F')

def ell_x(ind, W, X, Psi2, rho, lmbda):
    return W @ ((W.T @ X) * ind) + rho * X + lmbda * X @ Psi2.T @ Psi2

def conj_grad_x(sparse_mat, ind, W, X, Psi2, rho, lmbda, maxiter = 5):
    rank, dim2 = X.shape
    x = np.reshape(X, -1, order = 'F')
    r = np.reshape(W @ sparse_mat 
                   - ell_x(ind, W, X, Psi2, rho, lmbda), -1, order = 'F')
    q = r.copy()
    rold = np.inner(r, r)
    for it in range(maxiter):
        Q = np.reshape(q, (rank, dim2), order = 'F')
        Aq = np.reshape(ell_x(ind, W, Q, Psi2, rho, lmbda), -1, order = 'F')
        x, r, q, rold = update_cg(x, r, q, Aq, rold)
    return np.reshape(x, (rank, dim2), order = 'F')

def smoothing_mf(dense_mat, sparse_mat, rank, rho, lmbda, maxiter = 50):
    dim1, dim2 = sparse_mat.shape
    W = 0.01 * np.random.randn(rank, dim1)
    X = 0.01 * np.random.randn(rank, dim2)
    ind = sparse_mat != 0
    pos_test = np.where((dense_mat != 0) & (sparse_mat == 0))
    dense_test = dense_mat[pos_test]
    del dense_mat
    Psi1 = generate_Psi(dim1)
    Psi2 = generate_Psi(dim2)
    show_iter = 10
    for it in range(maxiter):
        W = conj_grad_w(sparse_mat, ind, W, X, Psi1, rho, lmbda)
        X = conj_grad_x(sparse_mat, ind, W, X, Psi2, rho, lmbda)
        mat_hat = W.T @ X
        if (it + 1) % show_iter == 0:
            temp_hat = mat_hat[pos_test]
            print('Iter: {}'.format(it + 1))
            print(compute_rse(temp_hat, dense_test))
            print()
    return mat_hat, W, X
import numpy as np
np.random.seed(1)
import matplotlib.pyplot as plt
from skimage import color
from skimage import io

img = io.imread('data/gaint_panda.bmp')
imgGray = color.rgb2gray(img)
M, N = imgGray.shape
missing_rate = 0.9

sparse_img = imgGray * np.round(np.random.rand(M, N) + 0.5 - missing_rate)
io.imshow(sparse_img)
plt.axis('off')
plt.show()
lmbda = 1e+1
for rank in [5, 10, 50]:
    rho = 1e-1
    maxiter = 100
    mat_hat, W, X = smoothing_mf(imgGray, sparse_img, rank, rho, lmbda, maxiter)

    mat_hat[mat_hat < 0] = 0
    mat_hat[mat_hat > 1] = 1
    io.imshow(mat_hat)
    plt.axis('off')
    plt.imsave('gaint_panda_gray_recovery_90_mf_rank_{}_lmbda_10.png'.format(rank), 
              mat_hat, cmap = plt.cm.gray)
    plt.show()
lmbda = 1e-10
for rank in [5, 10, 50]:
    rho = 1e-1
    maxiter = 100
    mat_hat, W, X = smoothing_mf(imgGray, sparse_img, rank, rho, lmbda, maxiter)

    mat_hat[mat_hat < 0] = 0
    mat_hat[mat_hat > 1] = 1
    io.imshow(mat_hat)
    plt.axis('off')
    plt.imsave('gaint_panda_gray_recovery_90_mf_rank_{}_lmbda_0.png'.format(rank), 
              mat_hat, cmap = plt.cm.gray)
    plt.show()

例. 使用考虑空间自回归的矩阵分解对灰度图像进行复原。

import numpy as np

def compute_rse(var, var_hat):
    return np.linalg.norm(var - var_hat, 2) / np.linalg.norm(var, 2)

def generate_Psi(n):
    mat1 = np.append(np.zeros((n - 1, 1)), np.eye(n - 1), axis = 1)
    mat2 = np.append(np.eye(n - 1), np.zeros((n - 1, 1)), axis = 1)
    return mat1, mat2

def update_cg(var, r, q, Aq, rold):
    alpha = rold / np.inner(q, Aq)
    var = var + alpha * q
    r = r - alpha * Aq
    rnew = np.inner(r, r)
    q = r + (rnew / rold) * q
    return var, r, q, rnew

def ell_w(ind, W, X, Aw, Phi0, Phi1, rho, lmbda):
    temp1 = W @ Phi0.T - Aw @ W @ Phi1.T
    temp2 = lmbda * temp1 @ Phi0 - lmbda * Aw.T @ temp1 @ Phi1
    return X @ ((W.T @ X) * ind).T + rho * W + temp2

def conj_grad_w(sparse_mat, ind, W, X, Aw, Phi0, Phi1, rho, lmbda, maxiter = 5):
    rank, dim1 = W.shape
    w = np.reshape(W, -1, order = 'F')
    r = np.reshape(X @ sparse_mat.T 
                   - ell_w(ind, W, X, Aw, Phi0, Phi1, rho, lmbda), -1, order = 'F')
    q = r.copy()
    rold = np.inner(r, r)
    for it in range(maxiter):
        Q = np.reshape(q, (rank, dim1), order = 'F')
        Aq = np.reshape(ell_w(ind, Q, X, Aw, Phi0, Phi1, rho, lmbda), -1, order = 'F')
        w, r, q, rold = update_cg(w, r, q, Aq, rold)
    return np.reshape(w, (rank, dim1), order = 'F')

def ell_x(ind, W, X, Ax, Psi0, Psi1, rho, lmbda):
    temp1 = X @ Psi0.T - Ax @ X @ Psi1.T
    temp2 = lmbda * temp1 @ Psi0 - lmbda * Ax.T @ temp1 @ Psi1
    return W @ ((W.T @ X) * ind) + rho * X + temp2

def conj_grad_x(sparse_mat, ind, W, X, Ax, Psi0, Psi1, rho, lmbda, maxiter = 5):
    rank, dim2 = X.shape
    x = np.reshape(X, -1, order = 'F')
    r = np.reshape(W @ sparse_mat 
                   - ell_x(ind, W, X, Ax, Psi0, Psi1, rho, lmbda), -1, order = 'F')
    q = r.copy()
    rold = np.inner(r, r)
    for it in range(maxiter):
        Q = np.reshape(q, (rank, dim2), order = 'F')
        Aq = np.reshape(ell_x(ind, W, Q, Ax, Psi0, Psi1, rho, lmbda), -1, order = 'F')
        x, r, q, rold = update_cg(x, r, q, Aq, rold)
    return np.reshape(x, (rank, dim2), order = 'F')

def spatial_autoregressive_mf(dense_mat, sparse_mat, rank, rho, lmbda, maxiter = 50):
    dim1, dim2 = sparse_mat.shape
    W = 0.01 * np.random.randn(rank, dim1)
    X = 0.01 * np.random.randn(rank, dim2)
    Aw = 0.01 * np.random.randn(rank, rank)
    Ax = 0.01 * np.random.randn(rank, rank)
    ind = sparse_mat != 0
    pos_test = np.where((dense_mat != 0) & (sparse_mat == 0))
    dense_test = dense_mat[pos_test]
    del dense_mat
    Phi0, Phi1 = generate_Psi(dim1)
    Psi0, Psi1 = generate_Psi(dim2)
    show_iter = 10
    for it in range(maxiter):
        W = conj_grad_w(sparse_mat, ind, W, X, Aw, Phi0, Phi1, rho, lmbda)
        X = conj_grad_x(sparse_mat, ind, W, X, Ax, Psi0, Psi1, rho, lmbda)
        Aw = W @ Phi0.T @ np.linalg.pinv(W @ Phi1.T)
        Ax = X @ Psi0.T @ np.linalg.pinv(X @ Psi1.T)
        mat_hat = W.T @ X
        if (it + 1) % show_iter == 0:
            temp_hat = mat_hat[pos_test]
            print('Iter: {}'.format(it + 1))
            print(compute_rse(temp_hat, dense_test))
            print()
    return mat_hat, W, X, Aw, Ax
import numpy as np
np.random.seed(1)
import matplotlib.pyplot as plt
from skimage import color
from skimage import io

img = io.imread('data/gaint_panda.bmp')
imgGray = color.rgb2gray(img)
M, N = imgGray.shape
missing_rate = 0.9

sparse_img = imgGray * np.round(np.random.rand(M, N) + 0.5 - missing_rate)
io.imshow(sparse_img)
plt.axis('off')
plt.show()
lmbda = 5e-1
for rank in [5, 10, 50]:
    rho = 1e-1
    maxiter = 100
    mat_hat, W, X, Aw, Ax = spatial_autoregressive_mf(imgGray, sparse_img, rank, rho, lmbda, maxiter)

    mat_hat[mat_hat < 0] = 0
    mat_hat[mat_hat > 1] = 1
    io.imshow(mat_hat)
    plt.axis('off')
    plt.imsave('gaint_panda_gray_recovery_90_spatial_ar_mf_rank_{}.png'.format(rank), 
              mat_hat, cmap = plt.cm.gray)
    plt.show()

例. 根据卷积定理计算循环卷积。

import numpy as np

x = np.array([0, 1, 2, 3, 4])
y = np.array([2, -1, 3])
fx = np.fft.fft(x)
fy = np.fft.fft(np.append(y, np.zeros(2), axis = 0))
z = np.fft.ifft(fx * fy).real

例. 根据卷积定理计算向量y

import numpy as np

x = np.array([0, 1, 2, 3, 4])
z = np.array([5, 14, 3, 7, 11])
fx = np.fft.fft(x)
fz = np.fft.fft(z)
y = np.fft.ifft(fz / fx).real

例. 根据卷积定理计算二维循环卷积。

import numpy as np

X = np.array([[1, 2, 3, 4], [4, 5, 6, 7], 
              [7, 8, 9, 10], [10, 11, 12, 13]])
K = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
pad_K = np.zeros(X.shape)
pad_K[: K.shape[0], : K.shape[1]] = K
Y = np.fft.ifft2(np.fft.fft2(X) * np.fft.fft2(pad_K)).real

例. 计算矩阵的F范数与傅立叶变换的F范数。

import numpy as np

X = np.array([[5, 6, 7], [8, 9, 10]])
print('The squared Frobenius norm of the matrix X:')
print(np.linalg.norm(X, 'fro') ** 2)
print()
print('The squared Frobenius norm of the matrix Fx:')
print(np.linalg.norm(np.fft.fft2(X), 'fro') ** 2)
print()

例. 计算循环矩阵的奇异值与L1范数。

import numpy as np

def circ_mat(vec):
    n = vec.shape[0]
    mat = np.zeros((n, n))
    mat[:, 0] = vec
    for k in range(1, n):
        mat[:, k] = np.append(vec[n - k :], vec[: n - k], axis = 0)
    return mat

x = np.array([0, 1, 2, 3, 4])
Cx = circ_mat(x)
s = np.linalg.svd(Cx, full_matrices = False, compute_uv = False)
print('Singular values of the circulant matrix Cx:')
print(s)
print()
print('Discrete Fourier transform of the vector x:')
print(np.fft.fft(x))
print()

例. 对循环矩阵核范数最小化问题进行求解。

import numpy as np

z = np.array([0, 1, 2, 3, 4])
lmbda = 2
T = z.shape[0]

h_hat = np.fft.fft(z)
print('h_hat = ')
print(h_hat)
print()
h_hat_abs = np.abs(h_hat)
print('h_hat_abs = ')
print(h_hat_abs)
print()

temp = h_hat_abs - T / lmbda
temp[temp <= 0] = 0
x = np.fft.ifft(h_hat / h_hat_abs * temp).real
print('x = ')
print(x)
print()
print('The objective function is: ')
print(np.sum(np.abs(np.fft.fft(x))) 
      + 0.5 * lmbda * np.linalg.norm(x - z, 2) ** 2)
print()

例. 使用循环矩阵核范数最小化算法对灰度图像进行复原。

import numpy as np

def compute_rse(var, var_hat):
    return np.linalg.norm(var - var_hat, 2) / np.linalg.norm(var, 2)

def prox(z, w, lmbda):
    T = z.shape[0]
    temp = np.fft.fft(z - w / lmbda)
    temp1 = np.abs(temp) - T / lmbda
    temp1[temp1 <= 0] = 0
    return np.fft.ifft(temp / np.abs(temp) * temp1).real

def update_z(y_train, pos_train, x, w, lmbda, eta):
    z = x + w / lmbda
    z[pos_train] = (lmbda / (lmbda + eta) * z[pos_train] 
                    + eta / (lmbda + eta) * y_train)
    return z

def update_w(x, z, w, lmbda):
    return w + lmbda * (x - z)

def circ_nnm(y_true, y, lmbda, eta, maxiter = 50):
    pos_train = np.where(y != 0)
    y_train = y[pos_train]
    pos_test = np.where((y_true != 0) & (y == 0))
    y_test = y_true[pos_test]
    z = y.copy()
    w = y.copy()
    del y_true, y
    show_iter = 10
    for it in range(maxiter):
        x = prox(z, w, lmbda)
        z = update_z(y_train, pos_train, x, w, lmbda, eta)
        w = update_w(x, z, w, lmbda)
        if (it + 1) % show_iter == 0:
            print(it + 1)
            print(compute_rse(y_test, x[pos_test]))
            print()
    return x
import numpy as np
np.random.seed(1)
import matplotlib.pyplot as plt
from skimage import color
from skimage import io

img = io.imread('data/gaint_panda.bmp')
imgGray = color.rgb2gray(img)
M, N = imgGray.shape
missing_rate = 0.9

sparse_img = imgGray * np.round(np.random.rand(M, N) + 0.5 - missing_rate)
io.imshow(sparse_img)
plt.axis('off')
plt.show()
lmbda = 1e-3 * M * N
eta = 100 * lmbda
maxiter = 100
vec_hat = circ_nnm(imgGray.reshape(M * N, order = 'F'), 
                   sparse_img.reshape(M * N, order = 'F'), 
                   lmbda, eta, maxiter)

vec_hat[vec_hat < 0] = 0
vec_hat[vec_hat > 1] = 1
io.imshow(vec_hat.reshape([M, N], order = 'F'))
plt.axis('off')
plt.imsave('gaint_panda_gray_recovery_90_circ_nnm.png', 
           vec_hat.reshape([M, N], order = 'F'), cmap = plt.cm.gray)
plt.show()

例. 使用一维低秩拉普拉斯卷积模型对车速时间序列进行重构。

import numpy as np

def compute_mape(var, var_hat):
    return np.sum(np.abs(var - var_hat) / var) / var.shape[0]

def compute_rmse(var, var_hat):
    return np.sqrt(np.sum((var - var_hat) ** 2) / var.shape[0])

def laplacian(n, tau):
    ell = np.zeros(n)
    ell[0] = 2 * tau
    for k in range(tau):
        ell[k + 1] = -1
        ell[-k - 1] = -1
    return ell

def prox(z, w, lmbda, denominator):
    T = z.shape[0]
    temp = np.fft.fft(lmbda * z - w) / denominator
    temp1 = np.abs(temp) - T / lmbda
    temp1[temp1 <= 0] = 0
    return np.fft.ifft(temp / np.abs(temp) * temp1).real

def update_z(y_train, pos_train, x, w, lmbda, eta):
    z = x + w / lmbda
    z[pos_train] = (lmbda / (lmbda + eta) * z[pos_train] 
                    + eta / (lmbda + eta) * y_train)
    return z

def update_w(x, z, w, lmbda):
    return w + lmbda * (x - z)

def LCR(y_true, y, lmbda, gamma, tau, maxiter = 50):
    eta = 100 * lmbda
    T = y.shape
    pos_train = np.where(y != 0)
    y_train = y[pos_train]
    pos_test = np.where((y_true != 0) & (y == 0))
    y_test = y_true[pos_test]
    z = y.copy()
    w = y.copy()
    denominator = lmbda + gamma * np.fft.fft(laplacian(T, tau)) ** 2
    del y_true, y
    show_iter = 10
    for it in range(maxiter):
        x = prox(z, w, lmbda, denominator)
        z = update_z(y_train, pos_train, x, w, lmbda, eta)
        w = update_w(x, z, w, lmbda)
        if (it + 1) % show_iter == 0:
            print(it + 1)
            print(compute_mape(y_test, x[pos_test]))
            print(compute_rmse(y_test, x[pos_test]))
            print()
    return x
import numpy as np
np.random.seed(1)
import time

missing_rate = 0.9
print('Missing rate = {}'.format(missing_rate))

dense_vec = np.load('sample_speed_time_series.npz')['arr_0']
T = dense_vec.shape[0]
sparse_vec = dense_vec * np.round(np.random.rand(T) + 0.5 - missing_rate)

import time
start = time.time()
lmbda = 5e-3 * T
gamma = 2 * lmbda
tau = 2
maxiter = 100
x = LCR(dense_vec, sparse_vec, lmbda, gamma, tau, maxiter)
end = time.time()
print('Running time: %d seconds.'%(end - start))
import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 13})

fig = plt.figure(figsize = (7.5, 2.2))
ax = fig.add_subplot(111)
plt.plot(dense_vec[: T], 'dodgerblue', linewidth = 2)
plt.xlabel('Time')
plt.ylabel('Speed (mph)')
plt.xlim([0, T])
plt.ylim([54, 65])
plt.xticks(np.arange(0, T + 1, 24))
plt.yticks(np.arange(54, 66, 2))
plt.grid(linestyle = '-.', linewidth = 0.5)
ax.tick_params(direction = 'in')

plt.savefig('freeway_traffic_speed_obs.pdf', bbox_inches = "tight")
plt.show()

import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 13})

fig = plt.figure(figsize = (7.5, 2.2))
ax = fig.add_subplot(111)
plt.plot(np.arange(0, T), sparse_vec[: T], 'o', 
         markeredgecolor = 'darkblue', alpha = missing_rate,
         markerfacecolor = 'deepskyblue', markersize = 10)
plt.xlabel('Time')
plt.ylabel('Speed (mph)')
plt.xlim([0, T])
plt.ylim([54, 65])
plt.xticks(np.arange(0, T + 1, 24))
plt.yticks(np.arange(54, 66, 2))
plt.grid(linestyle = '-.', linewidth = 0.5)
ax.tick_params(direction = 'in')

plt.savefig('freeway_traffic_speed_partial_obs.pdf', bbox_inches = "tight")
plt.show()

import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 13})

fig = plt.figure(figsize = (7.5, 2.2))
ax = fig.add_subplot(111)
plt.plot(np.arange(0, T), sparse_vec[: T], 'o', 
         markeredgecolor = 'darkblue', alpha = missing_rate,
         markerfacecolor = 'deepskyblue', markersize = 10)
plt.plot(x[: T], 'red', linewidth = 4)
plt.xlabel('Time')
plt.ylabel('Speed (mph)')
plt.xlim([0, T])
plt.ylim([54, 65])
plt.xticks(np.arange(0, T + 1, 24))
plt.yticks(np.arange(54, 66, 2))
plt.grid(linestyle = '-.', linewidth = 0.5)
ax.tick_params(direction = 'in')

plt.savefig('freeway_traffic_speed_reconstructed_lap_conv.pdf',
           bbox_inches = "tight")
plt.show()

import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 13})

fig = plt.figure(figsize = (7.5, 2.2))
ax = fig.add_subplot(111)
plt.plot(dense_vec[: T], 'dodgerblue', linewidth = 1.5)
plt.plot(np.arange(0, T), sparse_vec[: T], 'o', 
         markeredgecolor = 'darkblue', alpha = missing_rate,
         markerfacecolor = 'deepskyblue', markersize = 10)
plt.plot(x[: T], 'red', linewidth = 4)
plt.xlabel('Time')
plt.ylabel('Speed (mph)')
plt.xlim([0, T])
plt.ylim([54, 65])
plt.xticks(np.arange(0, T + 1, 24))
plt.yticks(np.arange(54, 66, 2))
plt.grid(linestyle = '-.', linewidth = 0.5)
ax.tick_params(direction = 'in')

plt.savefig('freeway_traffic_speed_reconstructed_vs_true_lap_conv.pdf',
           bbox_inches = "tight")
plt.show()

例. 使用一维低秩拉普拉斯卷积模型对灰度图像进行复原。

import numpy as np

def compute_rse(var, var_hat):
    return np.linalg.norm(var - var_hat, 2) / np.linalg.norm(var, 2)

def laplacian(n, tau):
    ell = np.zeros(n)
    ell[0] = 2 * tau
    for k in range(tau):
        ell[k + 1] = -1
        ell[-k - 1] = -1
    return ell

def prox(z, w, lmbda, denominator):
    T = z.shape[0]
    temp = np.fft.fft(lmbda * z - w) / denominator
    temp1 = 1 - T / (lmbda * np.abs(temp))
    temp1[temp1 <= 0] = 0
    return np.fft.ifft(temp * temp1).real

def update_z(y_train, pos_train, x, w, lmbda, eta):
    z = x + w / lmbda
    z[pos_train] = (lmbda / (lmbda + eta) * z[pos_train] 
                    + eta / (lmbda + eta) * y_train)
    return z

def update_w(x, z, w, lmbda):
    return w + lmbda * (x - z)

def lap_conv_1d(y_true, y, gamma, lmbda, eta, tau, maxiter = 50):
    T = y.shape
    pos_train = np.where(y != 0)
    y_train = y[pos_train]
    pos_test = np.where((y_true != 0) & (y == 0))
    y_test = y_true[pos_test]
    z = y.copy()
    w = y.copy()
    denominator = lmbda + gamma * np.fft.fft(laplacian(T, tau)) ** 2
    del y_true, y
    show_iter = 10
    for it in range(maxiter):
        x = prox(z, w, lmbda, denominator)
        z = update_z(y_train, pos_train, x, w, lmbda, eta)
        w = update_w(x, z, w, lmbda)
        if (it + 1) % show_iter == 0:
            print(it + 1)
            print(compute_rse(y_test, x[pos_test]))
            print()
    return x
import numpy as np
np.random.seed(1)
import matplotlib.pyplot as plt
from skimage import color
from skimage import io

img = io.imread('data/gaint_panda.bmp')
imgGray = color.rgb2gray(img)
M, N = imgGray.shape
missing_rate = 0.9

sparse_img = imgGray * np.round(np.random.rand(M, N) + 0.5 - missing_rate)
io.imshow(sparse_img)
plt.axis('off')
plt.show()
lmbda = 5e-3 * M * N
gamma = 1 * lmbda
eta = 100 * lmbda
tau = 1
maxiter = 100
vec_hat = lap_conv_1d(imgGray.reshape(M * N, order = 'F'), 
                      sparse_img.reshape(M * N, order = 'F'), 
                      gamma, lmbda, eta, tau, maxiter)

vec_hat[vec_hat < 0] = 0
vec_hat[vec_hat > 1] = 1
io.imshow(vec_hat.reshape([M, N], order = 'F'))
plt.axis('off')
plt.imsave('gaint_panda_gray_recovery_90_lap_conv_1d.png', 
           vec_hat.reshape([M, N], order = 'F'), cmap = plt.cm.gray)
plt.show()

例. 使用二维低秩拉普拉斯卷积模型对灰度图像进行复原。

import numpy as np

def compute_rse(var, var_hat):
    return np.linalg.norm(var - var_hat, 2) / np.linalg.norm(var, 2)

def laplacian(T, tau):
    ell = np.zeros(T)
    ell[0] = 2 * tau
    for k in range(tau):
        ell[k + 1] = -1
        ell[-k - 1] = -1
    return ell

def prox_2d(z, w, lmbda, denominator):
    M, N = z.shape
    temp = np.fft.fft2(lmbda * z - w) / denominator
    temp1 = 1 - M * N / (lmbda * np.abs(temp))
    temp1[temp1 <= 0] = 0
    return np.fft.ifft2(temp * temp1).real

def update_z(y_train, pos_train, x, w, lmbda, eta):
    z = x + w / lmbda
    z[pos_train] = (lmbda / (lmbda + eta) * z[pos_train] 
                    + eta / (lmbda + eta) * y_train)
    return z

def update_w(x, z, w, lmbda):
    return w + lmbda * (x - z)

def laplacian_conv_2d(y_true, y, lmbda, gamma, eta, tau, maxiter = 50):
    M, N = y.shape
    pos_train = np.where(y != 0)
    y_train = y[pos_train]
    pos_test = np.where((y_true != 0) & (y == 0))
    y_test = y_true[pos_test]
    z = y.copy()
    w = y.copy()
    ell_1 = laplacian(M, tau)
    ell_2 = laplacian(N, tau)
    denominator = lmbda + gamma * np.fft.fft2(np.outer(ell_1, ell_2)) ** 2
    del y_true, y
    show_iter = 10
    for it in range(maxiter):
        x = prox_2d(z, w, lmbda, denominator)
        z = update_z(y_train, pos_train, x, w, lmbda, eta)
        w = update_w(x, z, w, lmbda)
        if (it + 1) % show_iter == 0:
            print(it + 1)
            print(compute_rse(y_test, x[pos_test]))
            print()
    return x
import numpy as np
np.random.seed(1)
import matplotlib.pyplot as plt
from skimage import color
from skimage import io

img = io.imread('data/gaint_panda.bmp')
imgGray = color.rgb2gray(img)
M, N = imgGray.shape
missing_rate = 0.9

sparse_img = imgGray * np.round(np.random.rand(M, N) + 0.5 - missing_rate)
io.imshow(sparse_img)
plt.axis('off')
plt.imsave('gaint_panda_gray_missing_rate_90.png', 
           sparse_img, cmap = plt.cm.gray)
plt.show()
lmbda = 5e-3 * M * N
gamma = 1 * lmbda
eta = 100 * lmbda
tau = 2
maxiter = 100
mat_hat = laplacian_conv_2d(imgGray, sparse_img, lmbda, gamma, eta, tau, maxiter)

mat_hat[mat_hat < 0] = 0
mat_hat[mat_hat > 1] = 1
io.imshow(mat_hat)
plt.axis('off')
plt.imsave('gaint_panda_gray_recovery_90_gamm1_tau{}.png'.format(tau), 
           mat_hat, cmap = plt.cm.gray)
plt.show()

例. 计算延迟嵌入矩阵的奇异值分解、还原向量x

import numpy as np

def delay_embedding(vec, kernel_size):
    n = vec.shape[0]
    mat = np.zeros((n, kernel_size))
    mat[:, 0] = vec
    for k in range(1, kernel_size):
        mat[:, k] = np.append(vec[k :], vec[: k], axis = 0)
    return mat

vec = np.array([0, 1, 2, 3, 4])
T = vec.shape[0]
kernel_size = 3
mat = delay_embedding(vec, kernel_size)

u, s, v = np.linalg.svd(mat, full_matrices = False)
x_hat = np.zeros(T)
for r in range(kernel_size):
    fu = np.fft.fft(u[:, r])
    fv = np.fft.fft(np.append(v[r, :], np.zeros(T - kernel_size), axis = 0))
    x_hat += s[r] * np.fft.ifft(fu * fv).real
x_hat = x_hat / kernel_size
print(x_hat)

或者

import numpy as np

def CircularConv(x, kernel):
    m = x.shape[0]
    n = kernel.shape[0]
    vec = np.zeros(m)
    for t in range(m):
        temp = 0
        for k in range(n):
            temp += x[t - k] * kernel[k]
        vec[t] = temp
    return vec

def DelayEmbedding(vec, kernel_size):
    n = vec.shape[0]
    mat = np.zeros((n, kernel_size))
    mat[:, 0] = vec
    for k in range(1, kernel_size):
        mat[:, k] = np.append(vec[k :], vec[: k], axis = 0)
    return mat

x = np.array([0, 1, 2, 3, 4])
kernel_size = 3
mat = DelayEmbedding(x, kernel_size)
u, s, v = np.linalg.svd(mat, full_matrices = False)
temp1 = s[0] * CircularConv(u[:, 0], v[0, :]) / 3
print(temp1)
print()
temp2 = s[1] * CircularConv(u[:, 1], v[1, :]) / 3
print(temp2)
print()
temp3 = s[2] * CircularConv(u[:, 2], v[2, :]) / 3
print(temp3)
print()
print(temp1 + temp2 + temp3)
print()

核范数最小化问题

▴ 回到顶部

import numpy as np

def circ_mat(vec):
    n = vec.shape[0]
    mat = np.zeros((n, n))
    mat[:, 0] = vec
    for k in range(1, n):
        mat[:, k] = np.append(vec[n - k :], vec[: n - k], axis = 0)
    return mat

def inv_circ_mat(mat):
    n = mat.shape[0]
    vec = mat[:, 0]
    for k in range(1, n):
        vec += np.append(mat[k :, k], mat[: k, k], axis = 0)
    return vec / n

z = np.array([0, 1, 2, 3, 4])
T = z.shape[0]
lmbda = 2
mat = circ_mat(z)
u, s, v = np.linalg.svd(mat, full_matrices = False)
s = s - T / lmbda
s[s < 0] = 0
temp = u @ np.diag(s) @ v
print('The result of singular value thresholding:')
print(temp)
print()
print('The inverse operator of the matrix:')
x = inv_circ_mat(temp)
print(x)
print()
print('The objective function is: ')
print(np.sum(np.abs(np.fft.fft(x))) 
      + 0.5 * lmbda * np.linalg.norm(x - z, 2) ** 2)
print()
def conv_mat(vec, kernel_size):
    n = vec.shape[0]
    mat = np.zeros((n, kernel_size))
    mat[:, 0] = vec
    for k in range(1, kernel_size):
        mat[:, k] = np.append(vec[n - k :], vec[: n - k], axis = 0)
    return mat

def inv_conv_mat(mat):
    tau = mat.shape[1]
    vec = mat[:, 0]
    for k in range(1, tau):
        vec += np.append(mat[k :, k], mat[: k, k], axis = 0)
    return vec / tau

About

张量计算教程 (Tensor Computations: Tutorial)

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published