Tracking data flow in Ziti edge router fabric

Hi,

My current setup has 2 edge routers running on different devices on the same network and the controller running on AWS EC2 instance. I have a server.py (uses Flazk) running on the device with edge_router_1 to stream data and a client.py (uses requests) on the device hosting edge_router_2 to receive the data. (Figure below)

I'm trying to see how smart-routing alters the data flow when I stop an edge router. Currently, I'm using ziti fabric stream events to check the "nodes" value to identify the edge router in use and the output looks like this (below)

thivish@thivish-XPS-8960:~$ ziti fabric stream events
{"namespace":"edge.entityCounts","timestamp":"2024-12-10T17:41:55.035539324Z","counts":{"apiSessionCertificates":9,"apiSessions":14,"authPolicies":1,"authenticators":3,"cas":0,"configTypes":5,"configs":0,"controllers":0,"edgeRouterPolicies":3,"enrollments":0,"eventualEvents":0,"externalJwtSigners":0,"identities":5,"identityTypes":2,"mfas":0,"postureCheckTypes":5,"postureChecks":0,"revocations":0,"routers":2,"routers.edge":2,"serviceEdgeRouterPolicies":1,"servicePolicies":2,"services":1,"services.edge":1,"sessions":2,"terminators":2},"error":""}
{"namespace":"fabric.circuits","version":2,"event_type":"created","circuit_id":"Ww55V5vsU","timestamp":"2024-12-10T17:41:55.079348754Z","client_id":"cm4ir0n42vlwisvnvwd2kuog1","service_id":"6ONBj74fU5eZa9NaOLVMEx","terminator_id":"1qXGVk7WxCVAVrbYS20yMd","instance_id":"","creation_timespan":43187739,"path":{"nodes":["fWkCotzhIu"],"links":null,"ingress_id":"3WoY","egress_id":"9Lqy"},"link_count":0,"path_cost":262140,"tags":{"clientId":"3vbp6TFh-u","hostId":"FERG29Fl-","serviceId":"6ONBj74fU5eZa9NaOLVMEx"}}
{"namespace":"fabric.circuits","version":2,"event_type":"deleted","circuit_id":"Ww55V5vsU","timestamp":"2024-12-10T17:41:55.130997497Z","client_id":"cm4ir0n42vlwisvnvwd2kuog1","service_id":"6ONBj74fU5eZa9NaOLVMEx","terminator_id":"1qXGVk7WxCVAVrbYS20yMd","instance_id":"","path":{"nodes":["fWkCotzhIu"],"links":null,"ingress_id":"3WoY","egress_id":"9Lqy"},"link_count":0,"duration":51651917,"tags":{"clientId":"3vbp6TFh-u","hostId":"FERG29Fl-","serviceId":"6ONBj74fU5eZa9NaOLVMEx"}}

thivish@thivish-XPS-8960:~$ ziti fabric stream events
{"namespace":"edge.entityCounts","timestamp":"2024-12-10T17:42:11.035320379Z","counts":{"apiSessionCertificates":9,"apiSessions":14,"authPolicies":1,"authenticators":3,"cas":0,"configTypes":5,"configs":0,"controllers":0,"edgeRouterPolicies":3,"enrollments":0,"eventualEvents":0,"externalJwtSigners":0,"identities":5,"identityTypes":2,"mfas":0,"postureCheckTypes":5,"postureChecks":0,"revocations":0,"routers":2,"routers.edge":2,"serviceEdgeRouterPolicies":1,"servicePolicies":2,"services":1,"services.edge":1,"sessions":2,"terminators":2},"error":""}
{"namespace":"fabric.circuits","version":2,"event_type":"created","circuit_id":"N9jRVH3P2","timestamp":"2024-12-10T17:42:11.188383637Z","client_id":"cm4ir0n42vlwisvnvwd2kuog1","service_id":"6ONBj74fU5eZa9NaOLVMEx","terminator_id":"1tnMKYkbNMzcGqmggicudY","instance_id":"","creation_timespan":41423129,"path":{"nodes":[".4lz8tzl-u"],"links":null,"ingress_id":"3qXG","egress_id":"a5qP"},"link_count":0,"path_cost":262140,"tags":{"clientId":"3vbp6TFh-u","hostId":"FERG29Fl-","serviceId":"6ONBj74fU5eZa9NaOLVMEx"}}
{"namespace":"fabric.circuits","version":2,"event_type":"deleted","circuit_id":"N9jRVH3P2","timestamp":"2024-12-10T17:42:11.236519927Z","client_id":"cm4ir0n42vlwisvnvwd2kuog1","service_id":"6ONBj74fU5eZa9NaOLVMEx","terminator_id":"1tnMKYkbNMzcGqmggicudY","instance_id":"","path":{"nodes":[".4lz8tzl-u"],"links":null,"ingress_id":"3qXG","egress_id":"a5qP"},"link_count":0,"duration":48139679,"tags":{"clientId":"3vbp6TFh-u","hostId":"FERG29Fl-","serviceId":"6ONBj74fU5eZa9NaOLVMEx"}}

