#
# Copyright 2026 by University College Dublin. All rights reserved.
#
# Developed by: Derek O'Callaghan
# University College Dublin
# https://www.ucd.ie/
#
# Builds on:
# Gamma-ray Data Tools - Core Components (https://github.com/USRA-STI/gdt-core)
# Gamma-ray Data Tools - Fermi mission components (https://github.com/USRA-STI/gdt-fermi/)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
# in compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing permissions and limitations under the
# License.
#
import astropy.units as u
import astropy.coordinates.representation as r
import numpy as np
import warnings
from .time import *
from .detectors import GiftsDetectors
from .headers import TrigdatHeaders, PhaiiTriggerHeaders
from .phaii import GiftsPhaii
from gdt.core.coords.spacecraft import SpacecraftFrame
from gdt.core.coords.quaternion import Quaternion
from gdt.core.data_primitives import TimeRange, Gti
from gdt.missions.fermi.gbm.detectors import GbmDetectors
from gdt.missions.fermi.gbm.trigdat import Trigdat as GbmTrigdat, MaxRates as GbmMaxRates, BackRates as GbmBackRates, FswLocation as GbmFswLocation
__all__ = ['Trigdat', 'MaxRates', 'BackRates', 'FswLocation']
[docs]
class Trigdat(GbmTrigdat):
"""Class for the GIFTS Trigger Data
"""
def __init__(self):
super().__init__()
# Will use GBM ebounds for now, but will define GIFTS emin/emax here later
# Lower case detector names are used to facilitate parent functions
self._detectors = [det.name.lower() for det in GiftsDetectors]
@property
def triggered_detectors(self):
"""(list of str): The detectors that were triggered"""
try:
detmask = self.headers["PRIMARY"]["DET_MASK"]
detmask = np.array(list(detmask)).astype(int) == 1
except:
return None
# Restore upper case names here
return (np.array([d.upper() for d in self._detectors])[detmask]).tolist()
[docs]
@classmethod
def open(cls, file_path, **kwargs):
"""Open and read a Trigdat file
Args:
file_path (str): The file path of the trigdat file
Returns:
(:class:`Trigdat`)
"""
obj = super(GbmTrigdat, cls).open(file_path, **kwargs)
# get the headers
hdrs = [hdu.header for hdu in obj.hdulist]
obj._headers = TrigdatHeaders.from_headers(hdrs)
obj._trigtime = obj._headers[0]['TRIGTIME']
# store trigrate, maxrates, backrates, and fsw location
trigrate_data = obj.hdulist['TRIGRATE'].data
if trigrate_data.size > 0:
obj._trigrates = MaxRates.from_recarray(trigrate_data[0])
obj._maxrates = [MaxRates.from_recarray(maxrate) for maxrate in
obj.hdulist['MAXRATES'].data]
try:
obj._backrates = BackRates.from_recarray(obj.hdulist['BCKRATES'].data[0])
except:
warnings.warn('No BCKRATES data in this file.', RuntimeWarning)
obj._fsw_locations = [FswLocation.from_recarray(ob_calc) \
for ob_calc in obj.hdulist['OB_CALC'].data]
# store the data
obj._data = obj.hdulist['EVNTRATE'].data
obj._data.sort(order='TIME')
obj._rates = obj._data['RATE']
# store the position history
idx, dt = obj._time_indices(1024)
eic = obj._data['EIC'][idx]
quat = obj._data['SCATTITD'][idx]
times = obj._data['TIME'][idx]
obj._poshist = SpacecraftFrame(
obsgeoloc=r.CartesianRepresentation(x=eic[:,0], y=eic[:,1],
z=eic[:,2], unit=u.km),
quaternion=Quaternion(quat),
obstime=Time(times, format='gifts'),
detectors=GbmDetectors)
obj._time_range = TimeRange(obj._data['ENDTIME'][0] - dt[0],
obj._data['ENDTIME'][-1])
obj._gti = Gti.from_list(obj._gti_from_times(obj._data['TIME'],
obj._data['ENDTIME']))
return obj
[docs]
@classmethod
def from_data(cls, phaii_list, poshist, trigtime, det_mask, time_range=None,
trigrate=None, maxrates=None, backrates=None,
fswlocations=None, filename=None, headers=None):
"""Create a Trigdat object from a list of GiftsPhaii objects and a
SpacecraftFrame object.
Note:
The list of PHAIIs does not need to include every detector, as
missing detectors will be assumed to have zero rates. However,
do not include more than one file from the same detector in the
list. The PHAII list can be in any order.
Args:
phaii_list (list of :class:`~.phaii.GiftsPhaii`): A list of GiftsPhaii
objects
poshist (:class:`~gdt.core.coords.SpacecraftFrame`):
The spacecraft frame
trigtime (float): The trigger time
det_mask (np.ndarray): Triggered detector mask
time_range (tuple, optional): The time range of the Trigdat. If not
specified, uses the time range of the
GiftsPhaiis
trigrate (:class:`MaxRates`, optional): The trigger rates object
maxrates (list of :class:`MaxRates`, optional): The maxrates objects
backrates (:class:`BackRates`, optional): The background rates
object
fswlocations (list of :class:`FswLocation`, optional): The flight
software location objects
Returns:
(:class:`Trigdat`)
"""
# make sure phaii_list contains the correct objects and desired time
# is contained within all files
for phaii in phaii_list:
if not isinstance(phaii, GiftsPhaii):
raise TypeError('phaii_list must be a list of GiftsPhaii objects')
phaii_time_range = TimeRange(*phaii.time_range)
# make sure the poshist is a PosHist object and desired time is
# contained within the file
if not isinstance(poshist, SpacecraftFrame):
raise TypeError('poshist must be a SpacecraftFrame object')
poshist_time_range = TimeRange(poshist.obstime[0].gifts,
poshist.obstime[-1].gifts)
if not poshist_time_range.contains(trigtime):
raise ValueError('{} does not contain the specified ' \
'trigtime'.format(poshist))
# For GIFTS, we won't be slicing phaii as there is a
# potential disconnect between phaii (relative) and
# poshist (absolute) times, when using GBM source data
# with a non-GBM trigger time.
# Leave the block here for now. However, need to
# create a poshist-suitable trigtime for all time
# indexing here
poshist_trigtime = poshist.obstime[0].value + (-1.0 * phaii_list[0].data.tstart[0])
# slice the files by time, if requested
if time_range is not None:
trange = (time_range[0] - poshist_trigtime, time_range[1] - poshist_trigtime)
phaii_list = [phaii.slice_time(trange) for phaii in phaii_list]
obj = cls()
obj._poshist = poshist
# Use the specified GIFTS trigger time here
obj._trigtime = trigtime
# create the data record array
num_chans = phaii_list[0].num_chans
num_dets = len(obj._detectors)
num_bins = phaii_list[0].data.num_times
dtypes = [('TIME', '>f8'), ('ENDTIME', '>f8'),
('SCATTITD', '>f4', (4,)), ('EIC', '>f4', (3,)),
('RATE', '>f4', (num_dets, num_chans))]
obj._data = np.recarray((num_bins,), dtype=dtypes)
tstart = phaii_list[0].data.tstart
tstop = phaii_list[0].data.tstop
tcent = phaii_list[0].data.time_centroids
# interpolate poshist to match the bin edges as is standard
gifts_times = Time(tstart + poshist_trigtime, format='gifts')
obj._poshist = poshist.at(gifts_times)
# to preserve the detector ordering assumed by the trigdat
# datatype, we must pull the phaii file from the list that
# corresponds with the right detector in our sequence. if the
# detector isn't represented in the phaii_list, then the rates will
# be zeros.
num_detectors = len(obj._detectors)
for i in range(num_bins):
rates = np.zeros((num_detectors, num_chans))
for j in range(num_detectors):
rates[j,:] = phaii_list[j].data.counts[i, :] * \
(1.024 / phaii_list[j].data.time_widths[i, np.newaxis])
quat = np.append(obj._poshist[i].quaternion.xyz,
obj._poshist[i].quaternion.w)
eic = obj._poshist.obsgeoloc[0].xyz.to('km').value
obj._data[i] = (tstart[i] + poshist_trigtime, tstop[i] + poshist_trigtime, quat,
eic, rates)
# This is set in open(), but not here, so set it now
# so that subsequent calls e.g. to_phaii() work
obj._rates = obj._data['RATE']
obj._time_range = TimeRange(obj._data['TIME'][0],
obj._data['ENDTIME'][-1])
obj._gti = Gti.from_list(obj._gti_from_times(obj._data['TIME'],
obj._data['ENDTIME']))
# assign/create trigrates, maxrates, backrates, and fsw_locations
if trigrate is not None:
if not isinstance(trigrate, MaxRates):
raise TypeError('trigrate must be a MaxRates object')
obj._trigrates = trigrate
else:
obj._trigrates = MaxRates.create()
if maxrates is not None:
try:
iter(maxrates)
except:
raise TypeError('maxrates must be a list of MaxRate objects')
if any([not isinstance(m, MaxRates) for m in maxrates]):
raise TypeError('maxrates must be a list of MaxRate objects')
obj._maxrates = maxrates
else:
obj._maxrates = [MaxRates.create()]
if backrates is not None:
if not isinstance(backrates, BackRates):
raise TypeError('backrates must be a BackRates object')
obj._backrates = backrates
else:
obj._backrates = BackRates.create()
if fswlocations is not None:
try:
iter(fswlocations)
except:
raise TypeError('fsw_locations must be a list of FswLocation ' \
'objects')
if any([not isinstance(f, FswLocation) for f in fswlocations]):
raise TypeError('fsw_locations must be a list of FswLocation ' \
'objects')
obj._fsw_locations = fswlocations
else:
obj._fsw_locations = [FswLocation.create()]
if filename is not None:
obj._filename = str(filename)
if headers is not None:
if not isinstance(headers, TrigdatHeaders):
raise ValueError('headers must be a TrigdatHeaders object')
obj._headers = headers
else:
headers = TrigdatHeaders()
pos = obj._poshist.scpos.interpolate(obj.trigtime)
headers[0]['FILENAME'] = obj._filename
for hdu in headers:
try:
hdu['TSTART'] = obj._data['TIME'][0]
hdu['TSTOP'] = obj._data['ENDTIME'][-1]
except:
pass
try:
hdu['TRIGTIME'] = obj._trigtime
except:
pass
try:
hdu['GEO_LONG'] = pos.longitude.value
hdu['GEO_LAT'] = pos.latitude.value
except:
pass
try:
hdu['DETTYPE'] = 'BOTH'
except:
pass
obj._headers = headers
if det_mask is None:
raise ValueError("Triggered detector mask (det_mask) must be specified")
obj._headers[0]['DET_MASK'] = ''.join(det_mask.astype(str))
return obj
[docs]
def to_phaii(self, detector, timescale=1024):
"""Convert the data for a detector to a :class:`~.phaii.GiftsPhaii`
object.
Note:
A standard Trigdat will contain data on 8192, 1024, 256, and 62 ms
timescales. A non-standard Trigdat can be created from other
data types (see :meth:`Trigdat.from_data`) and therefore the
timescales will likely be different and arbitrary. For that reason,
the ``timescale`` keyword will be ignored for non-standard Trigdat.
Args:
detector (str): The detector to convert
timescale (int, optional):
The minimum timescale in ms of the data to return. Available
options are 1024, 256, and 64. This is ignored if the trigdat
has non-standard timescales.
Returns:
(:class:`~.phaii.GiftsPhaii`)
"""
return self._gbm_to_gifts_phaii(gbm_phaii=super().to_phaii(detector, timescale),
detector=detector)
[docs]
def sum_detectors(self, detectors, timescale=1024):
"""Sum the data from a list of detectors and convert to a GiftsPhaii
object. The exposures from different detectors are averaged.
Args:
detectors (list of str): The detectors to sum
timescale (int, optional):
The minimum timescale in ms of the data to return. Available
options are 1024, 256, and 64.
Returns:
(:class:`~.phaii.GiftsPhaii`)
"""
return self._gbm_to_gifts_phaii(gbm_phaii=super().sum_detectors([det.lower() for det in detectors], timescale),
detector='+'.join(detectors))
def _gbm_to_gifts_phaii(self, gbm_phaii, detector):
"""Generates a GiftsPhaii from a GbmPhaii"""
gifts_phaii_headers = PhaiiTriggerHeaders.from_file_headers(gbm_phaii.headers)
for h in gifts_phaii_headers:
h["DETNAM"] = detector
# TODO: set trig time and ra/dec/err from loc
return GiftsPhaii.from_data(data=gbm_phaii.data,
gti=gbm_phaii.gti,
trigger_time=gbm_phaii.trigtime,
filename=gbm_phaii.filename,
headers=gifts_phaii_headers)
[docs]
class MaxRates(GbmMaxRates):
"""GIFTS MAXRATES and TRIGRATE data in Trigdat.
"""
def __init__(self):
super().__init__()
self._detectors = [det.name for det in GiftsDetectors]
[docs]
class BackRates(GbmBackRates):
"""GIFTS background rates data in Trigdat.
"""
def __init__(self):
super().__init__()
self._detectors = [det.name for det in GiftsDetectors]
[docs]
class FswLocation(GbmFswLocation):
def __init__(self):
super().__init__()
# Need to override the size of the RATES array in parent dtypes
# to ensure to_recarray() will work
loc_dtype = self._dtypes[11]
self._dtypes[11] = (loc_dtype[0], loc_dtype[1], (len(GiftsDetectors),))
[docs]
@classmethod
def create(cls, time=None, radec=(None, None), err=None, spectrum='normal',
class1='UNCERT', class1_prob=1.0, class2='UNCERT',
class2_prob=0.0, intensity=None, hardness=None, fluence=None,
sigma=None, rates=None, timescale=None, azzen=(None, None)):
"""Create a FswLocation object from keywords
Args:
time (float, optional): The time of the localization
radec ((float, float), optional): RA and Dec
err (float, optional): The localization error
spectrum (str, optional): The algorithm spectrum; one of 'hard',
'normal', or 'soft' TODO
class1 (str, optional): The primary classification
class1_prob (float, optional): The primary classification
probability
class2 (str, optional): The secondary classification
class2_prob (float, optional): The secondary classification
probability
intensity: (float, optional): Intensity of the signal in the
localization interval
hardness: (float, optional): The hardness ratio in the
localization interval
fluence: (float, optional): The fluence in the localization interval
sigma: (float, optional): The S/N ratio of the localization interval
rates: (np.array, optional): A 12-element array containing the rate
in each NaI used for localization
timescale (float, optional): The localization timescale
azzen ((float, float), optional): Spacecraft azimuth and zenith of
the localization
Returns:
(:class:`FswLocation`)
"""
obj = super().create(time=time,
radec=radec,
err=err,
algorithm='normal',
class1=class1,
class1_prob=class1_prob,
intensity=intensity,
hardness=hardness,
fluence=fluence,
sigma=sigma,
rates=np.empty(len(GbmDetectors.nai())),
timescale=timescale,
azzen=azzen,
)
obj._algorithm = spectrum
num_gifts_detectors = len(GiftsDetectors)
if rates is None:
rates = np.zeros(num_gifts_detectors)
else:
rates = np.asarray(rates)
if rates.size != num_gifts_detectors:
raise ValueError(f"rates must be a {num_gifts_detectors}-element array")
obj._rates = rates
return obj
[docs]
def to_recarray(self):
"""Convert contents of the object to a numpy record array
Returns:
(np.recarray)
"""
spectrum = self.spectrum
self._algorithm = "normal"
recarray = super().to_recarray()
self._algorithm = spectrum
return recarray