Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 147 additions & 134 deletions matchzoo/dataloader/dataloader.py
Original file line number Diff line number Diff line change
@@ -1,134 +1,147 @@
"""Basic data loader."""
import typing
import math

import numpy as np
import torch
from torch.utils import data

from matchzoo.dataloader.dataset import Dataset
from matchzoo.engine.base_callback import BaseCallback


class DataLoader(object):
"""
DataLoader that loads batches of data from a Dataset.

:param dataset: The Dataset object to load data from.
:param device: The desired device of returned tensor. Default: if None,
use the current device. If `torch.device` or int, use device specified
by user. If list, the first item will be used.
:param stage: One of "train", "dev", and "test". (default: "train")
:param callback: BaseCallback. See
`matchzoo.engine.base_callback.BaseCallback` for more details.
:param pin_momory: If set to `True`, tensors will be copied into
pinned memory. (default: `False`)
:param timeout: The timeout value for collecting a batch from workers. (
default: 0)
:param num_workers: The number of subprocesses to use for data loading. 0
means that the data will be loaded in the main process. (default: 0)
:param worker_init_fn: If not ``None``, this will be called on each
worker subprocess with the worker id (an int in [0, num_workers - 1])
as input, after seeding and before data loading. (default: None)

Examples:
>>> import matchzoo as mz
>>> data_pack = mz.datasets.toy.load_data(stage='train')
>>> preprocessor = mz.preprocessors.BasicPreprocessor()
>>> data_processed = preprocessor.fit_transform(data_pack)
>>> dataset = mz.dataloader.Dataset(
... data_processed, mode='point', batch_size=32)
>>> padding_callback = mz.dataloader.callbacks.BasicPadding()
>>> dataloader = mz.dataloader.DataLoader(
... dataset, stage='train', callback=padding_callback)
>>> len(dataloader)
4

"""

def __init__(
self,
dataset: Dataset,
device: typing.Union[torch.device, int, list, None] = None,
stage='train',
callback: BaseCallback = None,
pin_memory: bool = False,
timeout: int = 0,
num_workers: int = 0,
worker_init_fn=None,
):
"""Init."""
if stage not in ('train', 'dev', 'test'):
raise ValueError(f"{stage} is not a valid stage type."
f"Must be one of `train`, `dev`, `test`.")

if isinstance(device, list) and len(device):
device = device[0]
elif not (isinstance(device, torch.device) or isinstance(device, int)):
device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu")

self._dataset = dataset
self._pin_momory = pin_memory
self._timeout = timeout
self._num_workers = num_workers
self._worker_init_fn = worker_init_fn
self._device = device
self._stage = stage
self._callback = callback

self._dataloader = data.DataLoader(
self._dataset,
batch_size=None,
shuffle=False,
collate_fn=lambda x: x,
batch_sampler=None,
num_workers=self._num_workers,
pin_memory=self._pin_momory,
timeout=self._timeout,
worker_init_fn=self._worker_init_fn,
)

def __len__(self) -> int:
"""Get the total number of batches."""
return len(self._dataset)

@property
def id_left(self) -> np.ndarray:
"""`id_left` getter."""
x, _ = self._dataset[:]
return x['id_left']

@property
def label(self) -> np.ndarray:
"""`label` getter."""
_, y = self._dataset[:]
return y.squeeze() if y is not None else None

def __iter__(self) -> typing.Tuple[dict, torch.tensor]:
"""Iteration."""
for batch_data in self._dataloader:
x, y = batch_data
self._handle_callbacks_on_batch_unpacked(x, y)

batch_x = {}
for key, value in x.items():
if key == 'id_left' or key == 'id_right':
continue
batch_x[key] = torch.tensor(
value, device=self._device)

if self._stage == 'test':
yield batch_x, None
else:
if y.dtype == 'int': # task='classification'
batch_y = torch.tensor(
y.squeeze(axis=-1), dtype=torch.long, device=self._device)
else: # task='ranking'
batch_y = torch.tensor(
y, dtype=torch.float, device=self._device)
yield batch_x, batch_y

def _handle_callbacks_on_batch_unpacked(self, x, y):
if self._callback is not None:
self._callback.on_batch_unpacked(x, y)
"""Basic data loader."""
import typing
import math

import numpy as np
import torch
from torch.utils import data

from matchzoo.dataloader.dataset import Dataset
from matchzoo.engine.base_callback import BaseCallback


