# Last modified: 2024-04-18 # # Copyright 2023 Bingxin Ke, ETH Zurich. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # -------------------------------------------------------------------------- # If you find this code useful, we kindly ask you to cite our paper in your work. # Please find bibtex at: https://github.com/prs-eth/Marigold#-citation # If you use or adapt this code, please attribute to https://github.com/prs-eth/marigold. # More information about the method can be found at https://marigoldmonodepth.github.io # -------------------------------------------------------------------------- import torch from torch.utils.data import ( BatchSampler, RandomSampler, SequentialSampler, ) class MixedBatchSampler(BatchSampler): """Sample one batch from a selected dataset with given probability. Compatible with datasets at different resolution """ def __init__( self, src_dataset_ls, batch_size, drop_last, shuffle, prob=None, generator=None ): self.base_sampler = None self.batch_size = batch_size self.shuffle = shuffle self.drop_last = drop_last self.generator = generator self.src_dataset_ls = src_dataset_ls self.n_dataset = len(self.src_dataset_ls) # Dataset length self.dataset_length = [len(ds) for ds in self.src_dataset_ls] self.cum_dataset_length = [ sum(self.dataset_length[:i]) for i in range(self.n_dataset) ] # cumulative dataset length # BatchSamplers for each source dataset if self.shuffle: self.src_batch_samplers = [ BatchSampler( sampler=RandomSampler( ds, replacement=False, generator=self.generator ), batch_size=self.batch_size, drop_last=self.drop_last, ) for ds in self.src_dataset_ls ] else: self.src_batch_samplers = [ BatchSampler( sampler=SequentialSampler(ds), batch_size=self.batch_size, drop_last=self.drop_last, ) for ds in self.src_dataset_ls ] self.raw_batches = [ list(bs) for bs in self.src_batch_samplers ] # index in original dataset self.n_batches = [len(b) for b in self.raw_batches] self.n_total_batch = sum(self.n_batches) # sampling probability if prob is None: # if not given, decide by dataset length self.prob = torch.tensor(self.n_batches) / self.n_total_batch else: self.prob = torch.as_tensor(prob) def __iter__(self): """_summary_ Yields: list(int): a batch of indics, corresponding to ConcatDataset of src_dataset_ls """ for _ in range(self.n_total_batch): idx_ds = torch.multinomial( self.prob, 1, replacement=True, generator=self.generator ).item() # if batch list is empty, generate new list if 0 == len(self.raw_batches[idx_ds]): self.raw_batches[idx_ds] = list(self.src_batch_samplers[idx_ds]) # get a batch from list batch_raw = self.raw_batches[idx_ds].pop() # shift by cumulative dataset length shift = self.cum_dataset_length[idx_ds] batch = [n + shift for n in batch_raw] yield batch def __len__(self): return self.n_total_batch # Unit test if "__main__" == __name__: from torch.utils.data import ConcatDataset, DataLoader, Dataset class SimpleDataset(Dataset): def __init__(self, start, len) -> None: super().__init__() self.start = start self.len = len def __len__(self): return self.len def __getitem__(self, index): return self.start + index dataset_1 = SimpleDataset(0, 10) dataset_2 = SimpleDataset(200, 20) dataset_3 = SimpleDataset(1000, 50) concat_dataset = ConcatDataset( [dataset_1, dataset_2, dataset_3] ) # will directly concatenate mixed_sampler = MixedBatchSampler( src_dataset_ls=[dataset_1, dataset_2, dataset_3], batch_size=4, drop_last=True, shuffle=False, prob=[0.6, 0.3, 0.1], generator=torch.Generator().manual_seed(0), ) loader = DataLoader(concat_dataset, batch_sampler=mixed_sampler) for d in loader: print(d)