"""Event module."""
import re
from collections import UserDict, UserList
from operator import attrgetter
from pathlib import Path
import numpy as np
from ..html import table
from ..misc import logger
from ..spice.datetime import datetime, np_date_str, timedelta
warn, _ = logger('EventsFileParser')
[docs]class AbstractEvent(UserDict):
"""Single time event object."""
def __init__(self, key, *args, **kwargs):
self.key = key
if 'contextual info' in kwargs:
infos = kwargs.pop('contextual info')
if infos:
for info in infos.split(';'):
key, value = info.split('=', 1)
kwargs[key.strip()] = value.strip()
super().__init__(*args, **kwargs)
if 't_start' in self and 't_end' in self:
self.__class__ = EventWindow
elif 'event time [utc]' in self or \
'event utc apo' in self or \
'event utc peri' in self or \
'time' in self:
self.__class__ = Event
else:
raise ValueError(f'Event time was not found: {kwargs}')
def __repr__(self):
return '\n - '.join([
f'<{self.__class__.__name__}> {self}:',
*[f'{k}: {v}' for k, v in self.items()]
])
def _repr_html_(self):
return table(list(self.items()))
def _ipython_key_completions_(self):
return self.keys()
def __contains__(self, utc):
if isinstance(utc, str) and utc in self.data.keys():
return True
try:
return self.contains(utc).any()
except ValueError:
return False
def __hash__(self):
return hash(frozenset(self.items()))
def __add__(self, other):
"""Add to stop time."""
return self.stop + timedelta(other)
def __sub__(self, other):
"""Subtract from start time."""
return self.start - timedelta(other)
def __gt__(self, other):
return self.start > np.datetime64(str(other))
def __ge__(self, other):
return self.start >= np.datetime64(str(other))
def __lt__(self, other):
return self.stop < np.datetime64(str(other))
def __le__(self, other):
return self.stop <= np.datetime64(str(other))
@property
def start(self) -> np.datetime64:
"""Event start time."""
raise NotImplementedError
@property
def stop(self) -> np.datetime64:
"""Event stop time."""
raise NotImplementedError
@property
def start_date(self):
"""Event start date."""
return np_date_str(self.start)
@property
def stop_date(self):
"""Event stop date."""
return np_date_str(self.stop)
[docs] def contains(self, pts):
"""Check if points are inside the temporal windows.
Parameters
----------
pts: numpy.ndarray
List of temporal UTC point(s): ``utc`` or ``[utc_0, …]``.
If an object with :attr:`utc` attribute/property is provided,
the intersection will be performed on these points.
Returns
-------
numpy.ndarray
Return ``True`` if the point is inside the pixel corners, and
``False`` overwise.
Note
----
If the point is on the edge of the window it will be included.
"""
if hasattr(pts, 'utc'):
return self.contains(pts.utc)
if isinstance(pts, str):
return self.contains(np.datetime64(pts))
if isinstance(pts, (list, tuple)):
return self.contains(np.array(pts).astype('datetime64'))
return (self.start <= pts) & (pts <= self.stop)
[docs] def trim(self, *, before=None, after=None, by_event=None):
"""Trim the event with time boundaries.
Parameters
----------
before: str, datetime.datetime or numpy.datetime64
Start time window to consider.
after: str, datetime.datetime or numpy.datetime64
Stop time window to consider.
by_event: AbstractEvent or AbstractEventsCollection
Event(s) that can be used as an time window range.
Returns
-------
AbstractEvent
Same event if the event is inside the time window considered.
AbstractEvent
A trimmed event if the event cross one or both boundaries.
None
If the event is outside the time window considered.
"""
raise NotImplementedError
[docs]class Event(AbstractEvent):
"""Single time event object."""
def __str__(self):
return f'{self.key} ({self.start_date})'
@property
def _time_value(self) -> str:
"""Event time value."""
if 'event time [utc]' in self:
return str(self['event time [utc]']).replace('Z', '')
if 'event utc apo' in self:
return self['event utc apo']
if 'event utc peri' in self:
return self['event utc peri']
if 'time' in self:
return self['time']
raise KeyError('Event time not found')
@property
def start(self) -> np.datetime64:
"""Event start time."""
return datetime(self._time_value)
@property
def stop(self) -> np.datetime64:
"""Event stop time (same as start time)."""
return self.start
[docs] def trim(self, *, before=None, after=None, by_event=None):
"""Discard the event if outside the time boundaries.
Parameters
----------
before: str, datetime.datetime or numpy.datetime64
Start time window to consider.
after: str, datetime.datetime or numpy.datetime64
Stop time window to consider.
by_event: AbstractEvent or AbstractEventsCollection
Event(s) that can be used as an time window range.
Returns
-------
Event
Same event if the event is inside the time window considered.
None
If the event is outside the time window considered.
"""
if by_event:
return self.trim(before=by_event.start, after=by_event.stop)
if before is not None and self.start < np.datetime64(str(before)):
return None
if after is not None and np.datetime64(str(after)) < self.stop:
return None
return self
[docs]class EventWindow(AbstractEvent):
"""Window time event object."""
def __str__(self):
return f'{self.key} ({self.start_date} -> {self.stop_date})'
@property
def start(self) -> np.datetime64:
"""Event start time."""
return np.datetime64(str(self['t_start']).replace('Z', ''))
@property
def stop(self) -> np.datetime64:
"""Event stop time."""
return np.datetime64(str(self['t_end']).replace('Z', ''))
[docs] def trim(self, *, before=None, after=None, by_event=None):
"""Trim the event with time boundaries.
Parameters
----------
before: str, datetime.datetime or numpy.datetime64
Start time window to consider.
after: str, datetime.datetime or numpy.datetime64
Stop time window to consider.
by_event: AbstractEvent or AbstractEventsCollection
Event(s) that can be used as an time window range.
Returns
-------
EventWindow
Same event if the event is inside the time window considered.
EventWindow
A trimmed event if the event cross one or both boundaries.
None
If the event is outside the time window considered.
"""
if by_event:
return self.trim(before=by_event.start, after=by_event.stop)
data = dict(self.data)
if before is not None and self.start < np.datetime64(str(before)):
data['t_start'] = str(before)
if after is not None and np.datetime64(str(after)) < self.stop:
data['t_end'] = str(after)
if np.datetime64(data['t_start']) <= np.datetime64(data['t_end']):
return EventWindow(self.key, data)
return None
[docs]class AbstractEventsCollection:
"""Abstract collection of events."""
def __repr__(self):
return f'<{self.__class__.__name__}> {self}'
def __contains__(self, utc):
try:
return self.contains(utc).any()
except ValueError:
return False
def __hash__(self):
raise NotImplementedError
def __iter__(self):
raise NotImplementedError
def _filter(self, func, err_msg):
"""Comparison filter."""
elements = []
for event in self:
if isinstance(event, AbstractEventsCollection):
try:
elements.append(func(event))
except LookupError:
pass
elif func(event):
elements.append(event)
if elements:
if len(elements) == 1:
return elements[0]
if isinstance(self, EventsList):
return EventsList(elements)
return EventsDict(elements)
raise LookupError(err_msg)
def __gt__(self, other):
return self._filter(
lambda event: event > other, f'{self} <= {other}'
)
def __ge__(self, other):
return self._filter(
lambda event: event >= other, f'{self} < {other}'
)
def __lt__(self, other):
return self._filter(
lambda event: event < other, f'{self} >= {other}'
)
def __le__(self, other):
return self._filter(
lambda event: event <= other, f'{self} > {other}'
)
@property
def starts(self) -> list:
"""Event start times."""
return [event.start for event in self]
@property
def stops(self) -> list:
"""Event stop times."""
return [event.stop for event in self]
@property
def windows(self) -> list:
"""Event windows."""
return [(event.start, event.stop) for event in self]
@property
def start(self) -> np.datetime64:
"""Global events start time."""
return min(self.starts)
@property
def stop(self) -> np.datetime64:
"""Global events stop time."""
return max(self.stops)
@property
def start_date(self):
"""global events start date."""
return np_date_str(self.start)
@property
def stop_date(self):
"""Global events stop date."""
return np_date_str(self.stop)
[docs] def contains(self, pts):
"""Check if points are inside any temporal window.
Parameters
----------
pts: numpy.ndarray
List of temporal UTC point(s): ``utc`` or ``[utc_0, …]``.
If an object with :attr:`utc` attribute/property is provided,
the intersection will be performed on these points.
Returns
-------
numpy.ndarray
Return ``True`` if the point is inside the pixel corners, and
``False`` overwise.
Note
----
If the point is on the edge of the window it will be included.
"""
return np.any([event.contains(pts) for event in self], axis=0)
[docs] def before(self, date_stop, strict=False, as_dict=False):
"""Select all the events before the given date."""
res = self < date_stop if strict else self <= date_stop
return EventsDict(res) if as_dict else res
[docs] def after(self, date_start, strict=False, as_dict=False):
"""Select all the events after the given date."""
res = self > date_start if strict else self >= date_start
return EventsDict(res) if as_dict else res
[docs] def between(self, date_start, date_stop, strict=False, as_dict=False):
"""Select all the events between the given dates.
Danger
------
The parenthesis in the comparison are mandatory here.
Comparison operator chains (``a < b < c``) will break
due to short-circuit chain evaluation (``a < b and b < c``)
where only ``a < b`` if the result is not `False` and
``b < c`` otherwise, not the intersection (``(a < b) & (b < c)``).
"""
res = (date_start < self) < date_stop if strict else \
(date_start <= self) <= date_stop
return EventsDict(res) if as_dict else res
[docs] def trim(self, *, before=None, after=None, by_event=None):
"""Trim the events within time boundaries.
Parameters
----------
before: str, datetime.datetime or numpy.datetime64
Start time window to consider.
after: str, datetime.datetime or numpy.datetime64
Stop time window to consider.
by_event: AbstractEvent or AbstractEventsCollection
Event(s) that can be used as an time window range.
Returns
-------
AbstractEventsCollection
Same events if the event is inside the time window considered.
AbstractEventsCollection
Trimmed events if the event cross one or both boundaries.
None
The events is outside the time window considered.
"""
raise NotImplementedError
[docs]class EventsDict(AbstractEventsCollection, UserDict):
"""List of events items with different keys.
Warning
-------
The iteration is performed on the values and not the dict keys.
"""
def __init__(self, events, **kwargs):
self.data = {}
if isinstance(events, AbstractEvent):
events = [events]
for event in sorted(events, key=attrgetter('start')):
self.append(event)
def __str__(self):
n_events = len(self)
events = f'{n_events} key' + ('s' if n_events > 1 else '')
if n_events > 0:
events = f'({np_date_str(self.start)} -> {np_date_str(self.stop)} | {events})'
events += '\n - '.join([':', *[
str(event) for event in self
]])
return events
def _repr_html_(self):
rows = [
[
f'<em>{i}</em>',
event.key,
len(event) if isinstance(event, AbstractEventsCollection) else '-',
event.start_date,
event.stop_date,
]
for i, event in enumerate(self)
]
return table(rows, header=('', 'event', '#', 't_start', 't_stop'))
def __iter__(self):
return iter(self.data.values())
def __getitem__(self, key):
if isinstance(key, str) and key in self.keys():
return self.data[key]
if isinstance(key, int):
return self.get_by_int(key)
if isinstance(key, slice):
return EventsDict(self.get_by_slice(key))
if isinstance(key, tuple):
return self.find(*key)
return self.find(key)
def _ipython_key_completions_(self):
return self.keys()
def __contains__(self, utc):
if isinstance(utc, str) and utc in self.data.keys():
return True
return super().__contains__(utc)
def __hash__(self):
return hash(frozenset(self.data.items()))
[docs] def keys(self):
"""Dictionary keys."""
return self.data.keys()
[docs] def get_by_slice(self, key) -> list:
"""Get events by slice."""
return list(self)[key]
[docs] def get_by_int(self, key: int):
"""Get event by int."""
if -len(self) <= key < len(self):
return self.get_by_slice(key)
raise IndexError('Event index out of range')
[docs] def append(self, event):
"""Append a new event to the dict."""
key = event.key
if key not in self.keys():
self.data[key] = event
else:
if not isinstance(self.data[key], EventsList):
# Convert previous stored value into a new EventsList
self.data[key] = EventsList([self.data[key]])
self.data[key].append(event)
[docs] def find(self, *regex, as_dict=False):
"""Find the events matching a regex expression.
Parameters
----------
*regex: str
Search regex expression key(s).
as_dict: bool, optional
When a match exists returns the results as an ``EventsDict``.
Returns
-------
Event, EventWindow, EventsList or EventsDict
Event(s) matching the provided regex expression(s).
Raises
------
KeyError
If none of the provided key was found.
Note
----
When multiple keys are provided, the duplicates
will be discarded.
"""
# Duplicates are removed with the list(set(...))
res = list({
event
for expr in regex
for key, event in self.data.items()
if re.search(expr, key, flags=re.IGNORECASE)
})
if not res:
raise KeyError(f'`{"`, `".join(regex)}` not found')
return EventsDict(res) if as_dict or len(res) != 1 else res[0]
[docs] def startswith(self, *keys, as_dict=False):
"""Find the events starting with a given key
Parameters
----------
*keys: str
Search expression key(s).
as_dict: bool, optional
When a match exists returns the results as an ``EventsDict``.
See Also
--------
find
"""
return self.find(*[f'^{key}' for key in keys], as_dict=as_dict)
[docs] def endswith(self, *keys, as_dict=False):
"""Find the events ending with a given key
Parameters
----------
*keys: str
Search expression key(s).
as_dict: bool, optional
When a match exists returns the results as an ``EventsDict``.
See Also
--------
find
"""
return self.find(*[f'{key}$' for key in keys], as_dict=as_dict)
[docs] def trim(self, *, before=None, after=None, by_event=None):
"""Trim the events dict within time boundaries.
Parameters
----------
before: str, datetime.datetime or numpy.datetime64
Start time window to consider.
after: str, datetime.datetime or numpy.datetime64
Stop time window to consider.
by_event: AbstractEvent or AbstractEventsCollection
Event(s) that can be used as an time window range.
Returns
-------
EventsDict
Same events if the event is inside the time window considered.
EventsDict
Trimmed events if the event cross one or both boundaries.
None
The events is outside the time window considered.
"""
events = [
ev for event in self
if (ev := event.trim(before=before, after=after, by_event=by_event))
]
return EventsDict(events) if events else None
[docs]class EventsList(AbstractEventsCollection, UserList):
"""List of events with the same key."""
def __str__(self):
return (
f'{self.key} ({self.start_date} -> {self.stop_date} | {len(self)} events)'
)
def __iter__(self):
return iter(self.data)
def _repr_html_(self):
return table([
[f'<em>{i}</em>'] + list(event.values())
for i, event in enumerate(self)
], header=('', *self[0]))
def __contains__(self, item):
"""Check datetime and secondary keys."""
if isinstance(item, str):
for keys in [self.crema_names, self.obs_names]:
if item in keys:
return True
return super().__contains__(item)
def __getitem__(self, item):
"""Items can be queried by index or flyby crema name."""
if isinstance(item, str):
for keys in [self.crema_names, self.obs_names]:
if item in keys:
return self[keys.index(item)]
raise KeyError(item)
if isinstance(item, slice):
return EventsList(self.data[item])
if isinstance(item, tuple):
return EventsList([self[i] for i in item])
return self.data[item]
def _ipython_key_completions_(self):
return self.crema_names + self.obs_names
def __hash__(self):
return hash(tuple(self.data))
@property
def key(self):
"""Events key."""
return getattr(self[0], 'key', None)
@property
def crema_names(self) -> list:
"""Crema names when present in contextual info field."""
return [name for item in self if (name := item.get('Crema name'))]
@property
def obs_names(self) -> list:
"""Observation names when present in contextual info field."""
return [name for item in self if (name := item.get('observation name'))]
[docs] def trim(self, *, before=None, after=None, by_event=None):
"""Trim the events list within time boundaries.
Parameters
----------
before: str, datetime.datetime or numpy.datetime64
Start time window to consider.
after: str, datetime.datetime or numpy.datetime64
Stop time window to consider.
by_event: AbstractEvent or AbstractEventsCollection
Event(s) that can be used as an time window range.
Returns
-------
EventsList
Same events if the event is inside the time window considered.
EventsList
Trimmed events if the event cross one or both boundaries.
None
The events is outside the time window considered.
"""
events = [
ev for event in self
if (ev := event.trim(before=before, after=after, by_event=by_event))
]
return EventsList(events) if events else None
[docs]class AbstractEventsFile(EventsDict):
"""Abstract Events File object.
Parameters
----------
fname: str or pathlib.Path
Input event filename.
primary_key: str, optional
Header primary key (default: `name`)
header: str, optional
Optional header definition (to be appended at the beginning of the file).
"""
def __init__(self, fname, primary_key, header=None):
super().__init__([])
self.primary_key = primary_key.lower()
self.header = header
self.comments = []
self.fields = []
self.rows = []
self.fname = fname
def __str__(self):
return self.fname.name
def __repr__(self):
events = super().__str__()
return f'<{self.__class__.__name__}> {self} {events}'
@property
def fname(self):
"""Events filename."""
return self.__fname
@fname.setter
def fname(self, fname):
"""Parse events file."""
self.__fname = Path(fname)
self._read_rows()
self._parse_rows()
# Check parsing validity
try:
self.start_date
except (KeyError, ValueError):
raise IOError('Events parsing failed.')
def _read_rows(self):
"""File row reader.
This function need to feed the ``fields`` and ``rows``
(and eventually the ``comments``) properties.
Parameters
----------
content: str
File content to read.
Returns
-------
list
Columns fields.
list
Rows content split in columns.
"""
raise NotImplementedError
def _parse_rows(self):
"""Parse rows content as Events objects."""
# Extract primary key values
if self.primary_key not in self.fields:
raise KeyError(f'Primary key `{self.primary_key}` not found')
i = self.fields.index(self.primary_key)
for row in self.rows:
kwargs = dict(zip(self.fields, row))
key = row[i]
k = key.upper()
if k.endswith('_START') or k.endswith('_DESC'):
key, _ = key.rsplit('_', 1) # pop `_START` and `_DESC`
start = kwargs.pop('event time [utc]')
kwargs.update({
self.primary_key: key,
't_start': start,
't_end': 'NaT',
})
elif k.endswith('_END') or k.endswith('_ASCE'):
key, _ = key.rsplit('_', 1) # pop `_END` and `_ASCE`
stop = kwargs['event time [utc]']
if key not in self.keys():
missing = row[i].replace('_END', '_START').replace('_ASCE', '_DESC')
warn.warning(
'Found `%s` (at %s) without `%s`.',
row[i], stop, missing
)
continue
if isinstance(self.data[key], EventsList):
self.data[key][-1]['t_end'] = stop
else:
self.data[key]['t_end'] = stop
continue # Go to the next row
self.append(AbstractEvent(key, **kwargs))