// Copyright 2026 MarketAlly. All rights reserved. // SPDX-License-Identifier: MIT // Package operations provides tracking for long-running operations // with real-time progress updates via Server-Sent Events (SSE). package operations import ( "context" "fmt" "maps" "net/http" "sync" "time" "code.gitcaddy.com/server/v3/modules/json" "code.gitcaddy.com/server/v3/modules/log" ) // OperationType represents the type of long-running operation type OperationType string const ( OpChunkedUpload OperationType = "chunked_upload" OpRepoMirrorSync OperationType = "mirror_sync" OpRepoPush OperationType = "repo_push" OpRepoClone OperationType = "repo_clone" OpWikiGenerate OperationType = "wiki_generate" OpBatchOperation OperationType = "batch_operation" ) // OperationStatus represents the current status of an operation type OperationStatus string const ( StatusPending OperationStatus = "pending" StatusRunning OperationStatus = "running" StatusComplete OperationStatus = "complete" StatusFailed OperationStatus = "failed" StatusCancelled OperationStatus = "cancelled" ) // Phase represents a phase within an operation type Phase struct { Name string `json:"name"` Status OperationStatus `json:"status"` Progress int `json:"progress"` // 0-100 Message string `json:"message,omitempty"` } // ProgressUpdate represents a progress update event type ProgressUpdate struct { OperationID string `json:"operation_id"` Type OperationType `json:"type"` Status OperationStatus `json:"status"` CurrentPhase string `json:"current_phase,omitempty"` Phases []Phase `json:"phases,omitempty"` Progress int `json:"progress"` // Overall progress 0-100 BytesTotal int64 `json:"bytes_total,omitempty"` BytesDone int64 `json:"bytes_done,omitempty"` ItemsTotal int `json:"items_total,omitempty"` ItemsDone int `json:"items_done,omitempty"` Message string `json:"message,omitempty"` Error string `json:"error,omitempty"` StartedAt time.Time `json:"started_at"` UpdatedAt time.Time `json:"updated_at"` EstimatedETA *time.Time `json:"estimated_eta,omitempty"` SpeedBPS int64 `json:"speed_bps,omitempty"` // bytes per second Metadata map[string]any `json:"metadata,omitempty"` } // Operation tracks a long-running operation type Operation struct { mu sync.RWMutex id string opType OperationType userID int64 status OperationStatus phases []Phase currentPhase int progress int bytesTotal int64 bytesDone int64 itemsTotal int itemsDone int message string errorMsg string startedAt time.Time updatedAt time.Time metadata map[string]any subscribers map[chan ProgressUpdate]struct{} ctx context.Context cancel context.CancelFunc } // Manager handles operation tracking type Manager struct { mu sync.RWMutex operations map[string]*Operation ttl time.Duration } var ( defaultManager *Manager once sync.Once ) // GetManager returns the global operation manager func GetManager() *Manager { once.Do(func() { defaultManager = &Manager{ operations: make(map[string]*Operation), ttl: 30 * time.Minute, } go defaultManager.cleanup() }) return defaultManager } // cleanup periodically removes completed operations func (m *Manager) cleanup() { ticker := time.NewTicker(5 * time.Minute) defer ticker.Stop() for range ticker.C { m.mu.Lock() now := time.Now() for id, op := range m.operations { op.mu.RLock() isComplete := op.status == StatusComplete || op.status == StatusFailed || op.status == StatusCancelled age := now.Sub(op.updatedAt) op.mu.RUnlock() if isComplete && age > m.ttl { delete(m.operations, id) log.Debug("Cleaned up operation %s", id) } } m.mu.Unlock() } } // StartOperation creates and registers a new operation func (m *Manager) StartOperation(ctx context.Context, id string, opType OperationType, userID int64, phases []string) *Operation { opCtx, cancel := context.WithCancel(ctx) phaseList := make([]Phase, len(phases)) for i, name := range phases { status := StatusPending if i == 0 { status = StatusRunning } phaseList[i] = Phase{ Name: name, Status: status, } } op := &Operation{ id: id, opType: opType, userID: userID, status: StatusRunning, phases: phaseList, currentPhase: 0, startedAt: time.Now(), updatedAt: time.Now(), metadata: make(map[string]any), subscribers: make(map[chan ProgressUpdate]struct{}), ctx: opCtx, cancel: cancel, } m.mu.Lock() m.operations[id] = op m.mu.Unlock() op.broadcast() return op } // GetOperation retrieves an operation by ID func (m *Manager) GetOperation(id string) (*Operation, bool) { m.mu.RLock() defer m.mu.RUnlock() op, ok := m.operations[id] return op, ok } // Subscribe to operation progress updates func (op *Operation) Subscribe() <-chan ProgressUpdate { ch := make(chan ProgressUpdate, 10) op.mu.Lock() op.subscribers[ch] = struct{}{} op.mu.Unlock() // Send current state immediately go func() { select { case ch <- op.GetProgress(): case <-op.ctx.Done(): } }() return ch } // Unsubscribe from operation updates func (op *Operation) Unsubscribe(ch <-chan ProgressUpdate) { op.mu.Lock() defer op.mu.Unlock() for sub := range op.subscribers { if sub == ch { delete(op.subscribers, sub) close(sub) break } } } // broadcast sends update to all subscribers func (op *Operation) broadcast() { op.mu.RLock() update := op.buildUpdate() subscribers := make([]chan ProgressUpdate, 0, len(op.subscribers)) for ch := range op.subscribers { subscribers = append(subscribers, ch) } op.mu.RUnlock() for _, ch := range subscribers { select { case ch <- update: default: // Skip if channel is full } } } func (op *Operation) buildUpdate() ProgressUpdate { update := ProgressUpdate{ OperationID: op.id, Type: op.opType, Status: op.status, Phases: op.phases, Progress: op.progress, BytesTotal: op.bytesTotal, BytesDone: op.bytesDone, ItemsTotal: op.itemsTotal, ItemsDone: op.itemsDone, Message: op.message, Error: op.errorMsg, StartedAt: op.startedAt, UpdatedAt: op.updatedAt, Metadata: op.metadata, } if op.currentPhase < len(op.phases) { update.CurrentPhase = op.phases[op.currentPhase].Name } // Calculate speed and ETA elapsed := time.Since(op.startedAt).Seconds() if elapsed > 0 && op.bytesDone > 0 { update.SpeedBPS = int64(float64(op.bytesDone) / elapsed) if op.bytesTotal > 0 && update.SpeedBPS > 0 { remaining := op.bytesTotal - op.bytesDone etaSeconds := float64(remaining) / float64(update.SpeedBPS) eta := time.Now().Add(time.Duration(etaSeconds) * time.Second) update.EstimatedETA = &eta } } return update } // GetProgress returns current progress state func (op *Operation) GetProgress() ProgressUpdate { op.mu.RLock() defer op.mu.RUnlock() return op.buildUpdate() } // UpdateProgress updates the operation progress func (op *Operation) UpdateProgress(progress int, message string) { op.mu.Lock() op.progress = progress op.message = message op.updatedAt = time.Now() if op.currentPhase < len(op.phases) { op.phases[op.currentPhase].Progress = progress op.phases[op.currentPhase].Message = message } op.mu.Unlock() op.broadcast() } // UpdateBytes updates byte progress func (op *Operation) UpdateBytes(done, total int64) { op.mu.Lock() op.bytesDone = done op.bytesTotal = total if total > 0 { op.progress = int(float64(done) / float64(total) * 100) } op.updatedAt = time.Now() op.mu.Unlock() op.broadcast() } // UpdateItems updates item progress func (op *Operation) UpdateItems(done, total int) { op.mu.Lock() op.itemsDone = done op.itemsTotal = total if total > 0 { op.progress = done * 100 / total } op.updatedAt = time.Now() op.mu.Unlock() op.broadcast() } // NextPhase moves to the next phase func (op *Operation) NextPhase() { op.mu.Lock() if op.currentPhase < len(op.phases) { op.phases[op.currentPhase].Status = StatusComplete op.phases[op.currentPhase].Progress = 100 } op.currentPhase++ if op.currentPhase < len(op.phases) { op.phases[op.currentPhase].Status = StatusRunning } op.updatedAt = time.Now() op.mu.Unlock() op.broadcast() } // SetMetadata sets operation metadata func (op *Operation) SetMetadata(key string, value any) { op.mu.Lock() op.metadata[key] = value op.mu.Unlock() } // Complete marks the operation as complete func (op *Operation) Complete(result map[string]any) { op.mu.Lock() op.status = StatusComplete op.progress = 100 for i := range op.phases { op.phases[i].Status = StatusComplete op.phases[i].Progress = 100 } maps.Copy(op.metadata, result) op.updatedAt = time.Now() op.mu.Unlock() op.broadcast() op.closeSubscribers() } // Fail marks the operation as failed func (op *Operation) Fail(err error) { op.mu.Lock() op.status = StatusFailed op.errorMsg = err.Error() if op.currentPhase < len(op.phases) { op.phases[op.currentPhase].Status = StatusFailed } op.updatedAt = time.Now() op.mu.Unlock() op.broadcast() op.closeSubscribers() } // Cancel cancels the operation func (op *Operation) Cancel() { op.mu.Lock() op.status = StatusCancelled op.updatedAt = time.Now() op.mu.Unlock() op.cancel() op.broadcast() op.closeSubscribers() } func (op *Operation) closeSubscribers() { op.mu.Lock() for ch := range op.subscribers { close(ch) } op.subscribers = make(map[chan ProgressUpdate]struct{}) op.mu.Unlock() } // Context returns the operation's context func (op *Operation) Context() context.Context { return op.ctx } // ID returns the operation ID func (op *Operation) ID() string { return op.id } // ServeSSE serves Server-Sent Events for operation progress func ServeSSE(w http.ResponseWriter, r *http.Request, opID string) { manager := GetManager() op, ok := manager.GetOperation(opID) if !ok { http.Error(w, "Operation not found", http.StatusNotFound) return } // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported", http.StatusInternalServerError) return } // Subscribe to updates updates := op.Subscribe() defer op.Unsubscribe(updates) // Send initial connection event fmt.Fprintf(w, "event: connected\ndata: {\"operation_id\":\"%s\"}\n\n", opID) flusher.Flush() for { select { case update, ok := <-updates: if !ok { // Channel closed, operation complete fmt.Fprintf(w, "event: close\ndata: {}\n\n") flusher.Flush() return } data, err := json.Marshal(update) if err != nil { log.Error("Failed to marshal progress update: %v", err) continue } eventType := "progress" switch update.Status { case StatusComplete: eventType = "complete" case StatusFailed: eventType = "error" case StatusCancelled: eventType = "cancelled" } fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventType, data) flusher.Flush() // Close connection after terminal events if update.Status == StatusComplete || update.Status == StatusFailed || update.Status == StatusCancelled { return } case <-r.Context().Done(): return } } }