diff --git a/.gitignore b/.gitignore index 09b6c00..0250a21 100644 --- a/.gitignore +++ b/.gitignore @@ -215,3 +215,5 @@ ansible/inventory-actual *.retry .ansible/ ansible/vars/secrets.yml +bin +go.sum \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..fad6b58 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module src + +go 1.24.7 + +require ( + github.com/aopoltorzhicky/go_kraken v0.2.3 + github.com/gorilla/websocket v1.5.3 + github.com/influxdata/influxdb-client-go/v2 v2.14.0 + github.com/sirupsen/logrus v1.9.3 +) + +require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/google/uuid v1.3.1 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/shopspring/decimal v1.3.1 // indirect + golang.org/x/net v0.23.0 // indirect + golang.org/x/sys v0.18.0 // indirect +) diff --git a/src/database/database_server.go b/src/database/database_server.go new file mode 100644 index 0000000..c376800 --- /dev/null +++ b/src/database/database_server.go @@ -0,0 +1,125 @@ +package main + +import ( + "os" + "os/signal" + "strings" + "syscall" + "time" + + ws "github.com/aopoltorzhicky/go_kraken/websocket" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + log "github.com/sirupsen/logrus" +) + +var SERVER = "wss://ws.kraken.com" +var PATH = "/v2" +var TIMESWAIT = 0 +var TIMESWAITMAX = 5 +var INFLUXSERVER = os.Getenv("INFLUX_URL") +var INFLUXTOKEN = os.Getenv("INFLUX_TOKEN") +var INFLUXORG = os.Getenv("INFLUX_ORG") +var INFLUXBUCKET = os.Getenv("INFLUX_BUCKET") +var CURRENCYPAIRS = strings.Split(os.Getenv("CURRENCYPAIRS"), " ") + +/* + type WsTokenJsonStruct struct { + Error []interface() `json:"error"` + result struct { + Token string `json:"token"` + Expires int `json:"expires"` + } `json:"result"` + } +*/ +func connectDatabase() influxdb2.Client { + client := influxdb2.NewClientWithOptions(INFLUXSERVER, INFLUXTOKEN, influxdb2.DefaultOptions().SetBatchSize(20)) + return client +} + +func writeDatabase(client influxdb2.Client, update ws.Update) { + writeAPI := client.WriteAPI(INFLUXORG, INFLUXBUCKET) + + switch data := update.Data.(type) { + case ws.TickerUpdate: + ask_price, err := data.Ask.Price.Float64() + if err != nil { + log.Warnf("Data extraction error %s", err.Error()) + } + ask_volume, err := data.Ask.Volume.Float64() + if err != nil { + log.Warnf("Data extraction error %s", err.Error()) + } + bid_price, err := data.Bid.Price.Float64() + if err != nil { + log.Warnf("Data extraction error %s", err.Error()) + } + bid_volume, err := data.Bid.Volume.Float64() + if err != nil { + log.Warnf("Data extraction error %s", err.Error()) + } + open_today, err := data.Open.Today.Float64() + if err != nil { + log.Warnf("Data extraction error %s", err.Error()) + } + open_last24, err := data.Open.Last24.Float64() + if err != nil { + log.Warnf("Data extraction error %s", err.Error()) + } + p := influxdb2.NewPoint( + "ticker", + map[string]string{ + "pair": update.Pair, + }, + map[string]interface{}{ + "ask_price": ask_price, + "ask_volume": ask_volume, + "bid_price": bid_price, + "bid_volume": bid_volume, + "open_today": open_today, + "open_24": open_last24, + }, + time.Now()) + + writeAPI.WritePoint(p) + default: + } +} + +func main() { + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + + kraken := ws.NewKraken(ws.ProdBaseURL) + if err := kraken.Connect(); err != nil { + log.Fatalf("Error connecting to websocket: %s", err.Error()) + } + + if err := kraken.SubscribeTicker([]string{ws.BTCUSD}); err != nil { + log.Fatalf("SubscribeTicker error %s", err.Error()) + } + + client := connectDatabase() + defer client.Close() + + for { + select { + case <-signals: + log.Warn("Stopping...") + if err := kraken.Close(); err != nil { + log.Fatal(err) + } + return + + case update := <-kraken.Listen(): + switch data := update.Data.(type) { + case ws.TickerUpdate: + writeDatabase(client, update) + log.Printf("---Ticker of %s---", update.Pair) + log.Printf("Ask: %s with %s", data.Ask.Price.String(), data.Ask.Volume.String()) + log.Printf("Bid: %s with %s", data.Bid.Price.String(), data.Bid.Volume.String()) + log.Printf("Open today: %s | Open last 24 hours: %s", data.Open.Today.String(), data.Open.Last24.String()) + default: + } + } + } +}