Source code for k9.batch
from kubernetes import client
from k9.core import get_default_namespace
###########################################################################
# Cron Jobs
###########################################################################
[docs]def list_cron_jobs(namespace: str = None):
"""
List all cluster cron jobs
:param namespace: Then namespace to list cron jobs from. If None, uses the default namespace.
:return: A list of dictionary items with **name** and **created**.
"""
if namespace is None:
namespace = get_default_namespace()
return [
{
'name': cj.metadata.name,
'created': cj.metadata.creation_timestamp
}
for cj in client.BatchV1beta1Api().list_namespaced_cron_job(namespace).items
]
[docs]def create_cron_job(body: dict, namespace: str = None):
"""
Create a cron job
:param body: The cron job definition object.
:param namespace: Then namespace to create cronjob in. If None, uses the default namespace.
:return: `V1beta1CronJob <https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1beta1CronJob.md>`_
"""
if namespace is None:
namespace = get_default_namespace()
return client.BatchV1beta1Api().create_namespaced_cron_job(namespace, body)
[docs]def delete_cron_job(name: str, namespace: str = None):
"""
Deletes the specified cron job
:param name: Name of cron job
:param namespace: Then namespace to delete cron job from. If None, uses the default namespace.
:return: None if cron job doesn't exist, `V1Status <https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1Status.md>`_
"""
if namespace is None:
namespace = get_default_namespace()
if cron_job_exists(name, namespace):
return client.BatchV1beta1Api().delete_namespaced_cron_job(name, namespace)
else:
return None
[docs]def get_cron_job(name: str, namespace: str = None):
"""
Gets the specified cron job
:param name: Name of cron job
:param namespace: Then namespace to get cron job from. If None, uses the default namespace.
:return: `V1beta1CronJob <https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1beta1CronJob.md>`_
"""
if namespace is None:
namespace = get_default_namespace()
return client.BatchV1beta1Api().read_namespaced_cron_job(name, namespace)
[docs]def cron_job_exists(name: str, namespace: str = None):
"""
Checks for the cron job's existence.
:param name: Name of cron job to look for.
:param namespace: Then namespace to check for cron job. If None, uses the default namespace.
:return: True if specified cron job exists
"""
try:
if namespace is None:
namespace = get_default_namespace()
result = get_cron_job(name, namespace)
return result.metadata.name == name
except:
return False