Skip to content

careamist_v2

CAREamistV2 #

Source code in src/careamics/careamist_v2.py
class CAREamistV2:
    def __init__(
        self,
        config: Configuration | Path | None = None,
        *,
        checkpoint_path: Path | None = None,
        bmz_path: Path | None = None,
        **user_context: Unpack[UserContext],
    ):
        self.checkpoint_path = checkpoint_path
        self.work_dir = self._resolve_work_dir(user_context.get("work_dir"))
        self.config, self.model = self._load_model(config, checkpoint_path, bmz_path)

        enable_progress_bar = user_context.get("enable_progress_bar", True)
        self.config.training_config.lightning_trainer_config["enable_progress_bar"] = (
            enable_progress_bar
        )
        callbacks = user_context.get("callbacks", None)
        self.callbacks = self._define_callbacks(callbacks, self.config, self.work_dir)

        self.prediction_writer = PredictionWriterCallback(self.work_dir)
        self.prediction_writer.disable_writing(True)

        experiment_loggers = self._create_loggers(
            self.config.training_config.logger,
            self.config.experiment_name,
            self.work_dir,
        )

        self.trainer = Trainer(
            callbacks=[self.prediction_writer, *self.callbacks],
            default_root_dir=self.work_dir,
            logger=experiment_loggers,
            **self.config.training_config.lightning_trainer_config or {},
        )

    def _load_model(
        self,
        config: Configuration | Path | None,
        checkpoint_path: Path | None,
        bmz_path: Path | None,
    ) -> tuple[Configuration, CAREamicsModule]:
        n_inputs = sum(
            [config is not None, checkpoint_path is not None, bmz_path is not None]
        )
        if n_inputs != 1:
            raise ValueError(
                "Exactly one of `config`, `checkpoint_path`, or `bmz_path` must be provided."
            )
        if config is not None:
            return self._from_config(config)
        elif checkpoint_path is not None:
            return self._from_checkpoint(checkpoint_path)
        else:
            assert bmz_path is not None
            return self._from_bmz(bmz_path)

    @staticmethod
    def _from_config(
        config: Configuration | Path,
    ) -> tuple[Configuration, CAREamicsModule]:
        if isinstance(config, Path):
            config = load_configuration_ng(config)
        assert not isinstance(config, Path)

        model = create_module(config.algorithm_config)
        return config, model

    @staticmethod
    def _from_checkpoint(
        checkpoint_path: Path,
    ) -> tuple[Configuration, CAREamicsModule]:
        checkpoint: dict = torch.load(checkpoint_path, map_location="cpu")

        careamics_info = checkpoint.get("careamics_info", None)
        if careamics_info is None:
            raise ValueError(
                "Could not find CAREamics related information within the provided "
                "checkpoint. This means that it was saved without using the "
                "CAREamics callback `CareamicsCheckpointInfo`. "
                "Please use a checkpoint saved with CAREamics or initialize with a config instead."
            )

        try:
            algorithm_config: dict[str, Any] = checkpoint["hyper_parameters"][
                "algorithm_config"
            ]
        except (KeyError, IndexError) as e:
            raise ValueError(
                "Could not determine CAREamics supported algorithm from the provided "
                f"checkpoint at: {checkpoint_path!s}."
            ) from e

        data_hparams_key = checkpoint.get(
            "datamodule_hparams_name", "datamodule_hyper_parameters"
        )
        try:
            data_config: dict[str, Any] = checkpoint[data_hparams_key]["data_config"]
        except (KeyError, IndexError) as e:
            raise ValueError(
                "Could not determine the data configuration from the provided "
                f"checkpoint at: {checkpoint_path!s}."
            ) from e

        # TODO: will need to resolve this with type adapter once more configs are added
        config = Configuration.model_validate(
            {
                "algorithm_config": algorithm_config,
                "data_config": data_config,
                **careamics_info,
            }
        )

        module = load_module_from_checkpoint(checkpoint_path)
        return config, module

    @staticmethod
    def _from_bmz(
        bmz_path: Path,
    ) -> tuple[Configuration, CAREamicsModule]:
        raise NotImplementedError("Loading from BMZ is not implemented yet.")

    @staticmethod
    def _resolve_work_dir(work_dir: str | Path | None) -> Path:
        if work_dir is None:
            work_dir = Path.cwd().resolve()
            logger.warning(
                f"No working directory provided. Using current working directory: "
                f"{work_dir}."
            )
        else:
            work_dir = Path(work_dir).resolve()
        return work_dir

    @staticmethod
    def _define_callbacks(
        callbacks: list[Callback] | None,
        config: Configuration,
        work_dir: Path,
    ) -> list[Callback]:
        callbacks = [] if callbacks is None else callbacks
        for c in callbacks:
            if isinstance(c, (ModelCheckpoint, EarlyStopping)):
                raise ValueError(
                    "`ModelCheckpoint` and `EarlyStopping` callbacks are already "
                    "defined in CAREamics and should only be modified through the "
                    "training configuration (see TrainingConfig)."
                )

            if isinstance(c, (CareamicsCheckpointInfo, ProgressBarCallback)):
                raise ValueError(
                    "`CareamicsCheckpointInfo` and `ProgressBar` callbacks are defined "
                    "internally and should not be passed as callbacks."
                )

        internal_callbacks = [
            ModelCheckpoint(
                dirpath=work_dir / "checkpoints",
                filename=f"{config.experiment_name}_{{epoch:02d}}_step_{{step}}",
                **config.training_config.checkpoint_callback.model_dump(),
            ),
            CareamicsCheckpointInfo(
                config.version, config.experiment_name, config.training_config
            ),
        ]

        enable_progress_bar = config.training_config.lightning_trainer_config.get(
            "enable_progress_bar", True
        )
        if enable_progress_bar:
            internal_callbacks.append(ProgressBarCallback())

        if config.training_config.early_stopping_callback is not None:
            internal_callbacks.append(
                EarlyStopping(
                    **config.training_config.early_stopping_callback.model_dump()
                )
            )

        return internal_callbacks + callbacks

    @staticmethod
    def _create_loggers(
        logger: str | None, experiment_name: str, work_dir: Path
    ) -> list[ExperimentLogger]:
        csv_logger = CSVLogger(name=experiment_name, save_dir=work_dir / "csv_logs")

        if logger is not None:
            logger = SupportedLogger(logger)

        match logger:
            case SupportedLogger.WANDB:
                return [
                    WandbLogger(name=experiment_name, save_dir=work_dir / "wandb_logs"),
                    csv_logger,
                ]
            case SupportedLogger.TENSORBOARD:
                return [
                    TensorBoardLogger(save_dir=work_dir / "tb_logs"),
                    csv_logger,
                ]
            case _:
                return [csv_logger]

    def train(
        self,
        *,
        # BASIC PARAMS
        train_data: Any | None = None,
        train_data_target: Any | None = None,
        val_data: Any | None = None,
        val_data_target: Any | None = None,
        # val_percentage: float | None = None, # TODO: hidden till re-implemented
        # val_minimum_split: int = 5,
        # ADVANCED PARAMS
        filtering_mask: Any | None = None,
        read_source_func: Callable | None = None,
        read_kwargs: dict[str, Any] | None = None,
        extension_filter: str = "",
    ) -> None:
        # TODO: init datamodule
        # TODO: remember to pass self.checkpoint_path to Trainer.fit
        # ^ this will load optimizer and lr_schedular state dicts
        raise NotImplementedError("Training is not implemented yet.")

    def predict(
        self,
        # BASIC PARAMS
        pred_data: Any | None = None,
        batch_size: int = 1,
        tile_size: tuple[int, ...] | None = None,
        tile_overlap: tuple[int, ...] | None = (48, 48),
        axes: str | None = None,
        data_type: Literal["array", "tiff", "custom"] | None = None,
        # ADVANCED PARAMS
        # tta_transforms: bool = False, # TODO: hidden till implemented
        num_workers: int | None = None,
        read_source_func: Callable | None = None,
        read_kwargs: dict[str, Any] | None = None,
        extension_filter: str = "",
    ) -> None:
        raise NotImplementedError("Predicting is not implemented yet.")

    def predict_to_disk(
        self,
        # BASIC PARAMS
        pred_data: Any | None = None,
        pred_data_target: Any | None = None,
        prediction_dir: Path | str = "predictions",
        batch_size: int = 1,
        tile_size: tuple[int, ...] | None = None,
        tile_overlap: tuple[int, ...] | None = (48, 48),
        axes: str | None = None,
        data_type: Literal["array", "tiff", "custom"] | None = None,
        # ADVANCED PARAMS
        num_workers: int | None = None,
        read_source_func: Callable | None = None,
        read_kwargs: dict[str, Any] | None = None,
        extension_filter: str = "",
        # WRITE OPTIONS
        write_type: Literal["tiff", "zarr", "custom"] = "tiff",
        write_extension: str | None = None,
        write_func: WriteFunc | None = None,
        write_func_kwargs: dict[str, Any] | None = None,
    ) -> None:
        raise NotImplementedError("Predicting to disk is not implemented yet.")

    def export_to_bmz(
        self,
        path_to_archive: Path | str,
        friendly_model_name: str,
        input_array: NDArray,
        authors: list[dict],
        general_description: str,
        data_description: str,
        covers: list[Path | str] | None = None,
        channel_names: list[str] | None = None,
        model_version: str = "0.1.0",
    ) -> None:
        """Export the model to the BioImage Model Zoo format.

        This method packages the current weights into a zip file that can be uploaded
        to the BioImage Model Zoo. The archive consists of the model weights, the model
        specifications and various files (inputs, outputs, README, env.yaml etc.).

        `path_to_archive` should point to a file with a ".zip" extension.

        `friendly_model_name` is the name used for the model in the BMZ specs
        and website, it should consist of letters, numbers, dashes, underscores and
        parentheses only.

        Input array must be of the same dimensions as the axes recorded in the
        configuration of the `CAREamist`.

        Parameters
        ----------
        path_to_archive : pathlib.Path or str
            Path in which to save the model, including file name, which should end with
            ".zip".
        friendly_model_name : str
            Name of the model as used in the BMZ specs, it should consist of letters,
            numbers, dashes, underscores and parentheses only.
        input_array : NDArray
            Input array used to validate the model and as example.
        authors : list of dict
            List of authors of the model.
        general_description : str
            General description of the model used in the BMZ metadata.
        data_description : str
            Description of the data the model was trained on.
        covers : list of pathlib.Path or str, default=None
            Paths to the cover images.
        channel_names : list of str, default=None
            Channel names.
        model_version : str, default="0.1.0"
            Version of the model.
        """
        output_patch = self.predict(
            pred_data=input_array,
            data_type=SupportedData.ARRAY.value,
        )
        output = np.concatenate(output_patch, axis=0)
        input_array = reshape_array(input_array, self.config.data_config.axes)

        export_to_bmz(
            model=self.model,
            config=self.config,
            path_to_archive=path_to_archive,
            model_name=friendly_model_name,
            general_description=general_description,
            data_description=data_description,
            authors=authors,
            input_array=input_array,
            output_array=output,
            covers=covers,
            channel_names=channel_names,
            model_version=model_version,
        )

    def get_losses(self) -> dict[str, list]:
        """Return data that can be used to plot train and validation loss curves.

        Returns
        -------
        dict of str: list
            Dictionary containing losses for each epoch.
        """
        return read_csv_logger(self.config.experiment_name, self.work_dir / "csv_logs")

    def stop_training(self) -> None:
        """Stop the training loop."""
        self.trainer.should_stop = True
        self.trainer.limit_val_batches = 0 # skip validation

