Zum Inhalt

Erweiterte Acknowledgments

Acknowledgments (ACKs) in Socket.IO ermöglichen die Implementierung von Request-Response-Mustern über bidirektionale Kommunikation. Dieser Leitfaden behandelt die erweiterte Verwendung von Acknowledgments.

Inhaltsverzeichnis

Grundlegende Konzepte

Was sind Acknowledgments?

Acknowledgments sind Callbacks, die es dem Sender ermöglichen, eine Antwort vom Empfänger zu erhalten:

// Client sendet und wartet auf Antwort
client.EmitWithAck("get-data", func(response ...interface{}) {
  fmt.Println("Server antwortete:", response[0])
}, "param1", "param2")

Richtung der Acknowledgments

ACKs können in beide Richtungen fließen:

Client → Server → Client:

// Client fordert Daten an
client.EmitWithAck("get-user", func(response ...interface{}) {
  user := response[0].(map[string]interface{})
  fmt.Println("Benutzer:", user["name"])
}, "user-123")

Server → Client → Server:

// Server fordert Bestätigung an, Client antwortet
client.On("confirm-action", func(data ...interface{}) {
  action := data[0].(string)

  // Letztes Argument ist der ACK-Callback
  if ack, ok := data[len(data)-1].(func(...interface{})); ok {
    // Aktion bestätigen
    ack(map[string]interface{}{
      "confirmed": true,
      "timestamp": time.Now().Unix(),
    })
  }
})

Verwendungsmuster

Muster: Einfaches Request-Response

Grundlegende Anfrage mit Antwort:

client.EmitWithAck("get-data", func(response ...interface{}) {
  if response == nil {
    fmt.Println("Timeout: keine Antwort erhalten")
    return
  }

  data := response[0]
  fmt.Printf("Daten empfangen: %v\n", data)
}, "data-id-123")

Muster: Datenvalidierung

Senden Sie Daten und validieren Sie, dass sie korrekt verarbeitet wurden:

func saveData(client *socketio.Socket, data map[string]interface{}) error {
  var result error
  done := make(chan struct{})

  client.EmitWithAck("save-data", func(response ...interface{}) {
    defer close(done)

    if response == nil {
      result = fmt.Errorf("Timeout beim Warten auf Bestätigung")
      return
    }

    resp := response[0].(map[string]interface{})
    if resp["success"].(bool) {
      fmt.Println("Daten erfolgreich gespeichert")
    } else {
      result = fmt.Errorf("Fehler: %s", resp["error"])
    }
  }, data)

  <-done
  return result
}

// Verwendung
err := saveData(client, map[string]interface{}{
  "name": "Max",
  "age": 30,
})
if err != nil {
  fmt.Println("Fehler:", err)
}

Muster: Mehrere parallele Requests

Führen Sie mehrere Requests parallel aus und warten Sie auf alle Antworten:

func fetchMultipleUsers(client *socketio.Socket, ids []string) map[string]interface{} {
  results := make(map[string]interface{})
  var mu sync.Mutex
  var wg sync.WaitGroup

  for _, id := range ids {
    wg.Add(1)
    go func(userId string) {
      defer wg.Done()

      client.EmitWithAck("get-user", func(response ...interface{}) {
        if response != nil {
          mu.Lock()
          results[userId] = response[0]
          mu.Unlock()
        }
      }, userId)
    }(id)
  }

  wg.Wait()
  return results
}

// Verwendung
users := fetchMultipleUsers(client, []string{"user-1", "user-2", "user-3"})
fmt.Printf("%d Benutzer erhalten\n", len(users))

Muster: Request mit Wiederholung

Wiederholen Sie den Request bei Fehler oder Timeout:

func requestWithRetry(client *socketio.Socket, event string, maxRetries int, data ...interface{}) (interface{}, error) {
  for attempt := 0; attempt <= maxRetries; attempt++ {
    resultChan := make(chan interface{}, 1)
    errorChan := make(chan error, 1)

    client.EmitWithAck(event, func(response ...interface{}) {
      if response == nil {
        errorChan <- fmt.Errorf("Timeout")
      } else {
        resultChan <- response[0]
      }
    }, data...)

    select {
    case result := <-resultChan:
      return result, nil
    case err := <-errorChan:
      if attempt < maxRetries {
        fmt.Printf("Versuch %d fehlgeschlagen: %v. Wiederhole...\n", attempt+1, err)
        time.Sleep(time.Second * time.Duration(attempt+1)) // Exponentielles Backoff
      } else {
        return nil, fmt.Errorf("nach %d Versuchen fehlgeschlagen: %v", maxRetries+1, err)
      }
    }
  }

  return nil, fmt.Errorf("nach %d Versuchen fehlgeschlagen", maxRetries+1)
}

