Overview

BUILD

Ray 的编译使用 bazel,其中大量的 proto 编译也依赖于此, 具体定义在 src/ray/protobuf/BUILD 文件中,参考 文档.

启动部署

安装完成后,可以直接运行的相关命令如下

# python/setup.py

entry_points={
    "console_scripts": [
        "ray=ray.scripts.scripts:main",
        "rllib=ray.rllib.scripts:cli [rllib]",
        "tune=ray.tune.scripts:cli",
        "ray-operator=ray.ray_operator.operator:main",
        "serve=ray.serve.scripts:cli",
    ]
}

常驻模式启动 ray 集群

# 启动 head 节点
ray start --head
# 启动 worker 节点
ray start --address=RAY_HEAD_IP:6379

ray start --head --redis-password="" --port=6389

start 命令的解析如下

# python/ray/scripts/scripts.py

# args 解析用的是 click

@cli.command()
@click.option("--head",...)
def start(...,head,...):
    ray_params = ray._private.parameter.RayParams(...)
    if head:
        # 启动 head 节点
        ray_params.update_if_absent(...)
        node = ray.node.Node(ray_params, head=True,...)
    else:
        # 启动 worker 节点
        bootstrap_address = services.canonicalize_bootstrap_address(address)
        ray_params.gcs_address = bootstrap_address
        node = ray.node.Node(ray_params, head=False,...)

cli.add_command(start)
def main():
    return cli()

可以看出本质上都是初始化了节点 Node 对象,是否为 head 则通过参数指定。

Node

Node 节点的初始化

# python/ray/node.py

class Node:
    def __init__(self, ray_params, head=False,...):
        self.head = head

        if not head:
            # GCS GRPC client, 确保 gcs 已启动
            self.get_gcs_client()

        # 初始化持久化存储
        storage._init_storage(ray_params.storage, is_head=head) 

        if head:
            self.validate_external_storage()

        if ...:
            # 启动 reaper 进程,负责在主进程意外退出后回收进程
            self.start_reaper_process()

        if head:
            self.start_head_processes()
            # 尝试写入 gcs
            self.get_gcs_client().internal_kv_put(...)

        if not connect_only:
            self.start_ray_processes()
            ray._private.services.wait_for_node(...)

Node 的初始化包括启动 start_head_processes 和 start_ray_processes 两部分。

# python/ray/node.py

class Node:
    def start_head_processes(self):
        # 如果使用外部 redis,需要配置
        # 这里的逻辑目前有点 confuse,external 和 local 不够明确
        if self._ray_params.external_addresses is not None:
            self.start_or_configure_redis()
            self.create_redis_client()

        # 启动 gcs,包含 redis 服务, 默认端口 6379
        self.start_gcs_server()

        self.start_ray_client_server()

    def start_or_configure_redis(self):
        # 如果 external 有配置,并不真正启动
        ray._private.services.start_redis(...)

    def start_gcs_server(self):
        process_info = ray._private.services.start_gcs_server(self.redis_address,...)
        # 等待启动
        self.get_gcs_client()

    def start_ray_client_server(self):
        process_info = ray._private.services.start_ray_client_server(self.address, self._node_ip_address, ...)

    def start_ray_processes(self):
        # 启动节点上所有的进程
        self.destroy_external_storage()
        # 启动 raylet
        self.start_raylet(plasma_directory, object_store_memory)

    def start_raylet(self, ...):
        process_info = ray._private.services.start_raylet(
            self.redis_address,
            self.gcs_address,
            self._node_ip_address,
            ...)

  • Head 节点启动 gcs 服务和 ray_client 服务
  • Head 和 Worker 节点都启动 raylet 服务

这些服务的具体启动都被封装在 services 里。

Services

Services 提供多种服务启动的封装,包括 redis 服务启动。

1.11 之前的版本 ray 通过启动 redis-server 二进制启动 redis,新版本中已经移除。

# python/ray/_private/services.py

def start_gcs_server(redis_address, ...):
    # 调用 gcs 二进制启动服务,包含 redis 服务
    # GCS_SERVER_EXECUTABLE "core/src/ray/gcs/gcs_server"
    command = [GCS_SERVER_EXECUTABLE, "--redis_xxxx=", ...]
    process_info = start_ray_process(command, ...)

def start_ray_client_server(address, ray_client_server_ip,...):
    command = [
        sys.executable,    # python
        setup_worker_path, # ray/workers/setup_worker.py
        "-m",
        "ray.util.client.server",
        ...]
    process_info = start_ray_process(command, ...)

def start_raylet(redis_address, gcs_address, ...):
    # 启动 raylet,包括 local scheduler 和 object manager
    # 支持 python、java 和 cpp
    # RAYLET_EXECUTABLE "core/src/ray/raylet/raylet"
    command = [
        RAYLET_EXECUTABLE,
        f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}",  # noqa
        f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}",  # noqa
        f"--cpp_worker_command={subprocess.list2cmdline(cpp_worker_command)}",  # noqa
        ...]
    command.append("--agent_command={}".format(subprocess.list2cmdline(agent_command)))
    process_info = start_ray_process(command, ...)

def start_ray_process(command, ...):
    process = ConsolePopen(
        command,
        env=modified_env,
        cwd=cwd,
        stdout=stdout_file,
        stderr=stderr_file,
        stdin=subprocess.PIPE if pipe_stdin else None,
        preexec_fn=preexec_fn if sys.platform != "win32" else None,
        creationflags=CREATE_SUSPENDED if win32_fate_sharing else 0,
    )

