Raylet

服务由以下二进制启动

core/src/ray/raylet/raylet

程序入口

// src/ray/raylet/main.cc

int main(int argc, char *argv[]) {

  // 启动 IO service
  // class instrumented_io_context : public boost::asio::io_context {...}
  instrumented_io_context main_service;
  boost::asio::io_service::work main_work(main_service);

  std::unique_ptr<ray::raylet::Raylet> raylet;
  ray::raylet::NodeManagerConfig node_manager_config;
  ray::ObjectManagerConfig object_manager_config;
  raylet = std::make_unique<ray::raylet::Raylet>(main_service,
                                                 raylet_socket_name,
                                                 node_ip_address,
                                                 node_manager_config,
                                                 object_manager_config,
                                                 gcs_client,
                                                 metrics_export_port);
  raylet->Start();
  main_service.run();
}

主服务类

// src/ray/raylet/raylet.cc

class Raylet {
  // 用于和 gcs 连接的客户端
  std::shared_ptr<gcs::GcsClient> gcs_client_;
  NodeManager node_manager_;
}

void Raylet::Start() {
  RAY_CHECK_OK(RegisterGcs());

  // Start listening for clients.
  DoAccept();
}

ray::Status Raylet::RegisterGcs() {
  node_manager_.RegisterGcs();

  gcs_client_->Nodes().RegisterSelf(self_node_info_, register_callback);
}

void Raylet::DoAccept() {
  acceptor_.async_accept(
      socket_,
      boost::bind(&Raylet::HandleAccept, this, boost::asio::placeholders::error));
}

void Raylet::HandleAccept(const boost::system::error_code &error) {
  // 建立本地连接并分发到 node manager 处理
  auto new_connection = ClientConnection::Create(
      client_handler, // node_manager_.ProcessNewClient(client);
      message_handler, // node_manager_.ProcessClientMessage(client, message_type, message.data());
      std::move(socket_),
      "worker",
      node_manager_message_enum,
      static_cast<int64_t>(protocol::MessageType::DisconnectClient),
      message_data);
  // 处理连接
  DoAccept();
}

Node Manager

NodeManager 本身是一个 ServiceHandler,所以在初始化 node_manager_service_ 时,使用 this 作为 handler 传递。

// src/ray/raylet/node_manager.h

class NodeManager : public rpc::NodeManagerServiceHandler {
  std::shared_ptr<gcs::GcsClient> gcs_client_;
  std::unique_ptr<HeartbeatSender> heartbeat_sender_;
  WorkerPool worker_pool_;
  ObjectManager object_manager_;
  rpc::GrpcServer node_manager_server_;
  rpc::NodeManagerGrpcService node_manager_service_;

  std::unique_ptr<rpc::AgentManagerServiceHandler> agent_manager_service_handler_;
  rpc::AgentManagerGrpcService agent_manager_service_;

  std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler_;
  std::shared_ptr<LocalTaskManager> local_task_manager_;

  std::shared_ptr<PlacementGroupResourceManager> placement_group_resource_manager_;
}

// src/ray/raylet/node_manager.cc

// Push
// Pull

NodeManager::NodeManager(...) {
  // 非常多的初始化配置
  node_manager_service_(io_service, *this),
  // 然后注册服务并启动
  node_manager_server_.RegisterService(node_manager_service_);
  node_manager_server_.RegisterService(agent_manager_service_);
  node_manager_server_.Run();
}

NodeManager 的 rpc 接口

// src/ray/rpc/node_manager/node_manager_server.h

// `NodeManagerService` 的接口, 对应 `src/ray/protobuf/node_manager.proto`.
class NodeManagerServiceHandler {}
// 目前有以下接口
// UpdateResourceUsage
// RequestResourceReport
// RequestWorkerLease
// ReportWorkerBacklog
// ReturnWorker
// ReleaseUnusedWorkers
// CancelWorkerLease
// PinObjectIDs
// GetNodeStats
// GlobalGC
// FormatGlobalMemoryInfo
// PrepareBundleResources
// CommitBundleResources
// CancelResourceReserve
// RequestObjectSpillage
// ReleaseUnusedBundles
// GetSystemConfig
// GetGcsServerAddress
// ShutdownRaylet

class NodeManagerGrpcService : public GrpcService {
  NodeManagerGrpcService(instrumented_io_context &io_service,
                         NodeManagerServiceHandler &service_handler)
}

关于怎么增加新的接口可以参考: src/ray/core_worker/core_worker.h.

ObjectManager

// src/ray/object_manager/object_manager.h

class ObjectManager : public ObjectManagerInterface, public rpc::ObjectManagerServiceHandler {
  instrumented_io_context rpc_service_;
  boost::asio::io_service::work rpc_work_;

  rpc::GrpcServer object_manager_server_;
  rpc::ObjectManagerGrpcService object_manager_service_;
}

// 主要接口
// Push
// Pull

// src/ray/object_manager/object_manager.cc

ObjectManager::ObjectManager(...){
  rpc_work_(rpc_service_),
  object_manager_server_("ObjectManager",...)
  object_manager_service_(rpc_service_, *this),
  StartRpcService();
}

void ObjectManager::StartRpcService() {
  // for i in config_.rpc_service_threads_number
  rpc_threads_[i] = std::thread(&ObjectManager::RunRpcService, this, i);
  object_manager_server_.RegisterService(object_manager_service_);
  object_manager_server_.Run();
}

void ObjectManager::RunRpcService(int index) {
  rpc_service_.run();
}
// src/ray/rpc/object_manager/object_manager_server.h


class ObjectManagerGrpcService : public GrpcService {
  ObjectManagerGrpcService(instrumented_io_context &io_service,
                           ObjectManagerServiceHandler &service_handler)
      : GrpcService(io_service), service_handler_(service_handler){};

Worker Pool

// src/ray/raylet/worker_pool.cc