Source code for tesk.services.pvc
import logging
import os
from kubernetes import client
from kubernetes.client.exceptions import ApiException
from tesk.services.exceptions import ServiceStatusCodes
from tesk.services.utils import pprint
[docs]
class PVC:
def __init__(self, name='task-pvc', size_gb=1, namespace='default'):
self.name = name
self.spec = {
'apiVersion': 'v1',
'kind': 'PersistentVolumeClaim',
'metadata': {'name': name},
'spec': {
'accessModes': ['ReadWriteOnce'],
'resources': {'requests': {'storage': f'{str(size_gb)}Gi'}},
},
}
self.subpath_idx = 0
self.namespace = namespace
self.cv1 = client.CoreV1Api()
# The environment variable 'TESK_API_TASKMASTER_ENVIRONMENT_STORAGE_CLASS_NAME'
# can be set to the preferred, non-default, user-defined storageClass
if os.environ.get('STORAGE_CLASS_NAME') is not None:
self.spec['spec'].update(
{'storageClassName': os.environ.get('STORAGE_CLASS_NAME')}
)
[docs]
def get_subpath(self):
subpath = f'dir{str(self.subpath_idx)}'
self.subpath_idx += 1
return subpath
[docs]
def create(self):
logging.debug('Creating PVC...')
logging.debug(pprint(self.spec))
try:
return self.cv1.create_namespaced_persistent_volume_claim(
self.namespace, self.spec
)
except ApiException as ex:
if ex.status == ServiceStatusCodes.CONFLICT:
logging.debug(f'Reading existing PVC: {self.name}')
return self.cv1.read_namespaced_persistent_volume_claim(
self.name, self.namespace
)
else:
logging.debug(ex.body)
raise ApiException(ex.status, ex.reason) from None
[docs]
def delete(self):
cv1 = client.CoreV1Api()
cv1.delete_namespaced_persistent_volume_claim(
self.name, self.namespace, body=client.V1DeleteOptions()
)