class ConsolePopen(subprocess.Popen):
    pass

setup worker & runtime env context

setup worker 的作用是执行不同的程序,

# ray/workers/setup_worker.py

parser.add_argument("--serialized-runtime-env-context",...)
parser.add_argument("--language", ...)

if __name__ == "__main__":
    args, remaining_args = parser.parse_known_args()
    runtime_env_context = RuntimeEnvContext.deserialize(
        args.serialized_runtime_env_context or "{}"
    )
    runtime_env_context.exec_worker(remaining_args, Language.Value(args.language))
# python/ray/_private/runtime_env/context.py

class RuntimeEnvContext:
    def exec_worker(self, passthrough_args: List[str], language: Language):
        os.environ.update(self.env_vars)

        # exec [python] passthrough_args
        command_str = " && ".join(...)

        if sys.platform == "win32":
            os.system(command_str)
        else:
            os.execvp("bash", args=["bash", "-c", command_str])

client server

python -m ray.util.client.server --address=x.x.x.x:6379 --host=0.0.0.0 --port=10001 --mode=proxy --redis-password=
# python/ray/util/client/server/__main__.py
# -> server.py main
# python/ray/util/client/server/server.py

def main():
    server = serve(hostport, ray_connect_handler)
    while True:
        ray.experimental.internal_kv._internal_kv_put(..., HEALTHCHECK)

def serve(connection_str, ray_connect_handler=None):
    server = grpc.server(
        futures.ThreadPoolExecutor(
            max_workers=CLIENT_SERVER_MAX_THREADS,
            thread_name_prefix="ray_client_server",
        ),
    )
    # mode proxy
    task_servicer = RayletServicerProxy(None, proxy_manager)
    data_servicer = DataServicerProxy(proxy_manager)
    logs_servicer = LogstreamServicerProxy(proxy_manager)
    # else
    task_servicer = RayletServicer(ray_connect_handler)
    data_servicer = DataServicer(task_servicer)
    logs_servicer = LogstreamServicer()
    ray_client_pb2_grpc.add_RayletDriverServicer_to_server(task_servicer, server)
    ray_client_pb2_grpc.add_RayletDataStreamerServicer_to_server(data_servicer, server)
    ray_client_pb2_grpc.add_RayletLogStreamerServicer_to_server(logs_servicer, server)
    server.start()
# python/ray/util/client/server/server.py

# class RayletServicerProxy(ray_client_pb2_grpc.RayletDriverServicer):
class RayletServicer(ray_client_pb2_grpc.RayletDriverServicer):
    def KVPut(self, request, context=None) -> ray_client_pb2.KVPutResponse:
    def KVGet(self, request, context=None) -> ray_client_pb2.KVGetResponse:
    def KVDel(self, request, context=None) -> ray_client_pb2.KVDelResponse:
    def KVList(self, request, context=None) -> ray_client_pb2.KVListResponse:
    def ListNamedActors(...):
    def ClusterInfo(self, request, context=None) -> ray_client_pb2.ClusterInfoResponse:
    def release(self, client_id: str, id: bytes) -> bool:
    def release_all(self, client_id):
    def Terminate(self, req, context=None):
    def GetObject(self, request: ray_client_pb2.GetRequest, context):
    def PutObject( self, request: ray_client_pb2.PutRequest, context=None) -> ray_client_pb2.PutResponse:
    def WaitObject(self, request, context=None) -> ray_client_pb2.WaitResponse:
    def Schedule( self, task: ray_client_pb2.ClientTask, context=None) -> ray_client_pb2.ClientTaskTicket:
    def lookup_or_register_func( self, id: bytes, client_id: str, options: Optional[Dict]) -> ray.remote_function.RemoteFunction:
    def lookup_or_register_actor( self, id: bytes, client_id: str, options: Optional[Dict]):
    def unify_and_track_outputs(self, output, client_id):

总结

ray start [--head], 将启动以下进程

# 以下进程只在 head 节点运行
# GCS_SERVER_EXECUTABLE 
/usr/local/lib/python3.7/dist-packages/ray/core/src/ray/gcs/gcs_server 
/usr/bin/python3.7 -m ray.util.client.server --host=0.0.0.0 --port=10001 --mode=proxy

# worker 进程
# RAYLET_EXECUTABLE
/usr/local/lib/python3.7/dist-packages/ray/core/src/ray/raylet/raylet
/usr/bin/python3.7 -u /usr/local/lib/python3.7/dist-packages/ray/dashboard/agent.py

/usr/bin/python3.7 -u /usr/local/lib/python3.7/dist-packages/ray/autoscaler/_private/monitor.py
/usr/bin/python3.7 -u /usr/local/lib/python3.7/dist-packages/ray/dashboard/dashboard.py
/usr/bin/python3.7 -u /usr/local/lib/python3.7/dist-packages/ray/_private/log_monitor.py
# 新版本中的以下进程已被移除
/usr/local/lib/python3.7/dist-packages/ray/core/src/ray/thirdparty/redis/src/redis-server *:6379
/usr/local/lib/python3.7/dist-packages/ray/core/src/ray/thirdparty/redis/src/redis-server *:64712
# java worker command
python ray/workers/setup_worker.py java -Dx=x -cp xx RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER io.ray.runtime.runner.worker.DefaultWorker

#  cpp worker command
cpp/default_worker