export_to_bmz(path_to_archive, friendly_model_name, input_array, authors, general_description, data_description, covers=None, channel_names=None, model_version='0.1.0') #

Export the model to the BioImage Model Zoo format.

This method packages the current weights into a zip file that can be uploaded to the BioImage Model Zoo. The archive consists of the model weights, the model specifications and various files (inputs, outputs, README, env.yaml etc.).

path_to_archive should point to a file with a ".zip" extension.

friendly_model_name is the name used for the model in the BMZ specs and website, it should consist of letters, numbers, dashes, underscores and parentheses only.

Input array must be of the same dimensions as the axes recorded in the configuration of the CAREamist.

Parameters:

Name Type Description Default
path_to_archive Path or str

Path in which to save the model, including file name, which should end with ".zip".

required
friendly_model_name str

Name of the model as used in the BMZ specs, it should consist of letters, numbers, dashes, underscores and parentheses only.

required
input_array NDArray

Input array used to validate the model and as example.

required
authors list of dict

List of authors of the model.

required
general_description str

General description of the model used in the BMZ metadata.

required
data_description str

Description of the data the model was trained on.

required
covers list of pathlib.Path or str

Paths to the cover images.

None
channel_names list of str

Channel names.

None
model_version str

Version of the model.

"0.1.0"
Source code in src/careamics/careamist_v2.py
def export_to_bmz(
    self,
    path_to_archive: Path | str,
    friendly_model_name: str,
    input_array: NDArray,
    authors: list[dict],
    general_description: str,
    data_description: str,
    covers: list[Path | str] | None = None,
    channel_names: list[str] | None = None,
    model_version: str = "0.1.0",
) -> None:
    """Export the model to the BioImage Model Zoo format.

    This method packages the current weights into a zip file that can be uploaded
    to the BioImage Model Zoo. The archive consists of the model weights, the model
    specifications and various files (inputs, outputs, README, env.yaml etc.).

    `path_to_archive` should point to a file with a ".zip" extension.

    `friendly_model_name` is the name used for the model in the BMZ specs
    and website, it should consist of letters, numbers, dashes, underscores and
    parentheses only.

    Input array must be of the same dimensions as the axes recorded in the
    configuration of the `CAREamist`.

    Parameters
    ----------
    path_to_archive : pathlib.Path or str
        Path in which to save the model, including file name, which should end with
        ".zip".
    friendly_model_name : str
        Name of the model as used in the BMZ specs, it should consist of letters,
        numbers, dashes, underscores and parentheses only.
    input_array : NDArray
        Input array used to validate the model and as example.
    authors : list of dict
        List of authors of the model.
    general_description : str
        General description of the model used in the BMZ metadata.
    data_description : str
        Description of the data the model was trained on.
    covers : list of pathlib.Path or str, default=None
        Paths to the cover images.
    channel_names : list of str, default=None
        Channel names.
    model_version : str, default="0.1.0"
        Version of the model.
    """
    output_patch = self.predict(
        pred_data=input_array,
        data_type=SupportedData.ARRAY.value,
    )
    output = np.concatenate(output_patch, axis=0)
    input_array = reshape_array(input_array, self.config.data_config.axes)

    export_to_bmz(
        model=self.model,
        config=self.config,
        path_to_archive=path_to_archive,
        model_name=friendly_model_name,
        general_description=general_description,
        data_description=data_description,
        authors=authors,
        input_array=input_array,
        output_array=output,
        covers=covers,
        channel_names=channel_names,
        model_version=model_version,
    )

get_losses() #

Return data that can be used to plot train and validation loss curves.

Returns:

Type Description
dict of str: list

Dictionary containing losses for each epoch.

Source code in src/careamics/careamist_v2.py
def get_losses(self) -> dict[str, list]:
    """Return data that can be used to plot train and validation loss curves.

    Returns
    -------
    dict of str: list
        Dictionary containing losses for each epoch.
    """
    return read_csv_logger(self.config.experiment_name, self.work_dir / "csv_logs")

stop_training() #

Stop the training loop.

Source code in src/careamics/careamist_v2.py
def stop_training(self) -> None:
    """Stop the training loop."""
    self.trainer.should_stop = True
    self.trainer.limit_val_batches = 0 # skip validation