KSQLDBClient#
OpenFactory ksqlDB client.
- class openfactory.kafka.ksql.KSQLDBClient(ksqldb_url, max_retries=3, retry_delay=2.0, timeout=10.0, loglevel='WARNING')[source]#
Bases:
objectksqlDB client used by OpenFactory.
- Example usage:
from openfactory.kafka import KSQLDBClient ksql = KSQLDBClient('http://localhost:8088') print('Info: ', ksql.info()) print('Streams:', ksql.streams()) print('Tables: ', ksql.tables()) df = ksql.query("SELECT ID, VALUE, TYPE FROM assets WHERE ASSET_UUID='PROVER3018' AND TYPE='Samples';") print(df) ksql.close()
- __init__(ksqldb_url, max_retries=3, retry_delay=2.0, timeout=10.0, loglevel='WARNING')[source]#
Initialize the KSQLDB client.
- Parameters:
ksqldb_url (str) – URL of the ksqlDB server.
max_retries (int) – Number of retry attempts on network failure. Defaults to 3.
retry_delay (float) – Seconds to wait between retries. Defaults to 2.0.
timeout (float) – Request timeout in seconds. Defaults to 10.0.
loglevel (str) – Logging level for the client. Defaults to Config.KSQLDB_LOG_LEVEL.
- get_kafka_topic(stream_name)[source]#
Retrieve the Kafka topic associated with a KSQL stream.
- Return type:
- Parameters:
stream_name (str) – The name of the KSQL stream.
- Returns:
str – The Kafka topic name associated with the stream.
- Raises:
KSQLDBClientException – If the topic cannot be found in the response.
- info()[source]#
Retrieve server information.
- Return type:
- Returns:
dict – A dictionary containing server information.
- insert_into_stream(stream_name, rows)[source]#
Insert rows into a stream over HTTP/2.
- Return type:
- Parameters:
- Returns:
list[dict] – A list of dictionaries containing the response from the insert operation.
- Raises:
KSQLDBClientException – If the request fails or returns an error status.
- query(ksql)[source]#
Execute a KSQL pull query and return the results as a DataFrame.
- Return type:
- Parameters:
ksql (str) – The KSQL pull query string to execute.
- Returns:
pandas.DataFrame – A DataFrame containing the query result rows and columns.
- Raises:
KSQLDBClientException – If the server returns a non-200 or malformed response.
- statement_query(sql)[source]#
Execute a KSQL statement query (e.g., CREATE, DROP).
Sends a KSQL statement to the server, typically for DDL or DML operations such as creating or dropping streams or tables.
- Return type:
- Parameters:
sql (str) – The KSQL statement to execute.
- Returns:
dict – The JSON response from the server as a dictionary.
- Raises:
KSQLDBClientException – If the request fails or returns an error status.