Source code for gdt.missions.fermi.gbm.trigdat

# CONTAINS TECHNICAL DATA/COMPUTER SOFTWARE DELIVERED TO THE U.S. GOVERNMENT WITH UNLIMITED RIGHTS
#
# Contract No.: CA 80MSFC17M0022
# Contractor Name: Universities Space Research Association
# Contractor Address: 7178 Columbia Gateway Drive, Columbia, MD 21046
#
# Copyright 2017-2022 by Universities Space Research Association (USRA). All rights reserved.
#
# Developed by: William Cleveland and Adam Goldstein
#               Universities Space Research Association
#               Science and Technology Institute
#               https://sti.usra.edu
#
# Developed by: Daniel Kocevski
#               National Aeronautics and Space Administration (NASA)
#               Marshall Space Flight Center
#               Astrophysics Branch (ST-12)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import os
import astropy.io.fits as fits
import astropy.units as u
import astropy.coordinates.representation as r
import numpy as np
import warnings

from ..time import *
from .detectors import GbmDetectors
from .headers import TrigdatHeaders, PhaiiTriggerHeaders
from .phaii import GbmPhaii
from gdt.core.coords.spacecraft import SpacecraftFrame
from gdt.core.coords.quaternion import Quaternion
from gdt.core.data_primitives import TimeEnergyBins, TimeRange, Gti
from gdt.core.file import FitsFileContextManager

__all__ = ['Trigdat', 'MaxRates', 'BackRates', 'FswLocation']

# Map the classification numbers to the string names
classifications = {0: 'ERROR', 1: 'UNRELOC', 2: 'LOCLPAR', 3: 'BELOWHZ',
                   4: 'GRB', 5: 'SGR', 6: 'TRANSNT', 7: 'DISTPAR',
                   8: 'SFL', 9: 'CYGX1', 10: 'SGR1806', 11: 'GROJ422',
                   19: 'TGF', 20: 'UNCERT', 21: 'GALBIN'}
# localization spectra
spectrum = ['hard', 'normal', 'soft']


