from kfp import components, dsl
def ray_fn(openshift_server: str, openshift_token: str) -> int: (1)
import ray
from codeflare_sdk.cluster.auth import TokenAuthentication
from codeflare_sdk.cluster.cluster import Cluster, ClusterConfiguration
auth = TokenAuthentication( (2)
token=openshift_token, server=openshift_server, skip_tls=True
)
auth_return = auth.login()
cluster = Cluster( (3)
ClusterConfiguration(
name="raytest",
# namespace must exist
namespace="pipeline-example",
num_workers=1,
head_cpus="500m",
min_memory=1,
max_memory=1,
num_gpus=0,
image="quay.io/project-codeflare/ray:latest-py39-cu118", (4)
instascale=False, (5)
)
)
print(cluster.status())
cluster.up() (6)
cluster.wait_ready() (7)
print(cluster.status())
print(cluster.details())
ray_dashboard_uri = cluster.cluster_dashboard_uri()
ray_cluster_uri = cluster.cluster_uri()
print(ray_dashboard_uri, ray_cluster_uri)
# Before proceeding, ensure that the cluster exists and that its URI contains a value
assert ray_cluster_uri, "Ray cluster must be started and set before proceeding"
ray.init(address=ray_cluster_uri)
print("Ray cluster is up and running: ", ray.is_initialized())
@ray.remote
def train_fn(): (8)
# complex training function
return 100
result = ray.get(train_fn.remote())
assert 100 == result
ray.shutdown()
cluster.down() (9)
auth.logout()
return result
@dsl.pipeline( (10)
name="Ray Simple Example",
description="Ray Simple Example",
)
def ray_integration(openshift_server, openshift_token):
ray_op = components.create_component_from_func(
ray_fn,
base_image='registry.redhat.io/ubi8/python-39:latest',
packages_to_install=["codeflare-sdk"],
)
ray_op(openshift_server, openshift_token)
if __name__ == '__main__': (11)
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(ray_integration, 'compiled-example.yaml')