Initial commit
This commit is contained in:
@@ -0,0 +1,109 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/caarlos0/env"
|
||||
"github.com/nats-io/go-nats-streaming"
|
||||
"googlemaps.github.io/maps"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
MapsApiKey string `env:"MAPS_API_KEY"`
|
||||
NATSUrl string `env:"NATS_URL" envDefault:"nats://nats:4222"`
|
||||
}
|
||||
|
||||
type event struct {
|
||||
Name string `json:"name"`
|
||||
City string `json:"city"`
|
||||
Municipality string `json:"municiplity"`
|
||||
State string `json:"state"`
|
||||
Created string `json:"created"`
|
||||
}
|
||||
|
||||
type location struct {
|
||||
Name string `json:"name"`
|
||||
City string `json:"city"`
|
||||
Municipality string `json:"municiplity"`
|
||||
State string `json:"state"`
|
||||
Lat float64 `json:"lat"`
|
||||
Long float64 `json:"long"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
cfg := config{}
|
||||
err := env.Parse(&cfg)
|
||||
if err != nil {
|
||||
fmt.Printf("%+v\n", err)
|
||||
}
|
||||
fmt.Printf("%+v\n", cfg)
|
||||
|
||||
clusterID := "stan"
|
||||
clientID := "geo-service"
|
||||
|
||||
c, err := maps.NewClient(maps.WithAPIKey(cfg.MapsApiKey))
|
||||
if err != nil {
|
||||
log.Fatalf("fatal error: %s", err)
|
||||
}
|
||||
|
||||
sc, err := stan.Connect(clusterID, clientID, stan.NatsURL(cfg.NATSUrl),
|
||||
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
|
||||
log.Fatalf("Connection lost, reason: %v", reason)
|
||||
}))
|
||||
if err != nil {
|
||||
log.Fatalf("Can't connect: %v.\nMake sure a NATS Streaming Server is running at: %s", err, cfg.NATSUrl)
|
||||
}
|
||||
log.Printf("Connected to %s clusterID: [%s] clientID: [%s]\n", cfg.NATSUrl, clusterID, clientID)
|
||||
|
||||
mcb := func(msg *stan.Msg) {
|
||||
e := event{}
|
||||
if err := json.Unmarshal(msg.Data, &e); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
r := &maps.GeocodingRequest{
|
||||
Address: fmt.Sprintf("%s,%s,%s,%s", e.Name, e.City, e.Municipality, e.State),
|
||||
}
|
||||
if result, err := c.Geocode(context.Background(), r); err != nil {
|
||||
log.Fatalf("fatal error: %s", err)
|
||||
} else {
|
||||
l := location{
|
||||
Name: e.Name,
|
||||
City: e.City,
|
||||
Municipality: e.Municipality,
|
||||
State: e.State,
|
||||
Lat: result[0].Geometry.Location.Lat,
|
||||
Long: result[0].Geometry.Location.Lng,
|
||||
}
|
||||
if response, err := json.Marshal(l); err != nil {
|
||||
log.Fatalf("fatal error: %s", err)
|
||||
} else {
|
||||
if err := sc.Publish("DanceHall.Location", response); err != nil {
|
||||
log.Fatalf("fatal error: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := sc.QueueSubscribe("DanceHall.Created", "geo-service", mcb, stan.StartWithLastReceived(), stan.DurableName("geo-service")); err != nil {
|
||||
_ = sc.Close()
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
signalChan := make(chan os.Signal, 1)
|
||||
cleanupDone := make(chan bool)
|
||||
signal.Notify(signalChan, os.Interrupt)
|
||||
go func() {
|
||||
for range signalChan {
|
||||
fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n")
|
||||
// Do not unsubscribe a durable on exit, except if asked to.
|
||||
_ = sc.Close()
|
||||
cleanupDone <- true
|
||||
}
|
||||
}()
|
||||
<-cleanupDone
|
||||
}
|
||||
Reference in New Issue
Block a user