Source code for pyvisgrid.core.gridder

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING

import h5py
import numpy as np
from astropy.constants import c
from astropy.io import fits
from astropy.time import Time
from casacore.tables import table
from numpy.exceptions import AxisError
from numpy.typing import ArrayLike
from tqdm.auto import tqdm

if TYPE_CHECKING:
    import matplotlib
    from pyvisgen.simulation import Observation, Visibilities
    from radiotools.layouts import Layout

try:
    import pandas as pd
    from radiotools.layouts import Layout

    include_array_layout = True
except ImportError:
    include_array_layout = False


import pyvisgrid.plotting as plotting
from pyvisgrid.core.stokes import get_stokes_from_vis_data

__all__ = ["GridData", "Gridder", "GridDataSeries"]


[docs] @dataclass class GridData: """DataClass to save the gridded and non-gridded visibilities for a specific Stokes component. Attributes ---------- vis_data : numpy.ndarray The ungridded visibilities of shape ``(N_MEASUREMENTS * N_CHANNELS,)``. fov : float The size of the Field Of View of the gridded data in arcseconds. mask : numpy.ndarray, optional The mask created from the given (u,v) coordinates. The mask contains the number of (u,v) coordinates per pixel. mask_real : numpy.ndarray, optional The gridded real part of the visibilites. mask_imag : numpy.ndarray, optional The gridded imaginary part of the visibilities. dirty_img : numpy.ndarray, optional The complex Dirty Image. This is the 2-dimensional Fourier transform of the gridded visibilities. """ vis_data: np.ndarray fov: float | None = None mask: np.ndarray | None = None mask_real: np.ndarray | None = None mask_imag: np.ndarray | None = None dirty_image: np.ndarray | None = None def __str__(self) -> str: return self.__dict__
[docs] def copy(self) -> GridData: return GridData( vis_data=self.vis_data, fov=self.fov, mask=self.mask, mask_real=self.mask_real, mask_imag=self.mask_imag, dirty_image=self.dirty_image, )
[docs] def get_mask_complex(self) -> np.ndarray: """Returns the gridded mask as a complex array with the form ``mask.real + 1j * mask.imag``. Returns ------- numpy.ndarray Complex mask """ return self.mask_real + 1j * self.mask_imag
[docs] def get_mask_abs_phase(self) -> tuple[np.ndarray]: """Returns the gridded mask as amplitude and phase. Returns ------- tuple[numpy.ndarray] Amplitude and phase of the complex mask. """ mask_complex = self.get_mask_complex() return np.abs(mask_complex), np.angle(mask_complex)
[docs] def get_mask_real_imag(self) -> tuple[np.ndarray]: """Returns the gridded mask as real and imaginary parts. Returns ------- tuple[numpy.ndarray] Real and parts of the complex mask. """ return self.mask_real, self.mask_imag
[docs] class GridDataSeries: """DataClass to save the gridded and non-gridded visibilities for a specific Stokes component for different time steps. This enables to create a time series of different steps in an observation. Attributes ---------- grid_data : GridData The GridData of the full observation. times : numpy.ndarray The time steps of the observation. u_wave : np.ndarray The :math:`u` coordinates as a multiple of the wavelength. u_wave : np.ndarray The :math:`v` coordinates as a multiple of the wavelength. step_size : int The size of each timestep (how many steps in the observation are in each iteration of the series). time_cutoff_idx : int | None, optional The index to stop the series at. Default is ``None``. """ def __init__( self, grid_data: GridData, u_wave: np.ndarray, v_wave: np.ndarray, times: np.ndarray, num_frequencies: int, step_size: int, time_start_idx: int = 0, time_end_idx: int | None = None, ): self._grid_data_full: GridData = grid_data self._grid_data: list[GridData] = [] self._u_wave: np.ndarray = u_wave self._v_wave: np.ndarray = v_wave self._num_frequencies: int = num_frequencies self._times_full: np.ndarray = times self.num_steps = self._times_full.size // self._num_frequencies self._step_idx: np.ndarray = np.arange(self.num_steps)[ time_start_idx:time_end_idx ][::step_size] self._times_idx: np.ndarray = np.tile( np.arange(0, self.num_steps), reps=self._num_frequencies, ) self._iter_idx: int = 0 def __len__(self) -> int: return self._step_idx.size def __getitem__(self, i) -> list[GridData, np.ndarray, np.ndarray, np.ndarray]: if not isinstance(i, int): raise TypeError("The index must be an integer!") time_idcs = self._get_time_idcs(step=i) result = [self._grid_data[i]] result.extend(self.get_uv_step(step=i)) result.extend([self._times_full[time_idcs]]) return result def __iter__(self) -> GridDataSeries: self._iter_idx = 0 return self def __next__(self) -> list[GridData, np.ndarray, np.ndarray, np.ndarray]: if self._iter_idx >= len(self): raise StopIteration num_result = self[self._iter_idx] self._iter_idx += 1 return num_result def _get_time_idcs(self, step: int) -> np.ndarray: """ Helper function which converts from animation step to time indices. Parameters ---------- step : int The step index. Returns ------- np.ndarray : Array of indices at which the times, coordinates and visibilities up to this step index are located in their respective arrays. """ step_idx = self._step_idx[step] return np.argwhere(self._times_idx <= step_idx).ravel()
[docs] def get_uv_step(self, step: int) -> tuple[np.ndarray, np.ndarray]: """ Returns the ungridded :math:`(u,v)` coordinates for the given timestep. Parameters ---------- step : int The step index. Returns ------- tuple[np.ndarray, np.ndarray]: The :math:`(u,v)` coordinates in the order ``(u, v)``. """ time_idcs = self._get_time_idcs(step=step) return ( self._u_wave[time_idcs], self._v_wave[time_idcs], )
[docs] def get_vis_step(self, step: int) -> GridData: """ Returns the visibility data for the given timestep. Parameters ---------- step : int The step index. Returns ------- GridData: The (un)gridded visibility data. """ time_idcs = self._get_time_idcs(step=step) grid_data = self._grid_data_full.copy() grid_data.vis_data = grid_data.vis_data[time_idcs] return grid_data
[docs] def add_grid_data(self, grid_data: GridData) -> None: """ Adds the given (un)gridded visibility data to the series. Parameters ---------- GridData : The data to add to the series. """ self._grid_data.append(grid_data)
[docs] class Gridder: def __init__( self, u_meter: np.ndarray, v_meter: np.ndarray, times: np.ndarray, img_size: int, fov: float, src_ra: float, src_dec: float, ref_frequency: float, frequency_offsets: ArrayLike, antenna_layout: Layout | None = None, ): """Initializes the default Gridder of the radionets-project. It uses NumPy histogramms to sort the visibilities into a grid in the Fourier space to create a 2-dimensional mask of visibilities, which can be transformed into real space using a Fast Fourier Transform. Parameters ---------- u_meter : numpy.ndarray The u coordinates in meter. v_meter : numpy.ndarray The v coordinates in meter. times : numpy.ndarray The times (in MJD) at which the visibilities were measured. img_size : int The size of the image in pixels. fov : float The physical size of the image in arcsec. src_ra : float The Right Ascension (RA) of the source in arcsec. src_dec : float The Declination (Dec) of the source in arcsec. ref_frequency : float The reference frequency of the data in Hertz. frequency_offsets : numpy.typing.ArrayLike The frequency offsets in Hertz. antenna_layout : raditools.layouts.Layout | None, optional The antenna layout as a radiotools ``Layout``. This does not need to be provided to use the Gridder but only for the creation of animations (needs the ``animations`` optional dependencies). Defaults is ``None``. """ self.src_ra: float = src_ra self.src_dec: float = src_dec self.ref_frequency: float = ref_frequency self.frequency_offsets: np.ndarray = np.asarray(frequency_offsets).ravel() self.frequencies: np.ndarray = self.frequency_offsets + self.ref_frequency u_wave = u_meter / c.value v_wave = v_meter / c.value self.u_wave: np.ndarray = np.concatenate( [u_wave * freq for freq in self.frequencies] ) self.v_wave: np.ndarray = np.concatenate( [v_wave * freq for freq in self.frequencies] ) self.u_meter: np.ndarray = np.tile(u_meter, reps=self.frequencies.size) self.v_meter: np.ndarray = np.tile(v_meter, reps=self.frequencies.size) self.times: Time = Time( np.tile(times, reps=self.frequencies.size), format="mjd" ) self.antenna_layout: Layout | None = antenna_layout self.img_size: int = img_size self.fov: float = np.deg2rad(fov / 3600) # convert from asec to rad self.stokes: dict = dict() # initialize stokes component dictionary def __str__(self) -> str: return str(self.__dict__) def __len__(self) -> int: return len(self.times) def __getitem__(self, i: str) -> GridData: if not isinstance(i, str): raise KeyError( "The provided key has to be a valid Stokes component, i.e. 'I'." ) return self.stokes[i] def _grid( self, grid_data: GridData, u_wave: np.ndarray, v_wave: np.ndarray ) -> GridData: """Internal method that grids given visibility data using the default Gridder for the radionets-project. Parameters ---------- grid_data : GridData The ``GridData`` data object containing the visibilities. u_wave: numpy.ndarray The u coordinates in units of wavelength. v_wave: numpy.ndarray The v coordinates in units of wavelength. Returns ------- mask : numpy.ndarray, optional The mask created from the given (u,v) coordinates. The mask contains the number of (u,v) coordinates per pixel. mask_real : numpy.ndarray, optional The gridded real part of the visibilites. mask_imag : numpy.ndarray, optional The gridded imaginary part of the visibilities. dirty_img_complex : numpy.ndarray, optional The complex Dirty Image. This is the 2-dimensional Fourier transform of the gridded visibilities. """ stokes = grid_data.vis_data real = stokes.real.T imag = stokes.imag.T u_wave_full = np.append(-u_wave.ravel(), u_wave.ravel()) v_wave_full = np.append(-v_wave.ravel(), v_wave.ravel()) stokes_real_full = np.append(real.ravel(), real.ravel()) stokes_imag_full = np.append(-imag.ravel(), imag.ravel()) N = self.img_size delta_uv = (self.fov) ** (-1) bins = np.arange( start=-(N / 2 + 1 / 2) * delta_uv, stop=(N / 2 + 1 / 2) * delta_uv, step=delta_uv, dtype=np.float128, ) mask, *_ = np.histogram2d( x=u_wave_full, y=v_wave_full, bins=[bins, bins], density=False ) mask = ( mask.T ) # u (x) is histogrammed along the first dimension (rows) --> transpose mask[mask == 0] = 1 mask_real, _, _ = np.histogram2d( x=u_wave_full, y=v_wave_full, bins=[bins, bins], weights=stokes_real_full, density=False, ) mask_imag, _, _ = np.histogram2d( x=u_wave_full, y=v_wave_full, bins=[bins, bins], weights=stokes_imag_full, density=False, ) mask_real = mask_real.T # see above mask_imag = mask_imag.T # see above mask_real /= mask mask_imag /= mask grid_data.fov = self.fov grid_data.mask = mask grid_data.mask_real = mask_real grid_data.mask_imag = mask_imag grid_data.dirty_image = np.fft.fftshift( np.fft.ifft2(np.fft.fftshift(mask_real + 1j * mask_imag)) ) return grid_data
[docs] def grid(self, stokes_component: str = "I") -> GridData: """Grids given visibility data using the default Gridder for the radionets-project. Parameters ---------- stokes_component : str, optional The stokes component which should be gridded. The specified component has to be initialized first! Otherwise this will result in a ``KeyError``. Default is ``'I'``. Returns ------- mask : numpy.ndarray, optional The mask created from the given (u,v) coordinates. The mask contains the number of (u,v) coordinates per pixel. mask_real : numpy.ndarray, optional The gridded real part of the visibilites. mask_imag : numpy.ndarray, optional The gridded imaginary part of the visibilities. dirty_img_complex : numpy.ndarray, optional The complex Dirty Image. This is the 2-dimensional Fourier transform of the gridded visibilities. """ if stokes_component not in self.stokes: raise KeyError( f"The Stokes component {stokes_component} has not been initialized!" ) return self._grid( grid_data=self.stokes[stokes_component], u_wave=self.u_wave, v_wave=self.v_wave, )
[docs] def grid_time_series( self, step_size: int, time_start_idx: int = 0, time_end_idx: int | None = None, stokes_component: str = "I", show_progress: bool = True, ) -> GridDataSeries: """Grids the visibilites in a time series so that time steps get gridded individually. Parameters ---------- step_size : int The number of visibilites (time steps) per gridded time step. Meaning that the resulting series will have ``len(Gridder) // step_size`` for its length. time_start_idx : int, optional The time index at which the time series starts. Default is ``0``, meaning the time series starts at the first visibility. time_end : int | None, optional The time index at which the time series end. Default is ``None``, meaning that the time series ends at the last visibility. stokes_component : str, optional The Stokes component to grid. Default is ``'I'``. show_progress : bool, optional Whether to show a progress bar for the gridding progress. Default is ``True``. Returns ------- GridDataSeries : The series of gridded and ungridded visibilities. """ grid_data = self.stokes[stokes_component] grid_data_series = GridDataSeries( grid_data=grid_data, u_wave=self.u_wave, v_wave=self.v_wave, times=self.times.mjd, step_size=step_size, num_frequencies=self.frequencies.size, time_start_idx=time_start_idx, time_end_idx=time_end_idx, ) for idx in tqdm( np.arange(len(grid_data_series)), desc="Gridding time series", disable=not show_progress, ): u_wave, v_wave = grid_data_series.get_uv_step(step=idx) grid_data_series.add_grid_data( grid_data=self._grid( grid_data=grid_data_series.get_vis_step(step=idx), u_wave=u_wave, v_wave=v_wave, ) ) return grid_data_series
[docs] @classmethod def from_pyvisgen( cls, vis_data: Visibilities, obs: Observation, img_size: int, fov: float, stokes_components: list[str] | str = "I", polarizations: list[str] | str | None = None, ) -> GridData: """Initializes the gridder with the visibility data which is generated by the ``pyvisgen.simulation.vis_loop`` function. Additionally one can define which stokes components should be calculated with a given polarization. More on the ``Gridder`` can be found in the constructor of the ``Gridder``. Parameters ---------- obs : pyvisgen.simulation.Observation The observation which is returned by the ``pyvisgen.simulation.vis_loop`` function. vis_data : pyvisgen.simulation.Visibilities The visibility data which is returned by the ``pyvisgen.simulation.vis_loop`` function. img_size : int The size of the image in pixels. fov : float The physical size of the image in asec. stokes_components : list[str] | str, optional The Stokes components which are to be calculated and saved in the gridder. This can either be a list of components (e.g. ``['I', 'V']``) or a single string. Default is ``'I'``. polarizations : list[str] | str | None, optional The polarization type. Default is ``None``. """ u_meter = vis_data.u v_meter = vis_data.v times = Time( obs.baselines.get_valid_subset( num_baselines=obs.num_baselines, device="cpu" ).date, format="jd", ).mjd vis_data = vis_data.get_values() if vis_data.ndim != 7: if vis_data.ndim == 3: vis_data = np.stack( [vis_data.real, vis_data.imag, np.ones(vis_data.shape)], axis=3, )[:, None, None, :, None, ...] else: raise RuntimeError( "Expected vis_data to be of dimension 3 or 7 but got" f"{vis_data.ndim}" ) if include_array_layout: array = obs.array positions = obs.array_earth_loc.value layout = Layout.from_dataframe( df=pd.DataFrame( { "station_name": array.st_num, "x": positions["x"], "y": positions["y"], "z": positions["z"], "dish_dia": array.diam, "el_low": array.el_low, "el_high": array.el_high, "sefd": array.sefd, "altitude": array.altitude, } ) ) else: layout = None cls = cls( u_meter=u_meter.cpu().numpy(), v_meter=v_meter.cpu().numpy(), times=times, img_size=img_size, fov=fov, src_ra=obs.ra.cpu().numpy(), src_dec=obs.dec.cpu().numpy(), ref_frequency=obs.ref_frequency.cpu().numpy(), frequency_offsets=obs.frequency_offsets.cpu().numpy(), antenna_layout=layout, ) if isinstance(stokes_components, str): stokes_components = [stokes_components] if polarizations is None: polarizations = "" if isinstance(polarizations, str): polarizations = [polarizations] if len(stokes_components) != len(polarizations): raise IndexError( "The length of stokes_components has to be equal " "to the length of polarizations!" ) for stokes_comp, polarization in zip(stokes_components, polarizations): # get stokes visibilities depending on stokes component to grid # and polarization mode stokes_vis = get_stokes_from_vis_data(vis_data, stokes_comp, polarization) try: stokes_vis = stokes_vis.swapaxes(0, 1).ravel() except AxisError: stokes_vis = stokes_vis.ravel() # FIXME: probably some kind of difference in normalization. # Factor 2 fixes this for now. Has to be investigated. stokes_vis *= 2 cls.stokes[stokes_comp] = GridData(vis_data=stokes_vis) return cls
[docs] @classmethod def from_uvh5( cls, file: str | Path, *, fov: float, stokes_components: str | list[str] = "I", polarizations: str | list[str] | None = None, station_ids_unavail: float = 0.0, img_size: int | None = None, ): """Initializes the gridder with visibility data from the UVH5 file format. This method allows to select stokes components that will be computed for a given polarization. Parameters ---------- file : str or :class:`~pathlib.Path` Path to the file that you want to grid. This file has to be in the UVH5 file format from pyvisgen. fov : float The physical size of the image in asec. stokes_components : str | list[str], optional The Stokes components which are to be calculated and saved in the gridder. This can either be a list of components (e.g. ``['I', 'V']``) or a single string. Default: ``'I'``. polarizations : str | list[str] | None, optional The polarization type. Default: ``None``. station_ids_unavail : float, optional Percentage of station ids that are unavailable during the observation. This allows simulating reduced operation capacity even after the main simulation in pyvisgen and thus avoids having to rerun the same simulation with different reduced arrays. Default: ``0.0`` img_size : int or None, optional Image size is read from the file. In case the image size cannot be read, e.g. because the sky image was not saved to the file ("file/sky/SI"), or you want to grid the image to a different size, it can be set through this keyword argument. Default: ``None`` Examples -------- >>> gridded_vis = Gridder.from_uvh5( ... "/path/to/example.uvh5", ... fov=0.024, ... ).grid() >>> gridded_vis.vis_data.shape (1344,) Setting the ``station_ids_unavail`` keyword argument to a value greater ``0.0``, you can reduce the array, without the need to re-simulate the entire dataset: >>> gridded_vis_reduced = Gridder.from_uvh5( ... "/path/to/example.uvh5", ... fov=0.024, ... station_ids_unavail=0.5, ... ).grid() >>> gridded_vis_reduced.vis_data.shape (288,) Note that this only disables a randomized selection of antennas for the entire observation. Depending on the visibility of the source, the data taken by the antennas may vary during measurement, and the resulting visibilites dataset may be smaller or larger than expected (see, e.g., the shapes ``(1344,)`` vs ``(288,)`` although only half of the antennas were unavailable). See Also -------- :class:`~pyvisgen.io.datawriters.UVH5Writer` : The `UVH5` data writer class as implemented in :mod:`pyvisgen`, outlining the file structure. """ rng = np.random.default_rng() with h5py.File(file) as hf: st_ids = np.asarray(hf["uvw"]["st_id_pairs"]) unique_st_ids = np.unique(st_ids) unavail = rng.choice( unique_st_ids, size=int(len(unique_st_ids) * station_ids_unavail), replace=False, ) avail = ~np.isin(st_ids, unavail).any(axis=1) vis = hf["visibilities"] V11 = np.asarray(vis["V_11"])[avail] V22 = np.asarray(vis["V_22"])[avail] V12 = np.asarray(vis["V_12"])[avail] V21 = np.asarray(vis["V_21"])[avail] vis_data = np.permute_dims( np.concatenate([V11[None], V22[None], V12[None], V21[None]], axis=0), axes=(1, 2, 0), ) if vis_data.ndim != 7: if vis_data.ndim == 3: vis_data = np.stack( [vis_data.real, vis_data.imag, np.ones(vis_data.shape)], axis=3, )[:, None, None, :, None, ...] else: raise RuntimeError( "Expected vis_data to be of dimension 3 or 7 but got" f"{vis_data.ndim}" ) del V11, V22, V12, V21 u_meter = np.asarray(hf["uvw"]["u"])[avail] v_meter = np.asarray(hf["uvw"]["v"])[avail] times = np.asarray(hf["times"])[avail] if not img_size: try: img_size = hf["sky"]["SI"].shape[-1] except KeyError as e: raise RuntimeError( f"'img_size' could not be read from {file}. Please use the " "'img_size' keyword argument instead." ) from e frequency_bands = np.asarray(hf["frequency_bands"]) ref_frequency = frequency_bands[0] frequency_offsets = frequency_bands - ref_frequency ra = np.float64(hf["obs"]["ra"]) dec = np.float64(hf["obs"]["dec"]) cls = cls( u_meter=u_meter, v_meter=v_meter, times=times, img_size=img_size, fov=fov, ref_frequency=ref_frequency, frequency_offsets=frequency_offsets, src_ra=ra, src_dec=dec, ) if isinstance(stokes_components, str): stokes_components = [stokes_components] if polarizations is None: polarizations = "" if isinstance(polarizations, str): polarizations = [polarizations] if len(stokes_components) != len(polarizations): raise IndexError( "The length of stokes_components has to be equal " "to the length of polarizations!" ) for stokes_comp, polarization in zip(stokes_components, polarizations): # get stokes visibilities depending on stokes component to grid # and polarization mode stokes_vis = get_stokes_from_vis_data(vis_data, stokes_comp, polarization) try: stokes_vis = stokes_vis.swapaxes(0, 1).ravel() except AxisError: stokes_vis = stokes_vis.ravel() cls.stokes[stokes_comp] = GridData(vis_data=stokes_vis) return cls
[docs] @classmethod def from_fits( cls, path: str, img_size: int, fov: float, uv_colnames: dict = None, ) -> GridData: """Initializes the gridder with the visibility data in a given FITS file using the default Gridder for the radionets-project. Currently only extraction of the Stokes I component is supported. More on the ``Gridder`` can be found in the constructor of the ``Gridder``. Parameters ---------- path : str The path to the FITS file. img_size : int The size of the image in pixels. fov : float The physical size of the image in asec. uv_colnames : dict, optional Alternative names for the U and V columns in the FITS file. Default is {'u': None, 'v': None}, meaning the default values of 'UU' and 'VV' or 'UU--' and 'VV--' will be used. """ if uv_colnames is None: uv_colnames = dict(u=None, v=None) path = Path(path) file = fits.open(path) data = file[0].data.T if uv_colnames["u"] is None and uv_colnames["v"] is None: try: u_meter = data["UU"].T * c.value v_meter = data["VV"].T * c.value except KeyError: u_meter = data["UU--"].T * c.value v_meter = data["VV--"].T * c.value elif (uv_colnames["u"] is None and uv_colnames["v"] is not None) or ( uv_colnames["u"] is not None and uv_colnames["v"] is None ): raise KeyError( "When providing specific column names, " "both the names for u and v have to be set!" ) else: u_meter = data[uv_colnames[0]].T * c.value v_meter = data[uv_colnames[1]].T * c.value times = Time(data["DATE"], format="jd").mjd vis = file[0].data["DATA"] stokes_i = ( (vis[..., 0, 0] + 1j * vis[..., 0, 1]) + (vis[..., 1, 0] + 1j * vis[..., 1, 1]) ).ravel()[:, None] cls = cls( u_meter=u_meter, v_meter=v_meter, times=times, img_size=img_size, fov=fov, src_ra=file[0].header["CRVAL6"] * 3600, src_dec=file[0].header["CRVAL7"] * 3600, ref_frequency=file[0].header["CRVAL4"], frequency_offsets=file[1].data["IF FREQ"], antenna_layout=Layout.from_uv_fits(path=path, sefd=0) if include_array_layout else None, ) cls.stokes["I"] = GridData(vis_data=stokes_i) return cls
[docs] @classmethod def from_ms( cls, path: str, img_size: int, fov: float, desc_id: int, ref_frequency_id: int = 0, use_calibrated: bool = False, filter_flagged: bool = True, ) -> GridData: """Initializes the Gridder with a measurement which is saved in an NRAO CASA Measurement Set. Currently only extraction of the Stokes I component is supported. Parameters ---------- path: str The path of the measurement set root directory. img_size: int The size of the image in pixels. fov: float The physical size of the image in asec. desc_id: int The description id of the visibilites which should be gridded. This id corresponds to a way of choosing between the spectral windows of the observation and its polarization setup. Note: Currently it is not supported to grid all visibilities of the observation in one image. It is planned to add that at a later point. ref_frequency_id: int, optional The index of the reference frequency that will be used if the measurement is a composite observation and no desc_id is given. use_calibrated: bool, optional Whether to use the calibrated data from the MODEL_DATA column or the raw data from the DATA column. Default is ``True``. filter_flagged: bool, optional Whether to filter out flagged data rows. Default is ``True``. """ path = Path(path) if not path.is_dir(): raise NotADirectoryError( f"This measurement set does not exist under the path {path}" ) main_tab = table(str(path), ack=False) spectral_tab = table(str(path / "SPECTRAL_WINDOW"), ack=False) source_table = table(str(path / "SOURCE"), ack=False) data_desc_table = table(str(path / "DATA_DESCRIPTION"), ack=False) spw_id = data_desc_table.getcell("SPECTRAL_WINDOW_ID", desc_id) data_colname = "DATA" if not use_calibrated else "MODEL_DATA" if desc_id is not None: mask = main_tab.getcol("DATA_DESC_ID") == desc_id main_tab = main_tab.selectrows(rownrs=np.argwhere(mask).ravel()) ref_frequency = spectral_tab.getcell("REF_FREQUENCY", spw_id) frequency_offsets = ( spectral_tab.getcell("CHAN_FREQ", spw_id) - ref_frequency ) else: pass data = main_tab.getcol(data_colname) uv = main_tab.getcol("UVW")[:, :2] times = main_tab.getcol("TIME") if filter_flagged: flag_mask = main_tab.getcol("FLAG") flag_mask = flag_mask.reshape((flag_mask.shape[0], -1)).astype(np.uint8) flag_mask = np.prod(flag_mask, axis=1) flag_mask = np.logical_not(flag_mask.astype(bool)) else: flag_mask = np.ones(uv.shape[0]).astype(bool) uv = uv[flag_mask] data = data[flag_mask] times = times[flag_mask] u_meter = uv[:, 0] v_meter = uv[:, 1] stokes_i = data[..., 0] + data[..., 1] stokes_i = stokes_i.T # ensure matching shape (N_CHANNELS, N_MEASUREMENTS) # FIXME: probably some kind of difference in normalization. # Factor 0.5 fixes this for now. Has to be investigated. stokes_i *= 0.5 src_ra, src_dec = np.rad2deg(source_table.getcol("DIRECTION")[0]) cls = cls( u_meter=u_meter, v_meter=v_meter, times=Time(times / 3600 / 24, format="mjd").mjd, img_size=img_size, fov=fov, src_ra=src_ra, src_dec=src_dec, ref_frequency=ref_frequency, frequency_offsets=frequency_offsets, antenna_layout=Layout.from_measurement_set(root_path=path, sefd=0) if include_array_layout else None, ) cls.stokes["I"] = GridData(vis_data=stokes_i.ravel()) return cls
[docs] def plot_ungridded_uv( self, **kwargs ) -> tuple[matplotlib.figure.Figure, matplotlib.axes.Axes]: """Plots the ungridded (u,v) points as a scatter plot. Parameters ---------- mode : str, optional The mode specifying the scale of the (u,v) coordinates. This can be either ``wave``, meaning the coordinates are plotted in units of the reference wavelength, or ``meter``, meaning the (u,v) coordinates will be plotted in meter. Default is ``wave``. show_times : bool, optional Whether to show the timestamps of the measured visibilities as a colormap. Default is ``True``. use_relative_time : bool, optional Whether to show the times relative to the timestamp of the first measurement in hours. Default is ``True``. times_cmap: str | matplotlib.colors.Colormap, optional The colormap to be used for the time component of the plot. Default is ``'inferno'``. marker_size : float | None, optional The size of the scatter markers in points**2. Default is ``None``, meaning the default value supplied by your matplotlib rcParams. plot_args : dict, optional The additional arguments passed to the scatter plot. Default is ``{"color":"royalblue"}``. fig_args : dict, optional The additional arguments passed to the figure. If a figure object is given in the ``fig`` parameter, this value will be discarded. Default is ``{}``. save_to : str | None, optional The name of the file to save the plot to. Default is ``None``, meaning the plot won't be saved. save_args : dict, optional The additional arguments passed to the ``fig.savefig`` call. Default is ``{"bbox_inches":"tight"}``. fig : matplotlib.figure.Figure | None, optional A custom figure object. If set to ``None``, the ``ax`` parameter also has to be ``None``! Default is ``None``. ax : matplotlib.axes.Axes | None, optional A custom axes object. If set to ``None``, the ``fig`` parameter also has to be ``None``! Default is ``None``. Returns ------- fig : matplotlib.figure.Figure The figure object. ax : matplotlib.axes.Axes The axes object. """ return plotting.plot_ungridded_uv(gridder=self, **kwargs)
[docs] def plot_mask( self, stokes_component: str = "I", **kwargs ) -> tuple[matplotlib.figure.Figure, matplotlib.axes.Axes]: """Plots the (u,v) mask (the binned visibilities) of the gridded interferometric image. Parameters ---------- stokes_component : str, optional The symbol of the stokes component whose mask should be plotted. The specified component has to be initialized and gridded first! Otherwise this will result in a ``KeyError``. Default is ``'I'``. mode : str, optional The mode specifying which values of the mask should be plotted. Possible values are: - ``hist``: Plots the number of (u,v) points which are sorted in each pixel of the image in the (u,v) space. - ``abs``: Plots the absolute value of the gridded visibilities, meaning the magnitude of the complex numbers in Euler representation. - ``phase``: Plots the phase angle of the gridded visibilities, meaning the angle in the exponent of the complex numbers in Euler representation. - ``real``: Plots the real part of the gridded visibilities. - ``imag``: Plots the imaginary part of the gridded visibilities. Default is ``hist``. crop : tuple[list[float | None]], optional The crop of the image. This has to have the format ``([x_left, x_right], [y_left, y_right])``, where the left and right values for each axis are the upper and lower limits of the axes which should be shown. IMPORTANT: If one supplies the ``plt.imshow`` an ``extent`` parameter via the ``plot_args`` parameter, this will be the scale in which one has to give the crop! If not, the crop has to be in pixels. norm : str | matplotlib.colors.Normalize | None, optional The name of the norm or a matplotlib norm. Possible values are: - ``log``: Returns a logarithmic norm with clipping on (!), meaning values above the maximum will be mapped to the maximum and values below the minimum will be mapped to the minimum, thus avoiding the appearance of a colormaps 'over' and 'under' colors (e.g. in case of negative values). Depending on the use case this is desirable but in case that it is not, one can set the norm to ``log_noclip`` or provide a custom norm. - ``log_noclip``: Returns a logarithmic norm with clipping off. - ``centered``: Returns a linear norm which centered around zero. - ``sqrt``: Returns a power norm with exponent 0.5, meaning the square-root of the values. - other: A value not declared above will be returned as is, meaning that this could be any value which exists in matplotlib itself. Default is ``None``, meaning no norm will be applied. cmap: str | matplotlib.colors.Colormap | None, optional The colormap to be used for the plot. Default is ``None``, meaning the colormap will be default to a value fitting for the chosen mode. plot_args : dict, optional The additional arguments passed to the scatter plot. Default is ``{"color":"royalblue"}``. fig_args : dict | None, optional The additional arguments passed to the figure. If a figure object is given in the ``fig`` parameter, this value will be discarded. Default is ``None``. save_to : str | PathLike | None, optional The name of the file to save the plot to. Default is ``None``, meaning the plot won't be saved. save_args : dict | None, optional The additional arguments passed to the ``fig.savefig`` call. Default is ``{"bbox_inches":"tight"}``. fig : matplotlib.figure.Figure | None, optional A custom figure object. If set to ``None``, the ``ax`` parameter also has to be ``None``! Default is ``None``. ax : matplotlib.axes.Axes | None, optional A custom axes object. If set to ``None``, the ``fig`` parameter also has to be ``None``! Default is ``None``. Returns ------- fig : matplotlib.figure.Figure The figure object. ax : matplotlib.axes.Axes The axes object. """ return plotting.plot_mask(self[stokes_component], **kwargs)
[docs] def plot_dirty_image( self, stokes_component: str = "I", **kwargs ) -> tuple[matplotlib.figure.Figure, matplotlib.axes.Axes]: """Plots the (u,v) dirty image, meaning the 2d Fourier transform of the gridded visibilities. Parameters ---------- stokes_component : str, optional The symbol of the stokes component whose dirty image should be plotted. The specified component has to be initialized and gridded first! Otherwise this will result in a ``KeyError``. Default is ``'I'``. mode : str, optional The mode specifying which values of the dirty image should be plotted. Possible values are: - ``real``: Plots the real part of the dirty image. - ``imag``: Plots the imaginary part of the dirty image. - ``abs``: Plot the absolute value of the dirty image. Default is ``real``. ax_unit: str | astropy.units.Unit, optional The unit in which to show the ticks of the x and y-axes in. The y-axis is the Declination (DEC) and the x-axis is the Right Ascension (RA). The latter one is defined as increasing from left to right! The unit has to be given as a string or an ``astropy.units.Unit``. The string must correspond to the string representation of an ``astropy.units.Unit``. Valid units are either ``pixel`` or angle units like ``arcsec``, ``degree`` etc. Default is ``pixel``. center_pos: tuple | None, optional The coordinate center of the image. The coordinates have to be given in the unit defined in the parameter ``ax_unit`` above. If ``ax_unit`` is set to ``pixel`` this parameter is ignored. Default is ``None``, meaning the coordinates of the axes will be given as relative. norm : str | matplotlib.colors.Normalize | None, optional The name of the norm or a matplotlib norm. Possible values are: - ``log``: Returns a logarithmic norm with clipping on (!), meaning values above the maximum will be mapped to the maximum and values below the minimum will be mapped to the minimum, thus avoiding the appearance of a colormaps 'over' and 'under' colors (e.g. in case of negative values). Depending on the use case this is desirable but in case that it is not, one can set the norm to ``log_noclip`` or provide a custom norm. - ``log_noclip``: Returns a logarithmic norm with clipping off. - ``centered``: Returns a linear norm which centered around zero. - ``sqrt``: Returns a power norm with exponent 0.5, meaning the square-root of the values. - other: A value not declared above will be returned as is, meaning that this could be any value which exists in matplotlib itself. Default is ``None``, meaning no norm will be applied. colorbar_shrink: float, optional The shrink parameter of the colorbar. This can be needed if the plot is included as a subplot to adjust the size of the colorbar. Default is ``1``, meaning original scale. cmap: str | matplotlib.colors.Colormap, optional The colormap to be used for the plot. Default is ``'inferno'``. plot_args : dict | None, optional The additional arguments passed to the scatter plot. Default is ``None``. fig_args : dict | None, optional The additional arguments passed to the figure. If a figure object is given in the ``fig`` parameter, this value will be discarded. Default is ``None``. save_to : str | PathLike | None, optional The name of the file to save the plot to. Default is ``None``, meaning the plot won't be saved. save_args : dict | None, optional The additional arguments passed to the ``fig.savefig`` call. Default is ``{"bbox_inches":"tight"}``. fig : matplotlib.figure.Figure | None, optional A custom figure object. If set to ``None``, the ``ax`` parameter also has to be ``None``! Default is ``None``. ax : matplotlib.axes.Axes | None, optional A custom axes object. If set to ``None``, the ``fig`` parameter also has to be ``None``! Default is ``None``. Returns ------- fig : matplotlib.figure.Figure The figure object. ax : matplotlib.axes.Axes The axes object. """ return plotting.plot_dirty_image(self[stokes_component], **kwargs)