Projects & Blogs
PROBABLISTIC APPR...
MINI-UNET
MINI-ALEXNET
PYTHON CODE GENER...
SEQUENTIAL MONTE ...
TRUNCATED SVD
CUSTOM DATALOADER...
PROBABILITY
import re
import os
import tiktoken
import math
import torch
from torch import nn
from torch.nn import functional as F
def remove_multiline_comments(code):
# Remove all triple-quoted strings (both ''' and """)
pattern = r"(\'\'\'[\s\S]*?\'\'\'|\"\"\"[\s\S]*?\"\"\")"
return re.sub(pattern, '', code)
directory_in_str = "./dataset"
directory = os.fsencode(directory_in_str)
token_encoder = tiktoken.get_encoding("gpt2")
data = None
for path, folders, files in os.walk(directory):
# print(path)
for filename in files:
filename_str = filename.decode('utf-8')
if filename_str.endswith('.py'):
with open(os.path.join(path, filename)) as f:
original_code = f.read()
cleaned_code = remove_multiline_comments(original_code)
# print(cleaned_code)
if data is None:
data = torch.tensor(token_encoder.encode(cleaned_code),dtype=torch.long)
else:
data = torch.cat((data,torch.tensor(token_encoder.encode(cleaned_code),dtype=torch.long)),0)
print(data.shape)
n = int(0.9*len(data))
# train data
train_data = data[:n]
# validation data
val_data = data[n:]
torch.Size([12145966])
def masked_softmax(X: torch.Tensor, valid_lens: torch.Tensor):
def _sequence_mask(X: torch.Tensor, valid_lens: torch.Tensor, value=0):
maxlen = X.size(1)
mask = torch.arange((maxlen), dtype=torch.float32, device=X.device)[None,:] < valid_lens[:, None]
# print(mask)
X[~mask] = value
return X
if valid_lens is None:
return nn.functional.softmax(X, dim=-1)
else:
shape = X.shape
if valid_lens.dim() == 1:
valid_lens = torch.repeat_interleave(valid_lens, shape[1])
else:
valid_lens = valid_lens.reshape(-1)
X = _sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6)
return nn.functional.softmax(X.reshape(shape), dim=-1)
class DotProductAttention(nn.Module):
def __init__(self, dropout):
super().__init__()
self.dropout = nn.Dropout(dropout)
def forward(self, queries: torch.Tensor, keys: torch.Tensor, values: torch.Tensor, valid_lens = None):
d = queries.shape[-1]
scores = torch.bmm(queries, keys.transpose(1,2))/math.sqrt(d)
self.attention_weights = masked_softmax(scores, valid_lens)
return torch.bmm(self.dropout(self.attention_weights), values)
class MultiHeadAttention(nn.Module):
def __init__(self, num_hiddens, num_heads, dropout, bias=False):
super().__init__()
self.num_heads = num_heads
self.attention = DotProductAttention(dropout)
# self.W_q = nn.LazyLinear(num_hiddens, bias=bias)
# self.W_k = nn.LazyLinear(num_hiddens, bias=bias)
# self.W_v = nn.LazyLinear(num_hiddens, bias=bias)
self.W_o = nn.LazyLinear(num_hiddens, bias=bias)
def transpose_qkv(self, X: torch.Tensor):
X = X.reshape(X.shape[0], X.shape[1], self.num_heads, -1)
X = X.permute(0,2,1,3)
return X.reshape(-1, X.shape[2], X.shape[3])
def transpose_output(self, X: torch.Tensor):
X = X.reshape(-1, self.num_heads, X.shape[1], X.shape[2])
X = X.permute(0,2,1,3)
return X.reshape(X.shape[0], X.shape[1], -1)
def forward(self, queries: torch.Tensor, keys: torch.Tensor, values: torch.Tensor, valid_lens: torch.Tensor = None):
queries = self.transpose_qkv(queries)
keys = self.transpose_qkv(keys)
values = self.transpose_qkv(values)
if valid_lens is not None:
valid_lens = torch.repeat_interleave(valid_lens, repeats=self.num_heads, dim=0)
output = self.attention(queries, keys, values, valid_lens)
output_concat = self.transpose_output(output)
return self.W_o(output_concat)
class AddNorm(nn.Module): #@save
"""The residual connection followed by layer normalization."""
def __init__(self, norm_shape, dropout):
super().__init__()
self.dropout = nn.Dropout(dropout)
self.ln = nn.LayerNorm(norm_shape)
def forward(self, X, Y):
return self.ln(self.dropout(Y) + X)
class PositionWiseFFN(nn.Module): #@save
"""The positionwise feed-forward network."""
def __init__(self, ffn_num_hiddens, ffn_num_outputs):
super().__init__()
self.dense1 = nn.LazyLinear(ffn_num_hiddens)
self.relu = nn.ReLU()
self.dense2 = nn.LazyLinear(ffn_num_outputs)
def forward(self, X):
return self.dense2(self.relu(self.dense1(X)))
class PositionalEncoding(nn.Module):
def __init__(self, num_hiddens, dropout, max_len = 1000):
super().__init__()
# n = 10000 as per paper
self.n = 10000
self.dropout = nn.Dropout(dropout)
self.P = torch.zeros((1,max_len,num_hiddens))
# k/n^(2i/d) with n = 10000
expression = torch.arange(max_len, dtype=torch.float32).reshape(-1,1)/torch.pow(self.n, torch.arange(0,num_hiddens,2,dtype=torch.float32)/num_hiddens)
self.P[:,:,0::2] = torch.sin(expression)
self.P[:,:,1::2] = torch.cos(expression)
def forward(self,X):
X = X + self.P[:,:X.shape[1],:]
return self.dropout(X)
class TransformerDecoder(nn.Module):
def __init__(self, vocab_size, num_hiddens, ffn_num_hiddens, num_heads,
num_blks, dropout):
super().__init__()
self.num_hiddens = num_hiddens
self.num_blks = num_blks
self.embedding = nn.Embedding(vocab_size, num_hiddens)
self.pos_encoding = PositionalEncoding(num_hiddens, dropout)
self.attention1 = MultiHeadAttention(num_hiddens, num_heads,
dropout)
self.addnorm1 = AddNorm(num_hiddens, dropout)
self.attention2 = MultiHeadAttention(num_hiddens, num_heads,
dropout)
self.addnorm2 = AddNorm(num_hiddens, dropout)
self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens)
self.addnorm3 = AddNorm(num_hiddens, dropout)
self.dense = nn.LazyLinear(vocab_size)
def forward(self, X, targets=None):
X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
X1 = self.attention1(X,X,X)
X2 = self.addnorm1(X1,X)
X3 = self.attention2(X2,X2,X2)
X4 = self.addnorm2(X3,X2)
X5 = self.ffn(X4)
X6 = self.addnorm3(X5,X4)
logits = self.dense(X6)
if targets == None:
loss = None
else:
B, T, C = logits.shape
logits = logits.view(B*T, C)
targets = targets.view(B*T)
loss = F.cross_entropy(logits, targets)
return logits, loss
def generate(self, X, max_new_tokens):
for _ in range(max_new_tokens):
logits, loss = self(X)
logits = logits[:,-1,:]
probs = F.softmax(logits, dim=-1)
X_next = torch.multinomial(probs, num_samples=1) #(B,1)
X = torch.cat((X, X_next), dim=1) #(B,T+1)
return X
@property
def attention_weights(self):
return self._attention_weights
max_iters = 3000
eval_interval = 300
learning_rate = 1e-2
eval_iter = 200
n_embd = 32
batch_size = 100
block_size = 15
device = 'cuda' if torch.cuda.is_available() else 'cpu'
num_hiddens, num_blks, dropout = 1024, 2, 0.2
ffn_num_hiddens, num_heads = 512, 4
model = TransformerDecoder(
token_encoder.n_vocab, num_hiddens, ffn_num_hiddens, num_heads,
num_blks, dropout)
m = model.to(device)
optimizer = torch.optim.AdamW(m.parameters(), lr=1e-3)
def get_batch(split):
data = train_data if split == 'train' else val_data
# Here ix is used as a start point for all the blocks
ix = torch.randint(len(data)-block_size,(batch_size,))
x = torch.stack([data[i:i+block_size] for i in ix])
y = torch.stack([data[i+1:i+block_size+1] for i in ix])
x, y = x.to(device), y.to(device)
return x,y
@torch.no_grad()
def estimate_loss():
out = {}
model.eval()
for split in ['train', 'val']:
losses = torch.zeros(eval_iter)
for k in range(eval_iter):
X, Y = get_batch(split)
logits, loss = model(X,Y)
losses[k] = loss.item()
out[split] = losses.mean()
model.train()
return out
for iter in range(max_iters):
if iter % eval_interval == 0:
losses = estimate_loss()
print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
# sample a batch of data
xb, yb = get_batch('train')
# evaluate the loss
logits, loss = m(xb, yb)
optimizer.zero_grad(set_to_none=True)
loss.backward()
optimizer.step()
step 0: train loss 10.8228, val loss 10.8197
step 300: train loss 3.2255, val loss 3.4917
step 600: train loss 3.1107, val loss 3.3543
step 900: train loss 3.0431, val loss 3.2838
step 1200: train loss 2.9613, val loss 3.2107
step 1500: train loss 2.9022, val loss 3.1645
step 1800: train loss 2.8660, val loss 3.1255
step 2100: train loss 2.8536, val loss 3.1156
step 2400: train loss 2.8228, val loss 3.0887
step 2700: train loss 2.7866, val loss 3.0627
context = torch.tensor(token_encoder.encode('def'),dtype = torch.long).unsqueeze(1)
print(token_encoder.decode(m.generate(context, max_new_tokens=500)[0].tolist()))
def test_factorial)
in range(30*Y terms:
Lorentz, b = IMT + 9 get_dep_basic]))
Gtok mat.node) -> list[int = {}
Point(2 + 27895456769,("*. passes = 1, Dummy
_constructinal.subs3, assumptions(self:
zero=False), C in range(t))
"__all_st__():
for changeacer) == self.visited_nl}},
r
coeff(x)._term(expr.contraction])
if issubclass Binary and y, ode1],
if arg in range(x, z0,0, z).contract_end(_timed))
def rv)
if is o =
if flago = Perm + 5, n
return self.154 + 86(y']
if fn.")
if not should be very "Eq, Boolean]) with correctIL'),
if orderVariables] > 0: str(
if len(self.1111 diminishing.sync(5:
if not is not None: evalf(Point(x))/3, 2:
if not iterate(arg", "range(x)) == asin_polys, QQ.row><union():
if nquo for num_forerThan.should41*32),