Hello everyone,
here is the first draft for a declarative configuration.
The setup is as follows:
- Create a folder named “config” in $ZITI_HOME (in my container that is /ziti-controller)
- Store any yaml files in this folder that are to be configured in the controller. You can simply convert and insert the JSON code from ZAC into YAML or create it manually. A possible configuration file could look like this:
---
configs:
- name: my-service.ziti-host-config
configTypeId: host.v1
data:
protocol: tcp
address: traefik
forwardPort: true
allowedPortRanges:
- high: 80
low: 80
- high: 443
low: 443
- name: my-service.ziti-intercept-config
configTypeId: intercept.v1
data:
portRanges:
- high: 80
low: 80
- high: 443
low: 443
addresses:
- my-service.ziti
protocols:
- tcp
services:
- name: my-service.ziti
configs:
- my-service.ziti-host-config
- my-service.ziti-intercept-config
service-policies:
- name: my-service.ziti-bind-policy
type: Bind
serviceRoles:
- "@my-service.ziti"
identityRoles:
- "@your-host"
- name: my-service.ziti-dial-policy
type: Dial
serviceRoles:
- "@my-service.ziti"
identityRoles:
- "#my-service.ziti"
It is important to note that the entries can be sorted into different files as required, only the keywords “identities”, “service-policies”, ... may only appear once per file.
- Place the following Python script named
config-sync.py
in the $ZITI_HOME or any other directory:
import yaml
import json
import subprocess
import os
import sys
import urllib.parse
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
ALL_ROOT_KEYS = ["service-policies", "identities"]
ATTRIBUTE_API_MAPPING = yaml.safe_load("""
configs: configs
serviceRoles: services
identityRoles: identities
configTypeId: config-types
""")
class YamlHandler(FileSystemEventHandler):
def on_closed(self, event):
if event.is_directory:
return
if event.src_path.endswith(".yaml") or event.src_path.endswith(".yml"):
process_yaml_file(event.src_path)
def run_cleanup_periodically(interval):
while True:
cleanup()
time.sleep(900) # Run every hour
# Convert ies to y, e.g. service-policies -> service-policy
def singular(word):
if word.endswith('ies'):
return word[:-3] + 'y'
# Prevents modification of words such as “process”
elif word.endswith('s') and not word.endswith('ss'):
return word[:-1]
return word
# Interact with file
def handle_file(file_path, mode, file_type, save_data=None):
if mode == 'r':
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
print(f"INFO file {file_path} does not exist or is empty, returning empty dict")
return {}
try:
with open(file_path, mode) as file:
if mode == 'r':
if file_type == 'yaml':
result = yaml.safe_load(file)
elif file_type == 'json':
result = json.load(file)
else:
raise ValueError(f"Unsupported file type: {file_type}")
print(f"INFO file {file_path} loaded successfull")
return result
elif mode == 'w':
if file_type == "json":
json.dump(save_data, file, indent=2)
print(f"INFO file {file_path} saved successfull")
else:
file.write(save_data)
except (yaml.YAMLError, json.JSONDecodeError) as e:
print(f"ERROR file {file_path} cannot be loaded, reason: invalid {file_type}")
except IOError as e:
print(f"ERROR file {file_path} cannot be {'loaded' if mode == 'r' else 'saved'}, reason: {e}")
# Login to Controller
def login(address, username, password):
login_command = [
'curl', '-s', '-k',
'-H', f'content-type: application/json',
urllib.parse.quote(f'https://{address}/authenticate?method=password', safe=':/?="'),
'-d', json.dumps({"username": username, "password": password})
]
try:
result = subprocess.run(login_command, capture_output=True, text=True, check=True)
session_token = json.loads(result.stdout).get('data', [{}]).get('token')
print(f"INFO login to {address} as '{username}' successfull")
return session_token
except (subprocess.CalledProcessError, json.JSONDecodeError, IndexError) as e:
print(f"ERROR login to {address} as '{username}' failed, unexpected error: {e}")
sys.exit(1)
def update_known_ids(action, name, id, root_key):
if root_key not in known_ids:
known_ids[root_key] = []
if action == "add":
existing_item = next((item for item in known_ids[root_key] if item['id'] == id), None)
if not existing_item:
known_ids[root_key].append({"name": name, "id": id})
elif action == "remove":
existing_item = next((item for item in known_ids[root_key] if item['id'] == id), None)
if existing_item:
known_ids[root_key].remove(existing_item)
# Remove the root_key if the list is empty
if not known_ids[root_key]:
del known_ids[root_key]
def evaluate_response(response, action, name, root_key):
if 'data' in response:
if response['data'] == {}:
print(f"INFO {singular(root_key)} '{name}' {action}")
elif 'id' in response['data']:
print(f"INFO {singular(root_key)} '{name}' created, id: {response['data']['id']}")
update_known_ids("add", name, response['data']['id'], root_key)
elif 'error' in response:
match response['error']['code']:
# 400
case "COULD_NOT_VALIDATE":
print(f"ERROR {singular(root_key)} '{name}' cannot {action}, reason: syntax error in config-file")
# 401
case "UNAUTHORIZED":
print(f"ERROR {singular(root_key)} '{name}' cannot {action}, reason: {response['error']['message']}")
# 409
case "CONFLICT_CANNOT_MODIFY_REFERENCED":
print(f"ERROR {singular(root_key)} '{name}' cannot {action}, reason: {response['error']['causeMessage']}")
# 429
case "RATE_LIMITED":
print(f"ERROR {singular(root_key)} '{name}' cannot {action}, reason: {response['error']['causeMessage']}")
else:
print(f"ERROR {singular(root_key)} '{name}' cannot {action}, unexpected response: {response}")
# Matches config files with existing configuration in the controller
def cleanup():
print(f"INFO cleanup started")
for root_key in ALL_ROOT_KEYS:
root_key_command = [
'curl', '-sk',
'-H', f'content-type: application/json',
'-H', f'zt-session: {session_token}',
urllib.parse.quote(f'https://{ZITI_CTRL_ADVERTISED_ADDRESS}/edge/management/v1/{root_key}', safe=':/?="')
]
try:
result = subprocess.run(root_key_command, capture_output=True, text=True, check=True)
data = json.loads(result.stdout).get("data", [])
returned_ids = {item["id"]: item["name"] for item in data}
known_ids_set = {item['id'] for item in known_ids.get(root_key, [])}
# Remove IDs from the controller that are not in the known_ids
for returned_id, returned_name in returned_ids.items():
if returned_id not in known_ids_set:
delete_command = [
'curl', '-k',
'-X', "DELETE",
'-H', f'content-type: application/json',
'-H', f'zt-session: {session_token}',
urllib.parse.quote(f'https://{ZITI_CTRL_ADVERTISED_ADDRESS}/edge/management/v1/{root_key}/{returned_id}', safe=':/?="')
]
try:
result = subprocess.run(delete_command, check=True, capture_output=True, text=True)
response = json.loads(result.stdout)
evaluate_response(response, "deleted", returned_name, root_key)
except:
print(f"ERROR {singular(root_key)} '{returned_name}' cannot deleted, unexpected error: {e}")
# Remove IDs from known_ids that are not returned by the curl command
for known_id in known_ids_set:
if known_id not in returned_ids:
update_known_ids("remove", returned_name, known_id, root_key)
except (subprocess.CalledProcessError, json.JSONDecodeError, IndexError) as e:
print(f"ERROR some {root_key} cannot deleted, unexpected error: {e}")
print(f"INFO cleanup finished")
# Query id for entities
def get_id(root_key, name):
check_command = [
'curl', '-sk',
'-H', f'content-type: application/json',
'-H', f'zt-session: {session_token}',
urllib.parse.quote(f'https://{ZITI_CTRL_ADVERTISED_ADDRESS}/edge/management/v1/{root_key}?filter=name="{name}"', safe=':/?="')
]
try:
result = subprocess.run(check_command, capture_output=True, text=True, check=True)
return json.loads(result.stdout).get('data', [{}])[0].get('id')
except (subprocess.CalledProcessError, json.JSONDecodeError, IndexError):
return None
# Function for executing the curl command
def execute_curl(root_key, item_data):
existing_id = get_id(root_key, item_data['name'])
if existing_id:
REQUEST_METHOD = 'PATCH'
url = f'https://{ZITI_CTRL_ADVERTISED_ADDRESS}/edge/management/v1/{root_key}/{existing_id}'
action = "updated"
update_known_ids("add", item_data['name'], existing_id, root_key)
else:
REQUEST_METHOD = 'POST'
url = f'https://{ZITI_CTRL_ADVERTISED_ADDRESS}/edge/management/v1/{root_key}'
action = "created"
# Applying the default values
if root_key in default_values:
for key, value in default_values[root_key].items():
if key not in item_data:
item_data[key] = value
# Replace each name with its openziti-id, based on the ATTRIBUTE_API_MAPPING
for key, value in item_data.items():
if key in ATTRIBUTE_API_MAPPING:
api_endpoint = ATTRIBUTE_API_MAPPING[key]
if isinstance(value, list):
# If the value is a list, we process each element
item_data[key] = []
for name in value:
if name.startswith('#'):
# If the name starts with '#', we add it unchanged
item_data[key].append(name)
elif name.startswith('@'):
# If the name starts with '@', we remove the '@' and call get_id
item_data[key].append(f"@{get_id(api_endpoint, name[1:])}")
else:
# For all other cases, we call get_id with the full name
item_data[key].append(f"{get_id(api_endpoint, name)}")
elif isinstance(value, str):
# If the value is a string, we process it directly
if value.startswith('#'):
item_data[key] = value
elif value.startswith('@'):
item_data[key] = f"@{get_id(api_endpoint, value[1:])}"
else:
item_data[key] = f"{get_id(api_endpoint, value)}"
curl_command = [
'curl', '-k', url,
'-X', REQUEST_METHOD,
'-H', f'accept: application/json',
'-H', f'content-type: application/json',
'-H', f'zt-session: {session_token}',
'-d', json.dumps(item_data)
]
try:
result = subprocess.run(curl_command, check=True, capture_output=True, text=True)
response = json.loads(result.stdout)
evaluate_response(response, action, item_data['name'], root_key)
except subprocess.CalledProcessError as e:
print(f"ERROR {singular(root_key)} '{item_data['name']}' cannot {action}, unexpected error: {e.stdout}")
# Process a yaml file
def process_yaml_file(file_path):
data = handle_file(f"{file_path}", "r", "yaml")
for root_key, value in data.items():
for item in value:
execute_curl(root_key, item)
if __name__ == "__main__":
print(f"INFO initializing config-sync")
for var_name in ["ZITI_CTRL_ADVERTISED_ADDRESS", "ZITI_HOME", "ZITI_USER", "ZITI_PWD"]:
var_value = os.getenv(var_name)
if var_value is not None:
globals()[var_name] = var_value
print(f"INFO set {var_name}='{var_value}'")
else:
print(f"ERROR {var_name} not found, please run 'export {var_name}=<YOUR-VALUE>' first")
sys.exit(1)
default_values = handle_file(f"{ZITI_HOME}/config/defaults.yml", "r", "yaml")
known_ids = handle_file(f"{ZITI_HOME}/statefile.json", "r", "json")
session_token = login(ZITI_CTRL_ADVERTISED_ADDRESS, ZITI_USER, ZITI_PWD)
for filename in os.listdir(f"{ZITI_HOME}/config"):
if (filename.endswith('.yml') or filename.endswith('.yaml')) and not filename.endswith('defaults.yml'):
file_path = f"{ZITI_HOME}/config/{filename}"
process_yaml_file(file_path)
handle_file(f"{ZITI_HOME}/statefile.json", "w", "json", known_ids)
# Monitoring the config directory for changes
observer = Observer()
observer.schedule(YamlHandler(), f"{ZITI_HOME}/config", recursive=False)
observer.start()
# Start the cleanup thread periodically (3600 seconds = 1 hour)
cleanup_thread = threading.Thread(target=run_cleanup_periodically, args=(3600,))
cleanup_thread.daemon = True
cleanup_thread.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
- Start the script in the container with
python config-sync.py
. The following happens:
- The script logs on to the controller
- Configures all entities from the configuration files on the controller
- Creates an entry for each in $HOME_ZITI/statefile.json (is created and maintained automatically, therefore no manual intervention necessary)
- Recognizes if something has been configured on the controller that is not in the configuration files -> deletes the configuration from the controller
- Monitors the files for changes and synchronizes them with the controller
- Regularly (every 15 minutes) synchronizes the controller configuration and the configuration files
- Optionally create the file
$ZITI_HOME/config/defaults.yml
. All values defined here do not have to be entered in the configuration files each time, but are added dynamically. For example, my defaults.yml
looks like this:
---
services:
semantic: AnyOf
encryptionRequired: true
service-policies:
semantic: AnyOf
identities:
type: Default
isAdmin: false
edge-router-policies:
semantic: AnyOf
If we want to integrate this script into the container, the following python libraries must be added to the controller container:
That's it, what do you think?