KSQLDBClient#

OpenFactory ksqlDB client.

class openfactory.kafka.ksql.KSQLDBClient(ksqldb_url, max_retries=3, retry_delay=2.0, timeout=10.0, loglevel='WARNING')[source]#

Bases: object

ksqlDB client used by OpenFactory.

Example usage:
from openfactory.kafka import KSQLDBClient

ksql = KSQLDBClient('http://localhost:8088')
print('Info:   ', ksql.info())
print('Streams:', ksql.streams())
print('Tables: ', ksql.tables())

# Query results are returned as a list of dictionaries
results = ksql.query(
    "SELECT ID, VALUE, TYPE "
    "FROM assets "
    "WHERE ASSET_UUID='PROVER3018' AND TYPE='Samples';"
)

# Example of iterating over the results
for row in results:
    print(row['ID'], row['VALUE'], row['TYPE'])

ksql.close()
__init__(ksqldb_url, max_retries=3, retry_delay=2.0, timeout=10.0, loglevel='WARNING')[source]#

Initialize the KSQLDB client.

Parameters:
  • ksqldb_url (str) – URL of the ksqlDB server.

  • max_retries (int) – Number of retry attempts on network failure. Defaults to 3.

  • retry_delay (float) – Seconds to wait between retries. Defaults to 2.0.

  • timeout (float) – Request timeout in seconds. Defaults to 10.0.

  • loglevel (str) – Logging level for the client. Defaults to Config.KSQLDB_LOG_LEVEL.

close()[source]#

Close HTTP client.

Return type:

None

get_kafka_topic(stream_name)[source]#

Retrieve the Kafka topic associated with a KSQL stream.

Return type:

str

Parameters:

stream_name (str) – The name of the KSQL stream.

Returns:

str – The Kafka topic name associated with the stream.

Raises:

KSQLDBClientException – If the topic cannot be found in the response.

info()[source]#

Retrieve server information.

Return type:

dict

Returns:

dict – A dictionary containing server information.

insert_into_stream(stream_name, rows)[source]#

Insert rows into a stream over HTTP/2.

Return type:

list[dict]

Parameters:
  • stream_name (str) – The name of the KSQL stream to insert data into.

  • rows (list[dict]) – A list of dictionaries representing the rows to be inserted.

Returns:

list[dict] – A list of dictionaries containing the response from the insert operation.

Raises:

KSQLDBClientException – If the request fails or returns an error status.

query(ksql)[source]#

Execute a KSQL pull query and return the results as a list of dictionaries.

Each dictionary represents a row, with column names as keys.

Return type:

list[dict]

Parameters:

ksql (str) – The KSQL pull query string to execute.

Returns:

list[dict] – Query results as a list of row dictionaries.

Raises:

KSQLDBClientException – If the server returns a non-200 or malformed response.

statement_query(sql)[source]#

Execute a KSQL statement query (e.g., CREATE, DROP).

Sends a KSQL statement to the server, typically for DDL or DML operations such as creating or dropping streams or tables.

Return type:

dict

Parameters:

sql (str) – The KSQL statement to execute.

Returns:

dict – The JSON response from the server as a dictionary.

Raises:

KSQLDBClientException – If the request fails or returns an error status.

streams()[source]#

List existing KSQL streams.

Return type:

list[str]

Returns:

list[str] – A list of stream names currently defined in KSQLDB.

tables()[source]#

List existing KSQL tables.

Return type:

list[str]

Returns:

list[str] – A list of table names currently defined in KSQLDB.

exception openfactory.kafka.ksql.KSQLDBClientException[source]#

Bases: Exception

A general error in OpenFactory.