tez is a lightweight wrapper on top of pytorch, which is used to simplify training pipeline logics.
tez
tez splits map and reduce tasks into smaller tasks, these smaller tasks can be combined more efficiently for computing in one biger DAG task
components
- input analyse input data and abstract them into key/value it can be LocalMergedInput, ShuffledMergedInput
- output analyse output data and put user generated key/value into file system it can be InMemorySortedOutput, LocalOnFileSorterOutput, onFileSortedOutput
- paritioner slicing data
- processor abstractinng process
- task is combi of input, output and processor it can be RunTimeTask
- master central manager of tasks, so that they can be processed depending on dependencies
example
sentiment analysis
from datasets import load_dataset
from labels import mapping
import pandas as pd
pd.set_option('display.max_colwidth', -1)
import numpy as np
from tqdm.notebook import tqdm
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
import tez
import torch
import torch.nn as nn
import transformers
from sklearn import metrics, model_selection, preprocessing
from transformers import AdamW, get_linear_schedule_with_warmup
#load data
go_emotions = load_dataset("go_emotions")
data = go_emotions.data
train = go_emotions.data["train"].to_pandas()
valid = go_emotions.data["validation"].to_pandas()
test = go_emotions.data["test"].to_pandas()
#one hot label
n_labels = len(mapping)
def one_hot_labels(df):
dict_labels = []
for i in tqdm(range(len(df)), leave=False):
d = dict(zip(range(n_labels), [0]*n_labels))
labels = df.loc[i]["labels"]
for label in labels:
d[label] = 1
dict_labels.append(d)
df_labels = pd.DataFrame(dict_labels)
return df_labels
train_oh_labels = one_hot_labels(train)
valid_oh_labels = one_hot_labels(valid)
test_oh_labels = one_hot_labels(test)
train = pd.concat([train, train_oh_labels], axis=1)
valid = pd.concat([valid, valid_oh_labels], axis=1)
test = pd.concat([test, test_oh_labels], axis=1)
#defind dataset
class GoEmotionDataset():
def __init__(self, texts, targets):
self.texts = texts
self.targets = targets
self.tokenizer = transformers.SqueezeBertTokenizer.from_pretrained(
"squeezebert/squeezebert-uncased", do_lower_case=True
)
self.max_len = 35
def __len__(self):
return len(self.texts)
def __getitem__(self, index):
target = self.targets[index]
text = self.texts[index]
inputs = self.tokenizer.encode_plus(text,
None,
add_special_tokens=True,
max_length=self.max_len,
padding="max_length",
truncation=True)
ids = inputs["input_ids"]
mask = inputs["attention_mask"]
return {
"ids": torch.tensor(ids, dtype=torch.long),
"mask": torch.tensor(mask, dtype=torch.long),
"targets": torch.tensor(self.targets[index], dtype=torch.long),
}
#define tez model
class EmotionClassifier(tez.Model):
def __init__(self, num_train_steps, num_classes):
super().__init__()
self.bert = transformers.SqueezeBertModel.from_pretrained("squeezebert/squeezebert-uncased")
self.bert_drop = nn.Dropout(0.3)
self.out = nn.Linear(768, num_classes)
self.num_train_steps = num_train_steps
self.step_scheduler_after = "batch"
def fetch_optimizer(self):
param_optimizer = list(self.named_parameters())
no_decay = ["bias", "LayerNorm.bias"]
optimizer_parameters = [
{
"params": [
p for n, p in param_optimizer if not any(nd in n for nd in no_decay)
],
"weight_decay": 0.001,
},
{
"params": [
p for n, p in param_optimizer if any(nd in n for nd in no_decay)
],
"weight_decay": 0.0,
},
]
opt = AdamW(optimizer_parameters, lr=3e-5)
return opt
def fetch_scheduler(self):
sch = get_linear_schedule_with_warmup(
self.optimizer, num_warmup_steps=0, num_training_steps=self.num_train_steps
)
return sch
def loss(self, outputs, targets):
if targets is None:
return None
return nn.BCEWithLogitsLoss()(outputs, targets.float())
def monitor_metrics(self, outputs, targets):
if targets is None:
return {}
outputs = torch.sigmoid(outputs)
outputs = outputs.cpu().detach().numpy()
targets = targets.cpu().detach().numpy()
fpr_micro, tpr_micro, _ = metrics.roc_curve(targets.ravel(), outputs.ravel())
auc_micro = metrics.auc(fpr_micro, tpr_micro)
return {"auc": auc_micro}
def forward(self, ids, mask, targets=None):
o_2 = self.bert(ids, attention_mask=mask)["pooler_output"]
b_o = self.bert_drop(o_2)
output = self.out(b_o)
loss = self.loss(output, targets)
acc = self.monitor_metrics(output, targets)
return output, loss, acc
#start training
train_dataset = GoEmotionDataset(train.text.tolist(), train[range(n_labels)].values.tolist())
valid_dataset = GoEmotionDataset(valid.text.tolist(), valid[range(n_labels)].values.tolist())
n_train_steps = int(len(train) / 32 * 10)
model = EmotionClassifier(n_train_steps, n_labels)
tb_logger = tez.callbacks.TensorBoardLogger(log_dir="logs/")
es = tez.callbacks.EarlyStopping(monitor="valid_loss", model_path="export/model.bin")
model.fit(train_dataset,
valid_dataset,
train_bs=64,
device="cuda",
epochs=8,
callbacks=[tb_logger, es],
fp16=True,
n_jobs=10)
test_dataset = GoEmotionDataset(test.text.tolist(), test[range(n_labels)].values.tolist())
dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False, drop_last=False)
#evaluate
outputs = []
with torch.no_grad():
for i, batch in tqdm(enumerate(dataloader), total=len(dataloader)):
output, loss, acc = model.forward(batch["ids"].to("cuda"),
batch["mask"].to("cuda"),
#batch["token_type_ids"].to("cuda"),
batch["targets"].to("cuda")
)
outputs.append(output)
outputs = torch.cat(outputs)
outputs = torch.sigmoid(outputs)
outputs = outputs.cpu().detach().numpy()
roc_metrics = []
for i in range(n_labels):
roc = metrics.roc_auc_score(test[i].values, outputs[:, i])
roc_metrics.append(roc)
s = pd.Series(roc_metrics, index=range(n_labels))
s.plot(kind="bar", figsize=(20, 5), title="roc auc score per class on test data", grid=True)
#test
tokenizer = transformers.SqueezeBertTokenizer.from_pretrained(
"squeezebert/squeezebert-uncased", do_lower_case=True
)
def score_sentence(text, topn=5):
max_len = 35
with torch.no_grad():
inputs = tokenizer.encode_plus(text,
None,
add_special_tokens=True,
max_length=max_len,
padding="max_length",
truncation=True)
ids = inputs["input_ids"]
ids = torch.LongTensor(ids).cuda().unsqueeze(0)
attention_mask = inputs["attention_mask"]
attention_mask = torch.LongTensor(attention_mask).cuda().unsqueeze(0)
output = model.forward(ids, attention_mask)[0]
output = torch.sigmoid(output)
probas, indices = torch.sort(output)
probas = probas.cpu().numpy()[0][::-1]
indices = indices.cpu().numpy()[0][::-1]
for i, p in zip(indices[:topn], probas[:topn]):
print(mapping[i], p)