But, is there a better way to check which edge routers are in use for this data stream?

Thanks,
Ajay

You can see just circuit events:

ziti fabric stream events --circuits

or if you want just a point-in-time view, you can do

ziti fabric list circuits

which includes the circuit paths.

Let me know if that's helpful.
Paul

2 Likes

Hi,

Yes, that was helpful! Thank you!

Also, I modified the Flazk tutorial code and the requests code to have a continuous stream of numbers but for some reason I'm getting the below error:

(ziti_codes) ubuntu@ubuntu-ATOPNUC-MA90:~/ziti_codes$ python client_counter.py 
Data chunk 0

Data chunk 1

Data chunk 2

Data chunk 3

Data chunk 4

Data chunk 5

Data chunk 6

Data chunk 7

Chunk encoding error: ("Connection broken: OSError(9, 'Bad file descriptor')", OSError(9, 'Bad file descriptor'))
Data chunk 8

This happens at random sometimes after streaming 50 numbers and stops with a Segmentation fault (core dumped)

I added a temporary fix to reconnect again but that adds more session and again stops at some point due to Segmentation fault (core dumped)

Could you advise on how to fix this?

Thank you!

I'm posting my server and client code below:
server.py

#run with: python server.py echo-server.json myService

from flask import Flask, Response, stream_with_context
import openziti
import sys

app = Flask(__name__)
bind_opts = {}  

global count
count = 0

@openziti.zitify(bindings={':18080': bind_opts})
def runApp():
    from waitress import serve
    print("Starting server on OpenZiti overlay")
   
    app.debug = True
    serve(app, port=18080)

@app.route('/')
def hello_world():  # Responds to requests with a message
    print("Received a request to /")
    def generate():
        global count
        yield f"Data chunk {count}\n"
        count += 1
    return Response(stream_with_context(generate()), mimetype='text/plain')

if __name__ == '__main__':
    bind_opts['ztx'] = sys.argv[1]  # Path to the identity JSON file
    bind_opts['service'] = sys.argv[2]  # Ziti service name
    runApp()

client.py

#Run with: python client.py
import openziti
import requests
import sys
import time

class ZitiEchoClient:
    def __init__(self, identity_json):
        # Load the Ziti context
        self.context = openziti.load(identity_json)

    def fetch_message(self):
        service_name = "myService"
        url = f"http://{service_name}"

        with openziti.monkeypatch():
            iteration = 0
            max_iterations = 100  # Limit to 100 iterations for safety
            while iteration < max_iterations:
                try:
                    response = requests.get(url, stream=True)
                    for chunk in response.iter_content(chunk_size=512):
                        if chunk:
                            print(chunk.decode('utf-8'))
                        else:
                            print("No chunk received.")
                    response.close()  # Ensure the connection is closed
                except Exception as e:
                    print(f"Chunk encoding error: {e}")
                time.sleep(0.5)  # Pause for 0.5 seconds before next request
                iteration += 1

if __name__ == "__main__":
    client = ZitiEchoClient("echo-client.json")
    client.fetch_message()

Please enable core dumps for segfaults and run your Python like this to increase log verbosity from the C (lang) SDK:

ulimit -c unlimited
ZITI_LOG=6 python ./client.py

If the logs don't clarify the error, and the segfault occurs again, then the fault will be handled according to your configuration in /proc/sys/kernel/core_pattern. Will you share the Corefile (CoreDump)? If your core_pattern calls apport you may find the crash dump in /var/crash/.

Hi,

I tried increasing the verbose level and I've saved the logs to a file. But the logs merely indicate some warnings but no errors except a broken pipe error when segfault occurs I'm attaching the file for your reference.

log.txt (110.1 KB)

Thank you,
Ajay

We're taking a closer look. Please ensure you have Ziti Py SDK v1.0.0.

pip install --upgrade openziti

Thank you for the trace log. It didn't reveal the problem. If you're willing to share the Corefile, which may contain sensitive bytes, please send it in a private message.

You may be able to debug it locally with gdb to identify where the problem occurred in the C SDK.

Example: print backtrace where the app was wrapped with Python 3.10's debugger with EUID=1000 and segfault was handled by apport:

apport-unpack /var/crash/_usr_lib_python3.10_pdb.py.1000.crash /tmp/crashpack
gdb /usr/bin/python3.10 -c /tmp/crashpack/CoreDump --ex bt --ex exit