Source code for planetary_coverage.spice.pool

"""Spice kernel pool module."""

from collections import defaultdict, namedtuple
from functools import wraps
from pathlib import Path

import numpy as np

import spiceypy as sp

from ._abc import ABCMetaKernel as MetaKernel
from .kernel import get_details, get_item, get_summary
from .references import SpiceRef
from .times import tdb, utc
from ..html import Html, table
from ..misc import group_by_2, logger


log_spice_pool, debug_spice_pool = logger('Spice Pool')


[docs]class MetaSpicePool(type): """Meta Spice kernel pool object.""" # pylint: disable=no-value-for-parameter, unsupported-membership-test MK_HASH = {} def __repr__(cls): n = int(cls) if n == 0: desc = 'EMPTY' else: desc = f'{n} kernel' desc += 's' if n > 1 else '' desc += ' loaded:\n - ' desc += '\n - '.join(cls.kernels) return f'<{cls.__name__}> {desc}' def _repr_html_(cls): if int(cls) == 0: return ( '<p><span style="color: #d62728">⌀</span> ' '<em>No kernel in the pool</em></p>' ) return cls.details.html def __int__(cls): return cls.count() def __len__(cls): return cls.count() def __hash__(cls): return cls.hash(cls.kernels) def __eq__(cls, other): if isinstance(other, (str, tuple, list)): return hash(cls) == cls.hash(other) return hash(cls) == other def __iter__(cls): return iter(cls.kernels) def __contains__(cls, kernel): return cls.contains(kernel) def __add__(cls, kernel): return cls.add(kernel) def __sub__(cls, kernel): return cls.remove(kernel) def __getitem__(cls, item): return get_item(item)
[docs] @staticmethod def count() -> int: """Count the number of kernels in the pool.""" return int(sp.ktotal('ALL'))
@property def kernels(cls): """Return the list of kernels loaded in the pool.""" return tuple( sp.kdata(i, 'ALL')[0] for i in range(cls.count()) )
[docs] def hash(cls, kernels) -> int: """Hash a (meta)kernel or a list of (meta)kernels.""" if isinstance(kernels, (str, MetaKernel)): return cls.hash((kernels, )) kernels_hash = () for kernel in kernels: if isinstance(kernel, MetaKernel): mk = kernel # Hash of the metakernel and all the `kernels` loaded with it kernels_hash += (hash(mk), *(hash(k) for k in mk.kernels)) elif kernel in cls.MK_HASH: # Check if the kernel is in mk hash cached kernels_hash += (cls.MK_HASH[kernel],) elif kernel is not None: # If not found, use the kernel hash if not None kernels_hash += (hash(kernel),) return hash(kernels_hash)
[docs] def contains(cls, kernel): """Check if the kernel is in the pool.""" return kernel in cls.kernels or hash(kernel) in cls.MK_HASH.values()
[docs] def add(cls, kernel, *, purge=False): """Add a kernel to the pool.""" if purge: cls.purge() if isinstance(kernel, (tuple, list)): for _kernel in kernel: cls.add(_kernel, purge=False) elif kernel in cls: raise ValueError(f'Kernel `{kernel}` is already in the pool.') elif kernel is not None: log_spice_pool.debug('Add `%s` in the SPICE pool', kernel) if isinstance(kernel, MetaKernel): with kernel as mk: # `mk` is the name of a `NamedTemporaryFile` (see `MetaKernel`) sp.furnsh(mk) log_spice_pool.debug('Cache metakernel original hash.') cls.MK_HASH[mk] = hash(kernel) else: sp.furnsh(str(Path(kernel).expanduser()))
[docs] def remove(cls, kernel): """Remove the kernel from the pool if present.""" if kernel not in cls: raise ValueError(f'Kernel `{kernel}` is not in the pool.') if isinstance(kernel, MetaKernel): mk_hash = hash(kernel) for key, value in cls.MK_HASH.items(): if value == mk_hash and key in cls.kernels: cls.remove(key) else: log_spice_pool.debug('Remove %s', kernel) sp.unload(kernel)
[docs] def purge(cls): """Purge the pool from all its content.""" log_spice_pool.info('Purge the pool') sp.kclear() cls.MK_HASH = {}
@property def summary(cls): """Pool content summary.""" return Html(table(get_summary(cls.kernels))) @property def details(cls): """Pool content summary.""" return Html(table(get_details(cls.kernels))) @staticmethod def _time_convert(fmt): """Time format convertor.""" if fmt.upper() == 'UTC': return utc if fmt.upper() == 'TDB': return tdb if fmt.upper() == 'ET': return lambda x: x raise TypeError(f'Output format unknown: `{fmt}`, ' 'only [`UTC`|`TDB`|`ET`] are accepted.') @staticmethod def _ck_cov(ck, ref: int): """Get CK coverage for given body.""" covers = sp.ckcov(ck, ref, False, 'SEGMENT', 0.0, 'TDB') ets = group_by_2(covers) log_spice_pool.debug('ET CK cover windows: %r', ets) return ets @staticmethod def _pck_cov(pck, ref: int): """Get PCK coverage for given body.""" cell = sp.cell_double(2000) sp.pckcov(pck, ref, cell) covers = list(cell) ets = group_by_2(covers) log_spice_pool.debug('ET PCK cover coverage: %r', ets) return ets @staticmethod def _spk_cov(spk, ref: int): """Get SPK coverage for given body.""" covers = list(sp.spkcov(spk, ref)) ets = group_by_2(covers) log_spice_pool.debug('ET SPK coverage: %r', ets) return ets def _cov(cls, kernel, ext): """Kernel covered ids and method.""" if ext == 'CK': ids = set(sp.ckobj(kernel)) cov = cls._ck_cov elif ext == 'PCK': ids = sp.cell_int(1000) sp.pckfrm(kernel, ids) ids = set(ids) cov = cls._pck_cov elif ext == 'SPK': ids = set(sp.spkobj(kernel)) cov = cls._spk_cov else: ids = set() cov = None return ids, cov
[docs] def windows(cls, *refs, fmt='UTC'): """Get kernels windows on a collection of bodies in the pool. Based on CK, PCK and SPK files. Parameters ---------- refs: str, int or SpiceRef Body(ies) reference(s). fmt: str, optional Output time format: - ``UTC`` (default) - ``TDB`` - ``ET`` Returns ------- {SpiceRef: {str: numpy.ndarray([[float|str, float|str], …]), …}, …} Start and stop times windows in the requested format. Raises ------ KeyError If the requested reference does not have a specific coverage range in the pool. See Also -------- .coverage .gaps .brief """ t_fmt = cls._time_convert(fmt) refs = {int(ref): ref for ref in map(SpiceRef, refs)} windows = defaultdict(dict) for i in range(cls.count()): kernel, ext, *_ = sp.kdata(i, 'ALL') ids, cov = cls._cov(kernel, ext) for ref in ids & set(refs): log_spice_pool.debug('Found `%s` in %s', refs[ref], kernel) if ets := cov(kernel, ref): # Coverage per references and kernels windows[refs[ref]][kernel] = ets if not windows: values = list(refs.values()) err = 'The windows for ' err += f'{values[0]} was' if len(values) == 1 else f'{values} were' err += ' not found.' raise KeyError(err) return { ref: { kernel: t_fmt(ets) for kernel, ets in kernels.items() } for ref, kernels in windows.items() }
[docs] def coverage(cls, *refs, fmt='UTC'): """Get coverage for a collection of bodies in the pool. Parameters ---------- refs: str, int or SpiceRef Body(ies) reference(s). fmt: str, optional Output time format: - ``UTC`` (default) - ``TDB`` - ``ET`` Returns ------- [str, str] or [float, float] Start and stop times covered for the requested format. Note ---- If multiple values are available, only the ``max(start)`` and ``min(stop)`` are kept. Raises ------ TypeError If the output time format is invalid. ValueError If the start time is after the stop time See Also -------- .coverage .gaps """ t_fmt = cls._time_convert(fmt) # Get all the temporal windows per references and kernels ets_windows = cls.windows(*refs, fmt='ET') # Flatten all ET boundaries grouped by references ets_refs = [ [ ets for windows in kernels.values() for window in windows for ets in window ] for kernels in ets_windows.values() ] # Get starts and stops ET per references # and intersect the reference coverage windows start = np.max([np.min(ets) for ets in ets_refs]) stop = np.min([np.max(ets) for ets in ets_refs]) if start > stop: raise ValueError( f'MAX start time ({tdb(start)}) is after MIN stop time ({tdb(stop)}).') return t_fmt(start), t_fmt(stop)
[docs] def gaps(cls, *refs, fmt='UTC'): """Get coverage caps (if any) for a collection of bodies in the pool. Parameters ---------- refs: str, int or SpiceRef Body(ies) reference(s). fmt: str, optional Output time format: - ``UTC`` (default) - ``TDB`` - ``ET`` Returns ------- [[str, str], …] or [[float, float], …] Start and stop times of coverage gaps intervals in the requested format. See Also -------- .windows .coverage .brief """ t_fmt = cls._time_convert(fmt) # Get all the temporal windows per references and kernels ets_windows = cls.windows(*refs, fmt='ET') return t_fmt( sorted([ [et_start, et_stop] for kernels in ets_windows.values() for windows in kernels.values() if len(windows) > 1 for (_, et_start), (et_stop, _) in zip(windows[:-1], windows[1:]) ]) )
[docs] def brief(cls, fmt='UTC'): """Bodies temporal coverage from CK, PCK and SPK kernels. Similar to NAIF `brief -t -a` method. Parameters ---------- fmt: str, optional Output time format: - ``UTC`` (default) - ``TDB`` - ``ET`` Returns ------- {SpiceRef: (float|str, float|str)} Bodies reference dictionary of start and stop times covered for the requested format. Raises ------ TypeError If the output time format is invalid. See Also -------- .windows .coverage .gaps """ t_fmt = cls._time_convert(fmt) Interval = namedtuple('Interval', ('start', 'stop'), defaults=(float('inf'), -float('inf'))) bodies = defaultdict(Interval) for i in range(cls.count()): kernel, ext, *_ = sp.kdata(i, 'ALL') ids, cov = cls._cov(kernel, ext) for ref in ids: log_spice_pool.debug('Found body `%s` in %s', ref, kernel) for start, stop in cov(kernel, ref): bodies[ref] = Interval( min(bodies[ref].start, start), max(bodies[ref].stop, stop), ) # Sort codes by decreasing negative and increasing positive # values to be consistent with NAIF `brief` ordering bodies = sorted(bodies.items(), key=lambda x: x[0] if x[0] > 0 else 1 / x[0]) brief = {} for code, (start, stop) in bodies: try: ref = SpiceRef(code) brief[ref] = (t_fmt(start), t_fmt(stop)) except ValueError: # Discard codes with unknown name continue return brief
[docs]class SpicePool(metaclass=MetaSpicePool): """Spice kernel pool singleton. See: :class:`.MetaSpicePool` for details. """
[docs]def check_kernels(func): """Spice Pool kernels checker decorator. The parent object must implement a :func:`__hash__` function and have a :attr:`kernels` attribute. """ @wraps(func) def wrapper(_self, *args, **kwargs): """Check if the content of pool have changed. If the content changed, the pool will be purge and the kernels reloaded. """ if SpicePool != hash(_self): log_spice_pool.info( 'The content of the pool changed -> the kernels will be reloaded.') SpicePool.add(_self.kernels, purge=True) return func(_self, *args, **kwargs) return wrapper