Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,16 @@ func TestAsyncProcessorEnqueueAndProcess(t *testing.T) {
t.Fatal("timed out waiting for async processing")
}

if got := testutil.ToFloat64(asyncProcessedEventsCounter.WithLabelValues("workflow_run")); got != 1 {
t.Fatalf("expected processed counter to be 1, got %v", got)
deadline := time.Now().Add(2 * time.Second)
for {
if got := testutil.ToFloat64(asyncProcessedEventsCounter.WithLabelValues("workflow_run")); got == 1 {
break
}
if time.Now().After(deadline) {
got := testutil.ToFloat64(asyncProcessedEventsCounter.WithLabelValues("workflow_run"))
t.Fatalf("expected processed counter to be 1, got %v", got)
}
time.Sleep(10 * time.Millisecond)
}
}

Expand Down
190 changes: 140 additions & 50 deletions src/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package main

import (
"bufio"
"context"
"encoding/json"
"errors"
"net"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/gorilla/mux"
Expand All @@ -22,7 +28,13 @@ type HealthCheckResposne struct {

type statusRecorder struct {
http.ResponseWriter
status int
status int
wroteHeader bool
}

type serviceMetrics struct {
apiCallsCounter *prometheus.CounterVec
requestDurationHistogram *prometheus.HistogramVec
}

var (
Expand All @@ -32,60 +44,112 @@ var (
enableDebug string // Compile time flag to enable debug mode
debug bool

apiCallsCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "promgithub_api_calls_total",
Help: "Number of API calls",
},
[]string{"status", "method", "path"},
)

requestDurationHistogram = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "promgithub_request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"path", "method"},
)
defaultServiceMetrics = newServiceMetrics(prometheus.DefaultRegisterer)
)

func apiHandler(logger *zap.Logger) func(http.Handler) http.Handler {
func newServiceMetrics(registerer prometheus.Registerer) *serviceMetrics {
factory := promauto.With(registerer)

return &serviceMetrics{
apiCallsCounter: factory.NewCounterVec(
prometheus.CounterOpts{
Name: "promgithub_api_calls_total",
Help: "Number of API calls",
},
[]string{"status", "method", "path"},
),
requestDurationHistogram: factory.NewHistogramVec(
prometheus.HistogramOpts{
Name: "promgithub_request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.DefBuckets,
},
[]string{"path", "method"},
),
}
}

func (r *statusRecorder) WriteHeader(status int) {
r.status = status
r.wroteHeader = true
r.ResponseWriter.WriteHeader(status)
}

func (r *statusRecorder) Write(body []byte) (int, error) {
if !r.wroteHeader {
r.WriteHeader(http.StatusOK)
}
return r.ResponseWriter.Write(body)
}

func (r *statusRecorder) Flush() {
if flusher, ok := r.ResponseWriter.(http.Flusher); ok {
if !r.wroteHeader {
r.WriteHeader(http.StatusOK)
}
flusher.Flush()
}
}

func (r *statusRecorder) Hijack() (net.Conn, *bufio.ReadWriter, error) {
hijacker, ok := r.ResponseWriter.(http.Hijacker)
if !ok {
return nil, nil, errors.New("response writer does not support hijacking")
}
return hijacker.Hijack()
}

func (r *statusRecorder) Push(target string, opts *http.PushOptions) error {
pusher, ok := r.ResponseWriter.(http.Pusher)
if !ok {
return http.ErrNotSupported
}
return pusher.Push(target, opts)
}

func (r *statusRecorder) Unwrap() http.ResponseWriter {
return r.ResponseWriter
}

func apiHandler(logger *zap.Logger, metrics *serviceMetrics) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()

rec := statusRecorder{ResponseWriter: w, status: 200}

logger.Info("Received request",
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.String("remoteAddr", r.RemoteAddr),
zap.String("userAgent", r.UserAgent()),
)
rec := statusRecorder{ResponseWriter: w, status: http.StatusOK}

next.ServeHTTP(&rec, r)

duration := time.Since(start).Seconds()
statusText := http.StatusText(rec.status)
if statusText == "" {
statusText = "UNKNOWN"
}

metrics.apiCallsCounter.WithLabelValues(statusText, r.Method, r.URL.Path).Inc()
metrics.requestDurationHistogram.WithLabelValues(r.URL.Path, r.Method).Observe(duration)

apiCallsCounter.With(prometheus.Labels{
"status": http.StatusText(rec.status),
"method": r.Method,
"path": r.URL.Path,
}).Inc()
fields := []zap.Field{
zap.String("method", r.Method),
zap.String("path", r.URL.Path),
zap.Int("status", rec.status),
zap.Float64("durationSeconds", duration),
}

requestDurationHistogram.With(prometheus.Labels{
"path": r.URL.Path,
"method": r.Method,
}).Observe(duration)
switch {
case rec.status >= http.StatusInternalServerError:
logger.Error("Request completed", fields...)
case rec.status >= http.StatusBadRequest:
logger.Warn("Request completed", fields...)
default:
logger.Debug("Request completed", fields...)
}
})
}
}

