import base64
import json
import os, time
import pathlib
import subprocess
from kubernetes import client, config as kube_config
import requests
from urllib3.exceptions import InsecureRequestWarning
from jinja2 import Environment, FileSystemLoader
from k9.helm import helm_install, helm_repo_add, helm_repo_update, helm_exists, helm_uninstall
from k9.core import namespace_exists, set_default_namespace, create_namespace, abs_path, list_pods, wait_for_pod, \
run_command, read_yaml, set_run_output, refresh_kubeconfig, get_secret, \
create_app_databases, delete_app_database, secret_exists, create_secret, connect_to_cluster, shell, render_template
from k9.storage import storage_class_exists
from k9.apps import deployment_exists, create_deployment
from aws import cluster, cfm, cert, ec2, secret, iam, util, rds, region
SIMON_CHARTS = 'https://charts.simoncomputing.com'
# util for jinja'ing XML
def write_templated_config(name: str, env, params):
if not isinstance(params, dict):
params = vars(params)
template = env.get_template(name)
template_body = template.render(params)
if not os.path.exists('./.output/config'):
if not os.path.exists('./.output'):
os.mkdir('./.output')
os.mkdir('./.output/config')
path = f'./.output/config/{name}'
f = open(path, 'w+')
f.write(template_body)
f.close()
return f'./.output/config/{name}'
def get_alb_controller_image_repo():
"""
ALB Controller installation needs to get its image from the same region the cluster is deployed in.
https://github.com/kubernetes-sigs/aws-load-balancer-controller/releases
:returns: ECR url for the current region to pass into helm value image.repo
"""
current_region = region.get_default_region()
repos = {
'us-gov-east-1': '151742754352.dkr.ecr.us-gov-east-1.amazonaws.com/amazon/aws-load-balancer-controller',
'us-gov-west-1': '013241004608.dkr.ecr.us-gov-west-1.amazonaws.com/amazon/aws-load-balancer-controller',
'us-east-1': '602401143452.dkr.ecr.us-east-1.amazonaws.com/amazon/aws-load-balancer-controller'
}
if current_region not in repos:
print('Using us-east-1 image for ALB Controller. If the install fails to ready, check '
'https://github.com/kubernetes-sigs/aws-load-balancer-controller/releases for your region and fix with'
'the following command.')
print('helm upgrade aws-load-balancer-controller --set image.repository=<REPO_WITHOUT_IMAGE_TAG> --reuse-values simoncomputing/aws-load-balancer-controller -n kube-system')
# vast majority of regions can use the same as us-east-1 (all non-gov us regions)
current_region = 'us-east-1'
return repos[current_region]
def install_aws_load_balancer_controller(params: dict):
"""
Installs the ALB controller helm chart and waits for the pods to ready.
:param params: a dictionary containing the params to insert into the chart's values.yaml
:returns: None on success. Error if install fails.
"""
if not helm_exists('aws-load-balancer-controller', 'kube-system'):
params['albRepoUrl'] = get_alb_controller_image_repo()
namespace = 'kube-system'
helm_install('simoncomputing/aws-load-balancer-controller', params,
values_path=abs_path('yaml/aws-load-balancer-controller-values.yml'), namespace=namespace,
debug=False)
alb_pod = ''
# get pod
attempts = 0
while attempts < 5:
time.sleep(15)
pods = list_pods(namespace)
for p in pods:
if p.get('name', '').find('aws-load-balancer-controller') != -1:
alb_pod = p.get('name', False)
break
attempts += 1
# wait for pod to be ready
if not wait_for_pod(alb_pod, namespace, timeout=300):
raise ValueError('aws-load-balancer-controller did not ready')
def install_aws_tools(params: dict):
"""
Installs charts/deployments from/for AWS resources. These are a storage class corresponding to EBS volumes,
EKS autoscaling parameters, and the ALB/ingress controller
:param params: a dictionary containing the params to insert into the charts' values.yaml
:return: True on success, exception if failure.
"""
set_run_output(False)
helm_repo_add('simoncomputing', SIMON_CHARTS)
helm_repo_update()
set_default_namespace('default')
set_run_output(True)
if not storage_class_exists('standard'):
print('storage class: "standard" not found. Creating standard storage class...')
set_run_output(False)
result = run_command('kubectl', 'apply', '-f', abs_path('yaml/default-sc.yml'))
set_run_output(True)
errors = result.stderr
if errors:
print('An error was encountered when creating the standard storage class.')
raise ValueError(errors)
print('Standard storage class successfully created.')
helm_install('simoncomputing/aws-storage', {}, release_name='aws-storage',
values_path=abs_path('yaml/aws-storage-values.yml'))
set_default_namespace('kube-system')
if not deployment_exists('cluster-autoscaler'):
env = Environment(loader=FileSystemLoader(abs_path('yaml/autoscaler')), autoescape=True)
run_command('kubectl', 'apply', '-f', abs_path('yaml/autoscaler/infra.yml'))
auto_deploy = read_yaml(write_templated_config('deploy.yml', env, params))
create_deployment(auto_deploy, 'kube-system')
install_aws_load_balancer_controller(params)
return True
[docs]def install_jenkins(params: dict, namespace='cicd'):
"""
Installs Jenkins to the current kubernetes environment.
:param params: a dictionary containing the params to insert into the jenkins chart's values.yaml
:param namespace: the namespace to install jenkins in. defaults to cluster-tools
:return: None on success, exception if failure.
"""
if helm_exists('jenkins', namespace):
return
helm_params = {
'clusterName': params.get('clusterName'),
'baseDomain': params.get('baseDomain'),
'certArn': params.get('certArn'),
}
install_aws_tools(helm_params)
set_run_output(False)
set_default_namespace(namespace)
if not namespace_exists(namespace):
create_namespace(namespace)
helm_repo_add('simoncomputing', SIMON_CHARTS)
helm_repo_update()
set_run_output(True)
# install chart
helm_install('simoncomputing/jenkins', params, release_name='jenkins',
values_path=abs_path('yaml/jenkins-values.yml'), namespace=namespace)
# get pod
time.sleep(1)
pods = list_pods(namespace)
jenkins_pod = False
for p in pods:
if p.get('name', '').find('jenkins') != -1:
jenkins_pod = p.get('name', False)
# wait for pod to be ready
if not wait_for_pod(jenkins_pod, namespace, timeout=300):
if jenkins_pod:
run_command('kubectl', 'describe', 'pod', jenkins_pod, '-n', namespace)
raise RuntimeError('Jenkins pod did not ready within 5 minutes. '
'Please verify that your params are complete and correct.')
[docs]def install_efk(params: dict, namespace: str = 'logging'):
"""
Installs EFK to the current kubernetes environment.
:param params: a dictionary containing the params to insert into the efk chart's values.yaml
:param namespace: the namespace to install efk in. defaults to cluster-tools
:return: None on success, exception if failure.
"""
if helm_exists('efk', namespace):
return
set_run_output(False)
set_default_namespace(namespace)
if not namespace_exists(namespace):
create_namespace(namespace)
helm_repo_add('simoncomputing', SIMON_CHARTS)
helm_repo_update()
set_run_output(True)
# install chart
helm_install('simoncomputing/efk', params, release_name='efk', values_path=abs_path('yaml/efk-values.yml'),
namespace=namespace)
# get pod
attempts = 0
while attempts < 5:
time.sleep(20)
pods = list_pods(namespace)
elastic_pod = False
for p in pods:
if p.get('name', '').find('elasticsearch') != -1:
elastic_pod = p.get('name', False)
break
attempts += 1
# wait for pod to be ready
if not elastic_pod or not wait_for_pod(elastic_pod, namespace, timeout=300):
raise RuntimeError('Elastic pod did not ready. Please verify that your params are complete and correct.')
for p in pods:
if p.get('name', '').find('kibana') != -1:
kibana_pod = p.get('name', False)
# wait for pod to be ready
if not wait_for_pod(kibana_pod, namespace, timeout=300):
raise RuntimeError('Kibana pod did not ready within 5 minutes. '
'Please verify that your params are complete and correct.')
def get_kibana_session(base_domain):
"""
Returns a request.Session() object with Kibana authorization and headers.
:param base_domain: baseDomain from cluster-config
:return: request.Session() object ready to be used for Kibana.
"""
kibana_secret = get_secret('elasticsearch-master-es-elastic-user', 'logging')
kibana_password = base64.decodebytes(bytes(kibana_secret.data['elastic'], 'ascii'))
session = requests.Session()
session.auth = ('elastic', kibana_password)
session.headers['kbn-xsrf'] = 'true'
session.headers['Host'] = f'kibana.{base_domain}'
session.verify = False
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
return session
def kibana_import_object(session, dns_name: str, file_path: str, overwrite=False, create_new=False):
"""
Makes an API call to Kibana to post an object.
:param session: a requests.session object with auth and headers already set up
:param dns_name: DNS name of the load balancer to route traffic to
:param file_path: absolute path to the object being posted
:param overwrite: Replace existing objects if there are conflicts.
:param create_new: Generate new ids if True. Takes precedent over overwrite parameter.
If not set and conflicts are found, raises error. Default is False.
:returns: True if upload succeeds. False otherwise.
"""
# route to load balancer
kibana_url = f'https://{dns_name}'
# wait for service to ready up
retry = 6
for _ in range(retry):
resp = session.get(kibana_url)
if resp.ok:
break
print('waiting for kibana to become available...')
print(resp.raw)
time.sleep(10)
import_url = os.path.join(kibana_url, 'api/saved_objects/_import')
if create_new:
params = {
'createNewCopies': create_new
}
else:
params = {
'overwrite': overwrite
}
with open(file_path, 'rb') as import_file:
files = {'file': (file_path, import_file)}
response = session.post(url=import_url, params=params, files=files)
print(response.status_code)
try:
response_body = response.json()
except requests.exceptions.JSONDecodeError as e:
print('Kibana upload failed')
print(response.reason)
print(str(e))
return False
if not response_body.get('success', False):
print(response_body)
return False
else:
print('Upload Success')
return True
def kibana_import_objects(combined_file_path, session, dns_name: str, file_paths, overwrite=False, create_new=False):
"""
Combines all given files into one combined.ndjson file and then calls kibana_import_object.
Files are combined to allow references across files. create_new=True changes the ids in file provided. However,
if the reference is within one file, kibana automatically handles changing the reference.
:param combined_file_path: The file location to place the combined ndjson file.
If the file exists, uploading is skipped.
:param session: a requests.session object with auth and headers already set up
:param dns_name: DNS name of the load balancer to route traffic to
:param file_paths: absolute path to the objects being uploaded
:param overwrite: Replace existing objects if there are conflicts.
:param create_new: Generate new ids if True. Takes precedent over overwrite parameter.
If not set and conflicts are found, raises error. Default is False.
:returns: True if upload succeeds. False otherwise.
"""
# do not create new dashboards if the combined file already exists, this will duplicate everything
if os.path.exists(combined_file_path):
return True
with open(combined_file_path, 'w+') as output:
for path in file_paths:
with open(path, 'r') as input_file:
output.write(input_file.read())
output.write('\n')
return kibana_import_object(session, dns_name, combined_file_path, overwrite, create_new)
def configure_kibana(base_domain: str, dns_name: str):
"""
Adds the standard cluster dashboard to Kibana.
:param dns_name: DNS name of the AWS load balancer
:param base_domain: the baseDomain from the cluster-config. Used to create Kibana urls.
:returns: True if all uploads succeed. False otherwise.
"""
session = get_kibana_session(base_domain)
# create cluster dashboard
filepath = abs_path('yaml/kibana/cluster-dashboard.ndjson')
return kibana_import_object(session, dns_name, filepath, overwrite=True)
def install_kibana_app_dashboards(app_dir: str, is_prd: bool):
"""
Installs boilerplate Kibana dashboards for the given app.
:param app_dir: The directory containing app_config.yml for the app.
:param is_prd: Whether the dashboard being created is for a production deployment
:returns: True if all uploads were successful. False otherwise.
"""
from k9.deploy import read_app_config
from k9.deploy import read_prd_app_config
app_config = read_prd_app_config(app_dir) if is_prd else read_app_config(app_dir)
app_name = app_config['appName']
file_names = ['boiler-dashboard', 'boiler-error-dashboard', 'boiler-api-dashboard', 'boiler-performance-dashboard']
output_file_paths = []
pathlib.Path(f'{app_dir}/values/kibana').mkdir(parents=True, exist_ok=True)
# create dashboard files from templates
for file_name in file_names:
# read boiler content
input_path = abs_path(f'yaml/kibana/{file_name}.ndjson')
with open(input_path, 'r') as boiler_file:
boiler_content = boiler_file.read()
# replace 'boiler' with app name
output_content = boiler_content.replace('boiler', app_name)
output_name = file_name.replace('boiler', app_name)
output_path = f'{app_dir}/values/kibana/{output_name}.ndjson'
# write the templated files to the vales/kibana directory
output_file_paths.append(output_path)
with open(output_path, 'w+') as output_file:
output_file.write(output_content)
print(f'Created {output_name}.ndjson')
# files are written, upload to every cluster in app-config
root_domain = app_config['rootDomain']
success = True
for deployment in app_config['deployments']:
cluster_name = deployment['clusterName']
connect_to_cluster(cluster_name)
base_domain = f'{cluster_name}.{root_domain}'
session = get_kibana_session(base_domain)
lb = get_cluster_load_balancer(cluster_name)
print(f'Uploading {app_name} dashboards to {cluster_name}-cluster.')
combined_file_path = f'{app_dir}/values/kibana/{cluster_name}-combined.ndjson'
success = success and kibana_import_objects(combined_file_path, session, lb['DNSName'],
output_file_paths, overwrite=False, create_new=True)
print('Kibana dashboards installed.') if success else print('Kibana dashboards failed to install')
return success
[docs]def install_sonarqube(params: dict, namespace: str = 'cicd'):
"""
Installs SonarQube to the current kubernetes environment.
:param params: a dictionary containing the params to insert into the sonarqube chart's values.yaml
:param namespace: the namespace to install sonarqube in. defaults to cluster-tools
:return: None on success, exception if failure.
"""
if helm_exists('sonarqube', namespace):
return
# install chart
# get pod
# wait for pod to be ready
set_run_output(False)
set_default_namespace(namespace)
if not namespace_exists(namespace):
create_namespace(namespace)
helm_repo_add('simoncomputing', SIMON_CHARTS)
helm_repo_update()
set_run_output(True)
# install chart
helm_install('simoncomputing/sonarqube', params, release_name='sonarqube',
values_path=abs_path('yaml/sonarqube-values.yml'), namespace=namespace)
# get pod
time.sleep(5)
pods = list_pods(namespace)
sonar_pod = False
for p in pods:
if p.get('name', '').find('sonarqube-sonarqube') != -1:
sonar_pod = p.get('name', False)
# wait for pod to be ready
if not wait_for_pod(sonar_pod, namespace,
timeout=600): # this one takes around 6 minutes, give or take. so much longer than the rest.
raise RuntimeError('Sonar pod did not ready within 5 minutes. '
'Please verify that your params are complete and correct.')
[docs]def install_prometheus(params: dict, namespace: str = 'monitoring'):
"""
Installs Prometheus to the current kubernetes environment.
:param params: a dictionary containing the params to insert into the prometheus chart's values.yaml
:param namespace: the namespace to install prometheus in. defaults to cluster-tools
:return: None on success, exception if failure.
"""
if helm_exists('prometheus', namespace):
return
# install chart
# get pod
# wait for pod to be ready
set_run_output(False)
set_default_namespace(namespace)
if not namespace_exists(namespace):
create_namespace(namespace)
helm_repo_add('simoncomputing', SIMON_CHARTS)
helm_repo_update()
set_run_output(True)
# install chart
helm_install('simoncomputing/prometheus', params, release_name='prometheus',
values_path=abs_path('yaml/prometheus-values.yml'), namespace=namespace)
# get pod
time.sleep(1)
pods = list_pods(namespace)
prom_pod = False
for p in pods:
if p.get('name', '').find('prometheus') != -1:
prom_pod = p.get('name', False)
# wait for pod to be ready
if not wait_for_pod(prom_pod, namespace, timeout=300):
raise RuntimeError('Prometheus pod did not ready within 5 minutes. '
'Please verify that your params are complete and correct.')
[docs]def install_grafana(params: dict, namespace: str = 'monitoring'):
"""
Installs grafana to the current kubernetes environment.
:param params: a dictionary containing the params to insert into the grafana chart's values.yaml
:param namespace: the namespace to install grafana in. defaults to cluster-tools
:return: None on success, exception if failure.
"""
if helm_exists('grafana', namespace):
return
# install chart
# get pod
# wait for pod to be ready
set_run_output(False)
set_default_namespace(namespace)
if not namespace_exists(namespace):
create_namespace(namespace)
helm_repo_add('simoncomputing', SIMON_CHARTS)
helm_repo_update()
set_run_output(True)
# install chart
helm_install('simoncomputing/grafana', params, release_name='grafana',
values_path=abs_path('yaml/grafana-values.yml'), namespace=namespace)
# get pod
time.sleep(1)
pods = list_pods(namespace)
grafana_pod = False
for p in pods:
if p.get('name', '').find('grafana') != -1:
grafana_pod = p.get('name', False)
# wait for pod to be ready
if not wait_for_pod(grafana_pod, namespace, timeout=300):
raise RuntimeError('Grafana pod did not ready within 5 minutes. '
'Please verify that your params are complete and correct.')
def create_grafana_post(session, upload_url: str, json: dict, keep_uid=False):
"""
Sends a POST to the grafana API. Used to upload dashboards and notification channels
:param session: a requests.session object with auth already set up
:param upload_url: the exact API url to post to
:param json: the json object being sent to the API
:param keep_uid: If set, the uid found in the dashboard specification will be reused. Otherwise, a new one will be generated. Default: False
:returns: True on Success, False on failure.
"""
data = json.copy()
if 'dashboard' in data:
data['dashboard']['id'] = None
if not keep_uid:
data['dashboard']['uid'] = None
else:
data['id'] = None
if not keep_uid:
data['uid'] = None
response_body = session.post(upload_url, json=data)
print(response_body.status_code)
if response_body.status_code == 412 or response_body.status_code == 409:
print('Item already exists')
return True
elif response_body.status_code == 200:
print('Upload Success')
return True
else:
print('Upload Failed')
print(response_body.reason)
return False
def configure_grafana(base_domain: str, dns_name: str):
"""
Adds dashboards and notification channels to Grafana.
:param base_domain: the baseDomain from the cluster-config. Used to create urls.
:param dns_name: DNS name of the AWS load balancer
:returns: True if all uploads succeeded. False otherwise.
"""
# get auth
refresh_kubeconfig()
credentials_secret = get_secret('grafana', namespace='monitoring')
username = base64.decodebytes(bytes(credentials_secret.data['admin-user'], 'ascii'))
password = base64.decodebytes(bytes(credentials_secret.data['admin-password'], 'ascii'))
session = requests.Session()
session.auth = (username, password)
session.headers['Host'] = f'grafana.{base_domain}'
session.verify = False
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
dashboards = ['pod-health', 'kubernetes-volume-usage-percentage',
# https://grafana.com/grafana/dashboards/1860-node-exporter-full/
'node-exporter-full',
# https://grafana.com/grafana/dashboards/3831-autoscaler/
'kubernetes-cluster-autoscaler',
]
grafana_path = abs_path('yaml/grafana')
grafana_url = f'https://{dns_name}'
# wait for service to ready up
retry = 6
for _ in range(retry):
resp = session.get(grafana_url)
if resp.ok:
break
print('waiting for grafana to become available...')
print(resp.raw)
time.sleep(10)
dashboard_url = os.path.join(grafana_url, 'api/dashboards/db')
notification_channel_url = os.path.join(grafana_url, 'api/alert-notifications')
success = True
for d in dashboards:
with open(f'{grafana_path}/dashboards/{d}.json') as f:
d_json = json.load(f)
if d_json:
data = {'dashboard': d_json}
success = success and create_grafana_post(session, dashboard_url, data)
# notis (this is what calls self healing lambdas, among other things)
notis = ['google-chat-alert']
for n in notis:
with open(f'{grafana_path}/notification-channels/{n}.json') as f:
n_json = json.load(f)
if n_json:
success = success and create_grafana_post(session, notification_channel_url, n_json, keep_uid=True)
print('Created grafana notification channel: "Google Chat Alert". '
'Update the Url to your google chat webhook to have notifications turned on.')
return success
def create_logins_secret(cluster_name: str):
"""
Puts Kibana, Grafana, Prometheus logins into an AWS secret
:param cluster_name: the name of the cluster currently connected to. Used to name the secret.
"""
secret_name = f'{cluster_name}-monitoring-logins-secret'
if secret.secret_exists(secret_name):
print(f'{secret_name} already exists')
return True
kibana_secret = get_secret('elasticsearch-master-es-elastic-user', 'logging')
kibana_user = 'elastic'
kibana_password = base64.b64decode(bytes(kibana_secret.data['elastic'], encoding='utf8')).decode('utf-8')
grafana_secret = get_secret('grafana', namespace='monitoring')
grafana_user = base64.b64decode(bytes(grafana_secret.data['admin-user'], encoding='utf8')).decode('utf-8')
grafana_password = base64.b64decode(bytes(grafana_secret.data['admin-password'], encoding='utf8')).decode('utf-8')
content = {
'kibanaUser': kibana_user,
'kibanaPassword': str(kibana_password),
'grafanaUser': str(grafana_user),
'grafanaPassword': str(grafana_password)
}
tags = {
'clusterName': cluster_name,
'createdWith': 'k9-create-cluster'
}
secret.create_secret(secret_name, f'Kibana and Grafana web login credentials for {cluster_name} cluster.',
kvp=content, tags=tags, )
print(f'{secret_name} created. Use may now view the login credentials within that secret.')
def delete_logins_secret(cluster_name: str):
"""
Deletes secret that holds standard app logins.
:param cluster_name: the name of the cluster currently connected to. Used to name the secret.
"""
secret_name = f'{cluster_name}-monitoring-logins-secret'
if not secret.secret_exists(secret_name):
print(f'{secret_name} does not exist, skipping delete')
return True
secret.delete_secret(secret_name, perma_delete=True)
print(f'{secret_name} deleted')
[docs]def install_standard_apps(config):
"""
Called by :func:`k9.cluster_init.create_cluster` after creating the cluster.
Deploys standard kubernetes apps: EFK, Grafana, Prometheus.
Also adds dashboards to Kibana and Grafana.
:param config: config dictionary as read from your cluster-config file
"""
print("Installing standard apps. If there are any errors, you may retry by running create cluster again.")
cert_arn = cert.get_certificate(config['baseDomain'])[0]['CertificateArn']
base_domain = config['baseDomain']
helm_params = {
'clusterName': config['clusterName'],
'baseDomain': base_domain,
'certArn': cert_arn,
}
install_aws_tools(helm_params)
install_efk(helm_params)
install_prometheus(helm_params)
install_grafana(helm_params)
create_logins_secret(config['clusterName'])
def configure_standard_apps(base_domain, dns_name: str):
"""
Called as part of install_standard_apps after DNS routing is complete.
Adds canned dashboards to standard apps.
:param base_domain: the baseDomain from the cluster-config. Used to create urls.
:param dns_name: DNS name of the AWS load balancer
"""
refresh_kubeconfig()
print('Adding Kibana Dashboards')
kibana_success = configure_kibana(base_domain, dns_name)
print('Adding Grafana Dashboards')
grafana_success = configure_grafana(base_domain, dns_name)
return kibana_success and grafana_success
[docs]def create_access_role(cluster_name: str):
"""
Finds the EKS access role and configures aws-auth on the kubernetes cluster to allow the role access.
Creates a ClusterRole and ClusterRoleBinding for the user created in aws-auth.
:param cluster_name: clusterName creating the role for, used to find the AWS role.
:return: True on success, False if unable to access ConfigMaps, exception if other failure.
"""
# match the name format in atomic-cloud cfm template
role = iam.get_role(f'{cluster_name}-eks-access-role')
role_arn = role['Arn']
# add access role to aws_auth configmap
add_arn_to_aws_auth(role_arn)
# create ClusterRole and CLusterRoleBinding
cr_body = util.read_yaml(abs_path('yaml/access-role/access-ClusterRole.yml'))
try:
client.RbacAuthorizationV1Api().create_cluster_role(cr_body)
except client.exceptions.ApiException:
print('ClusterRole already exists')
crb_body = util.read_yaml(abs_path('yaml/access-role/access-ClusterRoleBinding.yml'))
try:
client.RbacAuthorizationV1Api().create_cluster_role_binding(crb_body)
except client.exceptions.ApiException:
print('ClusterRoleBinding already exists')
print(f'{cluster_name}-eks-access-role created and configured. '
f'Manually add users to the trust policy to allow them to assume the role.')
return True
def add_arn_to_aws_auth(role_arn: str):
"""
Adds the IAM entity arn to the current cluster's aws-auth ConfigMap. The arn is associated to a kubernetes user
named eks-access-role-user.
"""
# build addition to aws-auth
jinja_env = Environment(loader=FileSystemLoader(abs_path('yaml/access-role')), autoescape=True)
template = jinja_env.get_template('aws-auth-entry.yml')
template_body = template.render({'roleArn': role_arn})
# find aws-auth and patch in new entry
core = client.CoreV1Api()
try:
map_list = core.list_namespaced_config_map('kube-system')
except client.exceptions.ApiException as e:
if e.reason == 'Forbidden':
print('The current AWS principle (role/user) does not have access to ConfigMaps')
print('Cannot configure aws-auth configmap')
return False
raise e
aws_auth = None
for m in map_list.items:
if m.metadata.name == 'aws-auth':
aws_auth = m
break
if role_arn not in aws_auth.data['mapRoles']:
new_body = aws_auth.data['mapRoles'] + '\n' + template_body
aws_auth.data['mapRoles'] = new_body
core.patch_namespaced_config_map(name='aws-auth', namespace='kube-system', body=aws_auth)
print('patched aws-auth')
else:
print('aws-auth already patched')
def configure_access_to_cluster(cluster_name: str):
"""
Adds the current IAM entity to a cluster. Current IAM entity must be able to assume the cluster's eks-access-role.
See https://k9.docs.simoncomputing.com/eks_access_role.html for information on assuming an access role.
:param cluster_name: The cluster to configure access to.
:return: True on success, False otherwise.
"""
# match the name format in atomic-cloud cfm template
role = iam.get_role(f'{cluster_name}-eks-access-role')
role_arn = role['Arn']
from k9.deploy import get_caller_identity
my_arn = get_caller_identity()['Arn']
# assume access role
try:
shell(f'aws sts assume-role --role-arn {role_arn} --role-session-name configure-access', silent=True)
except subprocess.CalledProcessError as e:
if 'AccessDenied' in str(e.stderr):
print(f'\nThe current IAM entity does not have permissions to assume {role_arn}. '
f'See https://k9.docs.simoncomputing.com/eks_access_role.html for information on assuming a cluster '
f'access role.')
return False
shell(f'aws eks update-kubeconfig --name {cluster_name}-cluster --role-arn {role_arn}', silent=True)
kube_config.load_kube_config()
add_arn_to_aws_auth(my_arn)
# un-assume access role
connect_to_cluster(cluster_name)
return True
def _get_rds_instance_name(cluster_name: str):
'''
Helper function to be used by create_cicd() to get rds instance name for cicd
'''
databases = rds.list_db_instances(eks_cluster_name=cluster_name)
rds_instance_name = None
if len(databases) == 0:
raise ValueError(f'The cluster with name {cluster_name} does not have any associated RDS instances.')
elif len(databases) == 1:
rds_instance_name = databases[0]['DBInstanceIdentifier']
else:
for database in databases:
if util.get_tag_value(database, 'default', 'TagList') == 'True':
rds_instance_name = database['DBInstanceIdentifier']
break
if not rds_instance_name:
raise ValueError(f'There was no RDS instance that was found with a default tag.')
return rds_instance_name
def _get_jenkins_password(cluster_name: str):
'''
Helper function to be used by create_cicd() to get Jenkins password if defined in secret manager.
Otherwise, it will random generated and put to a secret manager.
:param cluster_name
:return Jenkins password
'''
secret_name = cluster_name + '-jenkins-password'
jenkins_tags = {
"clusterName": cluster_name,
"type": 'Jenkins login'
}
try:
if not secret.secret_exists_by_tags(tags=jenkins_tags):
if not secret.secret_exists(name=secret_name):
password = util.generate_random_password()
tags = {
'Name': secret_name,
'clusterName': cluster_name,
'type': 'Jenkins login'
}
secret.create_secret(name=secret_name,
description=f"Jenkins web login credentials for the {cluster_name} cluster.",
kvp={'username': 'admin', 'password': password}, tags=tags)
resulting_secret = secret.get_secret_value(name=secret_name)
secret.wait_for_secret(name=secret_name, value=resulting_secret)
return password
else:
return secret.get_secret_value(name=secret_name, key='password')
else:
return secret.get_secrets_by_tags(tags=jenkins_tags, desired_key='password')[0]
except Exception as e:
print(
f'ERROR: An error occurred while attempting to create/retrieve the Jenkins login secret for the cluster {cluster_name}.')
print(e)
raise e
def _get_rds_hostname(rds_instance_name: str, cluster_name: str):
'''
Helper function to get rds hostname of the rds instance through secret manager
'''
rds_secret_name = rds_instance_name + "-credentials"
rds_tags = {
"rdsInstance": rds_instance_name,
"secretType": "RDS login credentials",
"clusterName": cluster_name
}
if not secret.secret_exists_by_tags(tags=rds_tags):
if not secret.secret_exists(rds_secret_name):
raise Exception(f'Password secret for cluster with clusterName {cluster_name} does not exist.')
return secret.get_secret_value(name=rds_secret_name, key='host')
else:
return secret.get_secrets_by_tags(tags=rds_tags, desired_key='host')[0]
def _get_sonar_credential(sonar_db_info: dict, cluster_name: str):
env = sonar_db_info['deployments'][0]['environments'][0]['env']
app_name = sonar_db_info['appName']
database_name = env + "_" + app_name
master_secret_name = env + "-" + app_name + "-master"
master_tags = {
"databaseName": database_name,
"appName": app_name,
"secretType": "Application database user credentials",
"clusterName": cluster_name,
"userType": "Master",
"env": env
}
if not secret.secret_exists_by_tags(tags=master_tags):
if not secret.secret_exists(master_secret_name):
raise Exception(f'Password secret for cluster with clusterName {cluster_name} does not exist.')
sonar_user = secret.get_secret_value(name=master_secret_name, key='username')
sonar_password = secret.get_secret_value(name=master_secret_name, key='password')
else:
sonar_user = secret.get_secrets_by_tags(tags=master_tags, desired_key='username')[0]
sonar_password = secret.get_secrets_by_tags(tags=master_tags, desired_key='password')[0]
sonar_credential = {
'user': sonar_user,
'password': sonar_password
}
return sonar_credential
[docs]def create_cicd(config):
"""
Called by :func:`k9.cluster_init.create_cluster` if creating a cicd cluster.
Deploys cicd apps: Jenkins and SonarQube
"""
cluster_name = config.get('clusterName')
if not cluster_name:
print(f'ERROR: The cluster-config.yml does not contain a clusterName.')
raise ValueError('The cluster-config.yml does not contain a clusterName.')
connect_to_cluster(cluster_name)
base_domain = config.get('baseDomain')
if not base_domain:
print(f'ERROR: The cluster-config.yml does not contain a baseDomain.')
raise ValueError('The cluster-config.yml does not contain a baseDomain.')
certificate = '*.' + base_domain
cert_arn = cert.get_cert_arn(domain=certificate)
if not cert_arn:
print(f'ERROR: The certificate {certificate} does not exist.')
raise ValueError(f'The certificate {certificate} does not exist.')
jenkins_password = _get_jenkins_password(cluster_name)
jenkins_user = secret.get_secret_value(f'{cluster_name}-jenkins-password', 'username')
if not namespace_exists(namespace='cicd'):
create_namespace(namespace='cicd')
try:
run_command('kubectl', 'apply', '-f', abs_path('yaml/jenkins-pvc.yml'), '-n', 'cicd')
except Exception as e:
print(f'An error occurred while attempting to create the persistent volume claim for Jenkins.')
raise e
secret_dict = {
'jenkins-admin-password': jenkins_password,
'jenkins-admin-user': jenkins_user,
}
if not secret_exists('jenkins', 'cicd'):
create_secret('jenkins', secret_dict, 'cicd')
jenkins_params = {
'clusterName': cluster_name,
'baseDomain': base_domain,
'certArn': cert_arn
}
try:
print('Installing Jenkins...')
install_jenkins(jenkins_params, namespace='cicd')
except Exception as e:
print('An error occurred while attempting to create Jenkins.')
print(e)
raise e
print('Jenkins has been installed.')
# SonarQube
rds_instance_name = _get_rds_instance_name(cluster_name)
rds_hostname = _get_rds_hostname(rds_instance_name, cluster_name)
app_database_info = {
"appName": "sonar",
"deployments": [{
"clusterName": cluster_name,
"environments": [{
"env": "cicd",
"rdsInstance": rds_instance_name
}]
}]
}
total_no_action, total_successful, total_failure = create_app_databases(app_config=app_database_info)
if len(total_no_action) > 0 or len(total_successful) != 1 or len(total_failure) > 0:
raise Exception(f'Creation of application database for SonarQube was unsuccessful.')
env = app_database_info['deployments'][0]['environments'][0]['env']
app_name = app_database_info['appName']
database_name = env + "_" + app_name
sonar_credential = _get_sonar_credential(app_database_info, cluster_name)
sonar_user = sonar_credential['user']
sonar_password = sonar_credential['password']
sonar_params = {
'clusterName': cluster_name,
'baseDomain': base_domain,
'certArn': cert_arn,
'sonarServerHost': rds_hostname,
'sonarDb': database_name,
'sonarUser': sonar_user
}
connect_to_cluster(cluster_name)
if not secret_exists('sonar-password', 'cicd'):
set_default_namespace('cicd')
create_secret('sonar-password', {'postgresql-password': sonar_password}, 'cicd')
try:
print('Installing SonarQube...')
install_sonarqube(sonar_params, namespace='cicd')
except Exception as e:
print('An error occurred while attempting to create SonarQube.')
print(e)
raise e
print('SonarQube has been installed.')
run_command('kubectl', 'get', 'ing', '-n', 'cicd')
print('DNS Routing must be done manually now. Run the "k9 link cicd" command when you can access the hosts printed above.')
print('cicd deployed')
def configure_sonar_for_linking(config):
"""
Configures the sonar site to update the default admin's password, to create a Jenkins admin, and to create a user token for the Jenkins user
Note: this should be run only after the create_cicd function has been run and DNS routing has been completed.
:param config: The cluster-config of the cluster used for cicd in dict form
:return: The user token used to link Jenkins and SonarQube
"""
cluster_name = config.get('clusterName')
if not cluster_name:
raise ValueError('ERROR: The cluster-config.yml does not contain a clusterName.')
connect_to_cluster(cluster_name)
base_domain = config.get('baseDomain')
if not base_domain:
raise ValueError('ERROR: The cluster-config.yml does not contain a baseDomain.')
if not helm_exists('jenkins', 'cicd') or not helm_exists('sonarqube', 'cicd'):
raise ValueError('ERROR: Jenkins or SonarQube has not been installed. Please run the k9 create cicd command before running this.')
sonar_url = f"https://sonar.{base_domain}"
admin_login, admin_password = update_default_sonar_user_password(sonar_url=sonar_url, cluster_name=cluster_name)
create_jenkins_user_in_sonar(sonar_url=sonar_url, admin_login=admin_login, admin_password=admin_password)
user_token = create_jenkins_user_token(sonar_url=sonar_url, admin_login=admin_login, admin_password=admin_password)
return user_token
def update_default_sonar_user_password(sonar_url: str, cluster_name: str):
"""
Updates the SonarQube default admin's password. Returns an error if sonar site is unreachable.
Note: this should be run only after the create_cicd function has been run and DNS routing has been completed.
:param sonar_url: The url of the SonarQube site
:return: The updated credentials for the default admin
"""
sonar_default_login = secret.get_secret_value('default-sonarqube-credentials', 'login')
sonar_default_password = secret.get_secret_value('default-sonarqube-credentials', 'password')
login_data = {
'login': sonar_default_login,
'password': sonar_default_password
}
print('Connecting to SonarQube...')
login_response = requests.post(f'{sonar_url}/api/authentication/login', data = login_data)
print("Updating SonarQube administrator's password from default...")
sonar_jenkins_user_tags = {
'clusterName': cluster_name,
'secretType': 'Admin user in SonarQube credentials'
}
if login_response.status_code == 200:
jenkins_user_in_sonar_secret = f"{cluster_name}-sonar-web-login-credentials"
if not secret.secret_exists_by_tags(tags=sonar_jenkins_user_tags):
password = util.generate_random_password()
sonar_jenkins_user_tags['Name'] = jenkins_user_in_sonar_secret
secret.create_secret(name=jenkins_user_in_sonar_secret,
description=f"SonarQube web login credentials for the {cluster_name} cluster.",
kvp={'login': sonar_default_login, 'password': password}, tags=sonar_jenkins_user_tags)
resulting_secret = secret.get_secret_value(name=jenkins_user_in_sonar_secret)
secret.wait_for_secret(name=jenkins_user_in_sonar_secret, value=resulting_secret)
admin_login = secret.get_secret_value(jenkins_user_in_sonar_secret, 'login')
admin_password = secret.get_secret_value(jenkins_user_in_sonar_secret, 'password')
else:
admin_login = secret.get_secrets_by_tags(tags=sonar_jenkins_user_tags, desired_key='login')[0]
admin_password = secret.get_secrets_by_tags(tags=sonar_jenkins_user_tags, desired_key='password')[0]
changed_password_data = {
'login': admin_login,
'password': admin_password,
'previousPassword': sonar_default_password,
}
change_password_response = requests.post(f'{sonar_url}/api/users/change_password', data = changed_password_data, auth = (sonar_default_login, sonar_default_password))
if (change_password_response.status_code >= 400):
print("WARNING: Sonar administrator's password was not successfully updated. This will need to be done manually.")
admin_login = sonar_default_login
admin_password = sonar_default_password
else:
print("Sonar administrator's password was successfully updated.")
elif login_response.status_code == 401:
print("Sonar administrator's password has already been updated.")
admin_login = secret.get_secrets_by_tags(tags=sonar_jenkins_user_tags, desired_key='login')[0]
admin_password = secret.get_secrets_by_tags(tags=sonar_jenkins_user_tags, desired_key='password')[0]
else:
raise ValueError(f'ERROR: Could not connect to the sonar site at {sonar_url}. Please ensure that you have completed DNS routing and are able to access this site.')
return admin_login, admin_password
def create_jenkins_user_in_sonar(sonar_url: str, admin_login: str, admin_password: str):
"""
Creates the Jenkins user and assigns administrator privileges to it.
Note: this should be run only after the create_cicd function has been run and DNS routing has been completed.
:param sonar_url: The url of the SonarQube site
:param admin_login: The login for the default admin user
:param admin_password: The password for the default admin user
"""
print('Creating Jenkins user...')
users_response = requests.get(f'{sonar_url}/api/users/search', params={'ps': 500}, auth = (admin_login, admin_password))
users: list = users_response.json().get('users')
jenkins_user = list(filter(lambda usr: usr.get('login') == 'jenkins', users))
if not jenkins_user:
jenkins_user_params = {
'login': 'jenkins',
'name': 'Jenkins',
'password': util.generate_random_password()
}
create_user_response = requests.post(f'{sonar_url}/api/users/create', data=jenkins_user_params, auth = (admin_login, admin_password))
if (create_user_response.status_code >= 400):
raise ValueError("ERROR: Jenkins user in SonarQube was unable to be created.")
else:
print('Jenkins user created successfully.')
else:
print('Jenkins user already created.')
print('Adding user as an administrator...')
group_params = {
'login': 'jenkins',
'name': 'sonar-administrators'
}
add_user_to_group_response = requests.post(f'{sonar_url}/api/user_groups/add_user', data=group_params, auth = (admin_login, admin_password))
if (add_user_to_group_response.status_code >= 400):
raise ValueError("ERROR: Jenkins user was not able to be added as an administrator.")
else:
print('Jenkins user was added as an administrator successfully.')
def create_jenkins_user_token(sonar_url: str, admin_login: str, admin_password: str):
"""
Creates a user token for the Jenkins user.
Note: this should be run only after the create_cicd function has been run and DNS routing has been completed.
:param sonar_url: The url of the SonarQube site
:param admin_login: The login for the default admin user
:param admin_password: The password for the default admin user
"""
print('Creating user token...')
token_exists_response = requests.get(f'{sonar_url}/api/user_tokens/search', params = {'login': 'jenkins'}, auth = (admin_login, admin_password))
token_exists: list = token_exists_response.json().get('userTokens')
if token_exists:
raise ValueError('ERROR: The Jenkins user already has a user token. Please manually delete all tokens before continuing.')
token_params = {
'login': 'jenkins',
'name': 'sonar-jenkins-user-token'
}
add_token_response = requests.post(f'{sonar_url}/api/user_tokens/generate', data=token_params, auth = (admin_login, admin_password))
if (add_token_response.status_code >= 400):
raise ValueError("ERROR: Jenkins user token was not able to be created.")
return add_token_response.json().get('token')
def delete_cicd(config: dict):
"""
Deletes resources created by create_cicd and removes Jenkins and Sonar deployments.
:param config: cluster-config.yml
"""
cluster_name = config['clusterName']
connect_to_cluster(cluster_name)
# delete AWS secrets
jenkins_secret = cluster_name + '-jenkins-password'
secret.delete_secret(jenkins_secret, perma_delete=True)
sonar_secret = cluster_name + '-cluster-sonar-password'
secret.delete_secret(sonar_secret, perma_delete=True)
jenkins_user_in_sonar_secret = f"{cluster_name}-sonar-web-login-credentials"
secret.delete_secret(jenkins_user_in_sonar_secret, perma_delete=True)
if helm_exists('jenkins', 'cicd'):
helm_uninstall('jenkins', 'cicd')
if helm_exists('sonarqube', 'cicd'):
helm_uninstall('sonarqube', 'cicd')
# delete sonar db
rds_instance_name = _get_rds_instance_name(cluster_name)
delete_app_database(cluster_name, 'sonar', 'cicd', rds_instance_name)
print('Jenkins and Sonar uninstalled.')
[docs]def create_cluster(config, cwd: str):
"""
Called by create cluster cli. Does all cluster setup including CloudFormation stacks, installing standard apps,
configuring standard apps, and create an access role for the cluster.
:param config: config dictionary as read from your cluster-config file
:param cwd: the current working directory that contains all the cfm template files
"""
cfm.set_template_path(cwd)
cluster.create_cluster(config)
print('Cluster created.')
# configure kubectl
cluster_name = config['clusterName']
connect_to_cluster(cluster_name)
print('Creating EKS access role...')
create_access_role(cluster_name)
print('\nEKS access role created.\n')
print('Installing standard apps...')
install_standard_apps(config)
print('\nAll standard apps deployed successfully.\n')
print('Configuring standard apps...')
lb = get_cluster_load_balancer(cluster_name)
if lb is None:
print('Cluster load balancer not found. Cannot configure standard apps.')
return
configure_standard_apps(base_domain=config['baseDomain'], dns_name=lb['DNSName'])
print('\nStandard apps configured.\n')
print(f'Route *.{config["baseDomain"]} to {lb["DNSName"]}\n')
def delete_volumes(cluster_name: str):
"""
Deletes all ELB volumes created as part of the Kubernetes cluster.
:param cluster_name: The name of the cluster to delete all volumes within.
(appends '-cluster' to find the ControlPlane)
"""
refresh_kubeconfig()
client = ec2.get_ec2()
# find volumes with the tag identifying them as part of this cluster
response = client.describe_volumes(
Filters=[
{
'Name': f'tag:kubernetes.io/cluster/{cluster_name}-cluster',
'Values': ['owned',]
}
]
)
for v in response['Volumes']:
volume_id = v['VolumeId']
print(f'Deleting volume: {volume_id}')
# detach and then delete
try:
ec2.get_ec2().detach_volume(VolumeId=volume_id, Force=True)
except Exception as e:
# if volume already detached, continue to delete
if 'IncorrectState' in str(e) or 'InvalidAttachment.NotFound' in str(e):
print('Volume already detached')
else:
raise e
try:
response = ec2.get_ec2().delete_volume(VolumeId=volume_id)
print(response)
except Exception as e:
# root volumes are removed with eks worker nodes
if 'root' in str(e):
print('Not deleting root volume, that will be cleaned later')
elif 'currently attached' in str(e):
print(str(e))
print(f'{volume_id} may be left behind after delete cluster finishes. Check manually in AWS console.')
else:
raise e
def get_cluster_load_balancer(cluster_name: str):
"""
Gets the load balancer for a vpc name clusterName-01-vpc.
:param cluster_name: The cluster to find the load balancer for.
:returns: Load balancer object if found. None otherwise.
"""
try:
vpc_id = cfm.get_output(cluster_name + '-01-vpc', 'VPC')
except TypeError:
return None
load_balancers = region.get_elbv2().describe_load_balancers()
for lb in load_balancers['LoadBalancers']:
if lb['VpcId'] == vpc_id:
return lb