// Verwendung
result, err := requestWithRetry(client, "get-data", 3, "param1", "param2")
if err != nil {
  fmt.Println("Fehler:", err)
} else {
  fmt.Println("Ergebnis:", result)
}

Muster: Request-Warteschlange

Warteschlange für Requests mit Priorität:

type Request struct {
  Event    string
  Data     []interface{}
  Callback func(...interface{})
  Priority int
}

type RequestQueue struct {
  client   *socketio.Socket
  queue    []*Request
  mu       sync.Mutex
  running  bool
}

func NewRequestQueue(client *socketio.Socket) *RequestQueue {
  return &RequestQueue{
    client: client,
    queue:  make([]*Request, 0),
  }
}

func (rq *RequestQueue) Add(req *Request) {
  rq.mu.Lock()
  defer rq.mu.Unlock()

  rq.queue = append(rq.queue, req)

  // Nach Priorität sortieren (höher zuerst)
  sort.Slice(rq.queue, func(i, j int) bool {
    return rq.queue[i].Priority > rq.queue[j].Priority
  })

  if !rq.running {
    go rq.process()
  }
}

func (rq *RequestQueue) process() {
  rq.running = true
  defer func() { rq.running = false }()

  for {
    rq.mu.Lock()
    if len(rq.queue) == 0 {
      rq.mu.Unlock()
      return
    }

    req := rq.queue[0]
    rq.queue = rq.queue[1:]
    rq.mu.Unlock()

    // Request ausführen
    rq.client.EmitWithAck(req.Event, req.Callback, req.Data...)

    // Kleine Verzögerung zwischen Requests
    time.Sleep(50 * time.Millisecond)
  }
}

// Verwendung
queue := NewRequestQueue(client)

// Request mit hoher Priorität
queue.Add(&Request{
  Event:    "urgent-data",
  Data:     []interface{}{"id-1"},
  Priority: 10,
  Callback: func(response ...interface{}) {
    fmt.Println("Dringende Antwort:", response[0])
  },
})

// Request mit normaler Priorität
queue.Add(&Request{
  Event:    "normal-data",
  Data:     []interface{}{"id-2"},
  Priority: 5,
  Callback: func(response ...interface{}) {
    fmt.Println("Normale Antwort:", response[0])
  },
})

Timeout-Verwaltung

Globales Timeout

Konfigurieren Sie ein globales Timeout für alle Acknowledgments:

client := socketio.New("ws://localhost:3000", socketio.Options{
  AckTimeout: 10 * time.Second, // Timeout von 10 Sekunden
})

client.EmitWithAck("slow-operation", func(response ...interface{}) {
  if response == nil {
    fmt.Println("⏱️  Operation dauerte länger als 10 Sekunden")
    return
  }
  fmt.Println("Abgeschlossen:", response[0])
}, "daten")

Timeout pro Request

Implementieren Sie ein benutzerdefiniertes Timeout pro Request:

func emitWithCustomTimeout(client *socketio.Socket, event string, timeout time.Duration, data ...interface{}) (interface{}, error) {
  resultChan := make(chan interface{}, 1)
  timeoutChan := time.After(timeout)

  client.EmitWithAck(event, func(response ...interface{}) {
    if response != nil {
      resultChan <- response[0]
    }
  }, data...)

  select {
  case result := <-resultChan:
    return result, nil
  case <-timeoutChan:
    return nil, fmt.Errorf("Timeout nach %v", timeout)
  }
}

// Verwendung mit kurzem Timeout
result, err := emitWithCustomTimeout(client, "quick-op", 2*time.Second, "data")
if err != nil {
  fmt.Println("Fehler:", err)
}

// Verwendung mit langem Timeout
result, err = emitWithCustomTimeout(client, "slow-op", 30*time.Second, "data")
if err != nil {
  fmt.Println("Fehler:", err)
}

Timeout-Erkennung

Unterscheiden Sie zwischen Timeout und Server-Fehler:

client.EmitWithAck("operation", func(response ...interface{}) {
  if response == nil {
    // Timeout: Server hat nicht geantwortet
    fmt.Println("⏱️  Timeout: Server hat nicht rechtzeitig geantwortet")
    handleTimeout()
    return
  }

  // Antwort erhalten, prüfen Sie auf Fehler
  if resp, ok := response[0].(map[string]interface{}); ok {
    if errorMsg, hasError := resp["error"]; hasError {
      // Server-Fehler
      fmt.Printf("❌ Server-Fehler: %s\n", errorMsg)
      handleServerError(errorMsg.(string))
      return
    }

    // Erfolg
    fmt.Println("✅ Operation erfolgreich:", resp["data"])
    handleSuccess(resp["data"])
  }
}, "daten")

