Projects & Blogs
LATEST TRANSFORMER
PROBABLISTIC APPR...
MINI-UNET
BASICS OF TRITON
MINI-ALEXNET
PYTHON CODE GENER...
SEQUENTIAL MONTE ...
TRUNCATED SVD
CUSTOM DATALOADER...
PROBABILITY
import os
import matplotlib.pyplot as plt
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset
import torchvision.transforms as transforms
import multiprocessingBASE_DIR = os.path.join('..','Neural Networks for Images')
image_path = os.path.join(BASE_DIR,'imagenet-mini')
labels_path = os.path.join(BASE_DIR, 'words.txt')
BASE_DIR = os.path.join('.')
image_path = os.path.join(BASE_DIR,'imagenet-mini')
labels_path = os.path.join(BASE_DIR, 'words.txt')
class ImageNetMiniDataset(Dataset):
    def __init__(self, annotations_file, img_dir, resize=(224,224), train=True, transform=None, target_transform=None):
        self.img_labels = self.read_labels(annotations_file)
        self.img_dir_paths = self.read_image_paths(img_dir,train)
        self.train = train
        self.transform = transform
        self.target_transform = target_transform
        self.resize_dim = resize
    def read_labels(self,path):
        labels_dict = dict()
        label_value = 0
        with open(path, 'r') as f:
            while True:
                label = f.readline()
                if len(label) == 0:
                    break
                label = label.split('\t')
                labels_dict[label[0]] = label[1]
                label_value += 1
        return labels_dict
    def read_image_paths(self,img_dir,train):
        sub_path = 'train' if train else 'val'
        result = []
        path = os.path.join(img_dir, sub_path)
        for current in os.listdir(path):
            current_path = os.path.join(path,current)
            for current_file in os.listdir(current_path):
                result.append((os.path.join(current_path,current_file),current))
        return result
    def __len__(self):
        return len(self.img_dir_paths)
    def __getitem__(self, idx):
        img_path, label = self.img_dir_paths[idx]
        image = cv2.resize(cv2.imread(img_path), self.resize_dim, interpolation=cv2.INTER_AREA)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label
transform = transforms.Compose(
    [
        transforms.ToTensor()
    ]
)class LazyDataLoader:
    def __init__(self, dataset, batch_size=10, shuffle=False, num_workers=0):
        self.dataset = dataset
        self.batch_size = batch_size
        # self.shuffle = shuffle
        self.num_workers = num_workers
        self.total_batches = len(dataset)//batch_size + (0 if len(dataset)%batch_size == 0 else 1)
        self.extras = 0 if len(dataset)%batch_size == 0 else len(dataset)%batch_size
        self.label_map = dict()
    
    def __len__(self):
        return self.total_batches
    
    def __iter__(self):
        counter = np.arange(self.total_batches*self.batch_size)
        np.random.shuffle(counter)
        self.counter = counter.reshape((self.total_batches,self.batch_size,-1))
        # print('batch-shape : ',self.counter.shape)
        self.current_batch = 0
        return self
    
    def fetch_data(self, index):
      image,label = self.dataset[index[0]]
      return (image,self.getLabelFromMap(label))
    
    def __next__(self):
        if self.current_batch >= self.counter.shape[0]:
            raise StopIteration
        if self.num_workers != 0:
            thread_pool = multiprocessing.Pool(self.num_workers)
            tensors_per_batch = thread_pool.map(self.fetch_data, [indexes for indexes in self.counter[self.current_batch] if indexes < len(self.dataset)])
            x_all = []
            y_all = []
            for values in tensors_per_batch:
                x_all.append(values[0])
                y_all.append(values[1])
            self.current_batch += 1
            return torch.stack(x_all),torch.LongTensor(y_all)
        else:
            # for current_batch in range(self.total_batches):
            x_all = []
            y_all = []
            for i in self.counter[self.current_batch]:
                if i >= len(self.dataset):
                    continue
                x,y = self.fetch_data(i)
                x_all.append(x)
                y_all.append(y)
            self.current_batch += 1
            # print(self.current_batch)
            return torch.stack(x_all),torch.LongTensor(y_all)
        raise StopIteration
    def getLabelFromMap(self, y):
        c_label = None
        if y in self.label_map:
            c_label = self.label_map[y]
        else:
            c_label = len(self.label_map)
            self.label_map[y] = c_label
        return c_labelimage_net_train_dataset = ImageNetMiniDataset(labels_path,image_path,resize=(224,224),transform=transform)
image_net_test_dataset = ImageNetMiniDataset(labels_path,image_path,resize=(224,224),train=False, transform=transform)
train_iter = LazyDataLoader(image_net_train_dataset, batch_size=1000, shuffle=True, num_workers=0)
# test_iter = LazyDataLoader(image_net_test_dataset, batch_size=100, num_workers=multiprocessing.cpu_count())
i = 1
for x,y in train_iter:  
    image = x[0]
    label = y[0]
    reverse_dict = {v : k for k,v in train_iter.label_map.items()}
    plt.subplot(1, 1, i)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(image.permute(1,2,0))
    key = reverse_dict[label.int().item()]
    plt.title(image_net_train_dataset.img_labels[key])
    if i>=1:
        break
    i+=1
plt.show()