title: “使用 Python 运行 Job” linkTitle: “Python” date: 2023-07-05 weight: 7 description: 在启用了 Kueue 的环境里运行 Job
本指南适用于批处理用户他们具有基本的 Python 与 Kubernetes 交互经验。 更多信息,请参见 Kueue 概述。
开始之前
检查管理集群配额 了解初始集群设置的详细信息。你还需要安装 kubernetes python。我们建议使用虚拟环境。
python -m venv env
source env/bin/activate
pip install kubernetes requests
请注意,以下版本用于开发这些示例:
- Python: 3.9.12
- kubernetes: 26.1.0
- requests: 2.31.0
你可以按照 Kueue 安装说明安装 Kueue,或者使用下面的安装示例。
Kueue 在 Python 中
Kueue 的核心是一个控制器,用于管理自定义资源。 因此,要使用 Python 与其交互,我们不需要一个专门的 SDK,而是可以使用 Kubernetes Python 库提供的通用函数。 在本指南中,我们提供了几个示例,用于以这种风格与 Kueue 交互。 如果你希望请求新的示例或需要帮助解决特定用例,请提交 Issue。
示例
以下示例演示了使用 Python 与 Kueue 交互的不同用例。
安装 Kueue
此示例演示如何将 Kueue 安装到现有集群。
你可以将此脚本保存到本地机器,例如 install-kueue-queues.py
。
#!/usr/bin/env python3
from kubernetes import utils, config, client
import tempfile
import requests
import argparse
# install-kueue-queues.py will:
# 1. install queue from the latest or a specific version on GitHub
# This example will demonstrate installing Kueue and applying a YAML file (local) to install Kueue
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--version",
help="Version of Kueue to install (if undefined, will install from master branch)",
default=None,
)
return parser
def main():
"""
Install Kueue and the Queue components.
This will error if they are already installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
install_kueue(args.version)
def get_install_url(version):
"""
Get the install version.
If a version is specified, use it. Otherwise install
from the main branch.
"""
if version is not None:
return f"https://github.com/kubernetes-sigs/kueue/releases/download/v{version}/manifests.yaml"
return "https://github.com/kubernetes-sigs/kueue/config/default?ref=main"
def install_kueue(version):
"""
Install Kueue of a particular version.
"""
print("⭐️ Installing Kueue...")
url = get_install_url(version)
with tempfile.NamedTemporaryFile(delete=True) as install_yaml:
res = requests.get(url)
assert res.status_code == 200
install_yaml.write(res.content)
utils.create_from_yaml(api_client, install_yaml.name)
if __name__ == "__main__":
main()
然后运行如下:
python install-kueue-queues.py
⭐️ Installing Kueue...
⭐️ Applying queues from single-clusterqueue-setup.yaml...
你也可以指定版本:
python install-kueue-queues.py --version v0.13.2
示例作业
对于下一个示例,让我们从一个已安装 Kueue 的集群开始,首先创建我们的队列:
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
# create_job.py
# This example will demonstrate full steps to submit a Job.
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job",
default="sample-job-",
)
parser.add_argument(
"--image",
help="container image to use",
default="registry.k8s.io/e2e-test-images/agnhost:2.53",
)
parser.add_argument(
"--args",
nargs="+",
help="args for container",
default=["entrypoint-tester", "hello", "world"],
)
return parser
def generate_job_crd(job_name, image, args):
"""
Generate an equivalent job CRD to sample-job.yaml
"""
metadata = client.V1ObjectMeta(
generate_name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
)
# Job container
container = client.V1Container(
image=image,
name="dummy-job",
args=args,
resources={
"requests": {
"cpu": 1,
"memory": "200Mi",
}
},
)
# Job template
template = {"spec": {"containers": [container], "restartPolicy": "Never"}}
return client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=metadata,
spec=client.V1JobSpec(
parallelism=1, completions=3, suspend=True, template=template
),
)
def main():
"""
Run a job.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
# Generate a CRD spec
crd = generate_job_crd(args.job_name, args.image, args.args)
batch_api = client.BatchV1Api()
print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
batch_api.create_namespaced_job("default", crd)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
)
if __name__ == "__main__":
main()
然后运行如下:
python sample-job.py
📦️ Container image selected is registry.k8s.io/e2e-test-images/agnhost:2.53...
⭐️ Creating sample job with prefix sample-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
或者尝试更改作业名称 (generateName
):
python sample-job.py --job-name sleep-job-
📦️ Container image selected is registry.k8s.io/e2e-test-images/agnhost:2.53...
⭐️ Creating sample job with prefix sleep-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
你也可以通过 --image
和 --args
更改容器镜像和参数。
你可以通过编辑示例脚本来配置更多定制化参数。
与队列和作业交互
如果你正在开发一个提交作业并需要与之交互的应用程序,你可能希望直接与队列或作业交互。
在运行上述示例后,你可以测试以下示例以与结果交互。
将以下内容写入一个名为 sample-queue-control.py
的脚本。
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
# sample-queue-control.py
# This will show how to interact with queues
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Interact with Queues e",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--namespace",
help="namespace to list for",
default="default",
)
return parser
def main():
"""
Get a listing of jobs in the queue
"""
parser = get_parser()
args, _ = parser.parse_known_args()
listing = crd_api.list_namespaced_custom_object(
group="kueue.x-k8s.io",
version="v1beta1",
namespace=args.namespace,
plural="localqueues",
)
list_queues(listing)
listing = crd_api.list_namespaced_custom_object(
group="batch",
version="v1",
namespace=args.namespace,
plural="jobs",
)
list_jobs(listing)
def list_jobs(listing):
"""
Iterate and show job metadata.
"""
if not listing:
print("💼️ There are no jobs.")
return
print("\n💼️ Jobs")
for job in listing["items"]:
jobname = job["metadata"]["name"]
status = (
"TBA" if "succeeded" not in job["status"] else job["status"]["succeeded"]
)
ready = job["status"]["ready"]
print(f"Found job {jobname}")
print(f" Succeeded: {status}")
print(f" Ready: {ready}")
def list_queues(listing):
"""
Helper function to iterate over and list queues.
"""
if not listing:
print("⛑️ There are no queues.")
return
print("\n⛑️ Local Queues")
# This is listing queues
for q in listing["items"]:
print(f'Found queue {q["metadata"]["name"]}')
print(f" Admitted workloads: {q['status']['admittedWorkloads']}")
print(f" Pending workloads: {q['status']['pendingWorkloads']}")
# And flavors with resources
for f in q["status"]["flavorUsage"]:
print(f' Flavor {f["name"]} has resources {f["resources"]}')
if __name__ == "__main__":
main()
为了使输出更有趣,我们可以先运行几个随机作业:
python sample-job.py
python sample-job.py
python sample-job.py --job-name tacos
然后运行脚本来查看你之前提交的队列和作业。
python sample-queue-control.py
⛑️ Local Queues
Found queue user-queue
Admitted workloads: 3
Pending workloads: 0
Flavor default-flavor has resources [{'name': 'cpu', 'total': '3'}, {'name': 'memory', 'total': '600Mi'}]
💼️ Jobs
Found job sample-job-8n5sb
Succeeded: 3
Ready: 0
Found job sample-job-gnxtl
Succeeded: 1
Ready: 0
Found job tacos46bqw
Succeeded: 1
Ready: 1
如果你想按作业标签过滤作业,可以通过job["metadata"]["labels"]["kueue.x-k8s.io/queue-name"]
来实现。
要按名称列出特定作业,你可以执行:
from kubernetes import client, config
# Interact with batch
config.load_kube_config()
batch_api = client.BatchV1Api()
# This is providing the name, and namespace
job = batch_api.read_namespaced_job("tacos46bqw", "default")
print(job)
请参见 BatchV1 API 文档了解更多调用。
Flux Operator 作业
对于此示例,我们将使用 Flux Operator 提交作业,并特别使用 Python SDK 来轻松完成此操作。鉴于我们在设置中创建的 Python 环境, 我们可以直接将其安装到其中,如下所示:
pip install fluxoperator
我们还需要安装 Flux operator。
kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml
将以下内容写入 sample-flux-operator-job.py
:
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
import fluxoperator.models as models
# sample-flux-operator.py
# This example will demonstrate full steps to submit a Job via the Flux Operator.
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Flux Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job (job prefix does not work here)",
default="hello-world",
)
parser.add_argument(
"--image",
help="container image to use",
default="ghcr.io/flux-framework/flux-restful-api",
)
parser.add_argument(
"--tasks",
help="Number of tasks",
default=1,
type=int,
)
parser.add_argument(
"--quiet",
help="Do not show extra flux output (only hello worlds!)",
action="store_true",
default=False,
)
parser.add_argument(
"--command",
help="command to run",
default="echo",
)
parser.add_argument(
"--args", nargs="+", help="args for container", default=["hello", "world"]
)
return parser
def generate_minicluster_crd(job_name, image, command, args, quiet=False, tasks=1):
"""
Generate a minicluster CRD
"""
container = models.MiniClusterContainer(
command=command + " " + " ".join(args),
resources={
"limits": {
"cpu": 1,
"memory": "2Gi",
}
},
)
# 4 pods and 4 tasks will echo hello-world x 4
spec = models.MiniClusterSpec(
job_labels={"kueue.x-k8s.io/queue-name": "user-queue"},
containers=[container],
size=4,
tasks=tasks,
logging={"quiet": quiet},
)
return models.MiniCluster(
kind="MiniCluster",
api_version="flux-framework.org/v1alpha1",
metadata=client.V1ObjectMeta(
generate_name=job_name,
namespace="default",
),
spec=spec,
)
def main():
"""
Run an example job using the Flux Operator.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
# Generate a CRD spec
minicluster = generate_minicluster_crd(
args.job_name, args.image, args.command, args.args, args.quiet, args.tasks
)
crd_api = client.CustomObjectsApi()
print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="flux-framework.org",
version="v1alpha1",
namespace="default",
plural="miniclusters",
body=minicluster,
)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get pods" to see pods'
)
if __name__ == "__main__":
main()
现在尝试运行示例:
python sample-flux-operator-job.py
📦️ Container image selected is ghcr.io/flux-framework/flux-restful-api...
⭐️ Creating sample job with prefix hello-world...
Use:
"kubectl get queue" to see queue assignment
"kubectl get pods" to see pods
你将能够几乎立即看到 MiniCluster 作业被本地队列接纳:
kubectl get queue
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
并且 4 个 pods 正在运行(我们创建了一个具有 4 个节点的联网集群):
kubectl get pods
NAME READY STATUS RESTARTS AGE
hello-world7qgqd-0-wp596 1/1 Running 0 7s
hello-world7qgqd-1-d7r87 1/1 Running 0 7s
hello-world7qgqd-2-rfn4t 1/1 Running 0 7s
hello-world7qgqd-3-blvtn 1/1 Running 0 7s
如果你查看主 broker Pod 的日志(上述作业中的索引 0),你会看到很多 调试输出,并且你可以看到最后运行的是 “hello world”:
kubectl logs hello-world7qgqd-0-wp596
Flux Operator Lead Broker 输出
🌀 Submit Mode: flux start -o --config /etc/flux/config -Scron.directory=/etc/flux/system/cron.d -Stbon.fanout=256 -Srundir=/run/flux -Sstatedir=/var/lib/flux -Slocal-uri=local:///run/flux/local -Slog-stderr-level=6 -Slog-stderr-mode=local flux submit -n 1 --quiet --watch echo hello world
broker.info[0]: start: none->join 0.399725ms
broker.info[0]: parent-none: join->init 0.030894ms
cron.info[0]: synchronizing cron tasks to event heartbeat.pulse
job-manager.info[0]: restart: 0 jobs
job-manager.info[0]: restart: 0 running jobs
job-manager.info[0]: restart: checkpoint.job-manager not found
broker.info[0]: rc1.0: running /etc/flux/rc1.d/01-sched-fluxion
sched-fluxion-resource.info[0]: version 0.27.0-15-gc90fbcc2
sched-fluxion-resource.warning[0]: create_reader: allowlist unsupported
sched-fluxion-resource.info[0]: populate_resource_db: loaded resources from core's resource.acquire
sched-fluxion-qmanager.info[0]: version 0.27.0-15-gc90fbcc2
broker.info[0]: rc1.0: running /etc/flux/rc1.d/02-cron
broker.info[0]: rc1.0: /etc/flux/rc1 Exited (rc=0) 0.5s
broker.info[0]: rc1-success: init->quorum 0.485239s
broker.info[0]: online: hello-world7qgqd-0 (ranks 0)
broker.info[0]: online: hello-world7qgqd-[0-3] (ranks 0-3)
broker.info[0]: quorum-full: quorum->run 0.354587s
hello world
broker.info[0]: rc2.0: flux submit -n 1 --quiet --watch echo hello world Exited (rc=0) 0.3s
broker.info[0]: rc2-success: run->cleanup 0.308392s
broker.info[0]: cleanup.0: flux queue stop --quiet --all --nocheckpoint Exited (rc=0) 0.1s
broker.info[0]: cleanup.1: flux cancel --user=all --quiet --states RUN Exited (rc=0) 0.1s
broker.info[0]: cleanup.2: flux queue idle --quiet Exited (rc=0) 0.1s
broker.info[0]: cleanup-success: cleanup->shutdown 0.252899s
broker.info[0]: children-complete: shutdown->finalize 47.6699ms
broker.info[0]: rc3.0: running /etc/flux/rc3.d/01-sched-fluxion
broker.info[0]: rc3.0: /etc/flux/rc3 Exited (rc=0) 0.2s
broker.info[0]: rc3-success: finalize->goodbye 0.212425s
broker.info[0]: goodbye: goodbye->exit 0.06917ms
如果你提交并请求四个任务,你将看到 “hello world” 四次:
python sample-flux-operator-job.py --tasks 4
...
broker.info[0]: quorum-full: quorum->run 23.5812s
hello world
hello world
hello world
hello world
你可以进一步自定义作业,并可以在 Flux Operator 问题板上提问。 最后,有关如何使用 YAML 在 Python 之外完成此操作的说明,请参见运行 Flux MiniCluster。
MPI Operator
对于此示例,我们将使用 MPI Operator 提交作业,并特别使用 Python SDK 来轻松完成此操作。鉴于我们在设置中创建的 Python 环境, 我们可以直接将其安装到其中,如下所示:
git clone --depth 1 https://github.com/kubeflow/mpi-operator /tmp/mpijob
cd /tmp/mpijob/sdk/python/v2beta1
python setup.py install
cd -
重要的是,MPI Operator 必须在 Kueue 之前安装才能正常工作!让我们从头开始,使用一个新的 Kind 集群。 我们还需要安装 MPI operator和 Kueue。 在这里我们安装确切版本测试此示例:
kubectl apply -f https://github.com/kubeflow/mpi-operator/releases/download/v0.4.0/mpi-operator.yaml
kubectl apply -f https://github.com/kubernetes-sigs/kueue/releases/download/v0.4.0/manifests.yaml
请检查 mpi-operator 发布页面和 Kueue 发布页面获取其他版本。 你需要等待 Kueue 准备就绪。你可以通过以下方式确定:
# 等待直到你看到 kueue-system 中的所有 Pod 都在运行
kubectl get pods -n kueue-system
当 Kueue 准备就绪时:
kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/kueue/main/site/static/examples/admin/single-clusterqueue-setup.yaml
现在尝试运行示例 MPIJob。
python sample-mpijob.py
📦️ 容器镜像已选择为 mpioperator/mpi-pi:openmpi...
⭐️ 正在创建示例作业,前缀为 pi...
使用:
"kubectl get queue" 查看队列分配
"kubectl get jobs" 查看作业
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
import mpijob.models as models
# sample-mpijob.py
# This example will demonstrate full steps to submit a Job via the MPI Operator
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue MPI Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job (job prefix does not work here)",
default="pi",
)
parser.add_argument(
"--image",
help="container image to use",
default="mpioperator/mpi-pi:openmpi",
)
parser.add_argument(
"--command",
help="command to run",
default="mpirun",
)
parser.add_argument(
"--args",
nargs="+",
help="args for container",
default=["-n", "2", "/home/mpiuser/pi"],
)
return parser
def generate_job_crd(job_name, image, command, args):
"""
Generate an equivalent job CRD to sample-job.yaml
"""
metadata = client.V1ObjectMeta(
name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
)
# containers for launcher and worker
launcher_container = client.V1Container(
image=image,
name="mpi-launcher",
command=[command],
args=args,
security_context=client.V1SecurityContext(run_as_user=1000),
resources={
"limits": {
"cpu": 1,
"memory": "1Gi",
}
},
)
worker_container = client.V1Container(
image=image,
name="mpi-worker",
command=["/usr/sbin/sshd"],
args=["-De", "-f", "/home/mpiuser/.sshd_config"],
security_context=client.V1SecurityContext(run_as_user=1000),
resources={
"limits": {
"cpu": 1,
"memory": "1Gi",
}
},
)
# Create the Launcher and worker replica specs
launcher = models.V2beta1ReplicaSpec(
replicas=1,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[launcher_container])
),
)
worker = models.V2beta1ReplicaSpec(
replicas=2,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[worker_container])
),
)
# runPolicy for jobspec
policy = models.V2beta1RunPolicy(
clean_pod_policy="Running", ttl_seconds_after_finished=60
)
# Create the jobspec
jobspec = models.V2beta1MPIJobSpec(
slots_per_worker=1,
run_policy=policy,
ssh_auth_mount_path="/home/mpiuser/.ssh",
mpi_replica_specs={"Launcher": launcher, "Worker": worker},
)
return models.V2beta1MPIJob(
metadata=metadata,
api_version="kubeflow.org/v2beta1",
kind="MPIJob",
spec=jobspec,
)
def main():
"""
Run an MPIJob. This requires the MPI Operator to be installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
# Generate a CRD spec
crd = generate_job_crd(args.job_name, args.image, args.command, args.args)
crd_api = client.CustomObjectsApi()
print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="kubeflow.org",
version="v2beta1",
namespace="default",
plural="mpijobs",
body=crd,
)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
)
if __name__ == "__main__":
main()
提交后,你可以看到队列具有一个接纳的作业!
$ kubectl get queue
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
并且作业 “pi-launcher” 已启动:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
pi-launcher 0/1 9s 9s
MPI Operator 通过中央 launcher 与节点通过 ssh 交互。我们可以检查 一个 worker 和 launcher 来了解两者的工作原理:
$ kubectl logs pods/pi-worker-1
Server listening on 0.0.0.0 port 22.
Server listening on :: port 22.
Accepted publickey for mpiuser from 10.244.0.8 port 51694 ssh2: ECDSA SHA256:rgZdwufXolOkUPA1w0bf780BNJC8e4/FivJb1/F7OOI
Received disconnect from 10.244.0.8 port 51694:11: disconnected by user
Disconnected from user mpiuser 10.244.0.8 port 51694
Received signal 15; terminating.
作业运行较快,我们可以看到 launcher 的输出:
$ kubectl logs pods/pi-launcher-f4gqv
Warning: Permanently added 'pi-worker-0.pi-worker.default.svc,10.244.0.7' (ECDSA) to the list of known hosts.
Warning: Permanently added 'pi-worker-1.pi-worker.default.svc,10.244.0.9' (ECDSA) to the list of known hosts.
Rank 1 on host pi-worker-1
Workers: 2
Rank 0 on host pi-worker-0
pi is approximately 3.1410376000000002
看起来像是 pi!🎉️🥧️ 如果你有兴趣在 Python 之外使用 YAML 运行此示例,请参见运行 MPIJob。
反馈
这个页面有帮助吗?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.