Merge pull request 'Go database server initial commit' (#1) from go into main
Reviewed-on: #1
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -215,3 +215,5 @@ ansible/inventory-actual
|
||||
*.retry
|
||||
.ansible/
|
||||
ansible/vars/secrets.yml
|
||||
bin
|
||||
go.sum
|
||||
21
go.mod
Normal file
21
go.mod
Normal file
@@ -0,0 +1,21 @@
|
||||
module src
|
||||
|
||||
go 1.24.7
|
||||
|
||||
require (
|
||||
github.com/aopoltorzhicky/go_kraken v0.2.3
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/influxdata/influxdb-client-go/v2 v2.14.0
|
||||
github.com/sirupsen/logrus v1.9.3
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
|
||||
github.com/google/uuid v1.3.1 // indirect
|
||||
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect
|
||||
github.com/oapi-codegen/runtime v1.0.0 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/shopspring/decimal v1.3.1 // indirect
|
||||
golang.org/x/net v0.23.0 // indirect
|
||||
golang.org/x/sys v0.18.0 // indirect
|
||||
)
|
||||
125
src/database/database_server.go
Normal file
125
src/database/database_server.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
ws "github.com/aopoltorzhicky/go_kraken/websocket"
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var SERVER = "wss://ws.kraken.com"
|
||||
var PATH = "/v2"
|
||||
var TIMESWAIT = 0
|
||||
var TIMESWAITMAX = 5
|
||||
var INFLUXSERVER = os.Getenv("INFLUX_URL")
|
||||
var INFLUXTOKEN = os.Getenv("INFLUX_TOKEN")
|
||||
var INFLUXORG = os.Getenv("INFLUX_ORG")
|
||||
var INFLUXBUCKET = os.Getenv("INFLUX_BUCKET")
|
||||
var CURRENCYPAIRS = strings.Split(os.Getenv("CURRENCYPAIRS"), " ")
|
||||
|
||||
/*
|
||||
type WsTokenJsonStruct struct {
|
||||
Error []interface() `json:"error"`
|
||||
result struct {
|
||||
Token string `json:"token"`
|
||||
Expires int `json:"expires"`
|
||||
} `json:"result"`
|
||||
}
|
||||
*/
|
||||
func connectDatabase() influxdb2.Client {
|
||||
client := influxdb2.NewClientWithOptions(INFLUXSERVER, INFLUXTOKEN, influxdb2.DefaultOptions().SetBatchSize(20))
|
||||
return client
|
||||
}
|
||||
|
||||
func writeDatabase(client influxdb2.Client, update ws.Update) {
|
||||
writeAPI := client.WriteAPI(INFLUXORG, INFLUXBUCKET)
|
||||
|
||||
switch data := update.Data.(type) {
|
||||
case ws.TickerUpdate:
|
||||
ask_price, err := data.Ask.Price.Float64()
|
||||
if err != nil {
|
||||
log.Warnf("Data extraction error %s", err.Error())
|
||||
}
|
||||
ask_volume, err := data.Ask.Volume.Float64()
|
||||
if err != nil {
|
||||
log.Warnf("Data extraction error %s", err.Error())
|
||||
}
|
||||
bid_price, err := data.Bid.Price.Float64()
|
||||
if err != nil {
|
||||
log.Warnf("Data extraction error %s", err.Error())
|
||||
}
|
||||
bid_volume, err := data.Bid.Volume.Float64()
|
||||
if err != nil {
|
||||
log.Warnf("Data extraction error %s", err.Error())
|
||||
}
|
||||
open_today, err := data.Open.Today.Float64()
|
||||
if err != nil {
|
||||
log.Warnf("Data extraction error %s", err.Error())
|
||||
}
|
||||
open_last24, err := data.Open.Last24.Float64()
|
||||
if err != nil {
|
||||
log.Warnf("Data extraction error %s", err.Error())
|
||||
}
|
||||
p := influxdb2.NewPoint(
|
||||
"ticker",
|
||||
map[string]string{
|
||||
"pair": update.Pair,
|
||||
},
|
||||
map[string]interface{}{
|
||||
"ask_price": ask_price,
|
||||
"ask_volume": ask_volume,
|
||||
"bid_price": bid_price,
|
||||
"bid_volume": bid_volume,
|
||||
"open_today": open_today,
|
||||
"open_24": open_last24,
|
||||
},
|
||||
time.Now())
|
||||
|
||||
writeAPI.WritePoint(p)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
|
||||
|
||||
kraken := ws.NewKraken(ws.ProdBaseURL)
|
||||
if err := kraken.Connect(); err != nil {
|
||||
log.Fatalf("Error connecting to websocket: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := kraken.SubscribeTicker([]string{ws.BTCUSD}); err != nil {
|
||||
log.Fatalf("SubscribeTicker error %s", err.Error())
|
||||
}
|
||||
|
||||
client := connectDatabase()
|
||||
defer client.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-signals:
|
||||
log.Warn("Stopping...")
|
||||
if err := kraken.Close(); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return
|
||||
|
||||
case update := <-kraken.Listen():
|
||||
switch data := update.Data.(type) {
|
||||
case ws.TickerUpdate:
|
||||
writeDatabase(client, update)
|
||||
log.Printf("---Ticker of %s---", update.Pair)
|
||||
log.Printf("Ask: %s with %s", data.Ask.Price.String(), data.Ask.Volume.String())
|
||||
log.Printf("Bid: %s with %s", data.Bid.Price.String(), data.Bid.Volume.String())
|
||||
log.Printf("Open today: %s | Open last 24 hours: %s", data.Open.Today.String(), data.Open.Last24.String())
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user