#!/usr/bin/env python3
import argparse
import gzip
import json
import logging
import os
import re
import sys
from kubernetes import config
from tesk.services.filer_class import Filer
from tesk.services.job import Job
from tesk.services.pvc import PVC
created_jobs = []
poll_interval = 5
task_volume_basename = 'task-volume'
args = None
logger = None
[docs]
def run_executor(executor, namespace, pvc=None):
jobname = executor['metadata']['name']
spec = executor['spec']['template']['spec']
if os.environ.get('EXECUTOR_BACKOFF_LIMIT') is not None:
executor['spec'].update(
{'backoffLimit': int(os.environ['EXECUTOR_BACKOFF_LIMIT'])}
)
if 'restartPolicy' not in spec.keys() and \
'restart_policy' in spec.keys():
spec['restartPolicy'] = spec['restart_policy']
for container in spec['containers']:
if 'limits' not in container['resources'].keys():
container['resources']['limits'] = None
if container['resources']['limits'] is None and \
('requests' in container['resources'].keys() and \
container['resources']['requests'] is not None):
container['resources']['limits'] = container['resources']['requests']
if pvc is not None:
mounts = spec['containers'][0].setdefault('volumeMounts', [])
mounts.extend(pvc.volume_mounts)
volumes = spec.setdefault('volumes', [])
volumes.extend(
[
{
'name': task_volume_basename,
'persistentVolumeClaim': {'readonly': False, 'claimName': pvc.name},
}
]
)
logger.debug(f'Created job: {jobname}')
job = Job(executor, jobname, namespace)
logger.debug(f'Job spec: {str(job.body)}')
created_jobs.append(job)
status = job.run_to_completion(poll_interval, check_cancelled, args.pod_timeout)
if status != 'Complete':
if status == 'Error':
job.delete()
exit_cancelled(f'Got status {status}')
# TODO move this code to PVC class
[docs]
def append_mount(volume_mounts, name, path, pvc):
# Checks all mount paths in volume_mounts if the path given is already in
# there
duplicate = next(
(mount for mount in volume_mounts if mount['mountPath'] == path), None
)
# If not, add mount path
if duplicate is None:
subpath = pvc.get_subpath()
logger.debug(' '.join([f'appending{name}at path{path}with subPath:{subpath}']))
volume_mounts.append({'name': name, 'mountPath': path, 'subPath': subpath})
[docs]
def dirname(iodata):
if iodata['type'] == 'FILE':
# strip filename from path
r = '(.*)/'
dirname = re.match(r, iodata['path'])[1]
logger.debug('dirname of ' + iodata['path'] + 'is: ' + dirname)
elif iodata['type'] == 'DIRECTORY':
dirname = iodata['path']
return dirname
[docs]
def generate_mounts(data, pvc):
volume_mounts = []
# gather volumes that need to be mounted, without duplicates
volume_name = task_volume_basename
for volume in data['volumes']:
append_mount(volume_mounts, volume_name, volume, pvc)
# gather other paths that need to be mounted from inputs/outputs FILE and
# DIRECTORY entries
for aninput in data['inputs']:
dirnm = dirname(aninput)
append_mount(volume_mounts, volume_name, dirnm, pvc)
for anoutput in data['outputs']:
dirnm = dirname(anoutput)
append_mount(volume_mounts, volume_name, dirnm, pvc)
return volume_mounts
[docs]
def init_pvc(data, filer):
task_name = data['executors'][0]['metadata']['labels']['taskmaster-name']
pvc_name = f'{task_name}-pvc'
pvc_size = data['resources']['disk_gb']
pvc = PVC(pvc_name, pvc_size, args.namespace)
mounts = generate_mounts(data, pvc)
logging.debug(mounts)
logging.debug(type(mounts))
pvc.set_volume_mounts(mounts)
filer.add_volume_mount(pvc)
pvc.create()
# to global var for cleanup purposes
global created_pvc # noqa: PLW0603
created_pvc = pvc
if os.environ.get('NETRC_SECRET_NAME') is not None:
filer.add_netrc_mount(os.environ.get('NETRC_SECRET_NAME'))
filerjob = Job(
filer.get_spec('inputs', args.debug),
f'{task_name}-inputs-filer',
args.namespace,
)
created_jobs.append(filerjob)
# filerjob.run_to_completion(poll_interval)
status = filerjob.run_to_completion(
poll_interval, check_cancelled, args.pod_timeout
)
if status != 'Complete':
exit_cancelled(f'Got status {status}')
return pvc
[docs]
def run_task(data, filer_name, filer_version, have_json_pvc=False):
task_name = data['executors'][0]['metadata']['labels']['taskmaster-name']
pvc = None
json_pvc = task_name if have_json_pvc else None
if data['volumes'] or data['inputs'] or data['outputs']:
filer = Filer(
f'{task_name}-filer',
data,
filer_name,
filer_version,
args.pull_policy_always,
json_pvc,
)
if os.environ.get('TESK_FTP_USERNAME') is not None:
filer.set_ftp(
os.environ['TESK_FTP_USERNAME'], os.environ['TESK_FTP_PASSWORD']
)
if os.environ.get('FILER_BACKOFF_LIMIT') is not None:
filer.set_backoffLimit(int(os.environ['FILER_BACKOFF_LIMIT']))
pvc = init_pvc(data, filer)
for executor in data['executors']:
run_executor(executor, args.namespace, pvc)
# run executors
logging.debug('Finished running executors')
# upload files and delete pvc
if data['volumes'] or data['inputs'] or data['outputs']:
filerjob = Job(
filer.get_spec('outputs', args.debug),
f'{task_name}-outputs-filer',
args.namespace,
)
created_jobs.append(filerjob)
# filerjob.run_to_completion(poll_interval)
status = filerjob.run_to_completion(
poll_interval, check_cancelled, args.pod_timeout
)
if status != 'Complete':
exit_cancelled(f'Got status {status}')
else:
pvc.delete()
[docs]
def newParser():
parser = argparse.ArgumentParser(description='TaskMaster main module')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'json',
help='string containing json TES request, required if -f is not given',
nargs='?',
)
group.add_argument(
'-f',
'--file',
help="TES request as a file or '-' for stdin, required if json is not given",
)
parser.add_argument('-p', '--poll-interval', help='Job polling interval', default=5)
parser.add_argument(
'-pt', '--pod-timeout', type=int, help='Pod creation timeout', default=240
)
parser.add_argument(
'-fn',
'--filer-name',
help='Filer image version',
default='eu.gcr.io/tes-wes/filer',
)
parser.add_argument(
'-fv', '--filer-version', help='Filer image version', default='v0.1.9'
)
parser.add_argument(
'-n', '--namespace', help='Kubernetes namespace to run in', default='default'
)
parser.add_argument(
'-s',
'--state-file',
help='State file for state.py script',
default='/tmp/.teskstate', # nosec: B108, false positive
)
parser.add_argument('-d', '--debug', help='Set debug mode', action='store_true')
parser.add_argument(
'--localKubeConfig',
help='Read k8s configuration from localhost',
action='store_true',
)
parser.add_argument(
'--pull-policy-always',
help="set imagePullPolicy = 'Always'",
action='store_true',
)
return parser
[docs]
def newLogger(loglevel):
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%m/%d/%Y %I:%M:%S',
level=loglevel,
)
logging.getLogger('kubernetes.client').setLevel(logging.CRITICAL)
return logging.getLogger(__name__)
[docs]
def main():
have_json_pvc = False
parser = newParser()
global args # noqa: PLW0603
args = parser.parse_args()
loglevel = logging.DEBUG if args.debug else logging.ERROR
global logger # noqa: PLW0603
logger = newLogger(loglevel)
logger.debug('Starting taskmaster')
# Get input JSON
if args.file is None:
data = json.loads(args.json)
elif args.file == '-':
data = json.load(sys.stdin)
elif args.file.endswith('.gz'):
with gzip.open(args.file, 'rb') as fh:
data = json.loads(fh.read())
have_json_pvc = True
else:
with open(args.file) as fh:
data = json.load(fh)
# Load kubernetes config file
if args.localKubeConfig:
config.load_kube_config()
else:
config.load_incluster_config()
global created_pvc # noqa: PLW0603
created_pvc = None
# Check if we're cancelled during init
if check_cancelled():
exit_cancelled('Cancelled during init')
run_task(data, args.filer_name, args.filer_version, have_json_pvc)
[docs]
def clean_on_interrupt():
logger.debug('Caught interrupt signal, deleting jobs and pvc')
for job in created_jobs:
job.delete()
[docs]
def exit_cancelled(reason='Unknown reason'):
logger.error(f'Cancelling taskmaster: {reason}')
sys.exit(0)
[docs]
def check_cancelled():
labelInfoFile = '/podinfo/labels'
if not os.path.exists(labelInfoFile):
return False
with open(labelInfoFile) as fh:
for line in fh:
name, label = line.split('=')
logging.debug(f'Got label: {label}')
if label == '"Cancelled"':
return True
return False
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
clean_on_interrupt()