feat: add opencensus tracing and metrics support
This commit is contained in:
@ -24,8 +24,10 @@ type Config struct {
|
||||
Core `mapstructure:",squash"`
|
||||
Serv `mapstructure:",squash"`
|
||||
|
||||
cpath string
|
||||
vi *viper.Viper
|
||||
closeFn func()
|
||||
hostPort string
|
||||
cpath string
|
||||
vi *viper.Viper
|
||||
}
|
||||
|
||||
// Serv struct contains config values used by the Super Graph service
|
||||
@ -50,7 +52,19 @@ type Serv struct {
|
||||
|
||||
// Telemetry struct contains OpenCensus metrics and tracing related config
|
||||
Telemetry struct {
|
||||
Enable bool
|
||||
Debug bool
|
||||
Metrics struct {
|
||||
Exporter string
|
||||
Endpoint string
|
||||
Namespace string
|
||||
Key string
|
||||
}
|
||||
|
||||
Tracing struct {
|
||||
Exporter string
|
||||
Endpoint string
|
||||
Sample string
|
||||
}
|
||||
}
|
||||
|
||||
Auth auth.Auth
|
||||
|
@ -19,8 +19,8 @@ const (
|
||||
|
||||
var (
|
||||
// These variables are set using -ldflags
|
||||
version string
|
||||
gitBranch string
|
||||
version string = "unknown"
|
||||
gitBranch string = "unknown"
|
||||
lastCommitSHA string
|
||||
lastCommitTime string
|
||||
)
|
||||
|
@ -111,6 +111,10 @@ func GetConfigName() string {
|
||||
return ge
|
||||
}
|
||||
|
||||
func (c *Config) telemetryEnabled() bool {
|
||||
return c.Telemetry.Metrics.Exporter != "" || c.Telemetry.Tracing.Exporter != ""
|
||||
}
|
||||
|
||||
func (c *Config) relPath(p string) string {
|
||||
if filepath.IsAbs(p) {
|
||||
return p
|
||||
|
@ -217,7 +217,7 @@ func initDB(c *Config, useDB bool) (*sql.DB, error) {
|
||||
// return errors.New("failed to open db")
|
||||
// }
|
||||
|
||||
if conf.Telemetry.Enable {
|
||||
if conf.telemetryEnabled() {
|
||||
driverName, err = ocsql.Register(driverName, ocsql.WithAllTraceOptions(), ocsql.WithInstanceName(conf.AppName))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to register ocsql driver: %v", err)
|
||||
@ -242,7 +242,7 @@ func initDB(c *Config, useDB bool) (*sql.DB, error) {
|
||||
return nil, fmt.Errorf("unable to open db connection: %v", err)
|
||||
}
|
||||
|
||||
if conf.Telemetry.Enable {
|
||||
if conf.telemetryEnabled() {
|
||||
defer ocsql.RecordStats(db, 2*time.Second)()
|
||||
}
|
||||
|
||||
|
@ -37,7 +37,6 @@ func initWatcher() {
|
||||
}
|
||||
|
||||
func startHTTP() {
|
||||
var hostPort string
|
||||
var appName string
|
||||
|
||||
defaultHP := "0.0.0.0:8080"
|
||||
@ -56,12 +55,12 @@ func startHTTP() {
|
||||
hp[1] = conf.Port
|
||||
}
|
||||
|
||||
hostPort = fmt.Sprintf("%s:%s", hp[0], hp[1])
|
||||
conf.hostPort = fmt.Sprintf("%s:%s", hp[0], hp[1])
|
||||
}
|
||||
}
|
||||
|
||||
if len(hostPort) == 0 {
|
||||
hostPort = defaultHP
|
||||
if len(conf.hostPort) == 0 {
|
||||
conf.hostPort = defaultHP
|
||||
}
|
||||
|
||||
routes, err := routeHandler()
|
||||
@ -70,7 +69,7 @@ func startHTTP() {
|
||||
}
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: hostPort,
|
||||
Addr: conf.hostPort,
|
||||
Handler: routes,
|
||||
ReadTimeout: 5 * time.Second,
|
||||
WriteTimeout: 10 * time.Second,
|
||||
@ -90,13 +89,15 @@ func startHTTP() {
|
||||
}()
|
||||
|
||||
srv.RegisterOnShutdown(func() {
|
||||
if conf.closeFn != nil {
|
||||
conf.closeFn()
|
||||
}
|
||||
db.Close()
|
||||
log.Fatalln("INF shutdown complete")
|
||||
})
|
||||
|
||||
log.Printf("INF version: %s, git-branch: %s, host-port: %s, app-name: %s, env: %s\n",
|
||||
version, gitBranch, hostPort, appName, env)
|
||||
|
||||
log.Printf("INF %s started\n", serverName)
|
||||
log.Printf("INF Super Graph started, version: %s, git-branch: %s, host-port: %s, app-name: %s, env: %s\n",
|
||||
version, gitBranch, conf.hostPort, appName, env)
|
||||
|
||||
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
|
||||
log.Fatalln("INF server closed")
|
||||
@ -106,6 +107,7 @@ func startHTTP() {
|
||||
}
|
||||
|
||||
func routeHandler() (http.Handler, error) {
|
||||
var err error
|
||||
mux := http.NewServeMux()
|
||||
|
||||
if conf == nil {
|
||||
@ -142,6 +144,13 @@ func routeHandler() (http.Handler, error) {
|
||||
mux.Handle(k, v)
|
||||
}
|
||||
|
||||
if conf.telemetryEnabled() {
|
||||
conf.closeFn, err = enableObservability(mux)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
fn := func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Server", serverName)
|
||||
mux.ServeHTTP(w, r)
|
||||
|
139
internal/serv/telemetry.go
Normal file
139
internal/serv/telemetry.go
Normal file
@ -0,0 +1,139 @@
|
||||
package serv
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"contrib.go.opencensus.io/exporter/aws"
|
||||
"contrib.go.opencensus.io/exporter/prometheus"
|
||||
"contrib.go.opencensus.io/exporter/stackdriver"
|
||||
|
||||
"contrib.go.opencensus.io/exporter/zipkin"
|
||||
"contrib.go.opencensus.io/integrations/ocsql"
|
||||
stdzipkin "github.com/openzipkin/zipkin-go"
|
||||
httpreporter "github.com/openzipkin/zipkin-go/reporter/http"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/trace"
|
||||
"go.opencensus.io/zpages"
|
||||
)
|
||||
|
||||
func enableObservability(mux *http.ServeMux) (func(), error) {
|
||||
// Enable OpenCensus zPages
|
||||
if conf.Telemetry.Debug {
|
||||
zpages.Handle(mux, "/telemetry")
|
||||
}
|
||||
|
||||
// Enable ocsql metrics with OpenCensus
|
||||
ocsql.RegisterAllViews()
|
||||
|
||||
var mex view.Exporter
|
||||
var tex trace.Exporter
|
||||
|
||||
var mCloseFn, tCloseFn func()
|
||||
var err error
|
||||
|
||||
// Set up the metrics exporter
|
||||
switch conf.Telemetry.Metrics.Exporter {
|
||||
case "prometheus":
|
||||
ep := "/metrics"
|
||||
|
||||
if conf.Telemetry.Metrics.Endpoint != "" {
|
||||
ep = conf.Telemetry.Metrics.Endpoint
|
||||
}
|
||||
|
||||
ex, err1 := prometheus.NewExporter(prometheus.Options{Namespace: conf.Telemetry.Metrics.Namespace})
|
||||
if err == nil {
|
||||
mux.Handle(ep, ex)
|
||||
log.Printf("INF Prometheus exporter listening on: %s", ep)
|
||||
}
|
||||
mex, err = view.Exporter(ex), err1
|
||||
|
||||
case "stackdriver":
|
||||
mex, err = stackdriver.NewExporter(stackdriver.Options{ProjectID: conf.Telemetry.Metrics.Key})
|
||||
if err == nil {
|
||||
log.Println("INF Google Stackdriver exporter initialized")
|
||||
}
|
||||
|
||||
case "":
|
||||
log.Println("INF No OpenCensus metrics exporter initialized")
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("invalid metrics exporter")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ERR OpenCensus: %s: %v", conf.Telemetry.Metrics, err)
|
||||
}
|
||||
|
||||
if mex != nil {
|
||||
// Register the exporter
|
||||
view.RegisterExporter(mex)
|
||||
}
|
||||
|
||||
// Set up the tracing exporter
|
||||
switch conf.Telemetry.Tracing.Exporter {
|
||||
case "xray", "aws":
|
||||
ex, err1 := aws.NewExporter(aws.WithVersion("latest"))
|
||||
if err == nil {
|
||||
tCloseFn = func() { ex.Flush() }
|
||||
log.Println("INF Amazon X-Ray exporter initialized")
|
||||
}
|
||||
tex, err = trace.Exporter(ex), err1
|
||||
|
||||
case "zipkin":
|
||||
// The local endpoint stores the name and address of the local service
|
||||
lep, err := stdzipkin.NewEndpoint(conf.AppName, conf.hostPort)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The Zipkin reporter takes collected spans from the app and reports them to the backend
|
||||
// http://localhost:9411/api/v2/spans is the default for the Zipkin Span v2
|
||||
re := httpreporter.NewReporter(conf.Telemetry.Tracing.Endpoint)
|
||||
tCloseFn = func() { re.Close() }
|
||||
tex = zipkin.NewExporter(re, lep)
|
||||
|
||||
case "":
|
||||
log.Println("INF No OpenCensus tracing exporter initialized")
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("invalid tracing exporter")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("ERR OpenCensus: %s: %v", conf.Telemetry.Tracing, err)
|
||||
}
|
||||
|
||||
if tex != nil {
|
||||
trace.RegisterExporter(tex)
|
||||
sample := conf.Telemetry.Tracing.Sample
|
||||
|
||||
if sample == "always" {
|
||||
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
|
||||
|
||||
} else {
|
||||
prob := 0.5
|
||||
if v, err := strconv.ParseFloat(sample, 10); err == nil {
|
||||
prob = v
|
||||
}
|
||||
trace.ApplyConfig(trace.Config{DefaultSampler: trace.ProbabilitySampler(prob)})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
var closeOnce sync.Once
|
||||
|
||||
return func() {
|
||||
// Flush and shutdown the Zipkin HTTP reporter
|
||||
closeOnce.Do(func() {
|
||||
if mCloseFn != nil {
|
||||
mCloseFn()
|
||||
}
|
||||
if tCloseFn != nil {
|
||||
tCloseFn()
|
||||
}
|
||||
})
|
||||
}, err
|
||||
}
|
@ -69,6 +69,16 @@ cors_debug: false
|
||||
# person: people
|
||||
# sheep: sheep
|
||||
|
||||
# open opencensus tracing and metrics
|
||||
# telemetry:
|
||||
# debug: true
|
||||
# metrics:
|
||||
# exporter: "prometheus"
|
||||
# tracing:
|
||||
# exporter: "zipkin"
|
||||
# endpoint: "http://zipkin:9411/api/v2/spans"
|
||||
# sample: 0.6
|
||||
|
||||
auth:
|
||||
# Can be 'rails', 'jwt' or 'header'
|
||||
type: rails
|
||||
|
@ -68,6 +68,16 @@ reload_on_config_change: false
|
||||
# SG_AUTH_RAILS_REDIS_PASSWORD
|
||||
# SG_AUTH_JWT_PUBLIC_KEY_FILE
|
||||
|
||||
# open opencensus tracing and metrics
|
||||
# telemetry:
|
||||
# debug: false
|
||||
# metrics:
|
||||
# exporter: "prometheus"
|
||||
# tracing:
|
||||
# exporter: "zipkin"
|
||||
# endpoint: "http://zipkin:9411/api/v2/spans"
|
||||
# sample: 0.6
|
||||
|
||||
database:
|
||||
type: postgres
|
||||
host: db
|
||||
|
@ -3,6 +3,8 @@ package serv
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/brianvoe/gofakeit/v5"
|
||||
)
|
||||
|
||||
func TestGQLHash1(t *testing.T) {
|
||||
@ -229,3 +231,7 @@ func TestGQLHashWithVars2(t *testing.T) {
|
||||
t.Fatal("Hashes don't match they should")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGoFake(t *testing.T) {
|
||||
gofakeit.Person()
|
||||
}
|
||||
|
Reference in New Issue
Block a user