How to connect postgres db using zitify() decorator

im trying to connect to postgres DB using psycopg its not working, do i need to use pg8000? does it not support psycopg2?

@qrkourier @TheLumberjack Sorry i dont know who to ask for can some one help to use SDK to connect postgres db?

Hey there, Although we don’t have a pg-specific sample yet in ziti-sdk-py/sample/README.md at main · openziti/ziti-sdk-py · GitHub, there’s an excellent chance the client-side techniques demonstrated in those samples will also work with psycopg.

The most direct, concise guidance is in pydoc openziti.decor.

Important takeaways include you should decorate functions with @openziti.zitify() and wrap clients and servers with a context manager like with openziti.monkeypatch():.

If the Ziti Py SDK doesn’t handle the client socket with either of these techniques for some reason, the zitify shell wrapper might work. It uses the LD_PRELOAD to hijack LIBC’s TCP connect, which would almost certainly work with the LIBC wrapper used by psycopg.

Good luck and please let us know if you find a solution!

@qrkourier i made it work through socket proxy method, here is the sample code

#!/usr/bin/env python3

"""

Simple script to get students count through OpenZiti

Uses the working proxy approach

"""

import os

import socket

import threading

import time

import openziti

from dotenv import load_dotenv



load_dotenv()

class ZitiPostgreSQLProxy:

def __init__(self):

self.ztx = openziti.load('_local_dagster.json')

self.local_port = None

self.server_socket = None

self.running = False

def start(self):

# Get free port

temp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

temp_sock.bind(('127.0.0.1', 0))

self.local_port = temp_sock.getsockname()[1]

temp_sock.close()

# Start proxy

self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

self.server_socket.bind(('127.0.0.1', self.local_port))

self.server_socket.listen(1)

self.running = True

threading.Thread(target=self._run_proxy, daemon=True).start()

time.sleep(0.1)

return self.local_port

def _run_proxy(self):

while self.running:

try:

self.server_socket.settimeout(1)

client_socket, _ = self.server_socket.accept()

threading.Thread(target=self._handle_connection, args=(client_socket,), daemon=True).start()

except socket.timeout:

continue

except:

break

def _handle_connection(self, client_socket):

try:

            @openziti.zitify()

def create_ziti_connection():

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.connect(('_db_service', 5432))

return s

ziti_socket = create_ziti_connection()

# Forward data

def forward(src, dst):

try:

while self.running:

data = src.recv(4096)

if not data: break

dst.sendall(data)

except: pass

t1 = threading.Thread(target=forward, args=(client_socket, ziti_socket), daemon=True)

t2 = threading.Thread(target=forward, args=(ziti_socket, client_socket), daemon=True)

t1.start()

t2.start()

t1.join()

t2.join()

except Exception as e:

pass

finally:

try: ziti_socket.close()

except: pass

client_socket.close()

def stop(self):

self.running = False

if self.server_socket:

self.server_socket.close()



def get_students_count():



# Start proxy

print("🚀 Starting OpenZiti proxy...")

proxy = ZitiPostgreSQLProxy()

local_port = proxy.start()

print(f"âś… Proxy running on localhost:{local_port}")

try:

import psycopg

conn = psycopg.connect(

host='127.0.0.1',

port=local_port,

user=os.getenv('HOSPITAL_DATABASE_USERNAME'),

password=os.getenv('HOSPITAL_DATABASE_PASSWORD'),

dbname=os.getenv('HOSPITAL_DATABASE_SERVICE_NAME', 'postgres')

        )

# Execute the query you need

cursor = conn.cursor()

cursor.execute("SELECT COUNT(*) FROM students")

count = cursor.fetchone()[0]

print(f"📊 students count: {count}")

conn.close()

proxy.stop()

return count

except ImportError:

print("❌ psycopg3 not installed: pip install 'psycopg[binary]'")

proxy.stop()

return None

except Exception as e:

print(f"❌ Query failed: {e}")

proxy.stop()

return None



if __name__ == "__main__":

count = get_students_count()

if count is not None:

print(f"\nâś… SUCCESS: {count} students found")

else:

print("\n❌ Failed to get count")

We cant use host url directly, it works only with ziti servcie name and not actual hostname,
Example: xxxx.ap-northeast-1.rds.amazonaws.com this wont work,
you need to use _db_service name from ziti services which has mapping to your host and intercept.
this one works for me. but if anyone know even simpler method please do share.

1 Like