Logging#

In NeuroCAPs, all informational messages and warnings are managed using Python’s logging module. By default, logs are output to the console (sys.stdout) with a logging level of INFO. Before importing the NeuroCAPs package, you can configure the root handler or specific module loggers to override these default settings.

Configuration (Without Parallel Processing)#

This configuration sets the root logger to output to the console and configures a specific module logger to use a FileHandler.

import logging, sys

# Configure the root logger for all loggers to propagate to
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(name)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)],
)

# Configuring the logger for the internal function that does timeseries extraction
extract_timeseries_logger = logging.getLogger("neurocaps._utils.extraction.extract_timeseries")
extract_timeseries_logger.setLevel(logging.WARNING)
file_handler = logging.FileHandler("neurocaps.log")
file_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
extract_timeseries_logger.addHandler(file_handler)

# Import package
from neurocaps.extraction import TimeseriesExtractor

extractor = TimeseriesExtractor()

extractor.get_bold(bids_dir="path/to/bids/dir", task="rest", tr=2)

Logging Configuration (With Parallel Processing)#

When using joblib’s loky backend for parallel processing, child processes do not inherit global logging configurations. Consequently, internal logs are output to the console by default even when parallel processing is enabled. To redirect these logs to a specific handler, you can set up a multiprocessing.Manager().Queue() (which is passed to QueueHandler internally) and QueueListener. This approach allows logs produced by the internal neurocaps.extraction._internals.postprocess module to be redirected when parallel processing is enabled.

Note

For versions < 0.31.0, the module the internal module that performs timeseries extraction is neurocaps._utils.extraction.extract_timeseries.

import logging
from logging.handlers import QueueListener
from multiprocessing import Manager

# Configure root with FileHandler
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
file_handler = logging.FileHandler("neurocaps.log")
file_handler.setFormatter(logging.Formatter("%(asctime)s %(name)s [%(levelname)s] %(message)s"))
root_logger.addHandler(file_handler)

if __name__ == "__main__":
    # Import the TimeseriesExtractor
    from neurocaps.extraction import TimeseriesExtractor

    # Setup managed queue
    manager = Manager()
    queue = manager.Queue()

    # Set up the queue listener
    listener = QueueListener(queue, *root_logger.handlers)

    # Start listener
    listener.start()

    extractor = TimeseriesExtractor()

    # Use the `parallel_log_config` parameter to pass queue and the logging level
    extractor.get_bold(
        bids_dir="path/to/bids/dir",
        task="rest",
        tr=2,
        n_cores=5,
        parallel_log_config={"queue": queue, "level": logging.WARNING},
    )

    # Stop listener
    listener.stop()