Fehlerbehandlung

Muster: Standard-Fehler-Response

Definieren Sie ein Standardformat für Fehlerantworten:

type AckResponse struct {
  Success bool                   `json:"success"`
  Data    interface{}            `json:"data,omitempty"`
  Error   string                 `json:"error,omitempty"`
  Code    int                    `json:"code,omitempty"`
}

func handleAckResponse(response ...interface{}) (*AckResponse, error) {
  if response == nil {
    return nil, fmt.Errorf("Timeout: keine Antwort vom Server")
  }

  if len(response) == 0 {
    return nil, fmt.Errorf("leere Antwort")
  }

  respMap, ok := response[0].(map[string]interface{})
  if !ok {
    return nil, fmt.Errorf("ungültiges Antwortformat")
  }

  ackResp := &AckResponse{}

  if success, ok := respMap["success"].(bool); ok {
    ackResp.Success = success
  }

  if data, ok := respMap["data"]; ok {
    ackResp.Data = data
  }

  if errorMsg, ok := respMap["error"].(string); ok {
    ackResp.Error = errorMsg
  }

  if code, ok := respMap["code"].(float64); ok {
    ackResp.Code = int(code)
  }

  if !ackResp.Success && ackResp.Error != "" {
    return ackResp, fmt.Errorf("Fehler %d: %s", ackResp.Code, ackResp.Error)
  }

  return ackResp, nil
}

// Verwendung
client.EmitWithAck("operation", func(response ...interface{}) {
  resp, err := handleAckResponse(response...)
  if err != nil {
    fmt.Println("Fehler:", err)
    return
  }

  fmt.Println("Daten:", resp.Data)
}, "params")

Muster: Graceful Degradation

Behandeln Sie Fehler elegant mit Standardwerten:

func getUserDataWithDefault(client *socketio.Socket, userId string) map[string]interface{} {
  defaultUser := map[string]interface{}{
    "id": userId,
    "name": "Unbekannter Benutzer",
    "status": "offline",
  }

  resultChan := make(chan map[string]interface{}, 1)

  client.EmitWithAck("get-user", func(response ...interface{}) {
    if response == nil {
      // Timeout: Standardwerte verwenden
      resultChan <- defaultUser
      return
    }

    if user, ok := response[0].(map[string]interface{}); ok {
      resultChan <- user
    } else {
      resultChan <- defaultUser
    }
  }, userId)

  // Warten mit zusätzlichem Timeout
  select {
  case user := <-resultChan:
    return user
  case <-time.After(6 * time.Second):
    // Zusätzliches Timeout: Standardwerte zurückgeben
    return defaultUser
  }
}

// Verwendung
user := getUserDataWithDefault(client, "user-123")
fmt.Printf("Benutzer: %s (%s)\n", user["name"], user["status"])

Erweiterte Muster

Muster: Promise-ähnliche API

Umschließen Sie Acknowledgments in eine Promise-ähnliche API:

type Promise struct {
  done   chan struct{}
  result interface{}
  err    error
}

func NewPromise(client *socketio.Socket, event string, data ...interface{}) *Promise {
  p := &Promise{
    done: make(chan struct{}),
  }

  client.EmitWithAck(event, func(response ...interface{}) {
    defer close(p.done)

    if response == nil {
      p.err = fmt.Errorf("Timeout")
      return
    }

    if resp, ok := response[0].(map[string]interface{}); ok {
      if errMsg, hasError := resp["error"]; hasError {
        p.err = fmt.Errorf("%s", errMsg)
        return
      }
      p.result = resp["data"]
    } else {
      p.result = response[0]
    }
  }, data...)

  return p
}

func (p *Promise) Then(callback func(interface{})) *Promise {
  go func() {
    <-p.done
    if p.err == nil {
      callback(p.result)
    }
  }()
  return p
}

func (p *Promise) Catch(callback func(error)) *Promise {
  go func() {
    <-p.done
    if p.err != nil {
      callback(p.err)
    }
  }()
  return p
}

func (p *Promise) Await() (interface{}, error) {
  <-p.done
  return p.result, p.err
}

// Promise-Stil Verwendung
NewPromise(client, "get-data", "id-123").
  Then(func(data interface{}) {
    fmt.Println("Daten:", data)
  }).
  Catch(func(err error) {
    fmt.Println("Fehler:", err)
  })

