Skip to content

Commit

Permalink
add dp&sp hybrid for cpt
Browse files Browse the repository at this point in the history
  • Loading branch information
qianhao0713 committed Jul 15, 2024
1 parent 92554c2 commit 49c0f80
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/llamafactory/data/collator.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class SeqParallelDataCollatorForLanguageModeling(DataCollatorForLanguageModeling
Reuse the sequence parallel distributing function for sft stage.
"""
seq_algo: str = "data_parallel"
seq_parallel_size: int = -1
sp_size: int = -1
rank: int = 0
world_size: int = 8
device: Optional[Any] = None
Expand Down
14 changes: 14 additions & 0 deletions src/llamafactory/train/pt/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from transformers.trainer_utils import seed_worker
import datasets
from torch.nn import CrossEntropyLoss
import os

if TYPE_CHECKING:
import torch
Expand Down Expand Up @@ -62,6 +63,7 @@ def compute_loss(self, model, inputs, return_outputs=False):
Subclass and override for custom behavior.
"""
from transformers.trainer import _is_peft_model, MODEL_FOR_CAUSAL_LM_MAPPING_NAMES
if self.label_smoother is not None and "labels" in inputs:
labels = inputs.pop("labels")
else:
Expand Down Expand Up @@ -148,6 +150,12 @@ def get_train_dataloader(self) -> DataLoader:
dataloader_params["prefetch_factor"] = self.args.dataloader_prefetch_factor

if hasattr(data_collator, "seq_algo") and data_collator.seq_algo != "data_parallel":
sp_size = self.finetuning_args.sp_size
if sp_size != -1:
world_size = int(os.environ['WORLD_SIZE'])
assert sp_size != 0 and world_size % sp_size == 0, f"world_size: {world_size} should be devide by seq_parallel_size: {sp_size}"
dp_size = world_size // sp_size
dataloader_params["batch_size"] = dataloader_params["batch_size"] * dp_size
return DataLoader(train_dataset, **dataloader_params)
return self.accelerator.prepare(DataLoader(train_dataset, **dataloader_params))

Expand Down Expand Up @@ -197,5 +205,11 @@ def get_eval_dataloader(self, eval_dataset) -> DataLoader:
self._eval_dataloader = eval_dataloader

if hasattr(data_collator, "seq_algo") and data_collator.seq_algo != "data_parallel":
sp_size = self.finetuning_args.sp_size
if sp_size != -1:
world_size = int(os.environ['WORLD_SIZE'])
assert sp_size != 0 and world_size % sp_size == 0, f"world_size: {world_size} should be devide by seq_parallel_size: {sp_size}"
dp_size = world_size // sp_size
dataloader_params["batch_size"] = dataloader_params["batch_size"] * dp_size
return eval_dataloader
return self.accelerator.prepare(eval_dataloader)
3 changes: 2 additions & 1 deletion src/llamafactory/train/pt/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def run_pt(
tokenizer = tokenizer_module["tokenizer"]
dataset = get_dataset(model_args, data_args, training_args, stage="pt", **tokenizer_module)
model = load_model(tokenizer, model_args, finetuning_args, training_args.do_train)
apply_seq_parallel_monkey_patch(finetuning_args.parallel_mode, "llama")
apply_seq_parallel_monkey_patch(finetuning_args.parallel_mode, "llama", sp_size=finetuning_args.sp_size)

# data_collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, mlm=False)
local_rank = int(os.getenv("LOCAL_RANK"))
Expand All @@ -42,6 +42,7 @@ def run_pt(
tokenizer=tokenizer,
mlm=False,
seq_algo=finetuning_args.parallel_mode,
sp_size=finetuning_args.sp_size,
rank=torch.distributed.get_rank(),
world_size=torch.distributed.get_world_size(),
device=torch.device("cuda", local_rank)
Expand Down

0 comments on commit 49c0f80

Please sign in to comment.