Go database server initial commit

This commit is contained in:
2025-09-11 22:46:09 -05:00
parent bfa1a3c64b
commit 10531cf9e8
3 changed files with 148 additions and 0 deletions

2
.gitignore vendored
View File

@@ -215,3 +215,5 @@ ansible/inventory-actual
*.retry
.ansible/
ansible/vars/secrets.yml
bin
go.sum

21
go.mod Normal file
View File

@@ -0,0 +1,21 @@
module src
go 1.24.7
require (
github.com/aopoltorzhicky/go_kraken v0.2.3
github.com/gorilla/websocket v1.5.3
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/sirupsen/logrus v1.9.3
)
require (
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
github.com/oapi-codegen/runtime v1.0.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
)

View File

@@ -0,0 +1,125 @@
package main
import (
"os"
"os/signal"
"strings"
"syscall"
"time"
ws "github.com/aopoltorzhicky/go_kraken/websocket"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
log "github.com/sirupsen/logrus"
)
var SERVER = "wss://ws.kraken.com"
var PATH = "/v2"
var TIMESWAIT = 0
var TIMESWAITMAX = 5
var INFLUXSERVER = os.Getenv("INFLUX_URL")
var INFLUXTOKEN = os.Getenv("INFLUX_TOKEN")
var INFLUXORG = os.Getenv("INFLUX_ORG")
var INFLUXBUCKET = os.Getenv("INFLUX_BUCKET")
var CURRENCYPAIRS = strings.Split(os.Getenv("CURRENCYPAIRS"), " ")
/*
type WsTokenJsonStruct struct {
Error []interface() `json:"error"`
result struct {
Token string `json:"token"`
Expires int `json:"expires"`
} `json:"result"`
}
*/
func connectDatabase() influxdb2.Client {
client := influxdb2.NewClientWithOptions(INFLUXSERVER, INFLUXTOKEN, influxdb2.DefaultOptions().SetBatchSize(20))
return client
}
func writeDatabase(client influxdb2.Client, update ws.Update) {
writeAPI := client.WriteAPI(INFLUXORG, INFLUXBUCKET)
switch data := update.Data.(type) {
case ws.TickerUpdate:
ask_price, err := data.Ask.Price.Float64()
if err != nil {
log.Warnf("Data extraction error %s", err.Error())
}
ask_volume, err := data.Ask.Volume.Float64()
if err != nil {
log.Warnf("Data extraction error %s", err.Error())
}
bid_price, err := data.Bid.Price.Float64()
if err != nil {
log.Warnf("Data extraction error %s", err.Error())
}
bid_volume, err := data.Bid.Volume.Float64()
if err != nil {
log.Warnf("Data extraction error %s", err.Error())
}
open_today, err := data.Open.Today.Float64()
if err != nil {
log.Warnf("Data extraction error %s", err.Error())
}
open_last24, err := data.Open.Last24.Float64()
if err != nil {
log.Warnf("Data extraction error %s", err.Error())
}
p := influxdb2.NewPoint(
"ticker",
map[string]string{
"pair": update.Pair,
},
map[string]interface{}{
"ask_price": ask_price,
"ask_volume": ask_volume,
"bid_price": bid_price,
"bid_volume": bid_volume,
"open_today": open_today,
"open_24": open_last24,
},
time.Now())
writeAPI.WritePoint(p)
default:
}
}
func main() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
kraken := ws.NewKraken(ws.ProdBaseURL)
if err := kraken.Connect(); err != nil {
log.Fatalf("Error connecting to websocket: %s", err.Error())
}
if err := kraken.SubscribeTicker([]string{ws.BTCUSD}); err != nil {
log.Fatalf("SubscribeTicker error %s", err.Error())
}
client := connectDatabase()
defer client.Close()
for {
select {
case <-signals:
log.Warn("Stopping...")
if err := kraken.Close(); err != nil {
log.Fatal(err)
}
return
case update := <-kraken.Listen():
switch data := update.Data.(type) {
case ws.TickerUpdate:
writeDatabase(client, update)
log.Printf("---Ticker of %s---", update.Pair)
log.Printf("Ask: %s with %s", data.Ask.Price.String(), data.Ask.Volume.String())
log.Printf("Bid: %s with %s", data.Bid.Price.String(), data.Bid.Volume.String())
log.Printf("Open today: %s | Open last 24 hours: %s", data.Open.Today.String(), data.Open.Last24.String())
default:
}
}
}
}