Source code for zensols.mimic.adm

"""Hospital admission/stay details.

"""
__author__ = 'Paul Landes'

from typing import Tuple, Dict, Iterable, List, Set, Callable, ClassVar, Any
from dataclasses import dataclass, field
import sys
import os
import logging
from functools import reduce
import collections
import itertools as it
from frozendict import frozendict
from io import TextIOBase
import pandas as pd
from zensols.persist import (
    PersistableContainer, persisted, Primeable, Stash,
    ReadOnlyStash, FactoryStash, KeySubsetStash,
)
from zensols.config import Dictable, ConfigFactory, Settings
from zensols.multi import MultiProcessDefaultStash
from zensols.db import BeanStash
from . import (
    Admission, Patient, Diagnosis, Procedure, NoteEvent,
    DiagnosisPersister, ProcedurePersister, PatientPersister,
    NoteEventPersister, AdmissionPersister, Note, NoteFactory,
)

logger = logging.getLogger(__name__)


[docs] @dataclass class HospitalAdmission(PersistableContainer, Dictable): """Represents data collected by a patient over the course of their hospital admission. Note: this object keys notes using their ``row_id`` IDs used in the MIMIC dataset as integers and not strings like some note stashes. """ _DICTABLE_ATTRIBUTES: ClassVar[List[str]] = 'hadm_id'.split() _PERSITABLE_TRANSIENT_ATTRIBUTES: ClassVar[Set[str]] = {'_note_stash'} admission: Admission = field() """The admission of the admission.""" patient: Patient = field() """The patient/subject.""" diagnoses: Tuple[Diagnosis, ...] = field() """The ICD-9 diagnoses of the hospital admission.""" procedures: Tuple[Procedure, ...] = field() """The ICD-9 procedures of the hospital admission.""" def __post_init__(self): super().__init__() def _init(self, note_stash: Stash): self._note_stash = note_stash @property def hadm_id(self) -> int: """The hospital admission unique identifier.""" return self.admission.hadm_id @property def notes(self) -> Iterable[Note]: """The notes by the care givers.""" return iter(self._note_stash.values()) @property @persisted('_by_category', transient=True) def notes_by_category(self) -> Dict[str, Tuple[Note, ...]]: """All notes by :obj:`.Note.category` as keys with the list of resepctive notes as a list as values. """ notes = collections.defaultdict(list) for note in self.notes: notes[note.category].append(note) return frozendict({k: tuple(notes[k]) for k in notes.keys()})
[docs] def get_duplicate_notes(self, text_start: int = None) -> \ Tuple[Set[str], ...]: """Notes with the same note text, each in their respective set. :param text_start: the number of first N characters used to compare notes, or the entire note text if ``None`` :return: the duplicate note``row_id``, or if there are no duplicates, an empty tuple """ dups = collections.defaultdict(set) note: Note for note in self.notes: key = note.text if text_start is not None: key = key[:text_start] dups[key].add(note.row_id) return tuple(map(lambda x: x[1], filter( lambda x: len(x[1]) > 1, dups.items())))
[docs] def get_non_duplicate_notes(self, dup_sets: Tuple[Set[str]], filter_fn: Callable = None) -> \ Tuple[Tuple[Note, bool], ...]: """Return non-duplicated notes. :param dup_sets: the duplicate sets generated from :meth:`get_duplicate_notes` :param filer_fn: if provided it is used to filter duplicates; if everything is filtered, a note from the respective duplicate set is chosen at random :return: a tuple of ``(<note>, <is duplicate>)`` pairs :see: :obj:`duplicate_notes` """ def filter_ans(n: Note) -> bool: if n.row_id in ds: if filter_fn is not None: return filter_fn(n) return True else: return False notes: Tuple[Note, ...] = self.notes dups: Set[str] = reduce(lambda x, y: x | y, dup_sets) # initialize with the notes not in any duplicate group, which are # non-duplicates non_dups: List[Note] = list( map(lambda x: (x, False), filter(lambda n: n.row_id not in dups, notes))) ds: Set[str] for ds in dup_sets: note: Note maybe_an: Note = tuple(filter(filter_ans, notes)) if len(maybe_an) > 0: # if filter_fn is used, it returns preferred notes to use note = maybe_an[0] else: # if there is no preference (all filtered) pick a random note = self[next(iter(ds))] non_dups.append((note, True)) return tuple(non_dups)
@property def feature_dataframe(self) -> pd.DataFrame: """The feature dataframe for the hospital admission as the constituent note feature dataframes. """ dfs: List[pd.DataFrame] = [] by_cat = self.notes_by_category for note_key in sorted(by_cat.keys()): for note in by_cat[note_key]: df = note.feature_dataframe df = df[df['ent_type_'] == 'mc'] df['hadm_id'] = self.hadm_id first = 'hadm_id section'.split() new_cols = list(filter(lambda c: c not in first, df.columns)) new_cols = first + new_cols dfs.append(df[new_cols]) return pd.concat(dfs)
[docs] def write_notes(self, depth: int = 0, writer: TextIOBase = sys.stdout, note_limit: int = sys.maxsize, categories: Set[str] = None, include_note_id: bool = False, **note_kwargs): """Write the notes of the admission. :param note_limit: the number of notes to write :param include_note_id: whether to include the note identification info :param categories: the note categories to write :param note_kwargs: the keyword arguments gtiven to :meth:`.Note.write_full` """ notes = self.notes if categories is not None: notes = filter(lambda c: c.category in categories, notes) note: Note for note in it.islice(notes, note_limit): if include_note_id: self._write_line(f'row_id: {note.row_id} ({note.category})', depth, writer) note.write_full(depth, writer, **note_kwargs) else: note.write_full(depth, writer, **note_kwargs)
[docs] def write(self, depth: int = 0, writer: TextIOBase = sys.stdout, include_admission: bool = False, include_patient: bool = False, include_diagnoses: bool = False, include_procedures: bool = False, **note_kwargs): """Write the admission and the notes of the admission. :param note_kwargs: the keyword arguments gtiven to :meth:`.Note.write_full` """ nkwargs = dict(note_line_limit=0, section_line_limit=0, include_fields=False, include_section_divider=False, include_note_divider=False, include_section_header=False, include_note_id=True) nkwargs.update(note_kwargs) self._write_line(f'hadm_id: {self.admission.hadm_id}', depth, writer) if include_admission: self._write_line('admission:', depth + 1, writer) self._write_object(self.admission, depth + 2, writer) if include_patient: self._write_line('patient:', depth + 1, writer) self._write_object(self.patient, depth + 2, writer) if include_diagnoses: self._write_line('diagnoses:', depth + 1, writer) self._write_object(self.diagnoses, depth + 2, writer) if include_procedures: self._write_line('procedures:', depth + 1, writer) self._write_object(self.procedures, depth + 2, writer) if 'note_limit' not in nkwargs or nkwargs['note_limit'] > 0: self._write_line('notes:', depth + 1, writer) self.write_notes(depth + 2, writer, **nkwargs)
[docs] def write_full(self, depth: int = 0, writer: TextIOBase = sys.stdout, **kwargs): """Write a verbose output of the admission. :param kwargs: the keyword arguments given to meth:`write` """ wkwargs = dict(note_line_limit=sys.maxsize, section_line_limit=sys.maxsize, include_fields=True, include_section_divider=True, include_note_divider=True, include_section_header=True, include_note_id=False, include_admission=True, include_patient=True, include_diagnoses=True, include_procedures=True) wkwargs.update(kwargs) self.write(depth, writer, **wkwargs)
[docs] def keys(self) -> Iterable[int]: return map(int, self._note_stash.keys())
def __getitem__(self, row_id: int): return self._note_stash[str(row_id)] def __contains__(self, row_id: int): return str(row_id) in self._note_stash def __iter__(self) -> Iterable[Note]: return iter(self._note_stash.values()) def __len__(self) -> int: return len(self._note_stash) def __str__(self): return (f'subject: {self.admission.subject_id}, ' + f'hadm: {self.admission.hadm_id}, ' + f'num notes: {len(self)}')
@dataclass class _NoteBeanStash(BeanStash): """Adapts the :class:`.NoteEventPersister` to a :class:`~zensols.persist.domain.Stash`. """ mimic_note_factory: NoteFactory = field() """The factory that creates :class:`.Note` for hopsital admissions.""" def load(self, row_id: str) -> Note: note_event: NoteEvent = super().load(row_id) if note_event is not None: logger.debug(f'creating note from {note_event}') return self.mimic_note_factory.create(note_event) @dataclass class _NoteFactoryStash(FactoryStash): """Creates instances of :class:`.Note`. """ mimic_note_context: Settings = field(default=None) """Contains resources needed by new and re-hydrated notes, such as the document stash. """ def load(self, row_id: str) -> Note: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'loading note: {row_id}') note: Note = super().load(row_id) if note is not None: logger.debug(f'setting note context on {row_id}') note._trans_context = self.mimic_note_context return note
[docs] @dataclass class HospitalAdmissionDbStash(ReadOnlyStash, Primeable): """A stash that creates :class:`.HospitalAdmission` instances. This instance is used by caching stashes per the default resource library configuration for this package. """ config_factory: ConfigFactory = field() """The factory used to create domain objects (ie hospital admission). """ mimic_note_factory: NoteFactory = field() """The factory that creates :class:`.Note` for hopsital admissions.""" admission_persister: AdmissionPersister = field() """The persister for the ``admissions`` table.""" diagnosis_persister: DiagnosisPersister = field() """The persister for the ``diagnosis`` table.""" patient_persister: PatientPersister = field() """The persister for the ``patients`` table.""" procedure_persister: ProcedurePersister = field() """The persister for the ``procedure`` table.""" note_event_persister: NoteEventPersister = field() """The persister for the ``noteevents`` table.""" note_stash: Stash = field() """Creates cached instances of :class:`.Note`.""" hospital_adm_name: str = field() """The configuration section name of the :class:`.HospitalAdmission` used to load instances. """ def __post_init__(self): super().__post_init__() self.strict = True def _create_note_stash(self, adm: Admission): np: NoteEventPersister = self.note_event_persister row_ids: Tuple[int, ...] = np.get_row_ids_by_hadm_id(adm.hadm_id) return KeySubsetStash( delegate=self.note_stash, key_subset=set(map(str, row_ids)), dynamic_subset=False)
[docs] def load(self, hadm_id: str) -> HospitalAdmission: """Create a *complete picture* of a hospital stay with admission, patient and notes data. :param hadm_id: the ID that specifics the hospital admission to create """ if logger.isEnabledFor(logging.DEBUG): logger.debug(f'loading hospital admission: {hadm_id}') hadm_id = int(hadm_id) dp: DiagnosisPersister = self.diagnosis_persister pp: ProcedurePersister = self.procedure_persister adm: Admission = self.admission_persister.get_by_hadm_id(hadm_id) pat: Patient = self.patient_persister.get_by_subject_id(adm.subject_id) diag: Tuple[Diagnosis, ...] = dp.get_by_hadm_id(hadm_id) procds: Tuple[Procedure, ...] = pp.get_by_hadm_id(hadm_id) note_stash: Stash = self._create_note_stash(adm) adm: HospitalAdmission = self.config_factory.new_instance( self.hospital_adm_name, adm, pat, diag, procds) adm._init(note_stash) return adm
@persisted('_keys', cache_global=True) def keys(self) -> Iterable[str]: return tuple(self.admission_persister.get_keys())
[docs] def exists(self, hadm_id: str) -> bool: return self.admission_persister.exists(int(hadm_id))
[docs] def prime(self): if logger.isEnabledFor(logging.INFO): logger.info(f'priming {type(self)}...') self.mimic_note_factory.prime() super().prime()
[docs] @dataclass class HospitalAdmissionDbFactoryStash(FactoryStash, Primeable): """A factory stash that configures :class:`.NoteEvent` instances so they can parse the MIMIC-III English text as :class:`.FeatureDocument` instances. """ doc_stash: Stash = field(default=None) """Contains the document that map to :obj:`row_id`.""" mimic_note_context: Settings = field(default=None) """Contains resources needed by new and re-hydrated notes, such as the document stash. """
[docs] def load(self, hadm_id: str) -> HospitalAdmission: if logger.isEnabledFor(logging.DEBUG): logger.debug(f'loading hospital admission: {hadm_id}') adm: HospitalAdmission = super().load(hadm_id) db_stash: HospitalAdmissionDbStash = self.factory adm._init(db_stash._create_note_stash(adm)) return adm
[docs] def clear(self,): # admission cached (i.e. data/adm) super().clear() # parsed docs (i.e. data/note-doc) self.doc_stash.clear() # note containers with sections (i.e. data/note-cont) self.factory.note_stash.delegate.clear()
[docs] def prime(self): if logger.isEnabledFor(logging.INFO): logger.info(f'priming {type(self)}...') self.factory.prime() super().prime()
[docs] @dataclass class NoteDocumentPreemptiveStash(MultiProcessDefaultStash): """Contains the stash that preemptively creates :class:`.Admission`, :class:`.Note` and :class:`~zensols.nlp.container.FeatureDocument` cache files. This class is not useful for returning any data (see :class:`.HospitalAdmissionDbFactoryStash). """ note_event_persister: NoteEventPersister = field(default=None) """The persister for the ``noteevents`` table.""" adm_factory_stash: HospitalAdmissionDbFactoryStash = field(default=None) """The factory to create the admission instances.""" def __post_init__(self): super().__post_init__() self._row_ids: Tuple[str, ...] = None def _create_data(self) -> Iterable[HospitalAdmission]: keys: Set[str] = self._row_ids if logger.isEnabledFor(logging.DEBUG): logger.debug(f'keys to process: {len(keys)}') return keys def _process(self, chunk: List[Any]) -> Iterable[Tuple[str, Any]]: np: NoteEventPersister = self.note_event_persister # for each row ID get the note throught the admission so sections are # created per the implementation specified in the configuration row_id: str for row_id in chunk: if logger.isEnabledFor(logging.DEBUG): pid = os.getpid() self._debug(f'processing key {row_id} in {pid}') hadm_id: int = np.get_hadm_id(int(row_id)) adm: HospitalAdmission = self.adm_factory_stash[hadm_id] note: Note = adm[row_id] # force document parse note.doc # it doesn't matter what we return becuase it won't be used, so # return the note's debugging string yield (row_id, str(note)) def _get_existing_note_row_ids(self) -> Set[str]: """Return the note row_ids that both have container and feature doc cached ID files. """ existing_note_cont_ids: Set[str] = set( self.adm_factory_stash.factory.note_stash.delegate.keys()) existing_doc_ids: Set[str] = set( self.adm_factory_stash.doc_stash.delegate.keys()) if logger.isEnabledFor(logging.INFO): logger.info(f'already cached: doc={len(existing_doc_ids)}, ' + f'container={len(existing_note_cont_ids)}') return existing_note_cont_ids & existing_doc_ids
[docs] def prime(self): if logger.isEnabledFor(logging.INFO): logger.info(f'priming {type(self)}...') # this leads to priming the stash that installs the MedSecId in the # mimicsid package self.adm_factory_stash.prime() np: NoteEventPersister = self.note_event_persister # get the IDs we already have create previously existing_row_ids: Set[str] = self._get_existing_note_row_ids() # create a list of those row IDs we still need to create to_create_row_ids: Set[str] = self._row_ids - existing_row_ids if logger.isEnabledFor(logging.INFO): logger.info(f'need: {len(self._row_ids)}, ' + f'existing: {len(existing_row_ids)}, ' + f'create: {len(to_create_row_ids)}') if logger.isEnabledFor(logging.DEBUG): logger.debug(f'need: {self._row_ids}, ' + f'existing: {existing_row_ids}, ' + f'create: {to_create_row_ids}') # populate admissions that have at least one missing note hadm_ids: Tuple[int, ...] = tuple(np.get_hadm_ids(to_create_row_ids)) # first create the admissions to processes overwrite, only then can # notes be dervied from admissions and written across procs if logger.isEnabledFor(logging.INFO): logger.info(f'creating {len(hadm_ids)} cached admissions') hadm_id: int for hadm_id in hadm_ids: # force creation of the admission to cache the file adm: HospitalAdmission = self.adm_factory_stash[hadm_id] assert isinstance(adm, HospitalAdmission) # don't fork processes only to find the work is already complete if len(hadm_ids) == 0: if logger.isEnabledFor(logging.INFO): logger.info('no note docs to create') else: if logger.isEnabledFor(logging.INFO): logger.info(f'creating {len(to_create_row_ids)} note docs') super().prime()
[docs] def process_keys(self, row_ids: Iterable[str], workers: int = None, chunk_size: int = None): """Invoke the multi-processing system to preemptively parse and store note events for the IDs provided. :param row_ids: the admission IDs to parse and cache :param workers: the number of processes spawned to accomplish the work :param chunk_size: the size of each group of data sent to the child process to be handled :see: :class:`~zensols.persist.multi.stash.MultiProcessStash` """ if workers is not None: self.workers = workers if chunk_size is not None: self.chunk_size = chunk_size self._row_ids = set(row_ids) if logger.isEnabledFor(logging.DEBUG): logger.debug(f'processing {len(row_ids)} notes') self.prime()