Source code for pygappy.pca_projection

# -*- coding: utf-8 -*-
"""

Authors
-------
John Weaver <john.weaver.astro@gmail.com>


About
-----
Robust Principal Component Analysis projection, and tools

Based on IDL implementation by Vivienne Wild

References:
  [1] Connolly & Szalay (1999, AJ, 117, 2052)
  http://www.journals.uchicago.edu/AJ/journal/issues/v117n5/980466/980466.html
  [2] Lemson, "Normalized gappy PCA projection"


Known Issues
------------
None


"""

# ------------------------------------------------------------------------------
# Standard Packages
# ------------------------------------------------------------------------------
import os
import numpy as np
import scipy.io as sciio
import pygappy.visualization as pcplot
import matplotlib.patches as patches
from matplotlib.path import Path

# ------------------------------------------------------------------------------
# Main Program
# ------------------------------------------------------------------------------
[docs]def eigenbasis(verbose=False): """ Returns wavelengths, eigenspectra, mean spectrum, and variance for Wild+07 PCA eigensystem. Parameters ---------- verbose : bool, optional Enable for status and debug messages. Default is ''False''. Returns ------- wave : ndarray Wavelength array of 'float' type. spec : ndarray 2D array of eiegenspectra. mean : ndarray Mean spectrum array. var : ndarray Variance array corresponding to the mean spectrum. """ dir_pca = os.path.dirname(__file__).split('/')[:-1] fname_pca = "data/pcavo_espec_25.sav" path_pca = os.path.join('/'.join(dir_pca), fname_pca) pcavo = sciio.readsav(path_pca) if verbose: print(f"[vwpca_eigenbasis] STATUS: opened {fname_pca}") wave = pcavo['wave'] spec = pcavo['espec'] mean = pcavo['meanarr'] var = pcavo['vararr'] if verbose: print(f"[vwpca_eigenbasis] STATUS: returning eigensystem") return wave, spec, mean, var
[docs]def gappy(data, error, espec, mean, cov=None, \ reconstruct=False, verbose=False): """ Performs robust PCA projection. Parameters ---------- data : ndarray 1D spectrum or 2D specta with 'float' type. error : ndarray 1D or 2D corresponding 1-sigma error array. Zeros indicate masked data. espec : ndarray 2D array of eigenspectra, possibly truncated in dimension. mean : ndarray 1D mean spectrum of the eigenspectra. cov: bool, optional Return covariance matrix. Default is ''False''. reconstruct : bool, optional Fill in missing values with PCA estimation. Default is ''False''. verbose : bool, optional Enable for status and debug messages. Default is ''False'' Returns ------- pcs : ndrray 1D or 2D array of Principal Components with 'float' type. data : ndrray If reconstruct enabled, 1D or 2D reconstructed spectra. ccov : ndarray If cov enabled, 2D or 3D covariance matrices. """ # Sanity checks if (np.size(data) == 0) | (np.size(error) == 0) | (np.size(espec) == 0) | ( np.size(mean) == 0): print('[pca_gappy] ERROR: incorrect input lengths') return None tmp = np.shape(espec) # number of eigenvectors if np.size(tmp) == 2: nrecon = tmp[0] else: nrecon = 1 nbin = np.shape(espec)[-1] # number of data points tmp = np.shape(data) # number of observations to project if np.size(tmp) == 2: ngal = tmp[0] else: ngal = 1 # Dimension mismatch check if np.shape(data)[-1] != nbin: print( '[pca_gappy] ERROR: "data" must have the same dimension as eigenvectors' ) return None if np.shape(error)[-1] != nbin: print( '[pca_gappy] ERROR: "error" must have the same dimension as eigenvectors' ) return None if np.shape(mean)[0] != nbin: print( '[pca_gappy] ERROR: "mean" must have the same dimension as eigenvectors' ) return None # Project each galaxy in turn pcs = np.zeros((ngal, nrecon)) if cov is not None: ccov = np.zeros((ngal, nrecon, nrecon)) for j in np.arange(0, ngal): if ngal == 1: data = data[np.newaxis, :] error = error[np.newaxis, :] if verbose: print('[pca_gappy] STATUS: processing spectrum ') # Calculate weighting array from 1-sig error array # ! if all bins have error=0 continue to next spectrum weight = np.zeros(nbin) ind = np.where(error[j, :] != 0.)[0] if np.size(ind) != 0: weight[ind] = 1. / (error[j, :][ind]**2) else: if verbose: print( '[pca_gappy] WARNING: error array problem in spectrum (setting pcs=0)' ) continue ind = np.where(np.isfinite(weight) is False)[0] if np.size(ind) != 0: if verbose: print( '[pca_gappy] WARNING: error array problem in spectrum (setting pcs=0)' ) continue # Subtract mean from data data_j = data[j, :] - mean # Calculate the weighted eigenvectors, multiplied by the eigenvectors (eq. 4-5 [1]) if nrecon > 1: # CONSERVED MEMORY NOT IMPLEMETED M = np.dot((espec * weight), espec.T) # Calculate the weighted data array, multiplied by the eigenvectors (eq. 4-5 [1]) F = np.dot((data_j * weight), espec.T) # Solve for Principle Component Amplitudes (eq. 5 [1]) try: Minv = np.linalg.inv(M) except: if verbose: print( '[pca_gappy] STATUS: problem with matrix inversion (setting pcs=0)' ) continue pcs[j, :] = np.dot(F, Minv) # Calculate covariance matrix (eq. 6 [1]) if cov is True: ccov[j, :, :] = Minv else: # if only one eigenvector M = np.sum(weight * espec * espec) F = np.sum(weight * data_j * espec) pcs[j, 0] = F / M if cov is True: ccov[j, 0, 0] = np.sum((1. / weight) * espec * espec) # If reconstruction of data array required, # fill in regions with weight = 0 with PCA reconstruction if reconstruct is True: bad_pix = np.where(weight == 0.)[0] count = np.size(bad_pix) if count == 0: continue rreconstruct = np.sum((pcs[j, :] * espec[:, bad_pix].T).T, 0) rreconstruct += mean[bad_pix] data[j, bad_pix] = rreconstruct if ngal == 1: pcs = pcs[0] data = data[0] if cov: ccov = ccov[0] # Report to user if verbose: print("[pca_gappy] STATUS: Results...") for i, pc in enumerate(pcs): print(f" PCA{i+1}: {pc:2.5f}") # Return if reconstruct is True: if cov is True: return pcs, data, ccov else: return pcs, data elif cov is True: return pcs, ccov else: return pcs
[docs]def normgappy(data, error, espec, mean, cov=False, \ reconstruct=False, verbose=False): """ Performs robust PCA projection, including normalization estimation. Parameters ---------- data : ndarray 1D spectrum or 2D specta with 'float' type. error : ndarray 1D or 2D corresponding 1-sigma error array. Zeros indicate masked data. espec : ndarray 2D array of eigenspectra, possibly truncated in dimension. mean : ndarray 1D mean spectrum of the eigenspectra. cov: bool, optional Return covariance matrix. Default is ''False''. reconstruct : bool, optional Fill in missing values with PCA estimation. Default is ''False''. verbose : bool, optional Enable for status and debug messages. Default is ''False'' Returns ------- pcs : ndrray 1D or 2D array of Principal Components with 'float' type. norm: float or ndarray Normalization estimates. data : ndrray If reconstruct enabled, 1D or 2D reconstructed spectra. ccov : ndarray If cov enabled, 2D or 3D covariance matrices. """ # Sanity checks if (np.size(data) == 0) | (np.size(error) == 0) | (np.size(espec) == 0) | ( np.size(mean) == 0): print('[pca_normgappy] ERROR: incorrect input lengths') return None tmp = np.shape(espec) # number of eigenvectors if np.size(tmp) == 2: nrecon = tmp[0] else: nrecon = 1 nbin = np.shape(espec)[-1] # number of data points tmp = np.shape(data) # number of observations to project if np.size(tmp) == 2: ngal = tmp[0] else: ngal = 1 # Dimension mismatch check if np.shape(data)[-1] != nbin: print( '[pca_normgappy] ERROR: "data" must have the same dimension as eigenvectors' ) return None if np.shape(error)[-1] != nbin: print( '[pca_normgappy] ERROR: "error" must have the same dimension as eigenvectors' ) return None if np.shape(mean)[0] != nbin: print( '[pca_normgappy] ERROR: "mean" must have the same dimension as eigenvectors' ) return None # Project each galaxy in turn pcs = np.zeros((ngal, nrecon), float) norm = np.zeros(ngal, float) if cov is not None: ccov = np.zeros((ngal, nrecon, nrecon)) if ngal == 1: data = data[np.newaxis, :] error = error[np.newaxis, :] for j in np.arange(0, ngal): if verbose: print('[pca_normgappy] STATUS: processing spectrum ') # Calculate weighting array from 1-sig error array # ! if all bins have error=0 continue to next spectrum weight = np.zeros(nbin) ind = error[j, :].nonzero()[0] if np.size(ind) != 0: try: weight[ind] = 1. / (error[j, :][ind]**2) except: if verbose: print( '[pca_normgappy] ERROR: error array problem in spectrum (setting pcs=0)' ) continue ind = np.where(np.isfinite(weight) is False)[0] if np.size(ind) != 0: if verbose: print( '[pca_normgappy] ERROR: error array problem in spectrum (setting pcs=0)' ) continue data_j = data[j, :] # Solve partial chi^2/partial N = 0 Fpr = np.sum(weight * data_j * mean) # eq 4 [2] Mpr = np.sum(weight * mean * mean) # eq 5 [2] E = np.sum((weight * mean) * espec, axis=1) # eq 6 [2] # Calculate the weighted eigenvectors, multiplied by the eigenvectors (eq. 4-5 [1]) if nrecon > 1: # CONSERVED MEMORY NOT IMPLEMETED espec_big = np.repeat(espec[:, np.newaxis, :], nrecon, axis=1) M = np.sum(weight * np.transpose(espec_big, (1, 0, 2)) * espec_big, 2) # Calculate the weighted data array, multiplied by the eigenvectors (eq. 4-5 [1]) F = np.dot((data_j * weight), espec.T) # Calculate new M matrix, this time accounting for the unknown normalization (eq. 11 [2]) E_big = np.repeat(E[np.newaxis, :], nrecon, axis=0) F_big = np.repeat(F[:, np.newaxis], nrecon, axis=1) Mnew = Fpr * M - E_big * F_big # Calculate the new F matrix, accounting for unknown normalization Fnew = Mpr * F - Fpr * E # Solve for Principle Component Amplitudes (eq. 5 [1]) try: Minv = np.linalg.inv(Mnew) except: if verbose: print( '[pca_normgappy] STATUS: problem with matrix inversion (setting pcs=0)' ) continue pcs[j, :] = np.squeeze(np.sum(Fnew * Minv, 1)) norm[j] = Fpr / (Mpr + np.sum(pcs[j, :] * E)) # Calculate covariance matrix (eq. 6 [1]) if cov is True: M_gappy = np.dot((espec * (weight * norm[j]**2)), espec.T) ccov[j, :, :] = np.linalg.inv(M_gappy) else: # if only one eigenvector M = np.sum(weight * espec * espec) F = np.sum(weight * data_j * espec) Mnew = M * Fpr - E * F Fnew = Mpr * F - E * Fpr pcs[j, 0] = Fnew / Mnew norm[j] = Fpr / (Mpr + pcs[j, 0] * E) if cov is True: ccov[j, 0, 0] = np.sum((1. / weight) * espec * espec) # If reconstruction of data array required, # fill in regions with weight = 0 with PCA reconstruction if reconstruct is True: bad_pix = np.where(weight == 0.) count = np.size(bad_pix) if count == 0: continue rreconstruct = np.sum((pcs[j, :] * espec[:, bad_pix].T).T, 0) rreconstruct += mean[bad_pix] data[j, bad_pix] = reconstruct if ngal == 1: pcs = pcs[0] data = data[0] norm = norm[0] if cov: ccov = ccov[0] # Report to user if verbose: print("[pca_normgappy] STATUS: Results...") for i, pc in enumerate(pcs): print(f" PCA{i+1}: {pc:2.5f}") print(f" Norm: {norm:2.5f}") # Return if reconstruct is True: if cov is True: return pcs, norm, data, ccov else: return pcs, norm, data elif cov is True: return pcs, norm, ccov else: return pcs, norm
[docs]def mc_errors(data, error, espec, emean, Ntrials=100, verbose=False): """ Performs Monte-Carlo error estimation assuming a normal distribution. The formal 1-sigma errors from the (norm)gappy cov matrix should match. Parameters ---------- data : ndarray 1D spectrum or 2D specta with 'float' type. error : ndarray 1D or 2D corresponding 1-sigma error array. Zeros indicate masked data. espec : ndarray 2D array of eigenspectra, possibly truncated in dimension. mean : ndarray 1D mean spectrum of the eigenspectra. Ntrials: int, optional Number of trials to perform Default is 100. verbose : bool, optional Enable for status and debug messages. Default is ''False'' Returns ------- pc_errors : ndrray 1D array of Principal Component errors with 'float' type. """ mcdata = np.random.normal(data, error, size=(Ntrials, len(data))) mcerror = error[None,:].repeat(Ntrials, 0) mcpcs = gappy( mcdata, mcerror, espec, emean, cov=False, verbose=verbose) pc_errors = np.std(mcpcs, 0) return pc_errors
[docs]def get_class(pcs, logmass=1.): """ Classifies location in PC Plane. Parameters ---------- pcs : float or np.ndarray List of PC1/2 values of type float. logmass : float, optional Log10 stellar mass, used to select suitable region patches. Default is '1.', for low-mass. Returns ------- output : float or ndarray Classes for input pc sets. """ output = None if np.ndim(pcs) == 2: output = np.zeros(np.shape(pcs)[0], dtype='U10') pcs[:,1] = - pcs[:,1] else: pcs[1] = - pcs[1] # Decide mass range if (logmass < 10): pca_patches = pcplot.get_patches_lowmass() else: pca_patches = pcplot.get_patches_highmass() for patch_verts in pca_patches: path = Path(patch_verts[0]) klass = patch_verts[1][0] if np.ndim(pcs) == 2: for i, pc in enumerate(pcs): if path.contains_point(pc[:2]): output[i] = klass else: if path.contains_point(pcs[:2]): output = klass return output