Object
Object 列表
- OperationManager
- HorovodGlobalState
- ParameterManager
- FusionBufferManager
- ProcessSet & ProcessSetTable
- Controller
- TensorQueue
- GroupTable
HorovodGlobalState
全局状态类,用于保存全局信息,同时持有以下几个对象
// horovod/common/global_state.h
struct HorovodGlobalState {
// Background thread running MPI communication.
std::thread background_thread;
bool elastic_enabled = false;
ParameterManager parameter_manager;
// handles resizing and auto-tuning of buffer size.
FusionBufferManager fusion_buffer;
ProcessSetTable process_set_table;
std::shared_ptr<Controller> global_controller;
};
ParameterManager
WHAT ParameterManager 会记录 tunable 的变量,包括时间和 fusion buffer size 等,然后以获得高吞吐为目标对其进程动态调整
Manager 类主要包含几个部分
- manager 自身的配置包括开关、warmup 信息等等
- API 包括 update、tune 等
- 对可调整参数的封装,包括调整对象和调整方法
- 调整策略, 基于上述封装,实现怎样调整的策略
// horovod/common/parameter_manager.h
class ParameterManager {
// 根据 tensor 信息计算 score 和调用 Tune 调整参数
// 返回值决定是否将更新后的参数广播出去
bool Update(const std::vector<std::string>& tensor_names, int64_t bytes);
// 根据 score 调整参数
bool Tune(double score);
// Manager 的配置
struct Params {
bool hierarchical_allreduce;
bool hierarchical_allgather;
bool cache_enabled;
double tensor_fusion_threshold;
double cycle_time;
bool active;
};
Params GetParams();
// Interface used to represent a parameter (or group of parameters) being tuned.
class ITunableParameter {
public:
virtual bool Tune(double score, double* best_score) = 0;
virtual void UpdateBestValue(double score) = 0;
virtual double BestScore() const = 0;
virtual bool IsTunable() const = 0;
};
// Abstract base class used to implement hierarchical parameter tuning.
template <class T>
class TunableParameter : public ITunableParameter {
TunableParameter(T initial_value);
T initial_value_;
T value_;
T best_value_;
};
// 需要调整的参数对象
std::vector<ITunableParameter*> parameter_chain_;
// A parameter that optimizes over a finite set of discrete values to be tried sequentially.
template <class T>
class CategoricalParameter : public TunableParameter<T> {
CategoricalParameter(std::vector<T> values);
std::vector<T> values_;
};
enum BayesianVariable { fusion_buffer_threshold_mb, cycle_time_ms };
struct BayesianVariableConfig {
BayesianVariable variable;
std::pair<double, double> bounds;
};
// A set of numerical parameters optimized jointly using Bayesian Optimization.
class BayesianParameter : public TunableParameter<Eigen::VectorXd> {
std::unique_ptr<BayesianOptimization> bayes_;
};
CategoricalParameter<bool> hierarchical_allreduce_;
CategoricalParameter<bool> hierarchical_allgather_;
CategoricalParameter<bool> cache_enabled_;
BayesianParameter joint_params_;
};
FusionBufferManager
这里的 Buffer 会伴随进程的整个生命周期
// horovod/common/fusion_buffer_manager.h
class FusionBufferManager {
public:
Status InitializeBuffer(int64_t threshold,
int device, std::shared_ptr<OpContext> context,
int stream_id,
std::function<void()> on_start_init,
std::function<void()> on_end_init);
// Status status = context->AllocatePersistent(threshold, &buffer);
std::shared_ptr<PersistentBuffer> GetBuffer(int device, Framework framework, int stream_id);
// map key: device ID, framework, stream_id
std::unordered_map<std::tuple<int, Framework, int>,
std::pair<std::shared_ptr<PersistentBuffer>, int64_t>> tensor_fusion_buffers_;
};
ProcessSet & ProcessSetTable
ProcessSet 持有以下对象
- Controller controller
- TensorQueue tensor_queue
- GroupTable group_table
基本调用流程
- HorovodGlobalState 中的 ProcessSetTable 对象
- ProcessSetTable Ids() 方法获取 ProcessSet 的 id
- ProcessSetTable Get() 方法从 id_to_process_set_ 变量中获取 ProcessSet 对象
- ProcessSet 的 IsCurrentProcessIncluded() 方法判断可见性
// horovod/common/process_set.h
struct ProcessSet {
std::shared_ptr<Controller> controller;
TensorQueue tensor_queue;
// LRU cache of Responses
ResponseCache response_cache;
// Information on registered groups.
GroupTable group_table;
std::vector<int> registered_global_ranks;
MPIContext mpi_context;
bool Initialize(const MPIContext& global_mpi_context);
void Finalize(const Status& status);
bool IsCurrentProcessIncluded() const;
explicit ProcessSet(std::vector<int> global_ranks = {});
};
class ProcessSetTable {
ProcessSetTable();
void Initialize(const MPIContext& global_mpi_context);
int32_t RegisterProcessSet(std::vector<int> global_ranks = {});
std::vector<int32_t> Ids() const; // Returns copy to be threadsafe
bool Contains(int32_t id) const;
ProcessSet& Get(int32_t id);
// Returns -1 if no process set with these ranks has been registered.
int32_t FindId(const std::vector<int32_t>& ranks);
void MarkProcessSetForRemoval(int32_t process_set_id);
void DeregisterProcessSet(int32_t process_set_id);
// ProcessSet 对象
std::unordered_map<int32_t, ProcessSet> id_to_process_set_;
};
Controller
Controller 持有以下对象需要在创建时传入,即外部依赖
- TensorQueue& tensor_queue_
- ResponseCache& response_cache_
- ParameterManager& parameter_manager_
- GroupTable& group_table_
class Controller : public std::enable_shared_from_this<Controller> {
void Initialize();
virtual int GetTypeSize(DataType dtype) = 0;
virtual void CrossRankBitwiseAnd(std::vector<long long>& bitvector, int count) = 0;
virtual void CrossRankBitwiseOr(std::vector<long long>& bitvector, int count) = 0;
virtual void Bcast(void* buffer, size_t size, int root_rank, Communicator communicator) = 0;
virtual void AlltoallGetRecvSplits(const std::vector<int32_t>& splits, std::vector<int32_t>& recvsplits) = 0;
virtual void Barrier(Communicator communicator) = 0;
virtual void Allgather2Ints(std::array<int, 2> values, std::vector<int>& recv_values) = 0;
void SynchronizeParameters();
ResponseList ComputeResponseList(bool this_process_requested_shutdown, HorovodGlobalState& state, ProcessSet& process_set);
virtual void DoInitialization() = 0;
// For rank 0 to receive other ranks' ready tensors.
virtual void RecvReadyTensors(std::vector<std::string>& ready_to_reduce,
std::vector<RequestList>& ready_list) = 0;
// For other ranks to send their ready tensors to rank 0
virtual void SendReadyTensors(RequestList& message_list) = 0;
// For rank 0 to send final tensors ready to be allreduced/allgathered to other ranks.
virtual void SendFinalTensors(ResponseList& response_list) = 0;
// For other ranks to receive final ready tensors.
virtual void RecvFinalTensors(ResponseList& response_list) = 0;
// Once a tensor is ready to be reduced, the coordinator sends a Response
// instructing all ranks to start the reduction to all ranks. The Response
// also contains error messages in case the submitted Requests were not
// valid (for example, contained mismatched shapes or types).
// Constructing the Response, thus, requires a whole lot of error checking.
Response ConstructResponse(const std::string& name, int joined_size = 0);
// Routine to sync cache hit and invalid bit sets across workers.
// Also determines global shutdown state and whether uncached requests
// exist on any worker.
void CoordinateCacheAndState(CacheCoordinator& cache_coordinator);
void FuseResponses(std::deque<Response>& responses,
HorovodGlobalState& state,
ResponseList& response_list);
// Return the total byte size of the final allgathered output tensor
int64_t
TotalByteSizeOfAllgatherOutput(const std::vector<int64_t>& tensor_sizes,
const TensorTableEntry& entry);
// Store the Request for a name, and return whether the total count of
// Requests for that tensor is now equal to the HOROVOD size (and thus we are
// ready to reduce the tensor).
bool IncrementTensorCount(const Request& msg, int joined_size = 0);
bool is_initialized_ = false;
int rank_ = 0;
int local_rank_ = 0;
int cross_rank_ = 0;
int size_ = 1;
int local_size_ = 1;
int cross_size_ = 1;
bool is_coordinator_ = false;
bool is_homogeneous_ = false;
// Global rank of each process in the set associated to this controller.
std::vector<int> global_ranks_;
// Map (global rank) -> (process set controller rank) for each process in this
// set.
std::unordered_map<int,int> global_rank_to_controller_rank_;
// Controller process set ranks of processes running on this node.
std::vector<int> local_comm_ranks_;
StallInspector stall_inspector_;
// Only exists on the coordinator node (rank zero). Maintains a vector of
// requests to allreduce every tensor (keyed by tensor name).
MessageTable message_table_;
// Outside dependencies
TensorQueue& tensor_queue_;
ResponseCache& response_cache_;
ParameterManager& parameter_manager_;
GroupTable& group_table_;
};
TensorQueue
TensorQueue 被 Controller 持有,主要包含两个对象
- Request 的对象 message_queue_,在 controller 中被取出做准备工作
- TensorTableEntry 对象 tensor_table_ 真正需要进行通信的 tensor
// horovod/common/tensor_queue.h
class TensorQueue {
TensorQueue() = default;
Status AddToTensorQueue(TensorTableEntry& e, Request& message);
Status AddToTensorQueueMulti(std::vector<TensorTableEntry>& entries, std::vector<Request>& messages);
void FinalizeTensorQueue(const Status& status);
int64_t GetTensorDataForAutotuner(const ResponseList& response_list,
std::vector<std::string>& tensor_names);
void GetTensorEntriesFromResponse(const Response& response,
std::vector<TensorTableEntry>& entries,
bool joined = false);
const TensorTableEntry& GetTensorEntry(const std::string& tensor_name) const;
bool IsTensorPresentInTable (const std::string& tensor_name) const;
void PopMessagesFromQueue(std::deque<Request>& message_queue_buffer);
void PushMessageToQueue(Request& message);
void PushMessagesToQueue(std::deque<Request>& messages);
void RemoveJoinTensor();
// Tensors waiting to be allreduced or allgathered.
std::unordered_map<std::string, TensorTableEntry> tensor_table_;
// Queue of MPI requests waiting to be sent to the coordinator node.
std::queue<Request> message_queue_;
};
GroupTable
在 ProcessSet 中持有,
- EnqueueTensorAllreduces 中调用 RegisterGroup
- RunLoopOnce 中 PerformOperation 前会调用 DeregisterGroups
// horovod/common/group_table.h
class GroupTable {
public:
GroupTable() = default;
int32_t GetGroupIDFromTensorName(const std::string& tensor_name) const;
const std::vector<std::string>& GetGroupTensorNames(int32_t group_id) const;
bool empty(void) const;
int32_t RegisterGroup(std::vector<std::string>&& tensor_names);
void DeregisterGroups(const std::vector<std::string>& tensor_names);
std::unordered_map<std::string, int32_t> tensor_name_to_id_;
std::unordered_map<int32_t, std::vector<std::string>> id_to_tensor_names_;
// Queue of ids that can be reused
std::queue<int32_t> free_ids_;
// Next available group id (increases each time a group is added)
int32_t next_group_id_ = 0;
};
OperationManager
对通信 op 的一个封装,基本上通过调用 op->Excute 实现,为 horovod/common/ops 目录中的各个 op 实现提供了接口。
// horovod/common/ops/operation_manager.h
class OperationManager {
OperationManager(ParameterManager* param_manager,
std::vector<std::shared_ptr<AllreduceOp>> allreduce_ops,
std::vector<std::shared_ptr<AllgatherOp>> allgather_ops,
std::vector<std::shared_ptr<BroadcastOp>> broadcast_ops,
std::vector<std::shared_ptr<AlltoallOp>> alltoall_ops,
std::vector<std::shared_ptr<ReducescatterOp>> reducescatter_ops,
std::shared_ptr<JoinOp> join_op,
std::vector<std::shared_ptr<AllreduceOp>> adasum_ops,
std::shared_ptr<BarrierOp> barrier_op,
std::shared_ptr<ErrorOp> error_op);
Status ExecuteAllreduce(std::vector<TensorTableEntry>& entries, const Response& response) const;
Status ExecuteAllgather(std::vector<TensorTableEntry>& entries, const Response& response) const;
Status ExecuteBroadcast(std::vector<TensorTableEntry>& entries, const Response& response) const;
Status ExecuteAlltoall(std::vector<TensorTableEntry>& entries, const Response& response) const;
Status ExecuteReducescatter(std::vector<TensorTableEntry>& entries, const Response& response) const;
Status ExecuteError(std::vector<TensorTableEntry>& entries, const Response& response) const;
Status ExecuteJoin(std::vector<TensorTableEntry>& entries,
const Response& response, ProcessSet& process_set) const;
Status ExecuteAdasum(std::vector<TensorTableEntry>& entries, const Response& response) const;
Status ExecuteBarrier(std::vector<TensorTableEntry>& entries, const Response& response) const;
Status ExecuteOperation(std::vector<TensorTableEntry>& entries,
const Response& response,
ProcessSet& process_set) const;
ParameterManager* param_manager_;
std::vector<std::shared_ptr<AllreduceOp>> allreduce_ops_;
std::vector<std::shared_ptr<AllgatherOp>> allgather_ops_;
std::vector<std::shared_ptr<BroadcastOp>> broadcast_ops_;
std::vector<std::shared_ptr<AlltoallOp>> alltoall_ops_;
std::vector<std::shared_ptr<ReducescatterOp>> reducescatter_ops_;
std::shared_ptr<JoinOp> join_op_;
std::vector<std::shared_ptr<AllreduceOp>> adasum_ops_;
std::shared_ptr<BarrierOp> barrier_op_;
std::shared_ptr<ErrorOp> error_op_;
};