distributed¶
Functions
Function to gather a tensor from several distributed processes. |
|
|
|
Function to gather all tensors from several ddp processes onto a list that is broadcasted to all processes. |
|
|
|
Utility function to initialize distributed connection by setting env variables and initializing the distributed process group. |
|
|
|
|
|
Function to register communication hook for DDP model https://pytorch.org/docs/master/ddp_comm_hooks.html. |
|
Function to reduce the tensors from several ddp processes to one main process. |
|
Function to reduce a tensor across worker processes during distributed training. |
|
|
|
Classes
Utilities that can be used with distributed training.
- class pytorch_lightning.utilities.distributed.AllGatherGrad(*args, **kwargs)[source]¶
Bases:
torch.autograd.function.Function
- static backward(ctx, *grad_output)[source]¶
Defines a formula for differentiating the operation with backward mode automatic differentiation (alias to the vjp function).
This function is to be overridden by all subclasses.
It must accept a context
ctx
as the first argument, followed by as many outputs as theforward()
returned (None will be passed in for non tensor outputs of the forward function), and it should return as many tensors, as there were inputs toforward()
. Each argument is the gradient w.r.t the given output, and each returned value should be the gradient w.r.t. the corresponding input. If an input is not a Tensor or is a Tensor not requiring grads, you can just pass None as a gradient for that input.The context can be used to retrieve tensors saved during the forward pass. It also has an attribute
ctx.needs_input_grad
as a tuple of booleans representing whether each input needs gradient. E.g.,backward()
will havectx.needs_input_grad[0] = True
if the first input toforward()
needs gradient computated w.r.t. the output.
- static forward(ctx, tensor, group=torch.distributed.group.WORLD)[source]¶
This function is to be overridden by all subclasses. There are two ways to define forward:
Usage 1 (Combined forward and ctx):
@staticmethod def forward(ctx: Any, *args: Any, **kwargs: Any) -> Any: pass
It must accept a context ctx as the first argument, followed by any number of arguments (tensors or other types).
See Combined or separate forward() and setup_context() for more details
Usage 2 (Separate forward and ctx):
@staticmethod def forward(*args: Any, **kwargs: Any) -> Any: pass @staticmethod def setup_context(ctx: Any, inputs: Tuple[Any, ...], output: Any) -> None: pass
The forward no longer accepts a ctx argument.
Instead, you must also override the
torch.autograd.Function.setup_context()
staticmethod to handle setting up thectx
object.output
is the output of the forward,inputs
are a Tuple of inputs to the forward.See Extending torch.autograd for more details
The context can be used to store arbitrary data that can be then retrieved during the backward pass. Tensors should not be stored directly on ctx (though this is not currently enforced for backward compatibility). Instead, tensors should be saved either with
ctx.save_for_backward()
if they are intended to be used inbackward
(equivalently,vjp
) orctx.save_for_forward()
if they are intended to be used for injvp
.- Return type
- pytorch_lightning.utilities.distributed.all_gather_ddp_if_available(tensor, group=None, sync_grads=False)[source]¶
Function to gather a tensor from several distributed processes.
- Parameters
- Return type
- Returns
A tensor of shape (world_size, batch, …)
- pytorch_lightning.utilities.distributed.gather_all_tensors(result, group=None)[source]¶
Function to gather all tensors from several ddp processes onto a list that is broadcasted to all processes.
- pytorch_lightning.utilities.distributed.init_dist_connection(cluster_environment, torch_distributed_backend, global_rank=None, world_size=None, **kwargs)[source]¶
Utility function to initialize distributed connection by setting env variables and initializing the distributed process group.
- Parameters
- Raises
RuntimeError – If
torch.distributed
is not available- Return type
- pytorch_lightning.utilities.distributed.register_ddp_comm_hook(model, ddp_comm_state=None, ddp_comm_hook=None, ddp_comm_wrapper=None)[source]¶
Function to register communication hook for DDP model https://pytorch.org/docs/master/ddp_comm_hooks.html.
- Parameters
model¶ (
DistributedDataParallel
) – DDP modelddp_comm_state¶ (
Optional
[object
]) – state is passed to the hook and can be used to maintain and update any state information that users would like to maintain as part of the training process. Examples: error feedback in gradient compression, peers to communicate with next in GossipGrad etc.ddp_comm_hook¶ (
Optional
[Callable
]) –hook(state: object, bucket: dist._GradBucket) -> torch.futures.Future
This callable function is called once the bucket is ready. The hook can perform whatever processing is needed and return a Future indicating completion of any async work (ex: allreduce). If the hook doesn’t perform any communication, it can also just return a completed Future. The Future should hold the new value of grad bucket’s tensors. Once a bucket is ready, c10d reducer would call this hook and use the tensors returned by the Future and copy grads to individual parameters.
ddp_comm_wrapper¶ (
Optional
[Callable
]) – communication hook wrapper to support a communication hook such as FP16 compression as wrapper, which could be combined with ddp_comm_hook
Warning
DDP communication hook needs pytorch version at least 1.8.0
Warning
DDP communication wrapper needs pytorch version at least 1.9.0 Post-localSGD hook needs pytorch version at least 1.9.0
Examples
>>> from torch.distributed.algorithms.ddp_comm_hooks import ( ... default_hooks as default, ... powerSGD_hook as powerSGD, ... post_localSGD_hook as post_localSGD, ... ) >>> >>> # fp16_compress_hook for compress gradients >>> ddp_model = ... >>> register_ddp_comm_hook( ... model=ddp_model, ... ddp_comm_hook=default.fp16_compress_hook, ... ) >>> >>> # powerSGD_hook >>> ddp_model = ... >>> register_ddp_comm_hook( ... model=ddp_model, ... ddp_comm_state=powerSGD.PowerSGDState( ... process_group=None, ... matrix_approximation_rank=1, ... start_powerSGD_iter=5000, ... ), ... ddp_comm_hook=powerSGD.powerSGD_hook, ... ) >>> >>> # post_localSGD_hook >>> subgroup, _ = torch.distributed.new_subgroups() >>> ddp_model = ... >>> register_ddp_comm_hook( ... model=ddp_model, ... state=post_localSGD.PostLocalSGDState( ... process_group=None, ... subgroup=subgroup, ... start_localSGD_iter=1_000, ... ), ... ddp_comm_hook=post_localSGD.post_localSGD_hook, ... ) >>> >>> # fp16_compress_wrapper combined with other communication hook >>> ddp_model = ... >>> register_ddp_comm_hook( ... model=ddp_model, ... ddp_comm_state=powerSGD.PowerSGDState( ... process_group=None, ... matrix_approximation_rank=1, ... start_powerSGD_iter=5000, ... ), ... ddp_comm_hook=powerSGD.powerSGD_hook, ... ddp_comm_wrapper=default.fp16_compress_wrapper, ... )
- Return type
- pytorch_lightning.utilities.distributed.sync_ddp(result, group=None, reduce_op=None)[source]¶
Function to reduce the tensors from several ddp processes to one main process.
- Parameters
result¶ (
Tensor
) – the value to sync and reduce (typically tensor or number)group¶ (
Optional
[Any
]) – the process group to gather results from. Defaults to all processes (world)reduce_op¶ (
Union
[ReduceOp
,str
,None
]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.
- Return type
- Returns
reduced value
- pytorch_lightning.utilities.distributed.sync_ddp_if_available(result, group=None, reduce_op=None)[source]¶
Function to reduce a tensor across worker processes during distributed training.
- Parameters
result¶ (
Tensor
) – the value to sync and reduce (typically tensor or number)group¶ (
Optional
[Any
]) – the process group to gather results from. Defaults to all processes (world)reduce_op¶ (
Union
[ReduceOp
,str
,None
]) – the reduction operation. Defaults to sum. Can also be a string of ‘avg’, ‘mean’ to calculate the mean during reduction.
- Return type
- Returns
reduced value