Workflow

本节关于 horovod 的主要工作流程,包含以下内容

  • HorovodBasics 即 API 部分
  • Operation 基于上述 API 的调用和调用后主要流程

TL;DR;

hvd.init()

  • 调用 c api horovod_init, 调用函数 InitializeHorovodOnce 创建 thread 运行 BackgroundThreadLoop, 完成初始化后函数返回。
  • 后台线程执行 BackgroundThreadLoop 进行各种初始化操作,最后 while 循环 RunLoopOnce(state).
  • RunLoopOnce 从队列中取出 tensor 驱动分布式通信。

allreduce_async_

  • python api 是根据框架封装的,PyTorch 通过 pybind 调用 EnqueueTensorAllreduces,将 tensor 放如队列。

HorovodBasics

Python API

  • horovod 的基础 API,会被具体实现 (torch/tf) 使用
  • 提供 C 接口的 py 封装,通过 ctypes 实现调用
# horovod/common/basics.py

class HorovodBasics(object):
    def __init__(self, pkg_path, *args):
        # 加载 mpi lib 实现包
        self.MPI_LIB_CTYPES = ctypes.CDLL(full_path, mode=ctypes.RTLD_GLOBAL)

    def init(self, comm, process_sets):
        initialization_ok = self.MPI_LIB_CTYPES.horovod_init(...)
        # initialization_ok = self.MPI_LIB_CTYPES.horovod_init_multi_comm(...)

        _init_process_sets(process_sets)

    def shutdown(self):
    def is_initialized(self):
    def start_timeline(self, file_path, mark_cycles=False):
    def stop_timeline(self):
    def size(self):
    def local_size(self):
    def cross_size(self):
    def rank(self):
    def local_rank(self):
    def cross_rank(self):
    def is_homogeneous(self):
    def mpi_threads_supported(self):
    def mpi_enabled(self):
    def mpi_built(self):
    def gloo_enabled(self):
    def gloo_built(self):
    def nccl_built(self):
    def ddl_built(self):
    def ccl_built(self):
    def cuda_built(self):
    def rocm_built(self):
    def _add_process_set_impl(self, ranks: Sequence[int]) -> Optional[int]:
    def _remove_process_set_impl(self, process_set_id: int) -> Optional[int]:
    def _process_set_rank(self, process_set_id: int) -> int:
    def _process_set_size(self, process_set_id: int) -> int:
    def _get_process_set_ids_and_ranks(self) -> Dict[int, List[int]]:
    def _comm_process_set_id(self, comm: MPI.Comm) -> int:

C API

这里的接口有两个部分

  • 系统相关的 C 接口,通过 py 的 ctypes 引用
  • 通信相关的接口,直接被调用
// horovod/common/operations.h