class Trigdat(FitsFileContextManager):
    """Class for the GBM Trigger Data
    """
    def __init__(self):
        super().__init__()
        self._data = None
        self._trigrates = None
        self._maxrates = None
        self._backrates = None
        self._fsw_locations = None
        self._emin = np.array([3.4, 10.0, 22.0, 44.0, 95.0, 300., 500., 800.])
        self._emax = np.array([10., 22.0, 44.0, 95.0, 300., 500., 800., 2000.])
        self._time_range = None
        self._detectors = [det.name for det in GbmDetectors]
        self._trigtime = None

    @property
    def backrates(self):
        """(:class:`BackRates`): A BackRates object containing the info from 
                                 the on-board background estimate"""
        return self._backrates

    @property
    def fsw_locations(self):
        """(list of :class:`FswLocation`): A list of flight-software-determined 
                                           locations for the trigger"""
        return self._fsw_locations

    @property
    def gti(self):
        """(:class:`~gdt.core.data_primitives.Gti`): The Good Time Intervals"""
        return self._gti

    @property
    def maxrates(self):
        """(list of :class:`MaxRates`): A list of MaxRates objects, each 
                                        containing maxrates info"""
        return self._maxrates

    @property
    def num_maxrates(self):
        """(int): The number of MaxRates issued by the flight software"""
        return len(self._maxrates)

    @property
    def poshist(self):
        """(:class:`~.poshist.GbmPosHist`): The position/attitude history"""
        return self._poshist

    @property
    def time_range(self):
        """(:class:`~gdt.core.data_primitives.TimeRange`): The time range of 
                                                           the data"""
        return self._time_range

    @property
    def triggered_detectors(self):
        """(list of str): The detectors that were triggered"""
        try:
            detmask = self.headers['PRIMARY']['DET_MASK']
            detmask = np.array(list(detmask)).astype(int) == 1
        except:
            return None
        
        if detmask.size == 14:
            return (np.array(self._detectors)[detmask]).tolist()
        elif detmask.size == 12:
            return (np.array(self._detectors[:-2])[detmask]).tolist()
        else:
            return None

    @property
    def trigrates(self):
        """(:class:`MaxRates`): The trigger information and rates"""
        return self._trigrates

    @property
    def trigtime(self):
        """(float): The trigger time"""
        return self._trigtime

    @classmethod
    def open(cls, file_path, **kwargs):
        """Open and read a Trigdat file
        
        Args:
            file_path (str):  The file path of the trigdat file
        
        Returns:
            (:class:`Trigdat`)
        """
        obj = super().open(file_path, **kwargs)

        # get the headers
        hdrs = [hdu.header for hdu in obj.hdulist]
        obj._headers = TrigdatHeaders.from_headers(hdrs)
        obj._trigtime = obj._headers[0]['TRIGTIME']
        
        # store trigrate, maxrates, backrates, and fsw location
        trigrate_data =  obj.hdulist['TRIGRATE'].data
        if trigrate_data.size > 0:
            obj._trigrates = MaxRates.from_recarray(trigrate_data[0])
        obj._maxrates = [MaxRates.from_recarray(maxrate) for maxrate in
                         obj.hdulist['MAXRATES'].data]
        
        try:
            obj._backrates = BackRates.from_recarray(obj.hdulist['BCKRATES'].data[0])
        except:
            warnings.warn('No BCKRATES data in this file.', RuntimeWarning)      
        obj._fsw_locations = [FswLocation.from_recarray(ob_calc) \
                              for ob_calc in obj.hdulist['OB_CALC'].data]
        
        # store the data            
        obj._data = obj.hdulist['EVNTRATE'].data
        obj._data.sort(order='TIME')
        # incorrectly shaped in the file, so we have to fix that here
        obj._rates = obj._data['RATE'].reshape(-1, 14, 8)
                    
        # store the position history
        idx, dt = obj._time_indices(1024)
        eic = obj._data['EIC'][idx]
        quat = obj._data['SCATTITD'][idx]
        times = obj._data['TIME'][idx]
        obj._poshist = SpacecraftFrame(
            obsgeoloc=r.CartesianRepresentation(x=eic[:,0], y=eic[:,1], 
                                                z=eic[:,2], unit=u.km),
            quaternion=Quaternion(quat),
            obstime=Time(times, format='fermi'),
            detectors=GbmDetectors)
                

        obj._time_range = TimeRange(obj._data['ENDTIME'][0] - dt[0], 
                                    obj._data['ENDTIME'][-1])
        
        obj._gti = Gti.from_list(obj._gti_from_times(obj._data['TIME'],
                                                     obj._data['ENDTIME']))
        return obj

    @classmethod
    def from_data(cls, phaii_list, poshist, trigtime, time_range=None, 
                  trigrate=None, maxrates=None, backrates=None, 
                  fswlocations=None, filename=None, headers=None):
        """Create a Trigdat object from a list of GbmPhaii objects and a 
        SpacecraftFrame object.
        
        Note:
            The list of PHAIIs does not need to include every detector, as 
            missing detectors will be assumed to have zero rates.  However, 
            do not include more than one file from the same detector in the 
            list. The PHAII list can be in any order.
        
        Args:
            phaii_list (list of :class:`~.phaii.GbmPhaii`): A list of GbmPhaii 
                                                           objects
            poshist (:class:`~gdt.core.coords.SpacecraftFrame`): 
                The spacecraft frame
            trigtime (float): The trigger time
            time_range (tuple, optional): The time range of the Trigdat. If not
                                         specified, uses the time range of the
                                         GbmPhaiis
            trigrate (:class:`MaxRates`, optional): The trigger rates object
            maxrates (list of :class:`MaxRates`, optional): The maxrates objects
            backrates (:class:`BackRates`, optional): The background rates
                                                      object
            fswlocations (list of :class:`FswLocation`, optional): The flight
                         software location objects
        
        Returns:
            (:class:`Trigdat`)
        """
        # make sure phaii_list contains the correct objects and desired time
        # is contained within all files
        for phaii in phaii_list:
            if not isinstance(phaii, GbmPhaii):
                raise TypeError('phaii_list must be a list of GbmPhaii objects')
            phaii_time_range = TimeRange(*phaii.time_range)
        
        # make sure the poshist is a PosHist object and desired time is 
        # contained within the file
        if not isinstance(poshist, SpacecraftFrame):
            raise TypeError('poshist must be a SpacecraftFrame object')
        poshist_time_range = TimeRange(poshist.obstime[0].fermi, 
                                       poshist.obstime[-1].fermi)
        if not poshist_time_range.contains(trigtime):
            raise ValueError('{} does not contain the specified ' \
                             'trigtime'.format(poshist))
        
        # slice the files by time, if requested
        trange = (time_range[0]-trigtime, time_range[1]-trigtime)
        if time_range is not None:
            phaii_list = [phaii.slice_time(trange) for phaii in phaii_list]
        
        
        obj = cls()
        obj._poshist = poshist
        obj._trigtime = trigtime
        
        # create the data record array
        num_chans = phaii_list[0].num_chans
        num_dets = len(obj._detectors)
        num_bins = phaii_list[0].data.num_times
        dtypes = [('TIME', '>f8'), ('ENDTIME', '>f8'), 
                  ('SCATTITD', '>f4', (4,)), ('EIC', '>f4', (3,)), 
                  ('RATE', '>f4', (num_chans, num_dets))]
        obj._data = np.recarray((num_bins,), dtype=dtypes)
        tstart = phaii_list[0].data.tstart
        tstop = phaii_list[0].data.tstop
        tcent = phaii_list[0].data.time_centroids
        
        # interpolate poshist to match the bin edges as is standard
        fermi_times = Time(tstart+trigtime, format='fermi')
        obj._poshist = poshist.at(fermi_times)
        
        # to preserve the detector ordering assumed by the trigdat 
        # datatype, we must pull the phaii file from the list that
        # corresponds with the right detector in our sequence.  if the 
        # detector isn't represented in the phaii_list, then the rates will
        # be zeros.
        det_mask = np.zeros(14, dtype=int)
        for i in range(num_bins):
            rates = np.zeros((14,8))
            for j in range(len(obj._detectors)):
                try:
                    det_idx = [phaii.detector for \
                               phaii in phaii_list].index(obj._detectors[j])
                    det_mask[j] = 1
                except:
                    continue
                rates[j,:] = phaii_list[det_idx].data.counts[i,:] * \
                      (1.024/phaii_list[det_idx].data.time_widths[i,np.newaxis])
            
            
            quat = np.append(obj._poshist[i].quaternion.xyz, 
                             obj._poshist[i].quaternion.w)
            eic = obj._poshist.obsgeoloc[0].xyz.to('km').value
            obj._data[i] = (tstart[i]+trigtime, tstop[i]+trigtime, quat,
                           eic, rates.T)

        obj._time_range = TimeRange(obj._data['TIME'][0], 
                                    obj._data['ENDTIME'][-1])
        obj._gti = Gti.from_list(obj._gti_from_times(obj._data['TIME'],
                                                     obj._data['ENDTIME']))
    
        # assign/create trigrates, maxrates, backrates, and fsw_locations              
        if trigrate is not None:
            if not isinstance(trigrate, MaxRates):
                raise TypeError('trigrate must be a MaxRates object')
            obj._trigrates = trigrate
        else:
            obj._trigrates = MaxRates.create()
        
        if maxrates is not None:
            try:
                iter(maxrates)
            except:
                raise TypeError('maxrates must be a list of MaxRate objects')
            if any([not isinstance(m, MaxRates) for m in maxrates]):
                raise TypeError('maxrates must be a list of MaxRate objects')
            obj._maxrates = maxrates
        else:
            obj._maxrates = [MaxRates.create()]
        
        if backrates is not None:
            if not isinstance(backrates, BackRates):
                raise TypeError('backrates must be a BackRats object')
            obj._backrates = backrates
        else:
            obj._backrates = BackRates.create()
        
        if fswlocations is not None:
            try:
                iter(fswlocations)
            except:
                raise TypeError('fsw_locations must be a list of FswLocation ' \
                                'objects')
            if any([not isinstance(f, FswLocation) for f in fswlocations]):
                raise TypeError('fsw_locations must be a list of FswLocation ' \
                                'objects')
            obj._fsw_locations = fswlocations
        else:
            obj._fsw_locations = [FswLocation.create()]
                 
        if filename is not None:
            obj._filename = str(filename)
        
        if headers is not None:
            if not isinstance(headers, TrigdatHeaders):
                raise ValueError('headers must be a TrigdatHeaders object')
            obj._headers = headers
        else:
            headers = TrigdatHeaders()
            pos = obj._poshist.scpos.interpolate(obj.trigtime)
            headers[0]['FILENAME'] = obj._filename
            for hdu in headers:
                try:
                    hdu['TSTART'] = obj._data['TIME'][0]
                    hdu['TSTOP'] = obj._data['ENDTIME'][-1]
                except:
                    pass
                try:
                    hdu['TRIGTIME'] = obj._trigtime
                except:
                    pass
                try:
                    hdu['GEO_LONG'] = pos.longitude.value
                    hdu['GEO_LAT'] = pos.latitude.value
                except:
                    pass
                try:
                    hdu['DETTYPE'] = 'BOTH'
                except:
                    pass
            obj._headers = headers
            
        obj._headers[0]['DET_MASK'] = ''.join(det_mask.astype(str))
        
        return obj
    
    def to_phaii(self, detector, timescale=1024):
        """Convert the data for a detector to a :class:`~.phaii.GbmPhaii`
        object.
        
        Note:
            A standard Trigdat will contain data on 8192, 1024, 256, and 62 ms
            timescales.  A non-standard Trigdat can be created from other 
            data types (see :meth:`Trigdat.from_data`) and therefore the 
            timescales will likely be different and arbitrary. For that reason,
            the ``timescale`` keyword will be ignored for non-standard Trigdat.
        
        Args:
            detector (str): The detector to convert
            timescale (int, optional): 
                The minimum timescale in ms of the data to return. Available 
                options are 1024, 256, and 64. This is ignored if the trigdat
                has non-standard timescales.
        
        Returns:
            (:class:`~.phaii.GbmPhaii`)
        """
        # check for valid detectors and timescale
        detector = detector.lower()
        if detector not in self._detectors:
            raise ValueError('Illegal detector name')

        # grab the correct detector rates (stored as 14 dets x 8 channels)
        det_idx = self._detectors.index(detector)

        if (timescale != 1024) and (timescale != 256) and (timescale != 64):
            raise ValueError('Illegal Trigdat resolution. Available resolutions: \
                              1024, 256, 64')

        # return the requested timescales
        try:
            time_idx, dt = self._time_indices(timescale)
        except:
            warnings.warn('\nTrigdat.to_ctime(): ' \
                          'Exporting from non-standard file. ' \
                          'Timescale is ignored.')
            time_idx = np.arange(self._rates.shape[0])
            dt = self._data['ENDTIME']-self._data['TIME']
            
        # calculate counts and exposure
        counts = np.round(self._rates[time_idx, det_idx, :] * \
                          (dt[:, np.newaxis] / 1.024))
        exposure = self._calc_exposure(counts, dt)

        # the 'TIME' and 'ENDTIME' edges are not aligned before being written to
        # file, so we must fix this to prevent TimeEnergyBins from thinking 
        # every single bin is a single discontiguous segment
        tstop = self._data['ENDTIME'][time_idx] - self.trigtime
        tstart = self._fix_tstart(tstop, dt)

        # create the Time-Energy histogram
        bins = TimeEnergyBins(counts, tstart, tstop, exposure,
                              self._emin, self._emax)
        
        headers = self._set_phaii_headers(detector, bins.num_chans)
        
        obj = GbmPhaii.from_data(bins, gti=self.gti, trigger_time=self.trigtime,
                                 headers=headers)
        return obj

    def sum_detectors(self, detectors, timescale=1024):
        """Sum the data from a list of detectors and convert to a GbmPhaii 
        object. The exposures from different detectors are averaged.
        
        Args:
            detectors (list of str): The detectors to sum
            timescale (int, optional): 
                The minimum timescale in ms of the data to return.  Available 
                options are 1024, 256, and 64.

        Returns:
            (:class:`~.phaii.GbmPhaii`)
        """
        # check for valid detectors and timescale
        for det in detectors:
            det = det.lower()
            if det not in self._detectors:
                raise ValueError('Illegal detector name')

        if (timescale != 1024) and (timescale != 256) and (timescale != 64):
            raise ValueError('Illegal Trigdat resolution. Available resolutions: \
                              1024, 256, 64')
        
        try:
            time_idx, dt = self._time_indices(timescale)
        except:
            warnings.warn('\nTrigdat.to_ctime(): ' \
                          'Exporting from non-standard file. ' \
                          'Timescale is ignored.')
            time_idx = np.arange(self._rates.shape[0])
            dt = self._data['ENDTIME']-self._data['TIME']

        counts = None
        exposure = None
        for det in detectors:
            # grab the correct detector rates (stored as 14 dets x 8 channels)
            det_idx = self._detectors.index(det)

            # calculate counts
            c = np.round(self._rates[time_idx, det_idx, :] * \
                         (dt[:, np.newaxis] / 1.024))
            if counts is None:
                counts = c        
            else:
                counts += c
            
            # calculate exposure
            e = self._calc_exposure(c, dt)
            if exposure is None:
                exposure = e
            else:
                exposure += e
        
        exposure /= float(len(detectors))
            

        # the 'TIME' and 'ENDTIME' edges are not aligned before being written to
        # file, so we must fix this to prevent TimeEnergyBins from thinking 
        # every single bin is a single discontiguous segment
        tstop = self._data['ENDTIME'][time_idx] - self.trigtime
        tstart = self._fix_tstart(tstop, dt)

        # create the Time-Energy histogram
        bins = TimeEnergyBins(counts, tstart, tstop, exposure,
                              self._emin, self._emax)

        det_str = '+'.join(detectors)
        headers = self._set_phaii_headers(det_str, bins.num_chans)
        
        obj = GbmPhaii.from_data(bins, gti=self.gti, trigger_time=self.trigtime,
                                 headers=headers)
        return obj

    def _build_hdulist(self):
        
        # create FITS and primary header
        hdulist = fits.HDUList()
        primary_hdu = fits.PrimaryHDU(header=self.headers['PRIMARY'])
        for key, val in self.headers['PRIMARY'].items():
            primary_hdu.header[key] = val
        hdulist.append(primary_hdu)
        
        # TRIGRATE extension
        trigrate_hdu = fits.BinTableHDU(data=self.trigrates.to_recarray(),
                                        name='TRIGRATE', 
                                        header=self.headers['TRIGRATE'])
        for key, val in self.headers['TRIGRATE'].items():
            trigrate_hdu.header[key] = val
        trigrate_hdu.header.comments['TTYPE1'] = 'Beginning of accumulation, ' \
                                                 'calculated value'
        trigrate_hdu.header.comments['TTYPE2'] = 'End of accumulation, same as'\
                                                 ' PCKTTIME'
        trigrate_hdu.header.comments['TTYPE3'] = 'Spacecraft attitude ' \
                                                 'quaternions'
        trigrate_hdu.header.comments['TTYPE4'] = 'Spacecraft position: Earth ' \
                                                 'X, Y, & Z'
        trigrate_hdu.header.comments['TTYPE5'] = 'Rates-14 detectors, 8 channels'
        hdulist.append(trigrate_hdu)
        

        # BCKRATES extension
        backrates_hdu = fits.BinTableHDU(data=self.backrates.to_recarray(),
                                         name='BCKRATES', 
                                         header=self.headers['BCKRATES'])
        for key, val in self.headers['BCKRATES'].items():
            backrates_hdu.header[key] = val
        backrates_hdu.header.comments['TTYPE1'] = 'Start time of the ' \
                                                  'background accumulation'
        backrates_hdu.header.comments['TTYPE2'] = 'Creation time of the ' \
                                                  'background model'
        backrates_hdu.header.comments['TTYPE3'] = 'Quality Flag for the ' \
                                                  'background model'
        backrates_hdu.header.comments['TTYPE4'] = 'Rates-14 detectors, 8 channels'                            
        hdulist.append(backrates_hdu)

        # OB_CALC extension
        obcalc_hdu = fits.BinTableHDU(data=np.array([loc.to_recarray() for \
                                                    loc in self.fsw_locations]),
                                      name='OB_CALC', 
                                      header=self.headers['OB_CALC'])
        for key, val in self.headers['OB_CALC'].items():
            obcalc_hdu.header[key] = val
        obcalc_hdu.header.comments['TTYPE1'] = 'End time of location rates ' \
                                               'accumulation'
        obcalc_hdu.header.comments['TTYPE2'] = 'Calculated Right Ascension of '\
                                               'source'
        obcalc_hdu.header.comments['TTYPE3'] = 'Calculated Declination of source'
        obcalc_hdu.header.comments['TTYPE4'] = 'Error radius of calculated' \
                                               'location'
        obcalc_hdu.header.comments['TTYPE5'] = 'Location algorithm used in ' \
                                               'calculation'
        obcalc_hdu.header.comments['TTYPE6'] = 'Two highest probable event ' \
                                               'classes'
        obcalc_hdu.header.comments['TTYPE7'] = 'Reliabilities of probable ' \
                                               'event classes'
        obcalc_hdu.header.comments['TTYPE8'] = 'Standardized count intensity'
        obcalc_hdu.header.comments['TTYPE9'] = 'Standard hardness ratio ' \
                                               '(channels 3 / 4)'
        obcalc_hdu.header.comments['TTYPE10'] = 'Fluence counts in 3 + 4 ' \
                                                'energy band'
        obcalc_hdu.header.comments['TTYPE11'] = 'Significance of localization '\
                                                'in sigma'
        obcalc_hdu.header.comments['TTYPE12'] = 'Localization rates-12 NaI ' \
                                                'detectors'
        obcalc_hdu.header.comments['TTYPE13'] = 'Location data timescale'
        obcalc_hdu.header.comments['TTYPE14'] = 'Source azimuth with respect ' \
                                                'to Spacecraft'
        obcalc_hdu.header.comments['TTYPE15'] = 'Source zenith with respect to'\
                                                ' Spacecraft'
        hdulist.append(obcalc_hdu)

        # MAXRATES extension
        maxrates_hdu = fits.BinTableHDU(data=np.array([mr.to_recarray() for \
                                                       mr in self.maxrates]),
                                      name='MAXRATES', 
                                      header=self.headers['MAXRATES'])
        for key, val in self.headers['MAXRATES'].items():
            maxrates_hdu.header[key] = val
        maxrates_hdu.header.comments['TTYPE1'] = 'Beginning of accumulation, ' \
                                                 'calculated value'
        maxrates_hdu.header.comments['TTYPE2'] = 'End of accumulation, same as'\
                                                 ' PCKTTIME'
        maxrates_hdu.header.comments['TTYPE3'] = 'Spacecraft attitude ' \
                                                 'quaternions'
        maxrates_hdu.header.comments['TTYPE4'] = 'Spacecraft position: Earth ' \
                                                 'X, Y, & Z'
        maxrates_hdu.header.comments['TTYPE5'] = 'Rates-14 detectors, 8 channels'
        hdulist.append(maxrates_hdu)

        # EVNTRATE extension
        evntrate_hdu = fits.BinTableHDU(data=self._data, name='EVNTRATE', 
                                        header=self.headers['EVNTRATE'])
        for key, val in self.headers['EVNTRATE'].items():
            evntrate_hdu.header[key] = val
        evntrate_hdu.header.comments['TTYPE1'] = 'Beginning of accumulation, ' \
                                                 'calculated value'
        evntrate_hdu.header.comments['TTYPE2'] = 'End of accumulation, same as'\
                                                 ' PCKTTIME'
        evntrate_hdu.header.comments['TTYPE3'] = 'Spacecraft attitude ' \
                                                 'quaternions'
        evntrate_hdu.header.comments['TTYPE4'] = 'Spacecraft position: Earth ' \
                                                 'X, Y, & Z'
        evntrate_hdu.header.comments['TTYPE5'] = 'Rates-14 detectors, 8 channels'
        hdulist.append(evntrate_hdu)
        
        return hdulist

    def _fix_tstart(self, tstop, dt):
        # this ensures that edge differences < 1 ms get fixed
        tstart = tstop - dt
        mask = (np.abs(tstart[1:] - tstop[:-1]) < 0.001)
        tstart[1:][mask] = tstop[:-1][mask]
        return tstart

    def _calc_exposure(self, counts, dt):
        """Calculate the exposure
        
        Args:
            counts (np.array): The observed counts in each bin
            dt (np.array): The time bin widths 
        
        Returns:
            (np.array)
        """
        deadtime = np.copy(counts)
        deadtime[:, :-1] *= 2.6e-6  # 2.6 microsec for each count
        deadtime[:, -1] *= 1e-5  # 10 microsec for each count in overflow
        total_deadtime = np.sum(deadtime, axis=1)
        exposure = (1.0 - total_deadtime) * dt
        return exposure

    def _time_indices(self, time_res):
        """Indices into the Trigdat arrays corresponding to the desired time 
        resolution(s) 
        
        Args:
            time_res (int): The time resolution in ms of the data
        
        Returns:
            (np.array, np.array): Indices into the trigdat arrays and the \
                                  bin widths in seconds
        """
        # bin widths
        dt = np.round((self._data['ENDTIME'] - self._data['TIME']) * 1000)
        # background bins
        back_idx = np.where(dt == 8192)[0]
        # 1 s scale bins - this is the minimum amount returned
        idx = np.where(dt == 1024)[0]
        cnt = len(idx)
        # reconcile 8 s and 1 s data
        idx = self._reconcile_timescales(back_idx, idx)

        # reconcile 8 s + 1 s and 256 ms data
        if time_res <= 256:
            tidx = np.where(dt == 256)[0]
            idx = self._reconcile_timescales(idx, tidx)

        # reconcile 8 s + 1 s + 256 ms and 64 ms data
        if time_res == 64:
            tidx = np.where(dt == 64)[0]
            idx = self._reconcile_timescales(idx, tidx)

        # return reconciled indices
        return idx, np.reshape(dt[idx] / 1000.0, len(idx))

    def _reconcile_timescales(self, idx1, idx2):
        """Reconcile indices representing different timescales and glue them 
           together to form a complete (mostly) continuous set of indices
        
        Args: 
            idx1 (np.array): Indices of the "bracketing" timescale
            idx2 (np.array): Indices of the "inserted" timescale 
        
        Returns:
            np.array: Indices of idx2 spliced into idx1
		"""
        # if there is no idx1, then we only have to return idx2 and v.v.
        if idx1.size == 0:
            return idx2
        if idx2.size == 0:
            return idx1

        # bin edges for both selections
        start_times1 = self._data['TIME'][idx1]
        end_times1 = self._data['ENDTIME'][idx1]
        start_times2 = self._data['TIME'][idx2]
        end_times2 = self._data['ENDTIME'][idx2]

        # find where bracketing timescale ends and inserted timescale begins
        mask = end_times1 >= start_times2[0]
        if mask.sum():
            start_idx = (np.where(mask))[0][0]
            idx = np.concatenate((idx1[0:start_idx], idx2))
        else:
            idx = np.concatenate((idx1, idx2))

        # find where inserted timescale ends and bracketing timescale begins again
        mask = start_times1 >= end_times2[-1]
        if mask.sum():
            end_idx = (np.where(mask))[0][0]
            idx = np.concatenate((idx, idx1[end_idx:]))
                
        return idx

    def _gti_from_times(self, tstarts, tstops):
        """Estimate the GTI from the bin start and stop times.
        This may return multiple GTIs if several background packets are missing
		
		Args:
            tstarts (np.array): The start times of the bins
            tstops (np.array): The end times of the bins
        
        Returns:
            [(float, float), ...]: A list of time ranges
        """
        tstart = tstarts[0]
        tstop = tstops[-1]
        dt = tstarts[1:] - tstops[:-1]
        idx = np.where(np.abs(dt) > 10.0)[0]
        if idx.sum() > 0:
            gti = [(tstart, tstops[idx[0] - 1]), (tstarts[idx[0]], tstop)]
        else:
            gti = [(tstart, tstop)]
        return np.array(gti)

    def _set_phaii_headers(self, det, num_chans):
        headers = PhaiiTriggerHeaders()
        try:
            detname = GbmDetectors.from_str(det).full_name
        except:
            detname = det
        
        headers[0]['DETNAM'] = detname
        headers[0]['DATE-OBS'] = self.headers[0]['DATE-OBS']
        headers[0]['DATE-END'] = self.headers[0]['DATE-END']
        headers[0]['TSTART'] = self.headers[0]['TSTART']
        headers[0]['TSTOP'] = self.headers[0]['TSTOP']
        headers[0]['TRIGTIME'] = self.trigtime
        headers[0]['OBJECT'] = self.headers[0]['OBJECT']
        headers[0]['RA_OBJ'] = self.headers[0]['RA_OBJ']
        headers[0]['DEC_OBJ'] = self.headers[0]['DEC_OBJ']
        headers[0]['ERR_RAD'] = self.headers[0]['ERR_RAD']

        headers[1]['DETNAM'] = detname
        headers[1]['DATE-OBS'] = self.headers[0]['DATE-OBS']
        headers[1]['DATE-END'] = self.headers[0]['DATE-END']
        headers[1]['TSTART'] = self.headers[0]['TSTART']
        headers[1]['TSTOP'] = self.headers[0]['TSTOP']
        headers[1]['TRIGTIME'] = self.trigtime
        headers[1]['OBJECT'] = self.headers[0]['OBJECT']
        headers[1]['RA_OBJ'] = self.headers[0]['RA_OBJ']
        headers[1]['DEC_OBJ'] = self.headers[0]['DEC_OBJ']
        headers[1]['ERR_RAD'] = self.headers[0]['ERR_RAD']
        headers[1]['DETCHANS'] = num_chans

        headers[2]['DETNAM'] = detname
        headers[2]['DATE-OBS'] = self.headers[0]['DATE-OBS']
        headers[2]['DATE-END'] = self.headers[0]['DATE-END']
        headers[2]['TSTART'] = self.headers[0]['TSTART']
        headers[2]['TSTOP'] = self.headers[0]['TSTOP']
        headers[2]['TRIGTIME'] = self.trigtime
        headers[2]['OBJECT'] = self.headers[0]['OBJECT']
        headers[2]['RA_OBJ'] = self.headers[0]['RA_OBJ']
        headers[2]['DEC_OBJ'] = self.headers[0]['DEC_OBJ']
        headers[2]['ERR_RAD'] = self.headers[0]['ERR_RAD']
        headers[2]['DETCHANS'] = num_chans

        headers[3]['DETNAM'] = detname
        headers[3]['DATE-OBS'] = self.headers[0]['DATE-OBS']
        headers[3]['DATE-END'] = self.headers[0]['DATE-END']
        headers[3]['TSTART'] = self.headers[0]['TSTART']
        headers[3]['TSTOP'] = self.headers[0]['TSTOP']
        headers[3]['TRIGTIME'] = self.trigtime
        headers[3]['OBJECT'] = self.headers[0]['OBJECT']
        headers[3]['RA_OBJ'] = self.headers[0]['RA_OBJ']
        headers[3]['DEC_OBJ'] = self.headers[0]['DEC_OBJ']
        headers[3]['ERR_RAD'] = self.headers[0]['ERR_RAD']
        return headers        
    
    def __repr__(self):
        s = '<Trigdat: {};\n'.format(self.filename)
        s += ' trigtime={:.2f};'.format(self.trigtime)
        s += ' triggered detectors={}>'.format(self.triggered_detectors)
        return s


