Projects & Blogs

  • PROBABLISTIC APPR...

  • MINI-UNET

  • MINI-ALEXNET

  • PYTHON CODE GENER...

  • SEQUENTIAL MONTE ...

  • TRUNCATED SVD

  • CUSTOM DATALOADER...

  • PROBABILITY

import re
import os
import tiktoken
import math
import torch
from torch import nn
from torch.nn import functional as F

def remove_multiline_comments(code):
    # Remove all triple-quoted strings (both ''' and """)
    pattern = r"(\'\'\'[\s\S]*?\'\'\'|\"\"\"[\s\S]*?\"\"\")"
    return re.sub(pattern, '', code)

directory_in_str = "./dataset"
directory = os.fsencode(directory_in_str)
token_encoder = tiktoken.get_encoding("gpt2")
data = None
for path, folders, files in os.walk(directory):
    # print(path)
    for filename in files:
        filename_str = filename.decode('utf-8')
        if filename_str.endswith('.py'):
            with open(os.path.join(path, filename)) as f:
                original_code = f.read()
                cleaned_code = remove_multiline_comments(original_code)
                # print(cleaned_code)
                if data is None:
                    data = torch.tensor(token_encoder.encode(cleaned_code),dtype=torch.long)
                else:
                    data = torch.cat((data,torch.tensor(token_encoder.encode(cleaned_code),dtype=torch.long)),0)
print(data.shape)

n = int(0.9*len(data))
# train data
train_data = data[:n]
# validation data
val_data = data[n:]


torch.Size([12145966])
def masked_softmax(X: torch.Tensor, valid_lens: torch.Tensor):
    def _sequence_mask(X: torch.Tensor, valid_lens: torch.Tensor, value=0):
        maxlen = X.size(1)
        mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None,:] < valid_lens[:, None]
        # print(mask)
        X[~mask] = value
        return X
    if valid_lens is None:
        return nn.functional.softmax(X, dim=-1)
    else:
        shape = X.shape
        if valid_lens.dim() == 1:
            valid_lens = torch.repeat_interleave(valid_lens, shape[1])
        else:
            valid_lens = valid_lens.reshape(-1)
        X = _sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6)
        return nn.functional.softmax(X.reshape(shape), dim=-1)


class DotProductAttention(nn.Module):
    def __init__(self, dropout):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
    def forward(self, queries: torch.Tensor, keys: torch.Tensor, values: torch.Tensor, valid_lens = None):
        d = queries.shape[-1]
        scores = torch.bmm(queries, keys.transpose(1,2))/math.sqrt(d)
        self.attention_weights = masked_softmax(scores, valid_lens)
        return torch.bmm(self.dropout(self.attention_weights), values)
class MultiHeadAttention(nn.Module):
    def __init__(self, num_hiddens, num_heads, dropout, bias=False):
        super().__init__()
        self.num_heads = num_heads
        self.attention = DotProductAttention(dropout)
        # self.W_q = nn.LazyLinear(num_hiddens, bias=bias)
        # self.W_k = nn.LazyLinear(num_hiddens, bias=bias)
        # self.W_v = nn.LazyLinear(num_hiddens, bias=bias)
        self.W_o = nn.LazyLinear(num_hiddens, bias=bias)

    def transpose_qkv(self, X: torch.Tensor):
        X = X.reshape(X.shape[0], X.shape[1], self.num_heads, -1)
        X = X.permute(0,2,1,3)
        return X.reshape(-1, X.shape[2], X.shape[3])
    
    def transpose_output(self, X: torch.Tensor):
        X = X.reshape(-1, self.num_heads, X.shape[1], X.shape[2])
        X = X.permute(0,2,1,3)
        return X.reshape(X.shape[0], X.shape[1], -1)

    def forward(self, queries: torch.Tensor, keys: torch.Tensor, values: torch.Tensor, valid_lens: torch.Tensor = None):
        queries = self.transpose_qkv(queries)
        keys = self.transpose_qkv(keys)
        values = self.transpose_qkv(values)

        if valid_lens is not None:
            valid_lens = torch.repeat_interleave(valid_lens, repeats=self.num_heads, dim=0)
        output = self.attention(queries, keys, values, valid_lens)
        output_concat = self.transpose_output(output)
        return self.W_o(output_concat)
class AddNorm(nn.Module):  #@save
    """The residual connection followed by layer normalization."""
    def __init__(self, norm_shape, dropout):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.ln = nn.LayerNorm(norm_shape)

    def forward(self, X, Y):
        return self.ln(self.dropout(Y) + X)
    

class PositionWiseFFN(nn.Module):  #@save
    """The positionwise feed-forward network."""
    def __init__(self, ffn_num_hiddens, ffn_num_outputs):
        super().__init__()
        self.dense1 = nn.LazyLinear(ffn_num_hiddens)
        self.relu = nn.ReLU()
        self.dense2 = nn.LazyLinear(ffn_num_outputs)

    def forward(self, X):
        return self.dense2(self.relu(self.dense1(X)))

