Some more ideas
This commit is contained in:
@@ -217,6 +217,165 @@ async def write_data(client, data):
|
|||||||
# non-blocking sleep to wait for data to be written
|
# non-blocking sleep to wait for data to be written
|
||||||
await asyncio.sleep(2)
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Websocket Consumer Function ---
|
||||||
|
async def websocket_consumer(uri: str, data_queue: asyncio.Queue):
|
||||||
|
"""
|
||||||
|
Connects to a websocket, receives messages, and puts them into a queue.
|
||||||
|
Includes basic reconnection logic.
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
print(f"Connecting to WebSocket: {uri}...")
|
||||||
|
async with websockets.connect(uri) as websocket:
|
||||||
|
print("WebSocket connected.")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
message = await websocket.recv()
|
||||||
|
# print(f"Received message: {message[:100]}...") # Print first 100 chars
|
||||||
|
try:
|
||||||
|
# Assuming messages are JSON, parse them and extract relevant data
|
||||||
|
data = json.loads(message)
|
||||||
|
# Example: If data has 'price' and 'symbol'
|
||||||
|
if 'data' in data and 's' in data['data'] and 'p' in data['data']:
|
||||||
|
symbol = data['data']['s']
|
||||||
|
price = float(data['data']['p'])
|
||||||
|
timestamp = datetime.fromtimestamp(data['data']['T'] / 1000) # Convert ms to seconds
|
||||||
|
await data_queue.put({'symbol': symbol, 'price': price, 'timestamp': timestamp})
|
||||||
|
else:
|
||||||
|
# Handle other message formats or log ignored messages
|
||||||
|
pass
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
print(f"Received non-JSON message: {message}")
|
||||||
|
except KeyError as e:
|
||||||
|
print(f"Missing expected key in message: {e} in {message}")
|
||||||
|
|
||||||
|
except websockets.exceptions.ConnectionClosedOK:
|
||||||
|
print("WebSocket connection closed gracefully.")
|
||||||
|
break
|
||||||
|
except websockets.exceptions.ConnectionClosed as e:
|
||||||
|
print(f"WebSocket connection closed unexpectedly: {e}")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error receiving message: {e}")
|
||||||
|
break # Break to attempt reconnection
|
||||||
|
|
||||||
|
except websockets.exceptions.InvalidURI as e:
|
||||||
|
print(f"Invalid WebSocket URI: {e}. Please check WEBSOCKET_URL.")
|
||||||
|
return # Exit if URI is invalid
|
||||||
|
except ConnectionRefusedError:
|
||||||
|
print(f"Connection refused to {uri}. Retrying in 5 seconds...")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"General WebSocket error: {e}. Retrying in 5 seconds...")
|
||||||
|
|
||||||
|
await asyncio.sleep(5) # Wait before attempting to reconnect
|
||||||
|
|
||||||
|
# --- Data Processor Function ---
|
||||||
|
async def data_processor(data_queue: asyncio.Queue, influx_writer: InfluxDBWriter):
|
||||||
|
"""
|
||||||
|
Pulls data from the queue, converts it to InfluxDB Line Protocol, and passes to the writer.
|
||||||
|
"""
|
||||||
|
print("Data processor started.")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
data = await data_queue.get()
|
||||||
|
# Convert the received data into InfluxDB Line Protocol string
|
||||||
|
# Format: measurement,tag_key=tag_value field_key=field_value timestamp_in_nanoseconds
|
||||||
|
# Example: market_data,symbol=TESTSYM price=100.0 1678886400000000000
|
||||||
|
# Timestamps for Line Protocol should be in nanoseconds since epoch
|
||||||
|
timestamp_ns = int(data['timestamp'].timestamp() * 1_000_000_000)
|
||||||
|
line_protocol = f"market_data,symbol={data['symbol']} price={data['price']} {timestamp_ns}"
|
||||||
|
|
||||||
|
await influx_writer.write_point(line_protocol)
|
||||||
|
data_queue.task_done()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
print("Data processor cancelled.")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing data from queue: {e}")
|
||||||
|
data_queue.task_done() # Mark as done even on error to prevent blocking
|
||||||
|
|
||||||
|
# --- Main Application Logic ---
|
||||||
|
async def main():
|
||||||
|
"""Main function to run the asyncio application."""
|
||||||
|
data_queue = asyncio.Queue()
|
||||||
|
influx_writer = InfluxDBWriter(
|
||||||
|
url=INFLUXDB_URL,
|
||||||
|
token=INFLUXDB_TOKEN,
|
||||||
|
org=INFLUXDB_ORG,
|
||||||
|
database=INFLUXDB_DATABASE,
|
||||||
|
batch_size=BATCH_SIZE,
|
||||||
|
flush_interval=FLUSH_INTERVAL_SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start the periodic flusher for InfluxDB
|
||||||
|
influx_writer.start_flusher()
|
||||||
|
|
||||||
|
# Create and run tasks for websocket consumer and data processor
|
||||||
|
consumer_task = asyncio.create_task(websocket_consumer(WEBSOCKET_URL, data_queue))
|
||||||
|
processor_task = asyncio.create_task(data_processor(data_queue, influx_writer))
|
||||||
|
|
||||||
|
print("Application started. Press Ctrl+C to exit.")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Keep the main loop running until tasks are done or cancelled
|
||||||
|
await asyncio.gather(consumer_task, processor_task)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
print("Main application tasks cancelled.")
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("KeyboardInterrupt received. Shutting down...")
|
||||||
|
finally:
|
||||||
|
# Graceful shutdown
|
||||||
|
print("Initiating graceful shutdown...")
|
||||||
|
consumer_task.cancel()
|
||||||
|
processor_task.cancel()
|
||||||
|
|
||||||
|
# Wait for tasks to finish or be cancelled
|
||||||
|
await asyncio.gather(consumer_task, processor_task, return_exceptions=True)
|
||||||
|
|
||||||
|
# Wait for any remaining items in the queue to be processed
|
||||||
|
if not data_queue.empty():
|
||||||
|
print(f"Waiting for {data_queue.qsize()} items in queue to be processed...")
|
||||||
|
await data_queue.join()
|
||||||
|
|
||||||
|
await influx_writer.stop_flusher()
|
||||||
|
print("Shutdown complete.")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Ensure you have the necessary libraries installed:
|
||||||
|
# pip install websockets influxdb-client-3
|
||||||
|
|
||||||
|
# Before running, make sure:
|
||||||
|
# 1. An InfluxDB 3.0 instance is running and accessible at INFLUXDB_URL.
|
||||||
|
# 2. You have created an organization (INFLUXDB_ORG), a database (INFLUXDB_DATABASE),
|
||||||
|
# and a token (INFLUXDB_TOKEN) with write permissions for that database.
|
||||||
|
# 3. A WebSocket server is running at WEBSOCKET_URL that sends JSON messages.
|
||||||
|
# (For testing, you can use a simple test WebSocket server or a public one like Binance's)
|
||||||
|
|
||||||
|
# Example of a simple test WebSocket server (you can run this in a separate terminal):
|
||||||
|
# import asyncio
|
||||||
|
# import websockets
|
||||||
|
# import json
|
||||||
|
# import time
|
||||||
|
# async def echo(websocket, path):
|
||||||
|
# i = 0
|
||||||
|
# while True:
|
||||||
|
# await websocket.send(json.dumps({"data": {"s": "TESTSYM", "p": str(100.0 + i), "T": int(time.time() * 1000)}}))
|
||||||
|
# i += 1
|
||||||
|
# await asyncio.sleep(1) # Send data every second
|
||||||
|
# start_server = websockets.serve(echo, "localhost", 8765)
|
||||||
|
# asyncio.get_event_loop().run_until_complete(start_server)
|
||||||
|
# asyncio.get_event_loop().run_forever()
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(main())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("Application terminated by user.")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"An unhandled error occurred: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
while __name__ == "__main__":
|
while __name__ == "__main__":
|
||||||
|
|
||||||
# Initialize Callback
|
# Initialize Callback
|
||||||
|
|||||||
Reference in New Issue
Block a user