class MaxRates:
    """Class for the MAXRATES and TRIGRATE data in Trigdat.
    """
    _dtypes = [('TIME', '>f8'), ('ENDTIME', '>f8'), ('SCATTITD', '>f4', (4,)), 
               ('EIC', '>f4', (3,)), ('TRIGRATE', '>f4', (8,14))]
    
    def __init__(self):
        self._detectors = [det.name for det in GbmDetectors]
        self._time_range = (None, None)
        self._quats = None
        self._eic = None
        self._rates = None

    @property
    def all_rates(self):
        """(np.array): An array (:attr:`num_chans`, :attr:`num_dets`) of 
                       the maxrates"""
        return self._rates

    @property
    def eic(self):
        """(np.array): The position of Fermi in Earth inertial coordinates"""
        return self._eic

    @property
    def num_chans(self):
        """(int): The number of energy channels"""
        if self._rates is not None:
            return self._rates.shape[0]
        else:
            return 0

    @property
    def num_dets(self):
        """(int): The number of detectors"""
        if self._rates is not None:
            return self._rates.shape[1]
        else:
            return 0

    @property
    def quaternion(self):
        """ (np.array): The quaternions at the maxrates time"""
        return self._quats

    @property
    def time_range(self):
        """(float, float): The time range of the maxrates"""
        return self._time_range

    @property
    def timescale(self):
        """(float): The timescale of the maxrates"""
        if self.time_range[0] is not None and self.time_range[1] is not None:
            return round((self.time_range[1] - self.time_range[0]) * 1000.0)
        else:
            return None