func healthCheck(w http.ResponseWriter, _ *http.Request) {
response := HealthCheckResposne{Status: "ok", Version: Version}
err := json.NewEncoder(w).Encode(response)
if err != nil {
if err := json.NewEncoder(w).Encode(response); err != nil {
http.Error(w, "Failed to encode response", http.StatusInternalServerError)
return
}
Expand All @@ -108,22 +172,19 @@ func init() {

defer func() {
if err := logger.Sync(); err != nil {
// Logger sync errors on program exit are typically not critical
// and often occur when stdout/stderr are closed before sync
_ = err // Explicitly ignore the error
_ = err
}
}()
}

func setupRouter(logger *zap.Logger) *mux.Router {
func setupRouter(logger *zap.Logger, metrics *serviceMetrics, gatherer prometheus.Gatherer) *mux.Router {
r := mux.NewRouter()
r.Use(apiHandler(logger))
r.Use(apiHandler(logger, metrics))

r.HandleFunc("/health", healthCheck).Methods("GET")
r.Handle("/metrics", promhttp.Handler())
r.Handle("/metrics", promhttp.HandlerFor(gatherer, promhttp.HandlerOpts{}))
r.HandleFunc("/webhook", githubEventsHandler).Methods("POST")

// Profiling endpoints
if debug {
r.HandleFunc("/debug/pprof/", pprof.Index)
r.HandleFunc("/debug/pprof/allocs", pprof.Handler("allocs").ServeHTTP)
Expand All @@ -141,7 +202,37 @@ func setupRouter(logger *zap.Logger) *mux.Router {
return r
}

func runServer(ctx context.Context, server *http.Server, logger *zap.Logger) error {
errCh := make(chan error, 1)
go func() {
err := server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
errCh <- err
return
}
errCh <- nil
}()

select {
case err := <-errCh:
return err
case <-ctx.Done():
shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

logger.Info("Shutting down server")
if err := server.Shutdown(shutdownCtx); err != nil {
return err
}

return <-errCh
}
}

func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

port := strings.TrimSpace(os.Getenv("PROMGITHUB_SERVICE_PORT"))
if port == "" {
port = "8080"
Expand Down Expand Up @@ -182,19 +273,18 @@ func main() {
zap.Int("queueSize", asyncConfig.QueueSize),
)

r := setupRouter(logger)

r := setupRouter(logger, defaultServiceMetrics, prometheus.DefaultGatherer)
server := &http.Server{
Addr: ":" + port,
Handler: r,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second,
MaxHeaderBytes: 1 << 20, // 1 MB
MaxHeaderBytes: 1 << 20,
}

logger.Info("Starting server", zap.String("port", port))
if err := server.ListenAndServe(); err != nil {
logger.Fatal("Error starting server", zap.Error(err))
if err := runServer(ctx, server, logger); err != nil {
logger.Fatal("Server exited with error", zap.Error(err))
}
}
Loading
Loading