From f91cb260f2eb20b5283591adfc7658223364bdbc Mon Sep 17 00:00:00 2001 From: Rostislav Dugin Date: Mon, 26 Jan 2026 14:18:53 +0300 Subject: [PATCH] FEATURE (logs): Add Victora Logs --- backend/.env.development.example | 2 + backend/.gitignore | 3 +- backend/cmd/main.go | 3 + backend/docker-compose.yml.example | 14 ++ backend/internal/util/logger/logger.go | 113 ++++++++-- backend/internal/util/logger/multi_handler.go | 59 +++++ .../util/logger/victorialogs_writer.go | 201 ++++++++++++++++++ 7 files changed, 379 insertions(+), 16 deletions(-) create mode 100644 backend/internal/util/logger/multi_handler.go create mode 100644 backend/internal/util/logger/victorialogs_writer.go diff --git a/backend/.env.development.example b/backend/.env.development.example index 801378e..5446a68 100644 --- a/backend/.env.development.example +++ b/backend/.env.development.example @@ -6,6 +6,8 @@ DEV_DB_PASSWORD=Q1234567 ENV_MODE=development # logging SHOW_DB_INSTALLATION_VERIFICATION_LOGS=true +VICTORIA_LOGS_URL=http://localhost:9428 +VICTORIA_LOGS_PASSWORD=devpassword # tests TEST_LOCALHOST=localhost IS_SKIP_EXTERNAL_RESOURCES_TESTS=false diff --git a/backend/.gitignore b/backend/.gitignore index 5f9cc95..da0b84b 100644 --- a/backend/.gitignore +++ b/backend/.gitignore @@ -18,4 +18,5 @@ pgdata-for-restore/ temp/ cmd.exe temp/ -valkey-data/ \ No newline at end of file +valkey-data/ +victoria-logs-data/ \ No newline at end of file diff --git a/backend/cmd/main.go b/backend/cmd/main.go index d56730e..0fa9a27 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -185,6 +185,9 @@ func startServerWithGracefulShutdown(log *slog.Logger, app *gin.Engine) { <-quit log.Info("Shutdown signal received") + // Gracefully shutdown VictoriaLogs writer + logger.ShutdownVictoriaLogs(5 * time.Second) + // The context is used to inform the server it has 10 seconds to finish // the request it is currently handling ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) diff --git a/backend/docker-compose.yml.example b/backend/docker-compose.yml.example index e59e64e..e7e3b24 100644 --- a/backend/docker-compose.yml.example +++ b/backend/docker-compose.yml.example @@ -34,6 +34,20 @@ services: retries: 5 start_period: 20s + # VictoriaLogs for external logging + victoria-logs: + image: victoriametrics/victoria-logs:latest + container_name: victoria-logs + ports: + - "9428:9428" + command: + - -storageDataPath=/victoria-logs-data + - -retentionPeriod=7d + - -httpAuth.password=devpassword + volumes: + - ./victoria-logs-data:/victoria-logs-data + restart: unless-stopped + # Test MinIO container test-minio: image: minio/minio:latest diff --git a/backend/internal/util/logger/logger.go b/backend/internal/util/logger/logger.go index 6b898c9..873748b 100644 --- a/backend/internal/util/logger/logger.go +++ b/backend/internal/util/logger/logger.go @@ -1,48 +1,131 @@ package logger import ( + "fmt" "log/slog" "os" + "path/filepath" "sync" "time" + + "github.com/joho/godotenv" ) var ( - loggerInstance *slog.Logger - once sync.Once + loggerInstance *slog.Logger + victoriaLogsWriter *VictoriaLogsWriter + once sync.Once + shutdownOnce sync.Once + envLoadOnce sync.Once ) // GetLogger returns a singleton slog.Logger that logs to the console func GetLogger() *slog.Logger { once.Do(func() { - handler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + // Create stdout handler + stdoutHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { if a.Key == slog.TimeKey { a.Value = slog.StringValue(time.Now().Format("2006/01/02 15:04:05")) } - - if a.Key == slog.MessageKey { - // Format the message to match the desired output format - return slog.Attr{ - Key: slog.MessageKey, - Value: slog.StringValue(a.Value.String()), - } - } - - // Remove level and other attributes to get clean output if a.Key == slog.LevelKey { return slog.Attr{} } - return a }, }) - loggerInstance = slog.New(handler) + // Try to initialize VictoriaLogs writer if configured + // Note: This will be called before config is fully loaded in some cases, + // so we need to handle that gracefully + victoriaLogsWriter = tryInitVictoriaLogs() + + // Create multi-handler + multiHandler := NewMultiHandler(stdoutHandler, victoriaLogsWriter) + loggerInstance = slog.New(multiHandler) loggerInstance.Info("Text structured logger initialized") + if victoriaLogsWriter != nil { + loggerInstance.Info("VictoriaLogs enabled") + } else { + loggerInstance.Info("VictoriaLogs disabled") + } }) return loggerInstance } + +// ShutdownVictoriaLogs gracefully shuts down the VictoriaLogs writer +func ShutdownVictoriaLogs(timeout time.Duration) { + shutdownOnce.Do(func() { + if victoriaLogsWriter != nil { + victoriaLogsWriter.Shutdown(timeout) + } + }) +} + +func tryInitVictoriaLogs() *VictoriaLogsWriter { + // Ensure .env is loaded before reading environment variables + ensureEnvLoaded() + + // Try to get config - this may fail early in startup + url := getVictoriaLogsURL() + password := getVictoriaLogsPassword() + + if url == "" { + fmt.Println("VictoriaLogs URL is not set") + return nil + } + + return NewVictoriaLogsWriter(url, password) +} + +func ensureEnvLoaded() { + envLoadOnce.Do(func() { + // Get current working directory + cwd, err := os.Getwd() + if err != nil { + fmt.Printf("Warning: could not get current working directory: %v\n", err) + cwd = "." + } + + // Find backend root by looking for go.mod + backendRoot := cwd + for { + if _, err := os.Stat(filepath.Join(backendRoot, "go.mod")); err == nil { + break + } + + parent := filepath.Dir(backendRoot) + if parent == backendRoot { + break + } + + backendRoot = parent + } + + // Try to load .env from various locations + envPaths := []string{ + filepath.Join(cwd, ".env"), + filepath.Join(backendRoot, ".env"), + } + + for _, path := range envPaths { + if err := godotenv.Load(path); err == nil { + fmt.Printf("Logger: loaded .env from %s\n", path) + return + } + } + + fmt.Println("Logger: .env file not found, using existing environment variables") + }) +} + +func getVictoriaLogsURL() string { + return os.Getenv("VICTORIA_LOGS_URL") +} + +func getVictoriaLogsPassword() string { + return os.Getenv("VICTORIA_LOGS_PASSWORD") +} diff --git a/backend/internal/util/logger/multi_handler.go b/backend/internal/util/logger/multi_handler.go new file mode 100644 index 0000000..8d48a5f --- /dev/null +++ b/backend/internal/util/logger/multi_handler.go @@ -0,0 +1,59 @@ +package logger + +import ( + "context" + "log/slog" +) + +type MultiHandler struct { + stdoutHandler slog.Handler + victoriaLogsWriter *VictoriaLogsWriter +} + +func NewMultiHandler( + stdoutHandler slog.Handler, + victoriaLogsWriter *VictoriaLogsWriter, +) *MultiHandler { + return &MultiHandler{ + stdoutHandler: stdoutHandler, + victoriaLogsWriter: victoriaLogsWriter, + } +} + +func (h *MultiHandler) Enabled(ctx context.Context, level slog.Level) bool { + return h.stdoutHandler.Enabled(ctx, level) +} + +func (h *MultiHandler) Handle(ctx context.Context, record slog.Record) error { + // Send to stdout handler + if err := h.stdoutHandler.Handle(ctx, record); err != nil { + return err + } + + // Send to VictoriaLogs if configured + if h.victoriaLogsWriter != nil { + attrs := make(map[string]interface{}) + record.Attrs(func(a slog.Attr) bool { + attrs[a.Key] = a.Value.Any() + return true + }) + + h.victoriaLogsWriter.Write(record.Level.String(), record.Message, attrs) + } + + return nil +} + +func (h *MultiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + return &MultiHandler{ + stdoutHandler: h.stdoutHandler.WithAttrs(attrs), + victoriaLogsWriter: h.victoriaLogsWriter, + } +} + +func (h *MultiHandler) WithGroup(name string) slog.Handler { + return &MultiHandler{ + stdoutHandler: h.stdoutHandler.WithGroup(name), + victoriaLogsWriter: h.victoriaLogsWriter, + } +} diff --git a/backend/internal/util/logger/victorialogs_writer.go b/backend/internal/util/logger/victorialogs_writer.go new file mode 100644 index 0000000..0ca13e2 --- /dev/null +++ b/backend/internal/util/logger/victorialogs_writer.go @@ -0,0 +1,201 @@ +package logger + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "sync" + "time" +) + +type logEntry struct { + Time string `json:"_time"` + Message string `json:"_msg"` + Level string `json:"level"` + Attrs map[string]any `json:",inline"` +} + +type VictoriaLogsWriter struct { + url string + password string + httpClient *http.Client + logChannel chan logEntry + wg sync.WaitGroup + once sync.Once + ctx context.Context + cancel context.CancelFunc + logger *slog.Logger +} + +func NewVictoriaLogsWriter(url, password string) *VictoriaLogsWriter { + ctx, cancel := context.WithCancel(context.Background()) + + writer := &VictoriaLogsWriter{ + url: url, + password: password, + httpClient: &http.Client{ + Timeout: 10 * time.Second, + }, + logChannel: make(chan logEntry, 1000), + ctx: ctx, + cancel: cancel, + logger: slog.New(slog.NewTextHandler(os.Stdout, nil)), + } + + // Start 3 worker goroutines + for range 3 { + writer.wg.Add(1) + go writer.worker() + } + + return writer +} + +func (w *VictoriaLogsWriter) Write(level, message string, attrs map[string]interface{}) { + entry := logEntry{ + Time: time.Now().UTC().Format(time.RFC3339Nano), + Message: message, + Level: level, + Attrs: attrs, + } + + select { + case w.logChannel <- entry: + // Successfully queued + default: + // Channel is full, drop log with warning + w.logger.Warn("VictoriaLogs channel buffer full, dropping log entry") + } +} + +func (w *VictoriaLogsWriter) worker() { + defer w.wg.Done() + + batch := make([]logEntry, 0, 100) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-w.ctx.Done(): + w.flushBatch(batch) + return + + case entry, ok := <-w.logChannel: + if !ok { + w.flushBatch(batch) + return + } + + batch = append(batch, entry) + + // Send batch if it reaches 100 entries + if len(batch) >= 100 { + w.sendBatch(batch) + batch = make([]logEntry, 0, 100) + } + + case <-ticker.C: + if len(batch) > 0 { + w.sendBatch(batch) + batch = make([]logEntry, 0, 100) + } + } + } +} + +func (w *VictoriaLogsWriter) sendBatch(entries []logEntry) { + backoffs := []time.Duration{0, 5 * time.Second, 30 * time.Second, 1 * time.Minute} + + for attempt := range 4 { + if backoffs[attempt] > 0 { + time.Sleep(backoffs[attempt]) + } + + if err := w.sendHTTP(entries); err == nil { + return + } else if attempt == 3 { + w.logger.Error("VictoriaLogs failed to send logs after 4 attempts", + "error", err, + "entries_count", len(entries)) + } + } +} + +func (w *VictoriaLogsWriter) sendHTTP(entries []logEntry) error { + // Build JSON Lines payload + var buf bytes.Buffer + encoder := json.NewEncoder(&buf) + + for _, entry := range entries { + if err := encoder.Encode(entry); err != nil { + return fmt.Errorf("failed to encode log entry: %w", err) + } + } + + // Build request + url := fmt.Sprintf("%s/insert/jsonline?_stream_fields=level&_msg_field=_msg", w.url) + req, err := http.NewRequestWithContext(w.ctx, "POST", url, &buf) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + // Set headers + req.Header.Set("Content-Type", "application/x-ndjson") + + // Set Basic Auth (password as username, empty password) + if w.password != "" { + auth := base64.StdEncoding.EncodeToString([]byte(w.password + ":")) + req.Header.Set("Authorization", "Basic "+auth) + } + + // Send request + resp, err := w.httpClient.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + defer func() { + _ = resp.Body.Close() + }() + + // Check response + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("VictoriaLogs returned status %d: %s", resp.StatusCode, string(body)) + } + + return nil +} + +func (w *VictoriaLogsWriter) flushBatch(batch []logEntry) { + if len(batch) > 0 { + w.sendBatch(batch) + } +} + +func (w *VictoriaLogsWriter) Shutdown(timeout time.Duration) { + w.once.Do(func() { + // Stop accepting new logs + w.cancel() + + // Wait for workers to finish with timeout + done := make(chan struct{}) + go func() { + w.wg.Wait() + close(done) + }() + + select { + case <-done: + w.logger.Info("VictoriaLogs writer shutdown gracefully") + case <-time.After(timeout): + w.logger.Warn("VictoriaLogs writer shutdown timeout, some logs may be lost") + } + }) +}