Source code for planetary_coverage.spice.pool
"""Spice kernel pool module."""
from functools import wraps
import numpy as np
import spiceypy as sp
from ._abc import ABCMetaKernel as MetaKernel
from .kernel import get_item
from .references import SpiceRef
from .times import tdb, utc
from ..misc import logger
log_spice_pool, debug_spice_pool = logger('Spice Pool')
[docs]class MetaSpicePool(type):
"""Meta Spice kernel pool object."""
# pylint: disable=no-value-for-parameter, unsupported-membership-test
MK_HASH = {}
def __repr__(cls):
n = int(cls)
if n == 0:
desc = 'EMPTY'
else:
desc = f'{n} kernel'
desc += 's'
desc += ' loaded:\n - '
desc += '\n - '.join(cls.kernels)
return f'<{cls.__name__}> {desc}'
def __int__(cls):
return cls.count()
def __len__(cls):
return cls.count()
def __hash__(cls):
return cls.hash(cls.kernels)
def __eq__(cls, other):
if isinstance(other, (str, tuple, list)):
return hash(cls) == cls.hash(other)
return hash(cls) == other
def __iter__(cls):
return iter(cls.kernels)
def __contains__(cls, kernel):
return cls.contains(kernel)
def __add__(cls, kernel):
return cls.add(kernel)
def __sub__(cls, kernel):
return cls.remove(kernel)
def __getitem__(cls, item):
return get_item(item)
[docs] @staticmethod
def count() -> int:
"""Count the number of kernels in the pool."""
return int(sp.ktotal('ALL'))
@property
def kernels(cls):
"""Return the list of kernels loaded in the pool."""
return tuple(
sp.kdata(i, 'ALL')[0] for i in range(cls.count())
)
[docs] def hash(cls, kernels) -> int:
"""Hash a (meta)kernel or a list of (meta)kernels."""
if isinstance(kernels, (str, MetaKernel)):
return cls.hash((kernels, ))
kernels_hash = ()
for kernel in kernels:
if isinstance(kernel, MetaKernel):
mk = kernel
# Hash of the metakernel and all the `kernels` loaded with it
kernels_hash += (hash(mk), *(hash(k) for k in mk.kernels))
elif kernel in cls.MK_HASH: # Check if the kernel is in mk hash cached
kernels_hash += (cls.MK_HASH[kernel],)
elif kernel is not None: # If not found, use the kernel hash if not None
kernels_hash += (hash(kernel),)
return hash(kernels_hash)
[docs] def contains(cls, kernel):
"""Check if the kernel is in the pool."""
return kernel in cls.kernels or hash(kernel) in cls.MK_HASH.values()
[docs] def add(cls, kernel, purge=False):
"""Add a kernel to the pool."""
if purge:
cls.purge()
if isinstance(kernel, (tuple, list)):
for _kernel in kernel:
cls.add(_kernel, purge=False)
elif kernel in cls:
raise ValueError(f'Kernel `{kernel}` is already in the pool.')
elif kernel is not None:
log_spice_pool.debug('Add `%s` in the SPICE pool', kernel)
if isinstance(kernel, MetaKernel):
with kernel as mk:
# `mk` is the name of a `NamedTemporaryFile` (see `MetaKernel`)
sp.furnsh(mk)
log_spice_pool.debug('Cache metakernel original hash.')
cls.MK_HASH[mk] = hash(kernel)
else:
sp.furnsh(kernel)
[docs] def remove(cls, kernel):
"""Remove the kernel from the pool if present."""
if kernel not in cls:
raise ValueError(f'Kernel `{kernel}` is not in the pool.')
if isinstance(kernel, MetaKernel):
mk_hash = hash(kernel)
for key, value in cls.MK_HASH.items():
if value == mk_hash and key in cls.kernels:
cls.remove(key)
else:
log_spice_pool.debug('Remove %s', kernel)
sp.unload(kernel)
[docs] def purge(cls):
"""Purge the pool from all its content."""
log_spice_pool.info('Purge the pool')
sp.kclear()
cls.MK_HASH = {}
[docs] def windows(cls, *refs, fmt='UTC'):
"""Get kernels windows on a collection of bodies in the pool.
Parameters
----------
refs: str, int or SpiceRef
Body(ies) reference(s).
fmt: str, optional
Output format:
- ``UTC`` (default)
- ``TDB``
- ``ET``
Returns
-------
np.array([[float,float], …])
Start and stop times windows in the requested format.
Raises
------
KeyError
If the requested reference does not have a specific coverage
range in the pool.
"""
refs = {int(ref): ref for ref in map(SpiceRef, refs)}
windows = []
for i in range(cls.count()):
kernel, ext, *_ = sp.kdata(i, 'ALL')
if ext == 'CK':
ids = set(sp.ckobj(kernel))
cov = cls._ck_cov
elif ext == 'PCK':
ids = sp.cell_int(1000)
sp.pckfrm(kernel, ids)
ids = set(ids)
cov = cls._pck_cov
elif ext == 'SPK':
ids = set(sp.spkobj(kernel))
cov = cls._spk_cov
else:
ids = set()
cov = None
for ref in ids & set(refs):
log_spice_pool.debug('Found `%s` in %s', refs[ref], kernel)
if ets := cov(kernel, ref):
windows.append([np.min(ets), np.max(ets)]) # Coverage per file
if not windows:
values = list(refs.values())
err = 'The windows for '
err += f'{values[0]} was' if len(values) == 1 else f'{values} were'
err += ' not found.'
raise KeyError(err)
return cls._fmt_windows(windows, fmt=fmt)
@staticmethod
def _fmt_windows(ets_windows, fmt='UTC'):
"""Format ET windows.
Parameters
----------
ets_windows: list
ET windows.
fmt: str, optional
Output format:
- ``UTC`` (default)
- ``TDB``
- ``ET``
Returns
-------
np.array([[float,float], …])
Start and stop times windows in the requested format.
Raises
------
TypeError
If the provided format is invalid.
"""
if fmt.upper() == 'ET':
return np.array(ets_windows)
if fmt.upper() == 'UTC':
return np.array([utc(w) for w in ets_windows], dtype=np.datetime64)
if fmt.upper() == 'TDB':
return np.array([tdb(w) for w in ets_windows], dtype='<U27')
raise TypeError(f'Output format unknown: `{fmt}`, '
'only [`UTC`|`TDB`|`ET`] are accepted.')
@staticmethod
def _ck_cov(ck, ref: int):
"""Get CK coverage for given body."""
cover = sp.ckcov(ck, ref, False, 'SEGMENT', 0.0, 'TDB')
ets = [
[cover[i * 2], cover[i * 2 + 1]] for i in range(sp.wncard(cover))
]
log_spice_pool.debug('ET windows: %r', ets)
return ets
@staticmethod
def _pck_cov(pck, ref: int):
"""Get PCK coverage for given body."""
cover = sp.cell_double(2000)
sp.pckcov(pck, ref, cover)
ets = list(cover)
log_spice_pool.debug('ET coverage: %r', ets)
return [ets]
@staticmethod
def _spk_cov(spk, ref: int):
"""Get SPK coverage for given body."""
ets = list(sp.spkcov(spk, ref))
log_spice_pool.debug('ET coverage: %r', ets)
return [ets]
[docs] def coverage(cls, *refs, fmt='UTC'):
"""Get coverage for a collection of bodies in the pool.
Parameters
----------
refs: str, int or SpiceRef
Body(ies) reference(s).
fmt: str, optional
Output format:
- ``UTC`` (default)
- ``TDB``
- ``ET``
Returns
-------
[str, str] or [float, float]
Start and stop times covered for the requested format.
Note
----
If multiple values are available, only the ``max(start)``
and ``min(stop)`` are kept.
Raises
------
TypeError
If the output format is invalid.
ValueError
If the start time is after the stop time
"""
starts, ends = cls.windows(*refs, fmt='ET').T
start, stop = np.max(starts), np.min(ends)
if start > stop:
raise ValueError(
f'MAX start time ({tdb(start)}) is after MIN stop time ({tdb(stop)}).')
if fmt.upper() == 'UTC':
start, stop = utc(start, stop)
elif fmt.upper() == 'TDB':
start, stop = tdb(start, stop)
elif fmt.upper() != 'ET':
raise TypeError(
f'Output format unknown: `{fmt}`, only [`UTC`|`TDB`|`ET`] are accepted.')
return start, stop
[docs]class SpicePool(metaclass=MetaSpicePool):
"""Spice kernel pool singleton.
See: :class:`.MetaSpicePool` for details.
"""
[docs]def check_kernels(func):
"""Spice Pool kernels checker decorator.
The parent object must implement a :func:`__hash__`
function and have a :attr:`kernels` attribute.
"""
@wraps(func)
def wrapper(_self, *args, **kwargs):
"""Check if the content of pool have changed.
If the content changed, the pool will be purge and the kernels reloaded.
"""
if SpicePool != hash(_self):
log_spice_pool.info(
'The content of the pool changed -> the kernels will be reloaded.')
SpicePool.add(_self.kernels, purge=True)
return func(_self, *args, **kwargs)
return wrapper