class DataLoader(object):
"""
DataLoader that loads batches of data from a Dataset.

:param dataset: The Dataset object to load data from.
:param device: The desired device of returned tensor. Default: if None,
use the current device. If `torch.device` or int, use device specified
by user. If list, the first item will be used.
:param stage: One of "train", "dev", and "test". (default: "train")
:param callback: BaseCallback. See
`matchzoo.engine.base_callback.BaseCallback` for more details.
:param pin_momory: If set to `True`, tensors will be copied into
pinned memory. (default: `False`)
:param timeout: The timeout value for collecting a batch from workers. (
default: 0)
:param num_workers: The number of subprocesses to use for data loading. 0
means that the data will be loaded in the main process. (default: 0)
:param worker_init_fn: If not ``None``, this will be called on each
worker subprocess with the worker id (an int in [0, num_workers - 1])
as input, after seeding and before data loading. (default: None)

Examples:
>>> import matchzoo as mz
>>> data_pack = mz.datasets.toy.load_data(stage='train')
>>> preprocessor = mz.preprocessors.BasicPreprocessor()
>>> data_processed = preprocessor.fit_transform(data_pack)
>>> dataset = mz.dataloader.Dataset(
... data_processed, mode='point', batch_size=32)
>>> padding_callback = mz.dataloader.callbacks.BasicPadding()
>>> dataloader = mz.dataloader.DataLoader(
... dataset, stage='train', callback=padding_callback)
>>> len(dataloader)
4

"""

def __init__(
self,
dataset: Dataset,
device: typing.Union[torch.device, int, list, None] = None,
stage='train',
callback: BaseCallback = None,
pin_memory: bool = False,
timeout: int = 0,
num_workers: int = 0,
worker_init_fn=None,
):
"""Init."""
if stage not in ('train', 'dev', 'test'):
raise ValueError(f"{stage} is not a valid stage type."
f"Must be one of `train`, `dev`, `test`.")

if isinstance(device, list) and len(device):
device = device[0]
elif not (isinstance(device, torch.device) or isinstance(device, int)):
device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu")

self._dataset = dataset
self._pin_momory = pin_memory
self._timeout = timeout
self._num_workers = num_workers
self._worker_init_fn = worker_init_fn
self._device = device
self._stage = stage
self._callback = callback

# Pass num_workers to dataset for proper worker partitioning
if hasattr(self._dataset, '_num_workers'):
self._dataset._num_workers = num_workers

# Create worker init function if not provided and workers > 0
if num_workers > 0 and worker_init_fn is None:
def _worker_init_fn(worker_id):
import os
os.environ['worker_id'] = str(worker_id)
if hasattr(self._dataset, 'worker_id'):
self._dataset.worker_id = worker_id
self._worker_init_fn = _worker_init_fn

self._dataloader = data.DataLoader(
self._dataset,
batch_size=None,
shuffle=False,
collate_fn=lambda x: x,
batch_sampler=None,
num_workers=self._num_workers,
pin_memory=self._pin_momory,
timeout=self._timeout,
worker_init_fn=self._worker_init_fn,
)

def __len__(self) -> int:
"""Get the total number of batches."""
return len(self._dataset)

@property
def id_left(self) -> np.ndarray:
"""`id_left` getter."""
x, _ = self._dataset[:]
return x['id_left']

@property
def label(self) -> np.ndarray:
"""`label` getter."""
_, y = self._dataset[:]
return y.squeeze() if y is not None else None

def __iter__(self) -> typing.Tuple[dict, torch.tensor]:
"""Iteration."""
for batch_data in self._dataloader:
x, y = batch_data
self._handle_callbacks_on_batch_unpacked(x, y)

batch_x = {}
for key, value in x.items():
if key == 'id_left' or key == 'id_right':
continue
batch_x[key] = torch.tensor(
value, device=self._device)

if self._stage == 'test':
yield batch_x, None
else:
if y.dtype == 'int': # task='classification'
batch_y = torch.tensor(
y.squeeze(axis=-1), dtype=torch.long, device=self._device)
else: # task='ranking'
batch_y = torch.tensor(
y, dtype=torch.float, device=self._device)
yield batch_x, batch_y

def _handle_callbacks_on_batch_unpacked(self, x, y):
if self._callback is not None:
self._callback.on_batch_unpacked(x, y)
Loading