[docs] @classmethod def create(cls, tstart=None, tstop=None, quaternion=None, eic=None, rates=None): """Create a MaxRates object from keywords. Args: tstart (float, optional): The start time of the MaxRates interval tstop (float, optional): The stop time of the MaxRates interval quaternion (np.array, optional): The attitude quaternion eic (np.array, optional): The position in earth inertial coordinates rates (np.array): The (num channels, num detectors) rates array Returns: (:class:`MaxRates`) """ obj = cls() obj._time_range = (tstart, tstop) if isinstance(quaternion, (list, tuple)): quaternion = np.array(quaternion) if quaternion is not None: if quaternion.size != 4: raise ValueError('quaternion must have 4 elements') obj._quats = quaternion if isinstance(eic, (list, tuple)): eic = np.array(eic) if eic is not None: if eic.size != 3: raise ValueError('eic must have 3 elements') obj._eic = eic if isinstance(rates, (list, tuple)): rates = np.array(rates) obj._rates = rates return obj
[docs] @classmethod def from_recarray(cls, rec_array): """Create from a FITS or numpy record array. Args: rec_array (np.recarray): The FITS TRIGRATE or MAXRATES record array from the trigdat file Returns: (:class:`MaxRates`) """ obj = cls() obj._time_range = (rec_array['TIME'], rec_array['ENDTIME']) obj._quats = rec_array['SCATTITD'] obj._eic = rec_array['EIC'] try: obj._rates = rec_array['TRIGRATE'] except: obj._rates = rec_array['MAXRATES'] return obj
[docs] def get_detector(self, det): """Retrieve the rates for a detector Args: det (str): The detector Returns: (np.array) """ if self._rates is None: return None mask = (np.array(self._detectors) == det) return self._rates[:, mask].squeeze()
[docs] def to_recarray(self): """Convert contents of the object to a numpy record array. Returns: (np.recarray) """ dtypes = self._dtypes dtypes[-1] = (*dtypes[-1][:-1], (self.num_chans, self.num_dets)) record = np.array([(*self._time_range, self._quats, self._eic, self._rates)], dtype=dtypes) return record
def __repr__(self): return '<MaxRates: {0} ms>'.format(self.timescale) class BackRates: """Class for the background rates data in Trigdat. """ _dtypes = [('TIME', '>f8'), ('ENDTIME', '>f8'), ('QUALITY', 'u1', (2,)), ('BCKRATES', '>f4', (8,14))] def __init__(self): self._detectors = [det.name for det in GbmDetectors] self._time_range = (None, None) self._quality = np.array((0, 0)) self._rates = None @property def all_rates(self): """(np.array): An array (:attr:`num_chans`, :attr:`num_dets`) of the background rates""" return self._rates @property def num_chans(self): """(int): The number of energy channels""" if self._rates is not None: return self._rates.shape[0] else: return 0 @property def num_dets(self): """(int): The number of detectors""" if self._rates is not None: return self._rates.shape[1] else: return 0 @property def quality(self): """(int, int): The quality flags for the background""" return self._quality @property def time_range(self): """(float, float): The time range of the background rates""" return self._time_range
[docs] @classmethod def create(cls, tstart=None, tstop=None, quality=None, rates=None): """Create a BackRates object from keywords. Args: tstart (float, optional): The start time of the MaxRates interval tstop (float, optional): The stop time of the MaxRates interval quality (int, int): Quality flags rates (np.array): The (num channels, num detectors) rates array Returns: (:class:`BackRates`) """ obj = cls() obj._time_range = (tstart, tstop) if quality is not None: if not isinstance(quality, (list, tuple)): raise TypeError('quality must be a 2-tuple integer') if len(quality) != 2: raise ValueError('quality must be a 2-tuple integer') obj._quality = np.asarray(quality) if isinstance(rates, (list, tuple)): rates = np.array(rates) obj._rates = rates return obj
[docs] @classmethod def from_recarray(cls, rec_array): """Create from a FITS or numpy record array. Args: rec_array (np.recarray): The FITS BCKRATES record array from the trigdat file Returns: (:class:`BackRates`) """ obj = cls() obj._time_range = (rec_array['TIME'], rec_array['ENDTIME']) obj._quality = rec_array['QUALITY'] obj._rates = rec_array['BCKRATES'] return obj
[docs] def get_detector(self, det): """Retrieve the background rates for a detector Args: det (str): The detector Returns: np.array: An array of size (:attr:`num_chans`,) of background rates for the detector """ if self._rates is None: return None mask = (np.array(self._detectors) == det) return self._rates[:, mask].squeeze()
[docs] def to_recarray(self): """Convert contents of the object to a numpy record array Returns: (np.recarray) """ dtypes = self._dtypes dtypes[-1] = (*dtypes[-1][:-1], (self.num_chans, self.num_dets)) record = np.array([(*self._time_range, self._quality, self._rates)], dtype=dtypes) return record
def __repr__(self): return '<BackRates: time range={0}>'.format(self.time_range) class FswLocation: """Class for the Flight Software localization information. """ _dtypes = [('TIME', '>f8'), ('RA', '>f4'), ('DEC', '>f4'), ('STATERR', '>f4'), ('LOCALG', '>i2'), ('EVTCLASS', '>i2', (2,)), ('RELIABLT', '>f4', (2,)), ('INTNSITY', '>f4'), ('HDRATIO', '>f4'), ('FLUENCE', '>f4'), ('SIGMA', '>f4'), ('LOCRATES', '>i2', (12,)), ('TRIG_TS', '>f4'), ('TR_SCAZ', '>f4'), ('TR_SCZEN', '>f4')] def __init__(self): self._time = None self._location = (None, None, None) self._algorithm = None self._class1 = (20, 1.0) self._class2 = (20, 0.0) self._intensity = None self._hardness = None self._fluence = None self._sigma = None self._rates = None self._timescale = None self._azzen = (None, None) @property def fluence(self): """(float): The fluence of the localization interval""" return self._fluence @property def hardness_ratio(self): """(float): The hardness ratio for the localization interval""" return self._hardness @property def intensity(self): """(float): The brightness of the signal""" return self._intensity @property def location(self): """(float, float, float): The RA, Dec, and statistical error of the onboard localization""" return self._location @property def location_sc(self): """(float, float): The localization in spacecraft coordinates: Azimuth, Zenith""" return self._azzen @property def next_classification(self): """(str, float): The next most likely classification of the trigger and the probability""" return self._class2 @property def rates(self): """(np.array): The rates in each NaI detector used for localization""" return self._rates @property def significance(self): """(float): The S/N ratio of the localization interval""" return self._sigma @property def spectrum(self): """(str): The spectrum used in the localization""" return self._algorithm @property def time(self): """(float): Time at which the localization was calculated""" return self._time @property def timescale(self): """(float): The localization interval timescale""" return self._timescale @property def top_classification(self): """(str, float): The most likely classification of the trigger and the probability""" return self._class1 @classmethod def create(cls, time=None, radec=(None, None), err=None, algorithm='normal', class1='UNCERT', class1_prob=1.0, class2='UNCERT', class2_prob=0.0, intensity=None, hardness=None, fluence=None, sigma=None, rates=None, timescale=None, azzen=(None, None)): """Create a FswLocation object from keywords Args: time (float, optional): The time of the localization radec ((float, float), optional): RA and Dec err (float, optional): The localization error algorithm (str, optional): The algorithm spectrum; one of 'hard', 'normal', or 'soft' class1 (str, optional): The primary classification class1_prob (float, optional): The primary classification probability class2 (str, optional): The secondary classification class2_prob (float, optional): The secondary classification probability intensity: (float, optional): Intensity of the signal in the localization interval hardness: (float, optional): The hardness ratio in the localization interval fluence: (float, optional): The fluence in the localization interval sigma: (float, optional): The S/N ratio of the localization interval rates: (np.array, optional): A 12-element array containing the rate in each NaI used for localization timescale (float, optional): The localization timescale azzen ((float, float), optional): Spacecraft azimuth and zenith of the localization Returns: (:class:`FswLocation`) """ obj = cls() obj._time = time if not isinstance(radec, (list, tuple)): raise TypeError('radec must be a 2-tuple float') if len(radec) != 2: raise ValueError('radec must be a 2-tuple float') obj._location = (*radec, err) if algorithm not in spectrum: raise ValueError('algorithm is not valid') obj._algorithm = algorithm if class1 not in classifications.values(): raise ValueError('class1 is not a valid class.') if class1_prob < 0.0 or class1_prob > 1.0: raise ValueError('class1_prob must be in range [0,1]') obj._class1 = (class1, class1_prob) if class2 not in classifications.values(): raise ValueError('class2 is not a valid class.') if class2_prob < 0.0 or class2_prob > 1.0 or \ class1_prob+class2_prob > 1.0: raise ValueError('class2_prob must be in range [0,1] and '\ 'class1_prob + class2_prob must be <= 1') obj._class2 = (class2, class2_prob) obj._intensity = intensity obj._hardness = hardness obj._fluence = fluence obj._sigma = sigma obj._timescale = timescale if rates is None: rates = np.zeros(12) else: rates = np.asarray(rates) if rates.size != 12: raise ValueError('rates must be a 12-element array') obj._rates = rates if not isinstance(azzen, (list, tuple)): raise TypeError('azzen must be a 2-tuple float') if len(azzen) != 2: raise ValueError('azzen must be a 2-tuple float') obj._azzen = azzen return obj
[docs] @classmethod def from_recarray(cls, rec_array): """Create from a FITS or numpy record array. Args: rec_array (np.recarray): The FITS OB_CALC record array from the trigdat file Returns: (:class:`FswLocation`) """ obj = cls() obj._time = rec_array['TIME'] obj._location = (rec_array['RA'], rec_array['DEC'], rec_array['STATERR']) obj._algorithm = spectrum[rec_array['LOCALG'] - 1] obj._class1 = (classifications[rec_array['EVTCLASS'][0]], rec_array['RELIABLT'][0]) obj._class2 = (classifications[rec_array['EVTCLASS'][1]], rec_array['RELIABLT'][1]) obj._intensity = rec_array['INTNSITY'] obj._hardness = rec_array['HDRATIO'] obj._fluence = rec_array['FLUENCE'] obj._sigma = rec_array['SIGMA'] obj._rates = rec_array['LOCRATES'] try: obj._timescale = rec_array['TRIG_TS'] except KeyError: pass try: obj._azzen = (rec_array['TR_SCAZ'], rec_array['TR_SCZEN']) except KeyError: pass return obj
def to_recarray(self): """Convert contents of the object to a numpy record array Returns: (np.recarray) """ dtypes = self._dtypes algorithm = spectrum.index(self.spectrum) class_nums = {v: k for k, v in classifications.items()} record = np.array([(self.time, *self.location, algorithm, (class_nums[self._class1[0]], class_nums[self._class2[0]]), (self._class1[1], self._class2[1]), self.intensity, self.hardness_ratio, self.fluence, self.significance, self.rates, self.timescale, *self.location_sc)], dtype=dtypes) return record def __repr__(self): s = '<FswLocation: {0}: {1:.1f}%;\n'.format(self.top_classification[0], self.top_classification[1]*100.) try: s += ' RA={0:.1f}, Dec={1:.1f}, err={2:.1f}>'.format(*self.location) except: s += ' RA=None, Dec=None, err=None>' return s