namespace horovod {
namespace common {

extern "C" {

bool horovod_init(const int* ranks, int nranks, const int* process_set_ranks,
                  const int* process_set_sizes, int num_process_sets);

#if HAVE_MPI
// 使用 MPI communicators 初始化
bool horovod_init_multi_comm(MPI_Comm* comm, int ncomms,
                             const int* process_set_ranks_via_ranks,
                             const int* process_set_sizes_via_ranks,
                             int num_process_sets_via_ranks);
#endif

void horovod_shutdown();

int horovod_rank();
int horovod_local_rank();

int horovod_size();
int horovod_local_size();

// bool horovod_xxx_enabled();
// bool horovod_xxx_built();

int horovod_reduce_op_average();
int horovod_reduce_op_sum();
int horovod_reduce_op_adasum();

int horovod_add_process_set(const int *ranks, int nranks);
int horovod_remove_process_set(int process_set_id);
int horovod_process_set_rank(int process_set_id);
int horovod_process_set_size(int process_set_id);
int horovod_process_set_included(int process_set_id);
int horovod_number_of_process_sets();
void horovod_process_set_ids(int* ids_prealloc);
int horovod_process_set_ranks(int id, int* ranks_prealloc);

} // C API 结束

Status EnqueueTensorAllreduce(std::shared_ptr<OpContext> context,
                              std::shared_ptr<Tensor> tensor,
                              std::shared_ptr<Tensor> output,
                              ReadyEventList ready_event_list,
                              std::string name, int device,
                              StatusCallback callback,
                              ReduceOp reduce_op = ReduceOp::SUM,
                              double prescale_factor = 1.0,
                              double postscale_factor = 1.0,
                              int32_t process_set_id = 0);

Status EnqueueTensorAllreduces(std::vector<std::shared_ptr<OpContext>>& contexts,
                               std::vector<std::shared_ptr<Tensor>>& tensors,
                               std::vector<std::shared_ptr<Tensor>>& outputs,
                               std::vector<ReadyEventList>& ready_event_lists,
                               std::vector<std::string>& names,
                               int device,
                               std::vector<StatusCallback>& callbacks,
                               ReduceOp reduce_op = ReduceOp::SUM,
                               double prescale_factor = 1.0,
                               double postscale_factor = 1.0,
                               int32_t process_set_id = 0);

Status EnqueueTensorAllgather(std::shared_ptr<OpContext> context,
                              std::shared_ptr<Tensor> tensor,
                              ReadyEventList ready_event_list,
                              const std::string& name, int device,
                              StatusCallback callback,
                              int32_t process_set_id = 0);

Status EnqueueTensorBroadcast(std::shared_ptr<OpContext> context,
                              std::shared_ptr<Tensor> tensor,
                              std::shared_ptr<Tensor> output, int root_rank,
                              ReadyEventList ready_event_list,
                              const std::string& name, int device,
                              StatusCallback callback,
                              int32_t process_set_id = 0);

Status EnqueueTensorAlltoall(std::shared_ptr<OpContext> context,
                             std::shared_ptr<Tensor> tensor,
                             std::shared_ptr<Tensor> splits,
                             ReadyEventList ready_event_list,
                             const std::string& name, int device,
                             StatusCallback callback,
                             int32_t process_set_id = 0);

Status EnqueueTensorReducescatter(std::shared_ptr<OpContext> context,
                                  std::shared_ptr<Tensor> tensor,
                                  ReadyEventList ready_event_list,
                                  const std::string& name, int device,
                                  StatusCallback callback,
                                  ReduceOp reduce_op = ReduceOp::SUM,
                                  int32_t process_set_id = 0);

Status EnqueueJoin(std::shared_ptr<OpContext> context,
                   std::shared_ptr<Tensor> output_last_joined_rank,
                   ReadyEventList ready_event_list,
                   const std::string& name, int device,
                   StatusCallback callback,
                   int32_t process_set_id = 0);

Status EnqueueBarrier(StatusCallback callback,
                   int32_t process_set_id = 0);

} // namespace common
} // namespace horovod

#endif // HOROVOD_OPERATIONS_H

Operation

Horovod 的主要流程都在 horovod/common/operations.cc 中,主线包含两个方面

  • init 接口调用启动后台进程,不断从 tensor_queue 中取出需要通信的 tensor 进行通信并返回结果
  • 用户前端接口调用间接调用 EnqueueTensorAllreduces 以及类似的 API 不断将需要进行通信的 tensor 放入 tensor_queue

初始化和出 Queue

初始化接口的具体实现,启动一个后台进程,不断出发执行通信操作

// horovod/common/operations.cc

