Run Jobs Using Python
This guide is for batch users that have a basic understanding of interacting with Kubernetes from Python. For more information, see Kueue’s overview.
Before you begin
Check administer cluster quotas for details on the initial cluster setup. You’ll also need kubernetes python installed. We recommend a virtual environment.
python -m venv env
source env/bin/activate
pip install kubernetes requests
Note that the following versions were used for developing these examples:
- Python: 3.9.12
- kubernetes: 26.1.0
- requests: 2.31.0
You can either follow the install instructions for Kueue, or use the install example, below.
Kueue in Python
Kueue at the core is a controller for a Custom Resource, and so to interact with it from Python we don’t need a custom SDK, but rather we can use the generic functions provided by the Kubernetes Python library. In this guide, we provide several examples for interacting with Kueue in this fashion. If you would like to request a new example or would like help for a specific use case, please open an issue.
Examples
The following examples demonstrate different use cases for using Kueue in Python.
Install Kueue
This example demonstrates installing Kueue to an existing cluster. You can save this
script to your local machine as install-kueue-queues.py
.
#!/usr/bin/env python3
from kubernetes import utils, config, client
import tempfile
import requests
import argparse
# install-kueue-queues.py will:
# 1. install queue from the latest or a specific version on GitHub
# This example will demonstrate installing Kueue and applying a YAML file (local) to install Kueue
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--version",
help="Version of Kueue to install (if undefined, will install from master branch)",
default=None,
)
return parser
def main():
"""
Install Kueue and the Queue components.
This will error if they are already installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
install_kueue(args.version)
def get_install_url(version):
"""
Get the install version.
If a version is specified, use it. Otherwise install
from the main branch.
"""
if version is not None:
return f"https://github.com/kubernetes-sigs/kueue/releases/download/v{version}/manifests.yaml"
return "https://github.com/kubernetes-sigs/kueue/config/default?ref=main"
def install_kueue(version):
"""
Install Kueue of a particular version.
"""
print("⭐️ Installing Kueue...")
url = get_install_url(version)
with tempfile.NamedTemporaryFile(delete=True) as install_yaml:
res = requests.get(url)
assert res.status_code == 200
install_yaml.write(res.content)
utils.create_from_yaml(api_client, install_yaml.name)
if __name__ == "__main__":
main()
And then run as follows:
python install-kueue-queues.py
⭐️ Installing Kueue...
⭐️ Applying queues from single-clusterqueue-setup.yaml...
You can also target a specific version:
python install-kueue-queues.py --version v0.10.1
Sample Job
For the next example, let’s start with a cluster with Kueue installed, and first create our queues:
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
# create_job.py
# This example will demonstrate full steps to submit a Job.
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job",
default="sample-job-",
)
parser.add_argument(
"--image",
help="container image to use",
default="gcr.io/k8s-staging-perf-tests/sleep:v0.1.0",
)
parser.add_argument(
"--args",
nargs="+",
help="args for container",
default=["30s"],
)
return parser
def generate_job_crd(job_name, image, args):
"""
Generate an equivalent job CRD to sample-job.yaml
"""
metadata = client.V1ObjectMeta(
generate_name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
)
# Job container
container = client.V1Container(
image=image,
name="dummy-job",
args=args,
resources={
"requests": {
"cpu": 1,
"memory": "200Mi",
}
},
)
# Job template
template = {"spec": {"containers": [container], "restartPolicy": "Never"}}
return client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=metadata,
spec=client.V1JobSpec(
parallelism=1, completions=3, suspend=True, template=template
),
)
def main():
"""
Run a job.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
# Generate a CRD spec
crd = generate_job_crd(args.job_name, args.image, args.args)
batch_api = client.BatchV1Api()
print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
batch_api.create_namespaced_job("default", crd)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
)
if __name__ == "__main__":
main()
And run as follows:
python sample-job.py
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sample-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
or try changing the name (generateName
) of the job:
python sample-job.py --job-name sleep-job-
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sleep-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
You can also change the container image with --image
and args with --args
.
For more customization, you can edit the example script.
Interact with Queues and Jobs
If you are developing an application that submits jobs and needs to interact
with and check on them, you likely want to interact with queues or jobs directly.
After running the example above, you can test the following example to interact
with the results. Write the following to a script called sample-queue-control.py
.
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
# sample-queue-control.py
# This will show how to interact with queues
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Interact with Queues e",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--namespace",
help="namespace to list for",
default="default",
)
return parser
def main():
"""
Get a listing of jobs in the queue
"""
parser = get_parser()
args, _ = parser.parse_known_args()
listing = crd_api.list_namespaced_custom_object(
group="kueue.x-k8s.io",
version="v1beta1",
namespace=args.namespace,
plural="localqueues",
)
list_queues(listing)
listing = crd_api.list_namespaced_custom_object(
group="batch",
version="v1",
namespace=args.namespace,
plural="jobs",
)
list_jobs(listing)
def list_jobs(listing):
"""
Iterate and show job metadata.
"""
if not listing:
print("💼️ There are no jobs.")
return
print("\n💼️ Jobs")
for job in listing["items"]:
jobname = job["metadata"]["name"]
status = (
"TBA" if "succeeded" not in job["status"] else job["status"]["succeeded"]
)
ready = job["status"]["ready"]
print(f"Found job {jobname}")
print(f" Succeeded: {status}")
print(f" Ready: {ready}")
def list_queues(listing):
"""
Helper function to iterate over and list queues.
"""
if not listing:
print("⛑️ There are no queues.")
return
print("\n⛑️ Local Queues")
# This is listing queues
for q in listing["items"]:
print(f'Found queue {q["metadata"]["name"]}')
print(f" Admitted workloads: {q['status']['admittedWorkloads']}")
print(f" Pending workloads: {q['status']['pendingWorkloads']}")
# And flavors with resources
for f in q["status"]["flavorUsage"]:
print(f' Flavor {f["name"]} has resources {f["resources"]}')
if __name__ == "__main__":
main()
To make the output more interesting, we can run a few random jobs first:
python sample-job.py
python sample-job.py
python sample-job.py --job-name tacos
And then run the script to see your queue and sample job that you submit previously.
python sample-queue-control.py
⛑️ Local Queues
Found queue user-queue
Admitted workloads: 3
Pending workloads: 0
Flavor default-flavor has resources [{'name': 'cpu', 'total': '3'}, {'name': 'memory', 'total': '600Mi'}]
💼️ Jobs
Found job sample-job-8n5sb
Succeeded: 3
Ready: 0
Found job sample-job-gnxtl
Succeeded: 1
Ready: 0
Found job tacos46bqw
Succeeded: 1
Ready: 1
If you wanted to filter jobs to a specific queue, you can do this via the job labels under `job[“metadata”][“labels”][“kueue.x-k8s.io/queue-name”]’. To list a specific job by name, you can do:
from kubernetes import client, config
# Interact with batch
config.load_kube_config()
batch_api = client.BatchV1Api()
# This is providing the name, and namespace
job = batch_api.read_namespaced_job("tacos46bqw", "default")
print(job)
See the BatchV1 API documentation for more calls.
Flux Operator Job
For this example, we will be using the Flux Operator to submit a job, and specifically using the Python SDK to do this easily. Given our Python environment created in the setup, we can install this Python SDK directly to it as follows:
pip install fluxoperator
We will also need to install the Flux operator.
kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml
Write the following script to sample-flux-operator-job.py
:
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
import fluxoperator.models as models
# sample-flux-operator.py
# This example will demonstrate full steps to submit a Job via the Flux Operator.
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Flux Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job (job prefix does not work here)",
default="hello-world",
)
parser.add_argument(
"--image",
help="container image to use",
default="ghcr.io/flux-framework/flux-restful-api",
)
parser.add_argument(
"--tasks",
help="Number of tasks",
default=1,
type=int,
)
parser.add_argument(
"--quiet",
help="Do not show extra flux output (only hello worlds!)",
action="store_true",
default=False,
)
parser.add_argument(
"--command",
help="command to run",
default="echo",
)
parser.add_argument(
"--args", nargs="+", help="args for container", default=["hello", "world"]
)
return parser
def generate_minicluster_crd(job_name, image, command, args, quiet=False, tasks=1):
"""
Generate a minicluster CRD
"""
container = models.MiniClusterContainer(
command=command + " " + " ".join(args),
resources={
"limits": {
"cpu": 1,
"memory": "2Gi",
}
},
)
# 4 pods and 4 tasks will echo hello-world x 4
spec = models.MiniClusterSpec(
job_labels={"kueue.x-k8s.io/queue-name": "user-queue"},
containers=[container],
size=4,
tasks=tasks,
logging={"quiet": quiet},
)
return models.MiniCluster(
kind="MiniCluster",
api_version="flux-framework.org/v1alpha1",
metadata=client.V1ObjectMeta(
generate_name=job_name,
namespace="default",
),
spec=spec,
)
def main():
"""
Run an example job using the Flux Operator.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
# Generate a CRD spec
minicluster = generate_minicluster_crd(
args.job_name, args.image, args.command, args.args, args.quiet, args.tasks
)
crd_api = client.CustomObjectsApi()
print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="flux-framework.org",
version="v1alpha1",
namespace="default",
plural="miniclusters",
body=minicluster,
)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get pods" to see pods'
)
if __name__ == "__main__":
main()
Now try running the example:
python sample-flux-operator-job.py
📦️ Container image selected is ghcr.io/flux-framework/flux-restful-api...
⭐️ Creating sample job with prefix hello-world...
Use:
"kubectl get queue" to see queue assignment
"kubectl get pods" to see pods
You’ll be able to almost immediately see the MiniCluster job admitted to the local queue:
kubectl get queue
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
And the 4 pods running (we are creating a networked cluster with 4 nodes):
kubectl get pods
NAME READY STATUS RESTARTS AGE
hello-world7qgqd-0-wp596 1/1 Running 0 7s
hello-world7qgqd-1-d7r87 1/1 Running 0 7s
hello-world7qgqd-2-rfn4t 1/1 Running 0 7s
hello-world7qgqd-3-blvtn 1/1 Running 0 7s
If you look at logs of the main broker pod (index 0 of the job above), there is a lot of output for debugging, and you can see “hello world” running at the end:
kubectl logs hello-world7qgqd-0-wp596
Flux Operator Lead Broker Output
🌀 Submit Mode: flux start -o --config /etc/flux/config -Scron.directory=/etc/flux/system/cron.d -Stbon.fanout=256 -Srundir=/run/flux -Sstatedir=/var/lib/flux -Slocal-uri=local:///run/flux/local -Slog-stderr-level=6 -Slog-stderr-mode=local flux submit -n 1 --quiet --watch echo hello world
broker.info[0]: start: none->join 0.399725ms
broker.info[0]: parent-none: join->init 0.030894ms
cron.info[0]: synchronizing cron tasks to event heartbeat.pulse
job-manager.info[0]: restart: 0 jobs
job-manager.info[0]: restart: 0 running jobs
job-manager.info[0]: restart: checkpoint.job-manager not found
broker.info[0]: rc1.0: running /etc/flux/rc1.d/01-sched-fluxion
sched-fluxion-resource.info[0]: version 0.27.0-15-gc90fbcc2
sched-fluxion-resource.warning[0]: create_reader: allowlist unsupported
sched-fluxion-resource.info[0]: populate_resource_db: loaded resources from core's resource.acquire
sched-fluxion-qmanager.info[0]: version 0.27.0-15-gc90fbcc2
broker.info[0]: rc1.0: running /etc/flux/rc1.d/02-cron
broker.info[0]: rc1.0: /etc/flux/rc1 Exited (rc=0) 0.5s
broker.info[0]: rc1-success: init->quorum 0.485239s
broker.info[0]: online: hello-world7qgqd-0 (ranks 0)
broker.info[0]: online: hello-world7qgqd-[0-3] (ranks 0-3)
broker.info[0]: quorum-full: quorum->run 0.354587s
hello world
broker.info[0]: rc2.0: flux submit -n 1 --quiet --watch echo hello world Exited (rc=0) 0.3s
broker.info[0]: rc2-success: run->cleanup 0.308392s
broker.info[0]: cleanup.0: flux queue stop --quiet --all --nocheckpoint Exited (rc=0) 0.1s
broker.info[0]: cleanup.1: flux cancel --user=all --quiet --states RUN Exited (rc=0) 0.1s
broker.info[0]: cleanup.2: flux queue idle --quiet Exited (rc=0) 0.1s
broker.info[0]: cleanup-success: cleanup->shutdown 0.252899s
broker.info[0]: children-complete: shutdown->finalize 47.6699ms
broker.info[0]: rc3.0: running /etc/flux/rc3.d/01-sched-fluxion
broker.info[0]: rc3.0: /etc/flux/rc3 Exited (rc=0) 0.2s
broker.info[0]: rc3-success: finalize->goodbye 0.212425s
broker.info[0]: goodbye: goodbye->exit 0.06917ms
If you submit and ask for four tasks, you’ll see “hello world” four times:
python sample-flux-operator-job.py --tasks 4
...
broker.info[0]: quorum-full: quorum->run 23.5812s
hello world
hello world
hello world
hello world
You can further customize the job, and can ask questions on the Flux Operator issues board. Finally, for instructions for how to do this with YAML outside of Python, see Run A Flux MiniCluster.
MPI Operator Job
For this example, we will be using the MPI Operator to submit a job, and specifically using the Python SDK to do this easily. Given our Python environment created in the setup, we can install this Python SDK directly to it as follows:
git clone --depth 1 https://github.com/kubeflow/mpi-operator /tmp/mpijob
cd /tmp/mpijob/sdk/python/v2beta1
python setup.py install
cd -
Importantly, the MPI Operator must be installed before Kueue for this to work! Let’s start from scratch with a new Kind cluster. We will also need to install the MPI operator and Kueue. Here we install the exact versions tested with this example:
kubectl apply -f https://github.com/kubeflow/mpi-operator/releases/download/v0.4.0/mpi-operator.yaml
kubectl apply -f https://github.com/kubernetes-sigs/kueue/releases/download/v0.4.0/manifests.yaml
Check the mpi-operator release page and Kueue release page for alternate versions. You need to wait until Kueue is ready. You can determine this as follows:
# Wait until you see all pods in the kueue-system are Running
kubectl get pods -n kueue-system
When Kueue is ready:
kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/kueue/main/site/static/examples/admin/single-clusterqueue-setup.yaml
Now try running the example MPIJob.
python sample-mpijob.py
📦️ Container image selected is mpioperator/mpi-pi:openmpi...
⭐️ Creating sample job with prefix pi...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
import mpijob.models as models
# sample-mpijob.py
# This example will demonstrate full steps to submit a Job via the MPI Operator
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue MPI Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job (job prefix does not work here)",
default="pi",
)
parser.add_argument(
"--image",
help="container image to use",
default="mpioperator/mpi-pi:openmpi",
)
parser.add_argument(
"--command",
help="command to run",
default="mpirun",
)
parser.add_argument(
"--args",
nargs="+",
help="args for container",
default=["-n", "2", "/home/mpiuser/pi"],
)
return parser
def generate_job_crd(job_name, image, command, args):
"""
Generate an equivalent job CRD to sample-job.yaml
"""
metadata = client.V1ObjectMeta(
name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
)
# containers for launcher and worker
launcher_container = client.V1Container(
image=image,
name="mpi-launcher",
command=[command],
args=args,
security_context=client.V1SecurityContext(run_as_user=1000),
resources={
"limits": {
"cpu": 1,
"memory": "1Gi",
}
},
)
worker_container = client.V1Container(
image=image,
name="mpi-worker",
command=["/usr/sbin/sshd"],
args=["-De", "-f", "/home/mpiuser/.sshd_config"],
security_context=client.V1SecurityContext(run_as_user=1000),
resources={
"limits": {
"cpu": 1,
"memory": "1Gi",
}
},
)
# Create the Launcher and worker replica specs
launcher = models.V2beta1ReplicaSpec(
replicas=1,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[launcher_container])
),
)
worker = models.V2beta1ReplicaSpec(
replicas=2,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[worker_container])
),
)
# runPolicy for jobspec
policy = models.V2beta1RunPolicy(
clean_pod_policy="Running", ttl_seconds_after_finished=60
)
# Create the jobspec
jobspec = models.V2beta1MPIJobSpec(
slots_per_worker=1,
run_policy=policy,
ssh_auth_mount_path="/home/mpiuser/.ssh",
mpi_replica_specs={"Launcher": launcher, "Worker": worker},
)
return models.V2beta1MPIJob(
metadata=metadata,
api_version="kubeflow.org/v2beta1",
kind="MPIJob",
spec=jobspec,
)
def main():
"""
Run an MPIJob. This requires the MPI Operator to be installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
# Generate a CRD spec
crd = generate_job_crd(args.job_name, args.image, args.command, args.args)
crd_api = client.CustomObjectsApi()
print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="kubeflow.org",
version="v2beta1",
namespace="default",
plural="mpijobs",
body=crd,
)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
)
if __name__ == "__main__":
main()
After submit, you can see that the queue has an admitted workload!
$ kubectl get queue
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
And that the job “pi-launcher” has started:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
pi-launcher 0/1 9s 9s
The MPI Operator works by way of a central launcher interacting with nodes via ssh. We can inspect a worker and the launcher to get a glimpse of how both work:
$ kubectl logs pods/pi-worker-1
Server listening on 0.0.0.0 port 22.
Server listening on :: port 22.
Accepted publickey for mpiuser from 10.244.0.8 port 51694 ssh2: ECDSA SHA256:rgZdwufXolOkUPA1w0bf780BNJC8e4/FivJb1/F7OOI
Received disconnect from 10.244.0.8 port 51694:11: disconnected by user
Disconnected from user mpiuser 10.244.0.8 port 51694
Received signal 15; terminating.
The job is fairly quick, and we can see the output of pi in the launcher:
$ kubectl logs pods/pi-launcher-f4gqv
Warning: Permanently added 'pi-worker-0.pi-worker.default.svc,10.244.0.7' (ECDSA) to the list of known hosts.
Warning: Permanently added 'pi-worker-1.pi-worker.default.svc,10.244.0.9' (ECDSA) to the list of known hosts.
Rank 1 on host pi-worker-1
Workers: 2
Rank 0 on host pi-worker-0
pi is approximately 3.1410376000000002
That looks like pi! 🎉️🥧️ If you are interested in running this same example with YAML outside of Python, see Run an MPIJob.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.