跳转至

Data.dataset(数据集) 模块

ppsci.data.dataset

IterableNamedArrayDataset

Bases: IterableDataset

IterableNamedArrayDataset for full-data loading.

Parameters:

Name Type Description Default
input Dict[str, ndarray]

Input dict.

required
label Optional[Dict[str, ndarray]]

Label dict. Defaults to None.

None
weight Optional[Dict[str, ndarray]]

Weight dict. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> input = {"x": np.random.randn(100, 1)}
>>> label = {"u": np.random.randn(100, 1)}
>>> weight = {"u": np.random.randn(100, 1)}
>>> dataset = ppsci.data.dataset.IterableNamedArrayDataset(input, label, weight)
Source code in ppsci/data/dataset/array_dataset.py
class IterableNamedArrayDataset(io.IterableDataset):
    """IterableNamedArrayDataset for full-data loading.

    Args:
        input (Dict[str, np.ndarray]): Input dict.
        label (Optional[Dict[str, np.ndarray]]): Label dict. Defaults to None.
        weight (Optional[Dict[str, np.ndarray]]): Weight dict. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> input = {"x": np.random.randn(100, 1)}
        >>> label = {"u": np.random.randn(100, 1)}
        >>> weight = {"u": np.random.randn(100, 1)}
        >>> dataset = ppsci.data.dataset.IterableNamedArrayDataset(input, label, weight)
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        input: Dict[str, np.ndarray],
        label: Optional[Dict[str, np.ndarray]] = None,
        weight: Optional[Dict[str, np.ndarray]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input = {key: paddle.to_tensor(value) for key, value in input.items()}
        self.label = (
            {key: paddle.to_tensor(value) for key, value in label.items()}
            if label is not None
            else {}
        )
        self.input_keys = tuple(input.keys())
        self.label_keys = tuple(self.label.keys())
        self.weight = (
            {
                key: paddle.to_tensor(value, paddle.get_default_dtype())
                for key, value in weight.items()
            }
            if weight is not None
            else None
        )
        self._len = len(next(iter(self.input.values())))
        self.transforms = transforms

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        return self._len

    def __iter__(self):
        if callable(self.transforms):
            input_, label_, weight_ = self.transforms(
                self.input, self.label, self.weight
            )
            yield input_, label_, weight_
        else:
            yield self.input, self.label, self.weight

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

NamedArrayDataset

Bases: Dataset

Class for Named Array Dataset.

Parameters:

Name Type Description Default
input Dict[str, ndarray]

Input dict.

required
label Optional[Dict[str, ndarray]]

Label dict. Defaults to None.

None
weight Optional[Dict[str, ndarray]]

Weight dict. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> input = {"x": np.random.randn(100, 1)}
>>> output = {"u": np.random.randn(100, 1)}
>>> weight = {"u": np.random.randn(100, 1)}
>>> dataset = ppsci.data.dataset.NamedArrayDataset(input, output, weight)
Source code in ppsci/data/dataset/array_dataset.py
class NamedArrayDataset(io.Dataset):
    """Class for Named Array Dataset.

    Args:
        input (Dict[str, np.ndarray]): Input dict.
        label (Optional[Dict[str, np.ndarray]]): Label dict. Defaults to None.
        weight (Optional[Dict[str, np.ndarray]]): Weight dict. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> input = {"x": np.random.randn(100, 1)}
        >>> output = {"u": np.random.randn(100, 1)}
        >>> weight = {"u": np.random.randn(100, 1)}
        >>> dataset = ppsci.data.dataset.NamedArrayDataset(input, output, weight)
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        input: Dict[str, np.ndarray],
        label: Optional[Dict[str, np.ndarray]] = None,
        weight: Optional[Dict[str, np.ndarray]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input = input
        self.label = {} if label is None else label
        self.input_keys = tuple(input.keys())
        self.label_keys = tuple(self.label.keys())
        self.weight = {} if weight is None else weight
        self.transforms = transforms
        self._len = len(next(iter(input.values())))
        for key in input:
            if key in self.label and len(input[key]) != len(self.label[key]):
                logger.warning(
                    f"The length of input {key}({len(input[key])}) is not equal to "
                    f"the length of label {key}({len(self.label[key])})."
                )

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        weight_item = {key: value[idx] for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        return self._len

ChipHeatDataset

Bases: Dataset

ChipHeatDataset for data loading of multi-branch DeepONet model.

Parameters:

Name Type Description Default
input Dict[str, ndarray]

Input dict.

required
label Optional[Dict[str, ndarray]]

Label dict. Defaults to None.

required
index tuple[str, ...]

Key of input dict.

required
data_type str

One of key of input dict.

required
weight Optional[Dict[str, ndarray]]

Weight dict. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> input = {"x": np.random.randn(100, 1)}
>>> label = {"u": np.random.randn(100, 1)}
>>> index = ('x', 'u', 'bc', 'bc_data')
>>> data_type = 'u'
>>> weight = {"u": np.random.randn(100, 1)}
>>> dataset = ppsci.data.dataset.ChipHeatDataset(input, label, index, data_type, weight)
Source code in ppsci/data/dataset/array_dataset.py
class ChipHeatDataset(io.Dataset):
    """ChipHeatDataset for data loading of multi-branch DeepONet model.

    Args:
        input (Dict[str, np.ndarray]): Input dict.
        label (Optional[Dict[str, np.ndarray]]): Label dict. Defaults to None.
        index (tuple[str, ...]): Key of input dict.
        data_type (str): One of key of input dict.
        weight (Optional[Dict[str, np.ndarray]]): Weight dict. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> input = {"x": np.random.randn(100, 1)}
        >>> label = {"u": np.random.randn(100, 1)}
        >>> index = ('x', 'u', 'bc', 'bc_data')
        >>> data_type = 'u'
        >>> weight = {"u": np.random.randn(100, 1)}
        >>> dataset = ppsci.data.dataset.ChipHeatDataset(input, label, index, data_type, weight)
    """

    def __init__(
        self,
        input: Dict[str, np.ndarray],
        label: Dict[str, np.ndarray],
        index: tuple[str, ...],
        data_type: str,
        weight: Optional[Dict[str, float]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input = input
        self.label = label
        self.input_keys = tuple(input.keys())
        self.label_keys = tuple(label.keys())
        self.index = index
        self.data_type = data_type
        self.weight = {} if weight is None else weight
        self.transforms = transforms

    def __getitem__(self, idx):
        quotient = idx
        index_ir = dict()
        for i in self.index:
            index_ir[i] = 0

        for i in index_ir:
            num = len(self.input[i])
            index_ir[i] = quotient % num
            quotient = quotient // num

        input_item = {}
        for key in self.input:
            if key == "y":
                input_item[key] = self.input[key][index_ir["x"]]
            elif key == "u_one":
                input_item[key] = self.input[key][
                    len(self.input[self.data_type]) * index_ir["x"]
                    + index_ir[self.data_type]
                ]
            else:
                input_item[key] = self.input[key][index_ir[key]]

        label_item = {key: value for key, value in self.label.items()}
        weight_item = {key: value for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                (input_item, label_item, weight_item)
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        _len = 1
        for i in self.index:
            _len *= len(self.input[i])
        return _len

CSVDataset

Bases: Dataset

Dataset class for .csv file.

Parameters:

Name Type Description Default
file_path str

CSV file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys.

required
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.CSVDataset(
...     "/path/to/file.csv",
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/csv_dataset.py
class CSVDataset(io.Dataset):
    """Dataset class for .csv file.

    Args:
        file_path (str): CSV file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...]): List of label keys.
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.CSVDataset(
        ...     "/path/to/file.csv",
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_csv_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = (
            {key: np.ones_like(next(iter(self.label.values()))) for key in self.label}
            if weight_dict is not None
            else {}
        )
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        weight_item = {key: value[idx] for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        return self._len

IterableCSVDataset

Bases: IterableDataset

IterableCSVDataset for full-data loading.

Parameters:

Name Type Description Default
file_path str

CSV file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys.

required
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.IterableCSVDataset(
...     "/path/to/file.csv"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/csv_dataset.py
class IterableCSVDataset(io.IterableDataset):
    """IterableCSVDataset for full-data loading.

    Args:
        file_path (str): CSV file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...]): List of label keys.
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.IterableCSVDataset(
        ...     "/path/to/file.csv"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_csv_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = (
            {key: np.ones_like(next(iter(self.label.values()))) for key in self.label}
            if weight_dict is not None
            else {}
        )
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.input = {key: paddle.to_tensor(value) for key, value in self.input.items()}
        self.label = {key: paddle.to_tensor(value) for key, value in self.label.items()}
        self.weight = {
            key: paddle.to_tensor(value) for key, value in self.weight.items()
        }

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        return self._len

    def __iter__(self):
        if callable(self.transforms):
            input_, label_, weight_ = self.transforms(
                self.input, self.label, self.weight
            )
            yield input_, label_, weight_
        else:
            yield self.input, self.label, self.weight

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

ContinuousNamedArrayDataset

Bases: IterableDataset

ContinuousNamedArrayDataset for iterable sampling.

Parameters:

Name Type Description Default
input Callable

Function generate input dict.

required
label Callable

Function generate label dict.

required
weight Optional[Callable]

Function generate weight dict. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> import numpy as np
>>> input = lambda : {"x": np.random.randn(100, 1)}
>>> label = lambda inp: {"u": np.random.randn(100, 1)}
>>> weight = lambda inp, label: {"u": 1 - (label["u"] ** 2)}
>>> dataset = ppsci.data.dataset.ContinuousNamedArrayDataset(input, label, weight)
>>> input_batch, label_batch, weight_batch = next(iter(dataset))
>>> print(input_batch["x"].shape)
[100, 1]
>>> print(label_batch["u"].shape)
[100, 1]
>>> print(weight_batch["u"].shape)
[100, 1]
Source code in ppsci/data/dataset/array_dataset.py
class ContinuousNamedArrayDataset(io.IterableDataset):
    """ContinuousNamedArrayDataset for iterable sampling.

    Args:
        input (Callable): Function generate input dict.
        label (Callable): Function generate label dict.
        weight (Optional[Callable]): Function generate weight dict. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> import numpy as np
        >>> input = lambda : {"x": np.random.randn(100, 1)}
        >>> label = lambda inp: {"u": np.random.randn(100, 1)}
        >>> weight = lambda inp, label: {"u": 1 - (label["u"] ** 2)}
        >>> dataset = ppsci.data.dataset.ContinuousNamedArrayDataset(input, label, weight)
        >>> input_batch, label_batch, weight_batch = next(iter(dataset))
        >>> print(input_batch["x"].shape)
        [100, 1]
        >>> print(label_batch["u"].shape)
        [100, 1]
        >>> print(weight_batch["u"].shape)
        [100, 1]
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        input: Callable,
        label: Callable,
        weight: Optional[Callable] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_fn = input
        self.input_keys = tuple(self.input_fn().keys())

        self.label_fn = label
        input_ = self.input_fn()
        self.label_keys = tuple(self.label_fn(input_).keys())

        self.weight_fn = weight
        self.transforms = transforms

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        raise NotImplementedError(
            "ContinuousNamedArrayDataset has no fixed number of samples."
        )

    def __iter__(self):
        def to_tensor_dict(_dict):
            if _dict is None:
                return None
            return {k: paddle.to_tensor(v) for k, v in _dict.items()}

        while True:
            input_batch = self.input_fn()
            label_batch = self.label_fn(input_batch)
            if callable(self.weight_fn):
                weight_batch = self.weight_fn(input_batch, label_batch)
            else:
                weight_batch = None

            if callable(self.transforms):
                input_batch, label_batch, weight_batch = self.transforms(
                    input_batch, label_batch, weight_batch
                )
            yield to_tensor_dict(input_batch), to_tensor_dict(
                label_batch
            ), to_tensor_dict(weight_batch)

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

ERA5Dataset

Bases: Dataset

Class for ERA5 dataset.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
precip_file_path Optional[str]

Precipitation data set path. Defaults to None.

None
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
vars_channel Optional[Tuple[int, ...]]

The variable channel index in ERA5 dataset. Defaults to None.

None
num_label_timestamps int

Number of timestamp of label. Defaults to 1.

1
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None
training bool

Whether in train mode. Defaults to True.

True
stride int

Stride of sampling data. Defaults to 1.

1

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.ERA5Dataset(
...     "file_path": "/path/to/ERA5Dataset",
...     "input_keys": ("input",),
...     "label_keys": ("output",),
... )
Source code in ppsci/data/dataset/era5_dataset.py
class ERA5Dataset(io.Dataset):
    """Class for ERA5 dataset.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        precip_file_path (Optional[str]): Precipitation data set path. Defaults to None.
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        vars_channel (Optional[Tuple[int, ...]]): The variable channel index in ERA5 dataset. Defaults to None.
        num_label_timestamps (int, optional): Number of timestamp of label. Defaults to 1.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.
        training (bool, optional): Whether in train mode. Defaults to True.
        stride (int, optional): Stride of sampling data. Defaults to 1.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.ERA5Dataset(
        ...     "file_path": "/path/to/ERA5Dataset",
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        precip_file_path: Optional[str] = None,
        weight_dict: Optional[Dict[str, float]] = None,
        vars_channel: Optional[Tuple[int, ...]] = None,
        num_label_timestamps: int = 1,
        transforms: Optional[vision.Compose] = None,
        training: bool = True,
        stride: int = 1,
    ):
        super().__init__()
        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.precip_file_path = precip_file_path

        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.vars_channel = list(range(20)) if vars_channel is None else vars_channel
        self.num_label_timestamps = num_label_timestamps
        self.transforms = transforms
        self.training = training
        self.stride = stride

        self.files = self.read_data(file_path)
        self.n_years = len(self.files)
        self.num_samples_per_year = self.files[0].shape[0]
        self.num_samples = self.n_years * self.num_samples_per_year
        if self.precip_file_path is not None:
            self.precip_files = self.read_data(precip_file_path, "tp")

    def read_data(self, path: str, var="fields"):
        paths = [path] if path.endswith(".h5") else glob.glob(path + "/*.h5")
        paths.sort()
        files = []
        for path_ in paths:
            _file = h5py.File(path_, "r")
            files.append(_file[var])
        return files

    def __len__(self):
        return self.num_samples // self.stride

    def __getitem__(self, global_idx):
        global_idx *= self.stride
        year_idx = global_idx // self.num_samples_per_year
        local_idx = global_idx % self.num_samples_per_year
        step = 0 if local_idx >= self.num_samples_per_year - 1 else 1

        if self.num_label_timestamps > 1:
            if local_idx >= self.num_samples_per_year - self.num_label_timestamps:
                local_idx = self.num_samples_per_year - self.num_label_timestamps - 1

        input_file = self.files[year_idx]
        label_file = (
            self.precip_files[year_idx]
            if self.precip_file_path is not None
            else input_file
        )
        if self.precip_file_path is not None and year_idx == 0 and self.training:
            # first year has 2 missing samples in precip (they are first two time points)
            lim = self.num_samples_per_year - 2
            local_idx = local_idx % lim
            step = 0 if local_idx >= lim - 1 else 1
            input_idx = local_idx + 2
            label_idx = local_idx + step
        else:
            input_idx, label_idx = local_idx, local_idx + step

        input_item = {self.input_keys[0]: input_file[input_idx, self.vars_channel]}

        label_item = {}
        for i in range(self.num_label_timestamps):
            if self.precip_file_path is not None:
                label_item[self.label_keys[i]] = np.expand_dims(
                    label_file[label_idx + i], 0
                )
            else:
                label_item[self.label_keys[i]] = label_file[
                    label_idx + i, self.vars_channel
                ]

        weight_shape = [1] * len(next(iter(label_item.values())).shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return input_item, label_item, weight_item

ERA5SampledDataset

Bases: Dataset

Class for ERA5 sampled dataset.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.ERA5SampledDataset(
...     "file_path": "/path/to/ERA5SampledDataset",
...     "input_keys": ("input",),
...     "label_keys": ("output",),
... )
>>> # get the length of the dataset
>>> dataset_size = len(dataset)
>>> # get the first sample of the data
>>> first_sample = dataset[0]
>>> print("First sample:", first_sample)
Source code in ppsci/data/dataset/era5_dataset.py
class ERA5SampledDataset(io.Dataset):
    """Class for ERA5 sampled dataset.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.ERA5SampledDataset(
        ...     "file_path": "/path/to/ERA5SampledDataset",
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ... )  # doctest: +SKIP
        >>> # get the length of the dataset
        >>> dataset_size = len(dataset)  # doctest: +SKIP
        >>> # get the first sample of the data
        >>> first_sample = dataset[0]  # doctest: +SKIP
        >>> print("First sample:", first_sample)  # doctest: +SKIP
    """

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        weight_dict: Optional[Dict[str, float]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys

        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.transforms = transforms

        self.files = self.read_data(file_path)
        self.num_samples = len(self.files)

    def read_data(self, path: str):
        paths = glob.glob(path + "/*.h5")
        paths.sort()
        files = []
        for _path in paths:
            _file = h5py.File(_path, "r")
            files.append(_file)
        return files

    def __len__(self):
        return self.num_samples

    def __getitem__(self, global_idx):
        _file = self.files[global_idx]

        input_item = {}
        for key in _file["input_dict"]:
            input_item[key] = np.asarray(
                _file["input_dict"][key], paddle.get_default_dtype()
            )

        label_item = {}
        for key in _file["label_dict"]:
            label_item[key] = np.asarray(
                _file["label_dict"][key], paddle.get_default_dtype()
            )

        weight_shape = [1] * len(next(iter(label_item.values())).shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return input_item, label_item, weight_item

IterableMatDataset

Bases: IterableDataset

IterableMatDataset for full-data loading.

Parameters:

Name Type Description Default
file_path str

Mat file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys. Defaults to ().

()
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.IterableMatDataset(
...     "/path/to/file.mat"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/mat_dataset.py
class IterableMatDataset(io.IterableDataset):
    """IterableMatDataset for full-data loading.

    Args:
        file_path (str): Mat file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...], optional): List of label keys. Defaults to ().
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.IterableMatDataset(
        ...     "/path/to/file.mat"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...] = (),
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_mat_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = (
            {key: np.ones_like(next(iter(self.label.values()))) for key in self.label}
            if weight_dict is not None
            else {}
        )
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.input = {key: paddle.to_tensor(value) for key, value in self.input.items()}
        self.label = {key: paddle.to_tensor(value) for key, value in self.label.items()}
        self.weight = {
            key: paddle.to_tensor(value) for key, value in self.weight.items()
        }

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        return self._len

    def __iter__(self):
        if callable(self.transforms):
            input_, label_, weight_ = self.transforms(
                self.input, self.label, self.weight
            )
            yield input_, label_, weight_
        else:
            yield self.input, self.label, self.weight

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

MatDataset

Bases: Dataset

Dataset class for .mat file.

Parameters:

Name Type Description Default
file_path str

Mat file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys. Defaults to ().

()
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.MatDataset(
...     "/path/to/file.mat"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/mat_dataset.py
class MatDataset(io.Dataset):
    """Dataset class for .mat file.

    Args:
        file_path (str): Mat file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...], optional): List of label keys. Defaults to ().
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.MatDataset(
        ...     "/path/to/file.mat"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...] = (),
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_mat_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = (
            {key: np.ones_like(next(iter(self.label.values()))) for key in self.label}
            if weight_dict is not None
            else {}
        )
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        weight_item = {key: value[idx] for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        return self._len

IterableNPZDataset

Bases: IterableDataset

IterableNPZDataset for full-data loading.

Parameters:

Name Type Description Default
file_path str

Npz file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys. Defaults to ().

()
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.IterableNPZDataset(
...     "/path/to/file.npz"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/npz_dataset.py
class IterableNPZDataset(io.IterableDataset):
    """IterableNPZDataset for full-data loading.

    Args:
        file_path (str): Npz file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...], optional): List of label keys. Defaults to ().
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.IterableNPZDataset(
        ...     "/path/to/file.npz"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...] = (),
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_npz_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = {}
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.input = {key: paddle.to_tensor(value) for key, value in self.input.items()}
        self.label = {key: paddle.to_tensor(value) for key, value in self.label.items()}
        self.weight = {
            key: paddle.to_tensor(value) for key, value in self.weight.items()
        }

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    @property
    def num_samples(self):
        """Number of samples within current dataset."""
        return self._len

    def __iter__(self):
        if callable(self.transforms):
            input_, label_, weight_ = self.transforms(
                self.input, self.label, self.weight
            )
            yield input_, label_, weight_
        else:
            yield self.input, self.label, self.weight

    def __len__(self):
        return 1

num_samples property

Number of samples within current dataset.

NPZDataset

Bases: Dataset

Dataset class for .npz file.

Parameters:

Name Type Description Default
file_path str

Npz file path.

required
input_keys Tuple[str, ...]

List of input keys.

required
label_keys Tuple[str, ...]

List of label keys. Defaults to ().

()
alias_dict Optional[Dict[str, str]]

Dict of alias(es) for input and label keys. i.e. {inner_key: outer_key}. Defaults to None.

None
weight_dict Optional[Dict[str, Union[Callable, float]]]

Define the weight of each constraint variable. Defaults to None.

None
timestamps Optional[Tuple[float, ...]]

The number of repetitions of the data in the time dimension. Defaults to None.

None
transforms Optional[Compose]

Compose object contains sample wise transform(s). Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.NPZDataset(
...     "/path/to/file.npz"
...     ("x",),
...     ("u",),
... )
Source code in ppsci/data/dataset/npz_dataset.py
class NPZDataset(io.Dataset):
    """Dataset class for .npz file.

    Args:
        file_path (str): Npz file path.
        input_keys (Tuple[str, ...]): List of input keys.
        label_keys (Tuple[str, ...], optional): List of label keys. Defaults to ().
        alias_dict (Optional[Dict[str, str]]): Dict of alias(es) for input and label keys.
            i.e. {inner_key: outer_key}. Defaults to None.
        weight_dict (Optional[Dict[str, Union[Callable, float]]]): Define the weight of
            each constraint variable. Defaults to None.
        timestamps (Optional[Tuple[float, ...]]): The number of repetitions of the data
            in the time dimension. Defaults to None.
        transforms (Optional[vision.Compose]): Compose object contains sample wise
            transform(s). Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.NPZDataset(
        ...     "/path/to/file.npz"
        ...     ("x",),
        ...     ("u",),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...] = (),
        alias_dict: Optional[Dict[str, str]] = None,
        weight_dict: Optional[Dict[str, Union[Callable, float]]] = None,
        timestamps: Optional[Tuple[float, ...]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys

        # read raw data from file
        raw_data = reader.load_npz_file(
            file_path,
            input_keys + label_keys,
            alias_dict,
        )
        # filter raw data by given timestamps if specified
        if timestamps is not None:
            if "t" in raw_data:
                # filter data according to given timestamps
                raw_time_array = raw_data["t"]
                mask = []
                for ti in timestamps:
                    mask.append(np.nonzero(np.isclose(raw_time_array, ti).flatten())[0])
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                mask = np.concatenate(mask, 0)
                raw_data = raw_data[mask]
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )
            else:
                # repeat data according to given timestamps
                raw_data = misc.convert_to_array(
                    raw_data, self.input_keys + self.label_keys
                )
                raw_data = misc.combine_array_with_time(raw_data, timestamps)
                self.input_keys = ("t",) + tuple(self.input_keys)
                raw_data = misc.convert_to_dict(
                    raw_data, self.input_keys + self.label_keys
                )

        # fetch input data
        self.input = {
            key: value for key, value in raw_data.items() if key in self.input_keys
        }
        # fetch label data
        self.label = {
            key: value for key, value in raw_data.items() if key in self.label_keys
        }

        # prepare weights
        self.weight = {}
        if weight_dict is not None:
            for key, value in weight_dict.items():
                if isinstance(value, (int, float)):
                    self.weight[key] = np.full_like(
                        next(iter(self.label.values())), value
                    )
                elif callable(value):
                    func = value
                    self.weight[key] = func(self.input)
                    if isinstance(self.weight[key], (int, float)):
                        self.weight[key] = np.full_like(
                            next(iter(self.label.values())), self.weight[key]
                        )
                else:
                    raise NotImplementedError(f"type of {type(value)} is invalid yet.")

        self.transforms = transforms
        self._len = len(next(iter(self.input.values())))

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        weight_item = {key: value[idx] for key, value in self.weight.items()}

        if self.transforms is not None:
            input_item, label_item, weight_item = self.transforms(
                input_item, label_item, weight_item
            )

        return (input_item, label_item, weight_item)

    def __len__(self):
        return self._len

CylinderDataset

Bases: Dataset

Dataset for training Cylinder model.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("states","visc").

required
label_keys Tuple[str, ...]

Output keys, such as ("pred_states", "recover_states").

required
block_size int

Data block size.

required
stride int

Data stride.

required
ndata Optional[int]

Number of data series to use. Defaults to None.

None
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
embedding_model Optional[Arch]

Embedding model. Defaults to None.

None
embedding_batch_size int

The batch size of embedding model. Defaults to 64.

64

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.CylinderDataset(
...     "file_path": "/path/to/CylinderDataset",
...     "input_keys": ("x",),
...     "label_keys": ("v",),
...     "block_size": 32,
...     "stride": 16,
... )
Source code in ppsci/data/dataset/trphysx_dataset.py
class CylinderDataset(io.Dataset):
    """Dataset for training Cylinder model.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("states","visc").
        label_keys (Tuple[str, ...]): Output keys, such as ("pred_states", "recover_states").
        block_size (int): Data block size.
        stride (int): Data stride.
        ndata (Optional[int]): Number of data series to use. Defaults to None.
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        embedding_model (Optional[base.Arch]): Embedding model. Defaults to None.
        embedding_batch_size (int, optional): The batch size of embedding model. Defaults to 64.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.CylinderDataset(
        ...     "file_path": "/path/to/CylinderDataset",
        ...     "input_keys": ("x",),
        ...     "label_keys": ("v",),
        ...     "block_size": 32,
        ...     "stride": 16,
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        block_size: int,
        stride: int,
        ndata: Optional[int] = None,
        weight_dict: Optional[Dict[str, float]] = None,
        embedding_model: Optional[base.Arch] = None,
        embedding_batch_size: int = 64,
    ):
        if not os.path.exists(file_path):
            raise FileNotFoundError(
                f"file_path({file_path}) not exists. Please download dataset first. "
                "Training: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/cylinder_training.hdf5. "
                "Valid: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/cylinder_valid.hdf5."
            )
        super().__init__()
        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys

        self.block_size = block_size
        self.stride = stride
        self.ndata = ndata
        self.weight_dict = {key: 1.0 for key in self.label_keys}
        if weight_dict is not None:
            self.weight_dict.update(weight_dict)

        self.data, self.visc = self.read_data(file_path, block_size, stride)
        self.embedding_model = embedding_model
        if embedding_model is None:
            self.embedding_data = None
        else:
            embedding_model.eval()
            with paddle.no_grad():
                data_tensor = paddle.to_tensor(self.data)
                visc_tensor = paddle.to_tensor(self.visc)
                embedding_data = []
                for i in range(0, len(data_tensor), embedding_batch_size):
                    start, end = i, min(i + embedding_batch_size, len(data_tensor))
                    embedding_data_batch = embedding_model.encoder(
                        data_tensor[start:end], visc_tensor[start:end]
                    )
                    embedding_data.append(embedding_data_batch.numpy())
                self.embedding_data = np.concatenate(embedding_data)

    def read_data(self, file_path: str, block_size: int, stride: int):
        data = []
        visc = []
        with h5py.File(file_path, "r") as f:
            data_num = 0
            for key in f.keys():
                visc0 = 2.0 / float(key)
                ux = np.asarray(f[key + "/ux"], dtype=paddle.get_default_dtype())
                uy = np.asarray(f[key + "/uy"], dtype=paddle.get_default_dtype())
                p = np.asarray(f[key + "/p"], dtype=paddle.get_default_dtype())
                data_series = np.stack([ux, uy, p], axis=1)

                for i in range(0, data_series.shape[0] - block_size + 1, stride):
                    data.append(data_series[i : i + block_size])
                    visc.append([visc0])

                data_num += 1
                if self.ndata is not None and data_num >= self.ndata:
                    break

        data = np.asarray(data)
        visc = np.asarray(visc, dtype=paddle.get_default_dtype())
        return data, visc

    def __len__(self):
        return len(self.data)

    def __getitem__(self, i):
        if self.embedding_data is None:
            data_item = self.data[i]
            input_item = {
                self.input_keys[0]: data_item,
                self.input_keys[1]: self.visc[i],
            }
            label_item = {
                self.label_keys[0]: data_item[1:],
                self.label_keys[1]: data_item,
            }
        else:
            data_item = self.embedding_data[i]
            input_item = {self.input_keys[0]: data_item[:-1, :]}
            label_item = {self.label_keys[0]: data_item[1:, :]}
            if len(self.label_keys) == 2:
                label_item[self.label_keys[1]] = data_item[1:, :]
        weight_shape = [1] * len(data_item.shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }
        return (input_item, label_item, weight_item)

LorenzDataset

Bases: Dataset

Dataset for training Lorenz model.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("states",).

required
label_keys Tuple[str, ...]

Output keys, such as ("pred_states", "recover_states").

required
block_size int

Data block size.

required
stride int

Data stride.

required
ndata Optional[int]

Number of data series to use. Defaults to None.

None
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
embedding_model Optional[Arch]

Embedding model. Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.LorenzDataset(
...     "file_path": "/path/to/LorenzDataset",
...     "input_keys": ("x",),
...     "label_keys": ("v",),
...     "block_size": 32,
...     "stride": 16,
... )
Source code in ppsci/data/dataset/trphysx_dataset.py
class LorenzDataset(io.Dataset):
    """Dataset for training Lorenz model.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("states",).
        label_keys (Tuple[str, ...]): Output keys, such as ("pred_states", "recover_states").
        block_size (int): Data block size.
        stride (int): Data stride.
        ndata (Optional[int]): Number of data series to use. Defaults to None.
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        embedding_model (Optional[base.Arch]): Embedding model. Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.LorenzDataset(
        ...     "file_path": "/path/to/LorenzDataset",
        ...     "input_keys": ("x",),
        ...     "label_keys": ("v",),
        ...     "block_size": 32,
        ...     "stride": 16,
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        block_size: int,
        stride: int,
        ndata: Optional[int] = None,
        weight_dict: Optional[Dict[str, float]] = None,
        embedding_model: Optional[base.Arch] = None,
    ):
        super().__init__()
        if not os.path.exists(file_path):
            raise FileNotFoundError(
                f"file_path({file_path}) not exists. Please download dataset first. "
                "Training: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/lorenz_training_rk.hdf5. "
                "Valid: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/lorenz_valid_rk.hdf5."
            )

        self.file_path = file_path
        self.input_keys = input_keys
        self.label_keys = label_keys

        self.block_size = block_size
        self.stride = stride
        self.ndata = ndata
        self.weight_dict = {key: 1.0 for key in self.label_keys}
        if weight_dict is not None:
            self.weight_dict.update(weight_dict)

        self.data = self.read_data(file_path, block_size, stride)
        self.embedding_model = embedding_model
        if embedding_model is None:
            self.embedding_data = None
        else:
            embedding_model.eval()
            with paddle.no_grad():
                data_tensor = paddle.to_tensor(self.data)
                embedding_data_tensor = embedding_model.encoder(data_tensor)
            self.embedding_data = embedding_data_tensor.numpy()

    def read_data(self, file_path: str, block_size: int, stride: int):
        data = []
        with h5py.File(file_path, "r") as f:
            data_num = 0
            for key in f.keys():
                data_series = np.asarray(f[key], dtype=paddle.get_default_dtype())
                for i in range(0, data_series.shape[0] - block_size + 1, stride):
                    data.append(data_series[i : i + block_size])
                data_num += 1
                if self.ndata is not None and data_num >= self.ndata:
                    break
        return np.asarray(data)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        # when embedding data is None
        if self.embedding_data is None:
            data_item = self.data[idx]
            input_item = {self.input_keys[0]: data_item}
            label_item = {
                self.label_keys[0]: data_item[1:, :],
                self.label_keys[1]: data_item,
            }
        else:
            data_item = self.embedding_data[idx]
            input_item = {self.input_keys[0]: data_item[:-1, :]}
            label_item = {self.label_keys[0]: data_item[1:, :]}
            if len(self.label_keys) == 2:
                label_item[self.label_keys[1]] = self.data[idx][1:, :]

        weight_shape = [1] * len(data_item.shape)
        weight_item = {
            key: np.full(weight_shape, value, paddle.get_default_dtype())
            for key, value in self.weight_dict.items()
        }
        return (input_item, label_item, weight_item)

RosslerDataset

Bases: LorenzDataset

Dataset for training Rossler model.

Parameters:

Name Type Description Default
file_path str

Data set path.

required
input_keys Tuple[str, ...]

Input keys, such as ("states",).

required
label_keys Tuple[str, ...]

Output keys, such as ("pred_states", "recover_states").

required
block_size int

Data block size.

required
stride int

Data stride.

required
ndata Optional[int]

Number of data series to use. Defaults to None.

None
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None
embedding_model Optional[Arch]

Embedding model. Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.RosslerDataset(
...     "file_path": "/path/to/RosslerDataset",
...     "input_keys": ("x",),
...     "label_keys": ("v",),
...     "block_size": 32,
...     "stride": 16,
... )
Source code in ppsci/data/dataset/trphysx_dataset.py
class RosslerDataset(LorenzDataset):
    """Dataset for training Rossler model.

    Args:
        file_path (str): Data set path.
        input_keys (Tuple[str, ...]): Input keys, such as ("states",).
        label_keys (Tuple[str, ...]): Output keys, such as ("pred_states", "recover_states").
        block_size (int): Data block size.
        stride (int): Data stride.
        ndata (Optional[int]): Number of data series to use. Defaults to None.
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.
        embedding_model (Optional[base.Arch]): Embedding model. Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.RosslerDataset(
        ...     "file_path": "/path/to/RosslerDataset",
        ...     "input_keys": ("x",),
        ...     "label_keys": ("v",),
        ...     "block_size": 32,
        ...     "stride": 16,
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        file_path: str,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        block_size: int,
        stride: int,
        ndata: Optional[int] = None,
        weight_dict: Optional[Dict[str, float]] = None,
        embedding_model: Optional[base.Arch] = None,
    ):
        if not os.path.exists(file_path):
            raise FileNotFoundError(
                f"file_path({file_path}) not exists. Please download dataset first. "
                "Training: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/rossler_training.hdf5. "
                "Valid: https://paddle-org.bj.bcebos.com/paddlescience/datasets/transformer_physx/rossler_valid.hdf5."
            )
        super().__init__(
            file_path,
            input_keys,
            label_keys,
            block_size,
            stride,
            ndata,
            weight_dict,
            embedding_model,
        )

VtuDataset

Bases: Dataset

Dataset class for .vtu file.

Parameters:

Name Type Description Default
file_path str

*.vtu file path.

required
input_keys Optional[Tuple[str, ...]]

Tuple of input keys. Defaults to None.

None
label_keys Optional[Tuple[str, ...]]

Tuple of label keys. Defaults to None.

None
time_step Optional[int]

Time step with unit second. Defaults to None.

None
time_index Optional[Tuple[int, ...]]

Time index tuple in increasing order.

None
labels Optional[Dict[str, float]]

Temporary variable for [load_vtk_with_time_file].

None
transforms Compose

Compose object contains sample wise. transform(s).

None

Examples:

>>> from ppsci.data.dataset import VtuDataset
>>> dataset = VtuDataset(file_path='example.vtu')
>>> # get the length of the dataset
>>> dataset_size = len(dataset)
>>> # get the first sample of the data
>>> first_sample = dataset[0]
>>> print("First sample:", first_sample)
Source code in ppsci/data/dataset/vtu_dataset.py
class VtuDataset(io.Dataset):
    """Dataset class for .vtu file.

    Args:
        file_path (str): *.vtu file path.
        input_keys (Optional[Tuple[str, ...]]): Tuple of input keys. Defaults to None.
        label_keys (Optional[Tuple[str, ...]]): Tuple of label keys. Defaults to None.
        time_step (Optional[int]): Time step with unit second. Defaults to None.
        time_index (Optional[Tuple[int, ...]]): Time index tuple in increasing order.
        labels (Optional[Dict[str, float]]): Temporary variable for [load_vtk_with_time_file].
        transforms (vision.Compose, optional): Compose object contains sample wise.
            transform(s).

    Examples:
        >>> from ppsci.data.dataset import VtuDataset

        >>> dataset = VtuDataset(file_path='example.vtu') # doctest: +SKIP

        >>> # get the length of the dataset
        >>> dataset_size = len(dataset) # doctest: +SKIP
        >>> # get the first sample of the data
        >>> first_sample = dataset[0] # doctest: +SKIP
        >>> print("First sample:", first_sample) # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = True

    def __init__(
        self,
        file_path: str,
        input_keys: Optional[Tuple[str, ...]] = None,
        label_keys: Optional[Tuple[str, ...]] = None,
        time_step: Optional[int] = None,
        time_index: Optional[Tuple[int, ...]] = None,
        labels: Optional[Dict[str, float]] = None,
        transforms: Optional[vision.Compose] = None,
    ):
        super().__init__()

        # load data from file
        if time_step is not None and time_index is not None:
            _input, _label = reader.load_vtk_file(
                file_path, time_step, time_index, input_keys, label_keys
            )
            _label = {key: _label[key] for key in label_keys}
        elif time_step is None and time_index is None:
            _input = reader.load_vtk_with_time_file(file_path)
            _label = {}
            for key, value in labels.items():
                if isinstance(value, (int, float)):
                    _label[key] = np.full_like(
                        next(iter(_input.values())), value, "float32"
                    )
                else:
                    _label[key] = value
        else:
            raise ValueError(
                "Error, read vtu with time_step and time_index, or neither"
            )

        # transform
        _input = transforms(_input)
        _label = transforms(_label)

        self.input = _input
        self.label = _label
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.transforms = transforms
        self.num_samples = len(next(iter(self.input.values())))

    def __getitem__(self, idx):
        input_item = {key: value[idx] for key, value in self.input.items()}
        label_item = {key: value[idx] for key, value in self.label.items()}
        return (input_item, label_item, {})

    def __len__(self):
        return self.num_samples

MeshAirfoilDataset

Bases: Dataset

Dataset for MeshAirfoil.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Name of input data.

required
label_keys Tuple[str, ...]

Name of label data.

required
data_dir str

Directory of MeshAirfoil data.

required
mesh_graph_path str

Path of mesh graph.

required
transpose_edges bool

Whether transpose the edges array from (2, num_edges) to (num_edges, 2) for convenient of slicing.

False

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.MeshAirfoilDataset(
...     "input_keys": ("input",),
...     "label_keys": ("output",),
...     "data_dir": "/path/to/MeshAirfoilDataset",
...     "mesh_graph_path": "/path/to/file.su2",
...     "transpose_edges": False,
... )
Source code in ppsci/data/dataset/airfoil_dataset.py
class MeshAirfoilDataset(io.Dataset):
    """Dataset for `MeshAirfoil`.

    Args:
        input_keys (Tuple[str, ...]): Name of input data.
        label_keys (Tuple[str, ...]): Name of label data.
        data_dir (str): Directory of MeshAirfoil data.
        mesh_graph_path (str): Path of mesh graph.
        transpose_edges (bool, optional): Whether transpose the edges array from (2, num_edges) to (num_edges, 2) for convenient of slicing.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.MeshAirfoilDataset(
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ...     "data_dir": "/path/to/MeshAirfoilDataset",
        ...     "mesh_graph_path": "/path/to/file.su2",
        ...     "transpose_edges": False,
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    use_pgl: bool = True

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        mesh_graph_path: str,
        transpose_edges: bool = False,
    ):
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.file_list = os.listdir(self.data_dir)
        self.len = len(self.file_list)
        self.mesh_graph = _get_mesh_graph(mesh_graph_path)

        with open(osp.join(osp.dirname(self.data_dir), "train_max_min.pkl"), "rb") as f:
            self.normalization_factors = pickle.load(f)

        self.nodes = self.mesh_graph[0]
        self.edges = self.mesh_graph[1]
        if transpose_edges:
            self.edges = self.edges.transpose([1, 0])
        self.elems_list = self.mesh_graph[2]
        self.marker_dict = self.mesh_graph[3]
        self.node_markers = np.full([self.nodes.shape[0], 1], fill_value=-1)
        for i, (marker_tag, marker_elems) in enumerate(self.marker_dict.items()):
            for elem in marker_elems:
                self.node_markers[elem[0]] = i
                self.node_markers[elem[1]] = i

        self.raw_graphs = [self.get(i) for i in range(len(self))]

    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        return (
            {
                self.input_keys[0]: self.raw_graphs[idx],
            },
            {
                self.label_keys[0]: self.raw_graphs[idx],
            },
            None,
        )

    def get(self, idx):
        with open(osp.join(self.data_dir, self.file_list[idx]), "rb") as f:
            fields = pickle.load(f)
        fields = self._preprocess(fields)
        aoa, reynolds, mach = self._get_params_from_name(self.file_list[idx])
        # aoa = aoa
        mach_or_reynolds = mach if reynolds is None else reynolds
        # mach_or_reynolds = mach_or_reynolds
        norm_aoa = aoa / 10
        norm_mach_or_reynolds = (
            mach_or_reynolds if reynolds is None else (mach_or_reynolds - 1.5e6) / 1.5e6
        )

        nodes = np.concatenate(
            [
                self.nodes,
                np.repeat(a=norm_aoa, repeats=self.nodes.shape[0])[:, np.newaxis],
                np.repeat(a=norm_mach_or_reynolds, repeats=self.nodes.shape[0])[
                    :, np.newaxis
                ],
                self.node_markers,
            ],
            axis=-1,
        ).astype(paddle.get_default_dtype())

        data = pgl.Graph(
            num_nodes=nodes.shape[0],
            edges=self.edges,
        )
        data.x = nodes
        data.y = fields
        data.pos = self.nodes
        data.edge_index = self.edges

        sender = data.x[data.edge_index[0]]
        receiver = data.x[data.edge_index[1]]
        relation_pos = sender[:, 0:2] - receiver[:, 0:2]
        post = np.linalg.norm(relation_pos, ord=2, axis=1, keepdims=True).astype(
            paddle.get_default_dtype()
        )
        data.edge_attr = post
        std_epsilon = [1e-8]
        a = np.mean(data.edge_attr, axis=0)
        b = data.edge_attr.std(axis=0)
        b = np.maximum(b, std_epsilon).astype(paddle.get_default_dtype())
        data.edge_attr = (data.edge_attr - a) / b
        data.aoa = aoa
        data.norm_aoa = norm_aoa
        data.mach_or_reynolds = mach_or_reynolds
        data.norm_mach_or_reynolds = norm_mach_or_reynolds
        return data

    def _preprocess(self, tensor_list, stack_output=True):
        data_max, data_min = self.normalization_factors
        normalized_tensors = []
        for i in range(len(tensor_list)):
            normalized = (tensor_list[i] - data_min[i]) / (
                data_max[i] - data_min[i]
            ) * 2 - 1
            normalized_tensors.append(normalized)
        if stack_output:
            normalized_tensors = np.stack(normalized_tensors, axis=1)
        return normalized_tensors

    def _get_params_from_name(self, filename):
        s = filename.rsplit(".", 1)[0].split("_")
        aoa = np.array(s[s.index("aoa") + 1])[np.newaxis].astype(
            paddle.get_default_dtype()
        )
        reynolds = s[s.index("re") + 1]
        reynolds = (
            np.array(reynolds)[np.newaxis].astype(paddle.get_default_dtype())
            if reynolds != "None"
            else None
        )
        mach = np.array(s[s.index("mach") + 1])[np.newaxis].astype(
            paddle.get_default_dtype()
        )
        return aoa, reynolds, mach

MeshCylinderDataset

Bases: Dataset

Dataset for MeshCylinder.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Name of input data.

required
label_keys Tuple[str, ...]

Name of label data.

required
data_dir str

Directory of MeshCylinder data.

required
mesh_graph_path str

Path of mesh graph.

required

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.MeshAirfoilDataset(
...     "input_keys": ("input",),
...     "label_keys": ("output",),
...     "data_dir": "/path/to/MeshAirfoilDataset",
...     "mesh_graph_path": "/path/to/file.su2",
... )
Source code in ppsci/data/dataset/cylinder_dataset.py
class MeshCylinderDataset(io.Dataset):
    """Dataset for `MeshCylinder`.

    Args:
        input_keys (Tuple[str, ...]): Name of input data.
        label_keys (Tuple[str, ...]): Name of label data.
        data_dir (str): Directory of MeshCylinder data.
        mesh_graph_path (str): Path of mesh graph.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.MeshAirfoilDataset(
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ...     "data_dir": "/path/to/MeshAirfoilDataset",
        ...     "mesh_graph_path": "/path/to/file.su2",
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    use_pgl: bool = True

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        mesh_graph_path: str,
    ):
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.file_list = os.listdir(self.data_dir)
        self.len = len(self.file_list)
        self.mesh_graph = airfoil_dataset._get_mesh_graph(mesh_graph_path)

        self.normalization_factors = np.array(
            [[978.6001, 48.9258, 24.8404], [-692.3159, -6.9950, -24.8572]],
            dtype=paddle.get_default_dtype(),
        )

        self.nodes = self.mesh_graph[0]
        self.meshnodes = self.mesh_graph[0]
        self.edges = self.mesh_graph[1]
        self.elems_list = self.mesh_graph[2]
        self.marker_dict = self.mesh_graph[3]
        self.bounder = []
        self.node_markers = np.full([self.nodes.shape[0], 1], fill_value=-1)
        for i, (marker_tag, marker_elems) in enumerate(self.marker_dict.items()):
            for elem in marker_elems:
                self.node_markers[elem[0]] = i
                self.node_markers[elem[1]] = i

        self.raw_graphs = [self.get(i) for i in range(len(self))]

    def __len__(self):
        return self.len

    def __getitem__(self, idx):
        return (
            {
                self.input_keys[0]: self.raw_graphs[idx],
            },
            {
                self.label_keys[0]: self.raw_graphs[idx],
            },
            None,
        )

    def get(self, idx):
        with open(osp.join(self.data_dir, self.file_list[idx]), "r") as f:
            field = []
            pos = []
            for line in f.read().splitlines()[1:]:
                lines_pos = line.split(",")[1:3]
                lines_field = line.split(",")[3:]
                numbers_float = list(eval(i) for i in lines_pos)
                array = np.array(numbers_float, paddle.get_default_dtype())
                pos.append(array)
                numbers_float = list(eval(i) for i in lines_field)
                array = np.array(numbers_float, paddle.get_default_dtype())
                field.append(array)

        field = np.stack(field, axis=0)
        pos = np.stack(pos, axis=0)
        indexlist = []
        for i in range(self.meshnodes.shape[0]):
            b = self.meshnodes[i : (i + 1)]
            b = np.squeeze(b)
            index = np.nonzero(
                np.sum((pos == b), axis=1, dtype=paddle.get_default_dtype())
                == pos.shape[1]
            )
            indexlist.append(index)
        indexlist = np.stack(indexlist, axis=0)
        indexlist = np.squeeze(indexlist)
        fields = field[indexlist]
        velocity = self._get_params_from_name(self.file_list[idx])

        norm_aoa = velocity / 40
        # add physics parameters to graph
        nodes = np.concatenate(
            [
                self.nodes,
                np.repeat(a=norm_aoa, repeats=self.nodes.shape[0])[:, np.newaxis],
                self.node_markers,
            ],
            axis=-1,
        ).astype(paddle.get_default_dtype())

        data = pgl.Graph(
            num_nodes=nodes.shape[0],
            edges=self.edges,
        )
        data.x = nodes
        data.y = fields
        data.pos = self.nodes
        data.edge_index = self.edges
        data.velocity = velocity

        sender = data.x[data.edge_index[0]]
        receiver = data.x[data.edge_index[1]]
        relation_pos = sender[:, 0:2] - receiver[:, 0:2]
        post = np.linalg.norm(relation_pos, ord=2, axis=1, keepdims=True).astype(
            paddle.get_default_dtype()
        )
        data.edge_attr = post
        std_epsilon = [1e-8]
        a = np.mean(data.edge_attr, axis=0)
        b = data.edge_attr.std(axis=0)
        b = np.maximum(b, std_epsilon).astype(paddle.get_default_dtype())
        data.edge_attr = (data.edge_attr - a) / b
        a = np.mean(data.y, axis=0)
        b = data.y.std(axis=0)
        b = np.maximum(b, std_epsilon).astype(paddle.get_default_dtype())
        data.y = (data.y - a) / b
        data.norm_max = a
        data.norm_min = b

        # find the face of the boundary,our cylinder dataset come from fluent solver
        with open(osp.join(osp.dirname(self.data_dir), "bounder"), "r") as f:
            field = []
            pos = []
            for line in f.read().splitlines()[1:]:
                lines_pos = line.split(",")[1:3]
                lines_field = line.split(",")[3:]
                numbers_float = list(eval(i) for i in lines_pos)
                array = np.array(numbers_float, paddle.get_default_dtype())
                pos.append(array)
                numbers_float = list(eval(i) for i in lines_field)
                array = np.array(numbers_float, paddle.get_default_dtype())
                field.append(array)

        field = np.stack(field, axis=0)
        pos = np.stack(pos, axis=0)

        indexlist = []
        for i in range(pos.shape[0]):
            b = pos[i : (i + 1)]
            b = np.squeeze(b)
            index = np.nonzero(
                np.sum((self.nodes == b), axis=1, dtype=paddle.get_default_dtype())
                == self.nodes.shape[1]
            )
            indexlist.append(index)

        indexlist = np.stack(indexlist, axis=0)
        indexlist = np.squeeze(indexlist)
        self.bounder = indexlist
        return data

    def _get_params_from_name(self, filename):
        s = filename.rsplit(".", 1)[0]
        reynolds = np.array(s[13:])[np.newaxis].astype(paddle.get_default_dtype())
        return reynolds

RadarDataset

Bases: Dataset

Class for Radar dataset.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
image_width int

Image width.

required
image_height int

Image height.

required
total_length int

Total length.

required
dataset_path str

Dataset path.

required
data_type str

Input and output data type. Defaults to paddle.get_default_dtype().

get_default_dtype()
weight_dict Optional[Dict[str, float]]

Weight dictionary. Defaults to None.

None

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.RadarDataset(
...     "input_keys": ("input",),
...     "label_keys": ("output",),
...     "image_width": 512,
...     "image_height": 512,
...     "total_length": 29,
...     "dataset_path": "datasets/mrms/figure",
...     "data_type": paddle.get_default_dtype(),
... )
Source code in ppsci/data/dataset/radar_dataset.py
class RadarDataset(io.Dataset):
    """Class for Radar dataset.

    Args:
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        image_width (int): Image width.
        image_height (int): Image height.
        total_length (int): Total length.
        dataset_path (str): Dataset path.
        data_type (str): Input and output data type. Defaults to paddle.get_default_dtype().
        weight_dict (Optional[Dict[str, float]]): Weight dictionary. Defaults to None.

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.RadarDataset(
        ...     "input_keys": ("input",),
        ...     "label_keys": ("output",),
        ...     "image_width": 512,
        ...     "image_height": 512,
        ...     "total_length": 29,
        ...     "dataset_path": "datasets/mrms/figure",
        ...     "data_type": paddle.get_default_dtype(),
        ... )  # doctest: +SKIP
    """

    # Whether support batch indexing for speeding up fetching process.
    batch_index: bool = False

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        image_width: int,
        image_height: int,
        total_length: int,
        dataset_path: str,
        data_type: str = paddle.get_default_dtype(),
        weight_dict: Optional[Dict[str, float]] = None,
    ):
        super().__init__()
        if importlib.util.find_spec("cv2") is None:
            raise ModuleNotFoundError(
                "To use RadarDataset, please install 'opencv-python' with: `pip install "
                "opencv-python` first."
            )
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.img_width = image_width
        self.img_height = image_height
        self.length = total_length
        self.dataset_path = dataset_path
        self.data_type = data_type

        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.case_list = []
        name_list = os.listdir(self.dataset_path)
        name_list.sort()
        for name in name_list:
            case = []
            for i in range(29):
                case.append(
                    self.dataset_path
                    + "/"
                    + name
                    + "/"
                    + name
                    + "-"
                    + str(i).zfill(2)
                    + ".png"
                )
            self.case_list.append(case)

    def _load(self, index):
        data = []
        for img_path in self.case_list[index]:
            img = cv2.imread(img_path, 2)
            data.append(np.expand_dims(img, axis=0))
        data = np.concatenate(data, axis=0).astype(self.data_type) / 10.0 - 3.0
        assert data.shape[1] <= 1024 and data.shape[2] <= 1024
        return data

    def __getitem__(self, index):
        data = self._load(index)[-self.length :].copy()
        mask = np.ones_like(data)
        mask[data < 0] = 0
        data[data < 0] = 0
        data = np.clip(data, 0, 128)
        vid = np.zeros(
            (self.length, self.img_height, self.img_width, 2), dtype=self.data_type
        )
        vid[..., 0] = data
        vid[..., 1] = mask

        input_item = {self.input_keys[0]: vid}
        label_item = {}
        weight_item = {}
        for key in self.label_keys:
            label_item[key] = np.asarray([], paddle.get_default_dtype())
        if len(label_item) > 0:
            weight_shape = [1] * len(next(iter(label_item.values())).shape)
            weight_item = {
                key: np.full(weight_shape, value, paddle.get_default_dtype())
                for key, value in self.weight_dict.items()
            }
        return input_item, label_item, weight_item

    def __len__(self):
        return len(self.case_list)

DGMRDataset

Bases: Dataset

Dataset class for DGMR (Deep Generative Model for Radar) model. This open-sourced UK dataset has been mirrored to HuggingFace Datasets https://huggingface.co/datasets/openclimatefix/nimrod-uk-1km. If the reader cannot load the dataset from Hugging Face, please manually download it and modify the dataset_path to the local path for loading.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
split str

The split of the dataset, "validation" or "train". Defaults to "validation".

'validation'
num_input_frames int

Number of input frames. Defaults to 4.

4
num_target_frames int

Number of target frames. Defaults to 18.

18
dataset_path str

Path to the dataset. Defaults to "openclimatefix/nimrod-uk-1km".

'openclimatefix/nimrod-uk-1km'

Examples:

>>> import ppsci
>>> dataset = ppsci.data.dataset.DGMRDataset(("input", ), ("output", ))
Source code in ppsci/data/dataset/dgmr_dataset.py
class DGMRDataset(io.Dataset):
    """
    Dataset class for DGMR (Deep Generative Model for Radar) model.
    This open-sourced UK dataset has been mirrored to HuggingFace Datasets https://huggingface.co/datasets/openclimatefix/nimrod-uk-1km.
    If the reader cannot load the dataset from Hugging Face, please manually download it and modify the dataset_path to the local path for loading.

    Args:
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        split (str, optional): The split of the dataset, "validation" or "train". Defaults to "validation".
        num_input_frames (int, optional): Number of input frames. Defaults to 4.
        num_target_frames (int, optional): Number of target frames. Defaults to 18.
        dataset_path (str, optional): Path to the dataset. Defaults to "openclimatefix/nimrod-uk-1km".

    Examples:
        >>> import ppsci
        >>> dataset = ppsci.data.dataset.DGMRDataset(("input", ), ("output", )) # doctest: +SKIP
    """

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        split: str = "validation",
        num_input_frames: int = 4,
        num_target_frames: int = 18,
        dataset_path: str = "openclimatefix/nimrod-uk-1km",
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.num_input_frames = num_input_frames
        self.num_target_frames = num_target_frames
        if not importlib.util.find_spec("datasets"):
            raise ModuleNotFoundError(
                "Please install datasets with `pip install datasets`"
                " before exporting onnx model."
            )
        import datasets

        self.reader = datasets.load_dataset(
            dataset_path, "sample", split=split, streaming=True, trust_remote_code=True
        )
        self.iter_reader = self.reader

    def __len__(self):
        return 1000

    def __getitem__(self, idx):
        try:
            row = next(self.iter_reader)
        except Exception:
            rng = default_rng(42)
            self.iter_reader = iter(
                self.reader.shuffle(
                    seed=rng.integers(low=0, high=100000), buffer_size=1000
                )
            )
            row = next(self.iter_reader)
        radar_frames = row["radar_frames"]
        input_frames = radar_frames[
            -self.num_target_frames - self.num_input_frames : -self.num_target_frames
        ]
        target_frames = radar_frames[-self.num_target_frames :]
        input_item = {
            self.input_keys[0]: np.moveaxis(input_frames, [0, 1, 2, 3], [0, 2, 3, 1])
        }
        label_item = {
            self.label_keys[0]: np.moveaxis(target_frames, [0, 1, 2, 3], [0, 2, 3, 1])
        }
        return input_item, label_item

DarcyFlowDataset

Bases: Dataset

Loads a small Darcy-Flow dataset

Training contains 1000 samples in resolution 16x16. Testing contains 100 samples at resolution 16x16 and 50 samples at resolution 32x32.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
data_dir str

The directory to load data from.

required
weight_dict Optional[Dict[str, float]]

Define the weight of each constraint variable. Defaults to None.

None
test_resolutions List[int, ...]

The resolutions to test dataset. Default is [16, 32].

[32]
grid_boundaries List[int, ...]

The boundaries of the grid. Default is [[0,1],[0,1]].

[[0, 1], [0, 1]]
positional_encoding bool

Whether to use positional encoding. Default is True

True
encode_input bool

Whether to encode the input. Default is False

False
encode_output bool

Whether to encode the output. Default is True

True
encoding str

The type of encoding. Default is 'channel-wise'.

'channel-wise'
channel_dim int

The location of unsqueeze. Default is 1. where to put the channel dimension. Defaults size is batch, channel, height, width

1
data_split str

Wether to use training or test dataset. Default is 'train'.

'train'
Source code in ppsci/data/dataset/darcyflow_dataset.py
class DarcyFlowDataset(io.Dataset):
    """Loads a small Darcy-Flow dataset

    Training contains 1000 samples in resolution 16x16.
    Testing contains 100 samples at resolution 16x16 and
    50 samples at resolution 32x32.

    Args:
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        data_dir (str): The directory to load data from.
        weight_dict (Optional[Dict[str, float]], optional): Define the weight of each constraint variable. Defaults to None.
        test_resolutions (List[int,...]): The resolutions to test dataset. Default is [16, 32].
        grid_boundaries (List[int,...]): The boundaries of the grid. Default is [[0,1],[0,1]].
        positional_encoding (bool): Whether to use positional encoding. Default is True
        encode_input (bool): Whether to encode the input. Default is False
        encode_output (bool): Whether to encode the output. Default is True
        encoding (str): The type of encoding. Default is 'channel-wise'.
        channel_dim (int): The location of unsqueeze. Default is 1.
            where to put the channel dimension. Defaults size is batch, channel, height, width
        data_split (str): Wether to use training or test dataset. Default is 'train'.
    """

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        weight_dict: Optional[Dict[str, float]] = None,
        test_resolutions: Tuple[int, ...] = [32],
        train_resolution: int = 32,
        grid_boundaries: Tuple[Tuple[int, ...], ...] = [[0, 1], [0, 1]],
        positional_encoding: bool = True,
        encode_input: bool = False,
        encode_output: bool = True,
        encoding: str = "channel-wise",
        channel_dim: int = 1,
        data_split: str = "train",
    ):
        super().__init__()
        for res in test_resolutions:
            if res not in [16, 32]:
                raise ValueError(
                    f"Only 32 and 64 are supported for test resolution, but got {test_resolutions}"
                )

        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.test_resolutions = test_resolutions
        self.train_resolution = train_resolution
        self.grid_boundaries = grid_boundaries
        self.positional_encoding = positional_encoding
        self.encode_input = encode_input
        self.encode_output = encode_output
        self.encoding = encoding
        self.channel_dim = channel_dim
        self.data_split = data_split

        # train path
        path_train = (
            Path(self.data_dir)
            .joinpath(f"darcy_train_{self.train_resolution}.npy")
            .as_posix()
        )
        self.x_train, self.y_train = self.read_data(path_train)
        # test path
        path_test_1 = (
            Path(self.data_dir)
            .joinpath(f"darcy_test_{self.test_resolutions[0]}.npy")
            .as_posix()
        )
        self.x_test_1, self.y_test_1 = self.read_data(path_test_1)
        path_test_2 = (
            Path(self.data_dir)
            .joinpath(f"darcy_test_{self.test_resolutions[1]}.npy")
            .as_posix()
        )
        self.x_test_2, self.y_test_2 = self.read_data(path_test_2)

        # input encoder
        if self.encode_input:
            self.input_encoder = self.encode_data(self.x_train)
            self.x_train = self.input_encoder.encode(self.x_train)
            self.x_test_1 = self.input_encoder.encode(self.x_test_1)
            self.x_test_2 = self.input_encoder.encode(self.x_test_2)
        else:
            self.input_encoder = None
        # output encoder
        if self.encode_output:
            self.output_encoder = self.encode_data(self.y_train)
            self.y_train = self.output_encoder.encode(self.y_train)
        else:
            self.output_encoder = None

        if positional_encoding:
            self.transform_x = PositionalEmbedding2D(grid_boundaries)

    def read_data(self, path):
        # load with numpy
        data = np.load(path, allow_pickle=True).item()
        x = (
            paddle.to_tensor(data["x"])
            .unsqueeze(self.channel_dim)
            .astype("float32")
            .clone()
        )
        y = paddle.to_tensor(data["y"]).unsqueeze(self.channel_dim).clone()
        del data
        return x, y

    def encode_data(self, data):
        if self.encoding == "channel-wise":
            reduce_dims = list(range(data.ndim))
        elif self.encoding == "pixel-wise":
            reduce_dims = [0]
        input_encoder = UnitGaussianNormalizer(data, reduce_dim=reduce_dims)
        return input_encoder

    def __len__(self):
        if self.data_split == "train":
            return self.x_train.shape[0]
        elif self.data_split == "test_16x16":
            return self.x_test_1.shape[0]
        else:
            return self.x_test_2.shape[0]

    def __getitem__(self, index):
        if self.data_split == "train":
            x = self.x_train[index]
            y = self.y_train[index]

        elif self.data_split == "test_16x16":
            x = self.x_test_1[index]
            y = self.y_test_1[index]
        else:
            x = self.x_test_2[index]
            y = self.y_test_2[index]

        if self.transform_x is not None:
            x = self.transform_x(x)

        input_item = {self.input_keys[0]: x}
        label_item = {self.label_keys[0]: y}
        weight_item = self.weight_dict

        return input_item, label_item, weight_item

SphericalSWEDataset

Bases: Dataset

Loads a Spherical Shallow Water equations dataset

Training contains 200 samples in resolution 32x64. Testing contains 50 samples at resolution 32x64 and 50 samples at resolution 64x128.

Parameters:

Name Type Description Default
input_keys Tuple[str, ...]

Input keys, such as ("input",).

required
label_keys Tuple[str, ...]

Output keys, such as ("output",).

required
data_dir str

The directory to load data from.

required
weight_dict Optional[Dict[str, float]]

Define the weight of each constraint variable. Defaults to None.

None
test_resolutions Tuple[str, ...]

The resolutions to test dataset. Defaults to ["34x64", "64x128"].

['34x64', '64x128']
train_resolution str

The resolutions to train dataset. Defaults to "34x64".

'34x64'
data_split str

Specify the dataset split, either 'train' , 'test_32x64',or 'test_64x128'. Defaults to "train".

'train'
Source code in ppsci/data/dataset/spherical_swe_dataset.py
class SphericalSWEDataset(io.Dataset):
    """Loads a Spherical Shallow Water equations dataset

    Training contains 200 samples in resolution 32x64.
    Testing contains 50 samples at resolution 32x64 and 50 samples at resolution 64x128.

    Args:
        input_keys (Tuple[str, ...]): Input keys, such as ("input",).
        label_keys (Tuple[str, ...]): Output keys, such as ("output",).
        data_dir (str): The directory to load data from.
        weight_dict (Optional[Dict[str, float]], optional): Define the weight of each constraint variable.
            Defaults to None.
        test_resolutions (Tuple[str, ...], optional): The resolutions to test dataset. Defaults to ["34x64", "64x128"].
        train_resolution (str, optional): The resolutions to train dataset. Defaults to "34x64".
        data_split (str, optional): Specify the dataset split, either 'train' , 'test_32x64',or 'test_64x128'.
            Defaults to "train".

    """

    def __init__(
        self,
        input_keys: Tuple[str, ...],
        label_keys: Tuple[str, ...],
        data_dir: str,
        weight_dict: Optional[Dict[str, float]] = None,
        test_resolutions: Tuple[str, ...] = ["34x64", "64x128"],
        train_resolution: str = "34x64",
        data_split: str = "train",
    ):
        super().__init__()
        self.input_keys = input_keys
        self.label_keys = label_keys
        self.data_dir = data_dir
        self.weight_dict = {} if weight_dict is None else weight_dict
        if weight_dict is not None:
            self.weight_dict = {key: 1.0 for key in self.label_keys}
            self.weight_dict.update(weight_dict)

        self.test_resolutions = test_resolutions
        self.train_resolution = train_resolution
        self.data_split = data_split

        # train path
        path_train = (
            Path(self.data_dir)
            .joinpath(f"train_SWE_{self.train_resolution}.npy")
            .as_posix()
        )
        self.x_train, self.y_train = self.read_data(path_train)
        # test path
        path_test_1 = (
            Path(self.data_dir)
            .joinpath(f"test_SWE_{self.test_resolutions[0]}.npy")
            .as_posix()
        )
        self.x_test_1, self.y_test_1 = self.read_data(path_test_1)
        path_test_2 = (
            Path(self.data_dir)
            .joinpath(f"test_SWE_{self.test_resolutions[1]}.npy")
            .as_posix()
        )
        self.x_test_2, self.y_test_2 = self.read_data(path_test_2)

    def read_data(self, path):
        # load with numpy
        data = np.load(path, allow_pickle=True).item()
        x = data["x"].astype("float32")
        y = data["y"].astype("float32")
        del data
        return x, y

    def __len__(self):
        if self.data_split == "train":
            return self.x_train.shape[0]
        elif self.data_split == "test_32x64":
            return self.x_test_1.shape[0]
        else:
            return self.x_test_2.shape[0]

    def __getitem__(self, index):
        if self.data_split == "train":
            x = self.x_train[index]
            y = self.y_train[index]

        elif self.data_split == "test_32x64":
            x = self.x_test_1[index]
            y = self.y_test_1[index]
        else:
            x = self.x_test_2[index]
            y = self.y_test_2[index]

        input_item = {self.input_keys[0]: x}
        label_item = {self.label_keys[0]: y}
        weight_item = self.weight_dict

        return input_item, label_item, weight_item

build_dataset(cfg)

Build dataset

Parameters:

Name Type Description Default
cfg List[DictConfig]

Dataset config list.

required

Returns:

Type Description
Dataset

Dict[str, io.Dataset]: dataset.

Source code in ppsci/data/dataset/__init__.py
def build_dataset(cfg) -> "io.Dataset":
    """Build dataset

    Args:
        cfg (List[DictConfig]): Dataset config list.

    Returns:
        Dict[str, io.Dataset]: dataset.
    """
    cfg = copy.deepcopy(cfg)

    dataset_cls = cfg.pop("name")
    if "transforms" in cfg:
        cfg["transforms"] = transform.build_transforms(cfg.pop("transforms"))

    dataset = eval(dataset_cls)(**cfg)

    logger.debug(str(dataset))

    return dataset