extern "C" {

bool horovod_init(const int* ranks, int nranks, const int* process_set_ranks,
                  const int* process_set_sizes, int num_process_sets) {
  return InitializeHorovodOnce(...);
}

bool horovod_init_multi_comm(MPI_Comm* comm, int ncomms,
                             const int* process_set_ranks_via_ranks,
                             const int* process_set_sizes_via_ranks,
                             int num_process_sets_via_ranks) {
  return InitializeHorovodOnce(std::vector<int>(), process_set_ranks_vecs);
}

// 启动 horovod 后台进程,只执行一次
bool InitializeHorovodOnce(
    const std::vector<int>& ranks,
    const std::vector<std::vector<int>>& process_set_ranks) {

  EnrichProcessSetWithMPIController(global_process_set);

  if (!horovod_global.initialize_flag.test_and_set()) {
    horovod_global.initialization_done = false;
    horovod_global.background_thread =
        std::thread(BackgroundThreadLoop, std::ref(horovod_global));
  }

  while (!horovod_global.initialization_done &&
         !horovod_global.initialization_failed) {
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }
}

// 初始化 controller,将 global 中的多个对象赋值给 controller 
void EnrichProcessSetWithMPIController(ProcessSet& process_set) {
  process_set.controller.reset(new MPIController(
      process_set.response_cache, process_set.tensor_queue,
      horovod_global.timeline, horovod_global.parameter_manager,
      process_set.group_table, horovod_global.timeline_controller,
      process_set.mpi_context));
}

void BackgroundThreadLoop(HorovodGlobalState& state) {
  auto mpi_ctx_manager = MPIContextManager();
  if (global_mpi_context.IsEnabled()) {
    global_mpi_context.Initialize(mpi_ctx_manager);
    if (state.control_operation == LibType::MPI) {
      // Initializes global controller
      state.process_set_table.Initialize(global_mpi_context);
    }
  }

  bool is_coordinator = state.global_controller->IsCoordinator();
  bool is_homogeneous = state.global_controller->IsHomogeneous();
  int size = state.global_controller->GetSize();
  int local_size = state.global_controller->GetLocalSize();
  int local_rank = state.global_controller->GetLocalRank();

  # 一堆配置
  state.parameter_manager.SetTensorFusionThresholdBytes(128 * 1024 * 1024);
  state.parameter_manager.SetTensorFusionThresholdBytes(threshold, true);
  state.parameter_manager.SetCycleTimeMs(1);
  state.parameter_manager.SetCacheEnabled(true);
  state.process_set_table.Get(0).response_cache.set_capacity(...)
  state.parameter_manager.SetHierarchicalAllgather(false);
  state.parameter_manager.SetHierarchicalAllreduce(false);

  while (RunLoopOnce(state));

  state.shut_down = true;

  horovod_global.process_set_table.Finalize(global_mpi_context,...)
}

bool RunLoopOnce(HorovodGlobalState& state) {
  state.process_set_table.InitializeRegisteredAndRemoveMarkedIfReady(global_mpi_context);

  for (auto process_set_id : state.process_set_table.Ids()) {
    auto& process_set = state.process_set_table.Get(process_set_id);
    auto response_list = process_set.IsCurrentProcessIncluded()
            ? process_set.controller->ComputeResponseList(this_process_requested_shutdown, state, process_set)
            : ResponseList();

    if (process_set.IsCurrentProcessIncluded()) {
      int global_rank = state.global_controller->GetRank();
      for (auto& response : response_list.responses()) {
        PerformOperation(response, process_set);
      }
    }
  }
}

这里主要包含两个操作

  • process_set.controller->ComputeResponseList 处理通信前的协同
  • PerformOperation 从 process_set 的 tensor_queue 中取出内容执行通信

PerformOperation

// 执行通信操作,获取 Response 
void PerformOperation(Response response, ProcessSet& process_set) {
  std::vector<TensorTableEntry> entries;
  process_set.tensor_queue.GetTensorEntriesFromResponse(response, entries, process_set.joined);

  if (response.response_type() != Response::JOIN &&
      response.response_type() != Response::BARRIER) {
    if (entries.size() > 1) {
      auto first_entry = entries[0];
      // 创建 buffer
      Status status = horovod_global.fusion_buffer.InitializeBuffer(
          process_set.controller->TensorFusionThresholdBytes(),
          first_entry.device, first_entry.context,
          horovod_global.current_nccl_stream,
          [&]() { timeline.ActivityStartAll(entries, INIT_FUSION_BUFFER); },
          [&]() { timeline.ActivityEndAll(entries); });
    }
  }

  // std::unique_ptr<OperationManager> op_manager;
  Status status = op_manager->ExecuteOperation(entries, response, process_set);
}

OperationManager->ExecuteOperation 即调用对应 api 完成 op 的执行

ComputeResponseList

这是 controller 里最重要的函数,它在 worker 间进行 allreduce/allgather 的协同,返回准备好通信的 tensor 列表,其中

  • 0 号 worker 作为 coordinator
  • 每个 worker 都存有一份别的 worker 发送的准备好的 tensor 列表作为 cache

具体流程如下

  • worker 所有计划的通信操作都会先发送给 coordinator,Request 类型,包括 (tensor, reduce/gather, shape, type)
  • worker 发送 DONE 消息给 coordinator 当所有计划通信操作都已发送
  • coordinator 接受来自 worker 的计划通信请求,直到收集到所有节点的 DONE 消息
  • coordinator 为准备好的 tensor 构建并向 worker 发送 Response 消息,当发送完毕时发送 DONE 消息
  • worker 监听来自 coordinator 的消息,执行对应的 reduce/gather 操作,直到收到 DONE 消息
// horovod/common/controller.cc

ResponseList Controller::ComputeResponseList(bool this_process_requested_shutdown,
                                             HorovodGlobalState& state,
                                             ProcessSet& process_set) {
  CacheCoordinator cache_coordinator(response_cache_.num_active_bits());

  // tensor_queue_ --> message_queue_tmp
  std::deque<Request> message_queue_tmp;
  tensor_queue_.PopMessagesFromQueue(message_queue_tmp);

  // cache 机制
  // tensor_queue_.PushMessagesToQueue(messages_to_replace);

  ResponseList response_list;

  if (!need_communication) {
    std::deque<Response> responses;
    for (auto bit : cache_coordinator.cache_hits()) {
      responses.push_back(response_cache_.get_response(bit));
    }
    FuseResponses(responses, state, response_list);
  } else {
    std::vector<std::string> ready_to_reduce;

    if (is_coordinator_) { // 0 号 worker
      // message_queue_tmp --> ready_to_reduce
      while (!message_queue_tmp.empty()) {
        Request message = message_queue_tmp.front();
        ready_to_reduce.push_back(message.tensor_name());
      }
      // Receive ready tensors from other ranks
      std::vector<RequestList> ready_list;
      RecvReadyTensors(ready_to_reduce, ready_list); // ready_to_reduce 未实际使用

      // ready_list +-> ready_to_reduce 即把各 worker 收集到的和自己的合并
      for (int i = 1; i < size_; ++i) {
        auto received_message_list = ready_list[i];
        for (auto& received_message : received_message_list.requests()) {
          auto& received_name = received_message.tensor_name();
          ready_to_reduce.push_back(received_name);
        }
      }

      // 到此准备通信的 tensor 准备完毕
      std::deque<Response> responses;

      for (auto& tensor_name : ready_to_reduce) {
        Response response = ConstructResponse(tensor_name, process_set.joined_size);
        responses.push_back(std::move(response));
      }
      FuseResponses(responses, state, response_list);

      // Broadcast final results to other ranks.
      SendFinalTensors(response_list);

    } else { // 非 0 号 worker
      RequestList message_list;
      while (!message_queue_tmp.empty()) {
        message_list.add_request(message_queue_tmp.front());
      }

      // Send ready tensors to rank zero
      SendReadyTensors(message_list);

      // Receive final tensors to be processed from rank zero
      RecvFinalTensors(response_list);
    }
  }

  return response_list;
}

调用和入 Queue

主要流程如下

  • 通过入参 process_set_id 从 global state 的 process_set_table 中取出 process_set 对象
  • 使用入参 Tensor tensors 和 outputs 封装 Request 和 TensorTableEntry
  • 把上述封装列表添加到 process_set 对象的 tensor_queue 中
// horovod/common/operations.cc

Status
EnqueueTensorAllreduces(std::vector<std::shared_ptr<OpContext>>& contexts,
                        std::vector<std::shared_ptr<Tensor>>& tensors,
                        std::vector<std::shared_ptr<Tensor>>& outputs,
                        std::vector<ReadyEventList>& ready_event_lists,
                        std::vector<std::string>& names, const int device,
                        std::vector<StatusCallback>& callbacks,
                        ReduceOp reduce_op, double prescale_factor,
                        double postscale_factor, int32_t process_set_id) {

  auto& process_set = horovod_global.process_set_table.Get(process_set_id);
  Status status;

  std::vector<Request> messages;
  std::vector<TensorTableEntry> entries;

  for (int n = 0; n < (int)tensors.size(); ++n) {
    Request message;
    message.set_xxxx(...);
    messages.push_back(std::move(message));

    TensorTableEntry e;
    e.tensor = tensors[n];
    e.output = outputs[n];
    e.process_set_id = process_set_id;
    entries.push_back(std::move(e));
  }

  status = process_set.tensor_queue.AddToTensorQueueMulti(entries, messages);
  return status;
}