API
import ray
ray.init()
# ray.init(address='ray://localhost:10001')
@ray.remote
def f(x):
return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))
# python/ray/__init__.py
from ray.worker import ( # noqa: E402,F401
get,
init,
put,
remote,
wait,
)
API ray.init
, ray.remote
, ray.get
都来自 ray.worker
.
# python/ray/worker.py
def init(address: Optional[str] = None, ...):
# if address
builder = ray.client(address, _deprecation_warn_enabled=False)
builder._init_args(**passed_kwargs)
return builder.connect()
# if bootstrap_address is None:
_global_node = ray.node.Node(head=True, shutdown_at_exit=False, spawn_reaper=True, ray_params=ray_params)
# else
_global_node = ray.node.Node(ray_params, head=False, shutdown_at_exit=False, spawn_reaper=False, connect_only=True)
connect(...)
return RayContext(...)
def connect(node, worker=global_worker, ...):
worker.node = node
worker.core_worker = ray._raylet.CoreWorker(...)
CoreWorker
// src/ray/core_worker/core_worker.h
class CoreWorker : public rpc::CoreWorkerServiceHandler {
instrumented_io_context io_service_;
boost::asio::io_service::work io_work_;
rpc::CoreWorkerGrpcService grpc_service_;
std::unique_ptr<rpc::GrpcServer> core_worker_server_;
// std::unique_ptr<ObjectRecoveryManager> object_recovery_manager_;
std::shared_ptr<TaskManager> task_manager_;
std::unique_ptr<ActorManager> actor_manager_;
/// Implements gRPC server handler.
void HandleXxxx(const rpc::PushTaskRequest &request,
rpc::PushTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
}
// src/ray/core_worker/core_worker.cc
CoreWorker::CoreWorker(...) {
io_work_(io_service_),
grpc_service_(io_service_, *this),
core_worker_server_->RegisterService(grpc_service_);
core_worker_server_->Run();
}