class PositionalEncoding(nn.Module):
    def __init__(self, num_hiddens, dropout, max_len = 1000):
        super().__init__()
        # n = 10000 as per paper
        self.n = 10000
        self.dropout = nn.Dropout(dropout)
        self.P = torch.zeros((1,max_len,num_hiddens))
        # k/n^(2i/d) with n = 10000
        expression = torch.arange(max_len, dtype=torch.float32).reshape(-1,1)/torch.pow(self.n, torch.arange(0,num_hiddens,2,dtype=torch.float32)/num_hiddens)
        self.P[:,:,0::2] = torch.sin(expression)
        self.P[:,:,1::2] = torch.cos(expression)
    def forward(self,X):
        X = X + self.P[:,:X.shape[1],:]
        return self.dropout(X)
class TransformerDecoder(nn.Module):
    def __init__(self, vocab_size, num_hiddens, ffn_num_hiddens, num_heads,
                 num_blks, dropout):
        super().__init__()
        self.num_hiddens = num_hiddens
        self.num_blks = num_blks
        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        self.pos_encoding = PositionalEncoding(num_hiddens, dropout)
        self.attention1 = MultiHeadAttention(num_hiddens, num_heads,
                                                 dropout)
        self.addnorm1 = AddNorm(num_hiddens, dropout)
        self.attention2 = MultiHeadAttention(num_hiddens, num_heads,
                                                 dropout)
        self.addnorm2 = AddNorm(num_hiddens, dropout)
        self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens)
        self.addnorm3 = AddNorm(num_hiddens, dropout)
        self.dense = nn.LazyLinear(vocab_size)

    def forward(self, X, targets=None):
        X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        X1 = self.attention1(X,X,X)
        X2 = self.addnorm1(X1,X)
        X3 = self.attention2(X2,X2,X2)
        X4 = self.addnorm2(X3,X2)
        X5 = self.ffn(X4)
        X6 = self.addnorm3(X5,X4)
        logits = self.dense(X6)
        if targets == None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
        return logits, loss

    def generate(self, X, max_new_tokens):
        for _ in range(max_new_tokens):
            logits, loss = self(X)
            logits = logits[:,-1,:]
            probs = F.softmax(logits, dim=-1)
            X_next = torch.multinomial(probs, num_samples=1) #(B,1)
            X = torch.cat((X, X_next), dim=1) #(B,T+1)
        return X 

    @property
    def attention_weights(self):
        return self._attention_weights
max_iters = 3000
eval_interval = 300
learning_rate = 1e-2
eval_iter = 200
n_embd = 32
batch_size = 100
block_size = 15
device = 'cuda' if torch.cuda.is_available() else 'cpu'
num_hiddens, num_blks, dropout = 1024, 2, 0.2
ffn_num_hiddens, num_heads = 512, 4

model = TransformerDecoder(
    token_encoder.n_vocab, num_hiddens, ffn_num_hiddens, num_heads,
    num_blks, dropout)
m = model.to(device)
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-3)

def get_batch(split):
    data = train_data if split == 'train' else val_data
    # Here ix is used as a start point for all the blocks
    ix = torch.randint(len(data)-block_size,(batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x,y

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iter)
        for k in range(eval_iter):
            X, Y = get_batch(split)
            logits, loss = model(X,Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

for iter in range(max_iters):
    if iter % eval_interval == 0:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # sample a batch of data
    xb, yb = get_batch('train')

    # evaluate the loss
    logits, loss = m(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()
step 0: train loss 10.8228, val loss 10.8197
step 300: train loss 3.2255, val loss 3.4917
step 600: train loss 3.1107, val loss 3.3543
step 900: train loss 3.0431, val loss 3.2838
step 1200: train loss 2.9613, val loss 3.2107
step 1500: train loss 2.9022, val loss 3.1645
step 1800: train loss 2.8660, val loss 3.1255
step 2100: train loss 2.8536, val loss 3.1156
step 2400: train loss 2.8228, val loss 3.0887
step 2700: train loss 2.7866, val loss 3.0627
context = torch.tensor(token_encoder.encode('def'),dtype = torch.long).unsqueeze(1)
print(token_encoder.decode(m.generate(context, max_new_tokens=500)[0].tolist()))
def test_factorial)
  in range(30*Y terms:
        Lorentz, b = IMT + 9 get_dep_basic]))
  Gtok mat.node) -> list[int = {}

  Point(2 + 27895456769,("*. passes = 1, Dummy
      _constructinal.subs3, assumptions(self:
    zero=False), C in range(t))
  "__all_st__():
        for changeacer) == self.visited_nl}},
  r
  
           coeff(x)._term(expr.contraction])
                 if issubclass Binary and y, ode1],
            if arg in range(x, z0,0, z).contract_end(_timed))

                    
                                
def rv)
         if is o =
                 if flago = Perm + 5, n
          return self.154 + 86(y']
             if fn.")

     if not should be very "Eq, Boolean]) with correctIL'),
   if orderVariables] > 0: str(
            if len(self.1111 diminishing.sync(5:
   if not is not None: evalf(Point(x))/3, 2:
          if not iterate(arg", "range(x)) == asin_polys, QQ.row><union():
  if nquo for num_forerThan.should41*32),
  
2024 Debashis Blogs...
Contact
LinkedIn
Privacy