// Await-Stil Verwendung
data, err := NewPromise(client, "get-data", "id-123").Await()
if err != nil {
  fmt.Println("Fehler:", err)
} else {
  fmt.Println("Daten:", data)
}

Muster: Batch-Requests

Senden Sie mehrere Requests in einem einzelnen Acknowledgment:

type BatchRequest struct {
  ID   string        `json:"id"`
  Data interface{}   `json:"data"`
}

type BatchResponse struct {
  ID     string      `json:"id"`
  Result interface{} `json:"result"`
  Error  string      `json:"error,omitempty"`
}

func batchRequest(client *socketio.Socket, requests []BatchRequest) (map[string]interface{}, error) {
  results := make(map[string]interface{})
  done := make(chan struct{})

  client.EmitWithAck("batch", func(response ...interface{}) {
    defer close(done)

    if response == nil {
      return
    }

    if responses, ok := response[0].([]interface{}); ok {
      for _, r := range responses {
        resp := r.(map[string]interface{})
        id := resp["id"].(string)

        if errMsg, hasError := resp["error"]; hasError && errMsg != "" {
          results[id] = fmt.Errorf("%s", errMsg)
        } else {
          results[id] = resp["result"]
        }
      }
    }
  }, requests)

  <-done

  if len(results) == 0 {
    return nil, fmt.Errorf("Timeout oder keine Antworten")
  }

  return results, nil
}

// Verwendung
requests := []BatchRequest{
  {ID: "req-1", Data: "user-1"},
  {ID: "req-2", Data: "user-2"},
  {ID: "req-3", Data: "user-3"},
}

results, err := batchRequest(client, requests)
if err != nil {
  fmt.Println("Fehler:", err)
  return
}

for id, result := range results {
  if err, isError := result.(error); isError {
    fmt.Printf("%s: Fehler - %v\n", id, err)
  } else {
    fmt.Printf("%s: Erfolg - %v\n", id, result)
  }
}

Muster: Streaming mit Acknowledgments

Implementieren Sie Daten-Streaming mit inkrementellen Bestätigungen:

func streamDataWithAcks(client *socketio.Socket, chunks [][]byte) error {
  for i, chunk := range chunks {
    done := make(chan error, 1)

    client.EmitBinaryWithAck("stream-chunk", func(response ...interface{}) {
      if response == nil {
        done <- fmt.Errorf("Timeout bei Chunk %d", i)
        return
      }

      resp := response[0].(map[string]interface{})
      if resp["success"].(bool) {
        done <- nil
      } else {
        done <- fmt.Errorf("Fehler bei Chunk %d: %s", i, resp["error"])
      }
    }, chunk, map[string]interface{}{
      "index": i,
      "total": len(chunks),
      "last": i == len(chunks)-1,
    })

    if err := <-done; err != nil {
      return err
    }

    fmt.Printf("Chunk %d/%d gesendet\n", i+1, len(chunks))
  }

  return nil
}

// Verwendung
data, _ := os.ReadFile("large-file.bin")
chunks := splitIntoChunks(data, 1024*64) // 64KB Chunks

if err := streamDataWithAcks(client, chunks); err != nil {
  fmt.Println("Fehler beim Streaming:", err)
} else {
  fmt.Println("Datei vollständig gesendet")
}

Best Practices

1. Immer auf nil prüfen

// ✅ Gut
client.EmitWithAck("event", func(response ...interface{}) {
  if response == nil {
    fmt.Println("Timeout")
    return
  }
  // Antwort verarbeiten
}, data)

// ❌ Schlecht
client.EmitWithAck("event", func(response ...interface{}) {
  data := response[0] // Panic wenn response == nil!
}, data)

2. Sichere Type Assertions verwenden

// ✅ Gut
if user, ok := response[0].(map[string]interface{}); ok {
  fmt.Println(user["name"])
} else {
  fmt.Println("Unerwartetes Format")
}

// ❌ Schlecht
user := response[0].(map[string]interface{}) // Panic bei falschem Typ!

3. Angemessene Timeouts konfigurieren

// ✅ Gut: Timeout je nach Operationstyp
fastOps := socketio.Options{AckTimeout: 2 * time.Second}
slowOps := socketio.Options{AckTimeout: 30 * time.Second}

// ❌ Schlecht: gleiches Timeout für alles
opts := socketio.Options{AckTimeout: 5 * time.Second} // Kann zu kurz oder zu lang sein

4. Antwortformat dokumentieren

// ✅ Gut: dokumentieren, was erwartet wird
// Erwartete Antwort: {success: bool, data: User, error: string}
client.EmitWithAck("get-user", func(response ...interface{}) {
  // ...
}, userId)

Siehe auch