Fine-tune GPT-2 Using Hybrid Parallelism
Author: Hongxin Liu, Yongbin Li, Mingyan Jiang
Prerequisite:
Example Code
Related Paper
- Colossal-AI: A Unified Deep Learning System For Large-Scale Parallel Training
- Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM
Introduction
In the previous tutorial, we introduce how to train ViT with pipeline. In this tutorial, you will learn a more complex scenario -- fine-tune GPT-2 with hybrid parallelism. In this case, GPT-2 is so large that CPU memory cannot fit it as well. Therefore, you must split the model.
Table of content
In this tutorial we will cover:
- Initialize the hybrid parallelism plugin.
- Defining the Training Components of the GPT-2 Model
- Boost the GPT-2 Model with
HybridParallelPlugin
- Training GPT-2 using hybrid parallelism
Import libraries
from typing import Callable, List, Union
import torch
import torch.distributed as dist
import torch.nn as nn
from torch.optim import Optimizer
from torch.optim.lr_scheduler import _LRScheduler as LRScheduler
from tqdm import tqdm
from transformers import AutoConfig, GPT2ForSequenceClassification, get_linear_schedule_with_warmup
from transformers import AutoTokenizer
import colossalai
from colossalai.booster import Booster
from colossalai.booster.plugin import GeminiPlugin, HybridParallelPlugin, LowLevelZeroPlugin, TorchDDPPlugin
from colossalai.cluster import DistCoordinator
from colossalai.nn.optimizer import HybridAdam
Define Plugin
Create a HybridParallelPlugin
object and specify the desired parallelism strategies to be used. In this example, both pipeline parallelism and ZeRO-1 are used simultaneously.
plugin = HybridParallelPlugin(
tp_size=1,
pp_size=2,
num_microbatches=None,
microbatch_size=1,
enable_all_optimization=True,
zero_stage=1,
precision="fp16",
initial_scale=1,
)
Define GPT-2's Training Components
Before using hybrid parallelism, you need to define the components used for training.
Define hyperparameters
NUM_EPOCHS = 3
BATCH_SIZE = 32
LEARNING_RATE = 2.4e-5
WEIGHT_DECAY = 0.01
WARMUP_FRACTION = 0.1
we create a distributed environment.
# Launch ColossalAI
colossalai.launch_from_torch( seed=42)
coordinator = DistCoordinator()
prepare the dataset. You can use plugin.prepare_dataloader
to generate a dataloader or customize your own dataloader.
def tokenize_batch(batch, tokenizer: Optional[AutoTokenizer] = None, max_length: int = 2048):
texts = [sample["sentence1"] + sample["sentence2"] for sample in batch]
data = tokenizer(texts, return_tensors="pt", padding="max_length", truncation=True, max_length=max_length)
data = {k: v.cuda() for k, v in data.items()}
data["labels"] = data["input_ids"].clone()
return data
tokenizer = AutoTokenizer.from_pretrained("gpt2")
dataset = datasets.load_dataset("glue", "mrpc")
train_dataloader = plugin.prepare_dataloader(
dataset["train"],
batch_size=BATCH_SIZE,
shuffle=True,
drop_last=True,
collate_fn=partial(tokenize_batch, tokenizer=tokenizer, max_length=512),
)
Prepare gpt-2 model
cfg = AutoConfig.from_pretrained("gpt2", num_labels=2)
model = GPT2ForSequenceClassification.from_pretrained("gpt2", config=cfg).cuda()
prepare optimizer
lr = LEARNING_RATE * coordinator.world_size
no_decay = ["bias", "LayerNorm.weight"]
optimizer_grouped_parameters = [
{
"params": [p for n, p in model.named_parameters() if not any(nd in n for nd in no_decay)],
"weight_decay": WEIGHT_DECAY,
},
{
"params": [p for n, p in model.named_parameters() if any(nd in n for nd in no_decay)],
"weight_decay": 0.0,
},
]
optimizer = HybridAdam(optimizer_grouped_parameters, lr=lr, eps=1e-8)
Prepare the lr_scheduler and criterion, and it's important to note that when hybrid parallelism with pipeline parallelism is used, a criterion function should also be defined. This function should take the input and output of the model's forward pass as parameters and return the loss.
# lr scheduler
total_steps = len(train_dataloader) * NUM_EPOCHS
num_warmup_steps = int(WARMUP_FRACTION * total_steps)
lr_scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=num_warmup_steps,
num_training_steps=total_steps,
)
def _criterion(outputs, inputs):
return outputs.loss
Boost the GPT-2 Model
Define a booster with HybridParallelPlugin
. Based on the configured plugin parameters, the booster will inject one or more parallel strategies into the model. In this example, pipeline parallelism, zero1, and mixed-precision training optimizations are utilized.
booster = Booster(plugin=plugin)
Boost these components with the defined booster.
model, optimizer, _criterion, _, lr_scheduler = booster.boost(
model, optimizer, criterion=_criterion, lr_scheduler=lr_scheduler
)
Training GPT-2 using hybrid parallelism
In the previous tutorial, We've explained how to inject various parallelism features into the model and its training components using the Booster and HybridParallelPlugin
. Now we can start model training.
Define a training function. When pipeline parallelism is used, you need to call booster.execute_pipeline
to schedule the stages of model training.
def train_epoch(
epoch: int,
model: nn.Module,
optimizer: Optimizer,
_criterion: Callable,
lr_scheduler: LRScheduler,
train_dataloader: DataLoader,
booster: Booster,
coordinator: DistCoordinator,
):
use_pipeline = isinstance(booster.plugin, HybridParallelPlugin) and booster.plugin.pp_size > 1
is_pp_last_stage = use_pipeline and booster.plugin.stage_manager.is_last_stage()
print_flag = (not use_pipeline and coordinator.is_master()) or (use_pipeline and is_pp_last_stage)
total_step = len(train_dataloader)
model.train()
optimizer.zero_grad()
train_dataloader_iter = iter(train_dataloader)
with tqdm(
range(total_step),
desc=f"Epoch [{epoch + 1}/{NUM_EPOCHS}]",
disable=not print_flag,
) as pbar:
# Forward pass
for _ in pbar:
if use_pipeline:
outputs = booster.execute_pipeline(
train_dataloader_iter, model, _criterion, optimizer, return_loss=True
)
# Backward and optimize
if is_pp_last_stage:
loss = outputs["loss"]
pbar.set_postfix({"loss": loss.item()})
else:
data = next(train_dataloader_iter)
data = move_to_cuda(data)
outputs = model(**data)
loss = _criterion(outputs, None)
# Backward
booster.backward(loss, optimizer)
pbar.set_postfix({"loss": loss.item()})
optimizer.step()
optimizer.zero_grad()
lr_scheduler.step()
Training the gpt-2 model
for epoch in range(NUM_EPOCHS):
train_epoch(epoch, model, optimizer, _criterion, lr_scheduler, train_dataloader, booster, coordinator)