From bfa1a3c64b416b75a3c231c90ea24f6cdeb9dd94 Mon Sep 17 00:00:00 2001 From: wasabi Date: Thu, 31 Jul 2025 18:46:22 -0500 Subject: [PATCH] Some more ideas --- src/influx_server.py | 159 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) diff --git a/src/influx_server.py b/src/influx_server.py index f119702..874612a 100644 --- a/src/influx_server.py +++ b/src/influx_server.py @@ -217,6 +217,165 @@ async def write_data(client, data): # non-blocking sleep to wait for data to be written await asyncio.sleep(2) + +# --- Websocket Consumer Function --- +async def websocket_consumer(uri: str, data_queue: asyncio.Queue): + """ + Connects to a websocket, receives messages, and puts them into a queue. + Includes basic reconnection logic. + """ + while True: + try: + print(f"Connecting to WebSocket: {uri}...") + async with websockets.connect(uri) as websocket: + print("WebSocket connected.") + while True: + try: + message = await websocket.recv() + # print(f"Received message: {message[:100]}...") # Print first 100 chars + try: + # Assuming messages are JSON, parse them and extract relevant data + data = json.loads(message) + # Example: If data has 'price' and 'symbol' + if 'data' in data and 's' in data['data'] and 'p' in data['data']: + symbol = data['data']['s'] + price = float(data['data']['p']) + timestamp = datetime.fromtimestamp(data['data']['T'] / 1000) # Convert ms to seconds + await data_queue.put({'symbol': symbol, 'price': price, 'timestamp': timestamp}) + else: + # Handle other message formats or log ignored messages + pass + except json.JSONDecodeError: + print(f"Received non-JSON message: {message}") + except KeyError as e: + print(f"Missing expected key in message: {e} in {message}") + + except websockets.exceptions.ConnectionClosedOK: + print("WebSocket connection closed gracefully.") + break + except websockets.exceptions.ConnectionClosed as e: + print(f"WebSocket connection closed unexpectedly: {e}") + break + except Exception as e: + print(f"Error receiving message: {e}") + break # Break to attempt reconnection + + except websockets.exceptions.InvalidURI as e: + print(f"Invalid WebSocket URI: {e}. Please check WEBSOCKET_URL.") + return # Exit if URI is invalid + except ConnectionRefusedError: + print(f"Connection refused to {uri}. Retrying in 5 seconds...") + except Exception as e: + print(f"General WebSocket error: {e}. Retrying in 5 seconds...") + + await asyncio.sleep(5) # Wait before attempting to reconnect + +# --- Data Processor Function --- +async def data_processor(data_queue: asyncio.Queue, influx_writer: InfluxDBWriter): + """ + Pulls data from the queue, converts it to InfluxDB Line Protocol, and passes to the writer. + """ + print("Data processor started.") + while True: + try: + data = await data_queue.get() + # Convert the received data into InfluxDB Line Protocol string + # Format: measurement,tag_key=tag_value field_key=field_value timestamp_in_nanoseconds + # Example: market_data,symbol=TESTSYM price=100.0 1678886400000000000 + # Timestamps for Line Protocol should be in nanoseconds since epoch + timestamp_ns = int(data['timestamp'].timestamp() * 1_000_000_000) + line_protocol = f"market_data,symbol={data['symbol']} price={data['price']} {timestamp_ns}" + + await influx_writer.write_point(line_protocol) + data_queue.task_done() + except asyncio.CancelledError: + print("Data processor cancelled.") + break + except Exception as e: + print(f"Error processing data from queue: {e}") + data_queue.task_done() # Mark as done even on error to prevent blocking + +# --- Main Application Logic --- +async def main(): + """Main function to run the asyncio application.""" + data_queue = asyncio.Queue() + influx_writer = InfluxDBWriter( + url=INFLUXDB_URL, + token=INFLUXDB_TOKEN, + org=INFLUXDB_ORG, + database=INFLUXDB_DATABASE, + batch_size=BATCH_SIZE, + flush_interval=FLUSH_INTERVAL_SECONDS + ) + + # Start the periodic flusher for InfluxDB + influx_writer.start_flusher() + + # Create and run tasks for websocket consumer and data processor + consumer_task = asyncio.create_task(websocket_consumer(WEBSOCKET_URL, data_queue)) + processor_task = asyncio.create_task(data_processor(data_queue, influx_writer)) + + print("Application started. Press Ctrl+C to exit.") + + try: + # Keep the main loop running until tasks are done or cancelled + await asyncio.gather(consumer_task, processor_task) + except asyncio.CancelledError: + print("Main application tasks cancelled.") + except KeyboardInterrupt: + print("KeyboardInterrupt received. Shutting down...") + finally: + # Graceful shutdown + print("Initiating graceful shutdown...") + consumer_task.cancel() + processor_task.cancel() + + # Wait for tasks to finish or be cancelled + await asyncio.gather(consumer_task, processor_task, return_exceptions=True) + + # Wait for any remaining items in the queue to be processed + if not data_queue.empty(): + print(f"Waiting for {data_queue.qsize()} items in queue to be processed...") + await data_queue.join() + + await influx_writer.stop_flusher() + print("Shutdown complete.") + +if __name__ == "__main__": + # Ensure you have the necessary libraries installed: + # pip install websockets influxdb-client-3 + + # Before running, make sure: + # 1. An InfluxDB 3.0 instance is running and accessible at INFLUXDB_URL. + # 2. You have created an organization (INFLUXDB_ORG), a database (INFLUXDB_DATABASE), + # and a token (INFLUXDB_TOKEN) with write permissions for that database. + # 3. A WebSocket server is running at WEBSOCKET_URL that sends JSON messages. + # (For testing, you can use a simple test WebSocket server or a public one like Binance's) + + # Example of a simple test WebSocket server (you can run this in a separate terminal): + # import asyncio + # import websockets + # import json + # import time + # async def echo(websocket, path): + # i = 0 + # while True: + # await websocket.send(json.dumps({"data": {"s": "TESTSYM", "p": str(100.0 + i), "T": int(time.time() * 1000)}})) + # i += 1 + # await asyncio.sleep(1) # Send data every second + # start_server = websockets.serve(echo, "localhost", 8765) + # asyncio.get_event_loop().run_until_complete(start_server) + # asyncio.get_event_loop().run_forever() + + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("Application terminated by user.") + except Exception as e: + print(f"An unhandled error occurred: {e}") + + + while __name__ == "__main__": # Initialize Callback