Source code for planetary_coverage.events.itl

"""ITL events file module."""

import re

from .event import AbstractEventsCollection, AbstractEventsFile
from .evf import EvfEventsFile
from ..html import table
from ..spice.datetime import np_datetime_str, timedelta


ITL_ROW = r'\s+(\w+)\s+(\w+|\*)\s+(\w+)(?:\s+\((\w+)=([\w\s\[\]]+)\))?'

ITL_ROW_DT = re.compile(
    r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})Z?' + ITL_ROW
)

ITL_ROW_EVF = re.compile(
    r'(\w+\s+\(COUNT\s+=\s+\d+\))\s+([+-]\d{2}:\d{2}:\d{2}(?:\.\d+)?)' + ITL_ROW
)


def itl_rows(content):
    """Read ITL content to extract comments and data."""
    comments = []
    rows = []
    for line in content.splitlines():
        if line.startswith('# ') or line.startswith('#\t'):
            # Block comment
            comments.append(line[2:].replace(';', ',').replace('=', '='))
            continue

        if line.startswith('#') or not line.strip():
            # Commented line (skipped)
            continue

        if match := ITL_ROW_DT.findall(line):
            dt, inst, event_type, name, key, value = match[0]

            context_info = f'instrument={inst}'

            if event_type == '*':
                event_type = name
            else:
                context_info += f';observation name={name}'

            if key and value:
                context_info += f';{key.lower()}={value.lower()}'

            context_info += ';comments=' + ('\n'.join(comments) if comments else '--')

            rows.append((event_type, dt, context_info))

        elif match := ITL_ROW_EVF.findall(line):
            ref, td, inst, event_type, name, key, value = match[0]

            context_info = f'instrument={inst}'

            if event_type == '*':
                event_type = name
            else:
                context_info += f';observation name={name}'

            if key and value:
                context_info += f';{key.lower()}={value.lower()}'

            context_info += ';comments=' + ('\n'.join(comments) if comments else '--')

            rows.append((event_type, (ref, td), context_info))

        comments = []

    return rows


[docs]class ItlEventsFile(AbstractEventsFile): """ITL event file object. Instrument timeline. Parameters ---------- fname: str or pathlib.Path ITL filename to parse. evf: str or pathlib.Path Supporting EVF events file (for relative timelines). """ def __init__(self, fname, evf=None): self.evf = EvfEventsFile(evf) if evf else {} super().__init__(fname, 'event') # primary_key='event' def _repr_html_(self): rows = [ [ event.key, len(event) if isinstance(event, AbstractEventsCollection) else '-', event.start_date, event.stop_date, ] for event in self ] return table(rows, header=('event', '#', 't_start', 't_stop')) def __contains__(self, key): if key in self.observations: return True return super().__contains__(key) def __getitem__(self, key): if key in self.observations: return self.observations[key] return super().__getitem__(key) def _ipython_key_completions_(self): return list(self.keys()) + self.observations.obs_names def _read_rows(self): """Read ITL rows content.""" content = self.fname.read_text(encoding='utf-8') # ITL columns self.fields = ['event', 'event time [utc]', 'contextual info'] # Extract rows and reconstruct relative timeline events self.rows = [ (name, self.datetime(time), context) for (name, time, context) in itl_rows(content) ]
[docs] def datetime(self, time): """Absolute datetime for an event.""" if not isinstance(time, tuple): return time ref, delta = time if not self.evf: raise FileNotFoundError(f'Missing supporting EVF file: `{ref}` is unknown.') if ref not in self.evf: raise KeyError(f'`{ref}` is unknown.') return np_datetime_str(self.evf[ref] + timedelta(delta))
@property def observations(self): """List of observation windows.""" return self.data['OBS']