From 9191861cab22b70103df85e073429cd3e41bd822 Mon Sep 17 00:00:00 2001 From: sergystepanov Date: Sat, 22 Nov 2025 22:09:38 +0300 Subject: [PATCH] Use iterators in the custom map implementation --- pkg/com/map.go | 147 ++++++++++++++++++++++++++-------------- pkg/com/map_test.go | 8 ++- pkg/com/net.go | 4 +- pkg/coordinator/hub.go | 8 +-- pkg/worker/room/room.go | 23 +++++-- 5 files changed, 126 insertions(+), 64 deletions(-) diff --git a/pkg/com/map.go b/pkg/com/map.go index 6a4df33a..ce2c5cd5 100644 --- a/pkg/com/map.go +++ b/pkg/com/map.go @@ -2,6 +2,7 @@ package com import ( "fmt" + "iter" "sync" ) @@ -9,72 +10,118 @@ import ( // Keep in mind that the underlying map structure will grow indefinitely. type Map[K comparable, V any] struct { m map[K]V - mu sync.Mutex + mu sync.RWMutex } -func (m *Map[K, _]) Has(key K) bool { _, ok := m.Contains(key); return ok } -func (m *Map[_, _]) Len() int { m.mu.Lock(); defer m.mu.Unlock(); return len(m.m) } -func (m *Map[K, V]) Pop(key K) V { - m.mu.Lock() - v := m.m[key] - delete(m.m, key) - m.mu.Unlock() - return v +func (m *Map[K, _]) Len() int { + m.mu.RLock() + defer m.mu.RUnlock() + return len(m.m) } -func (m *Map[K, V]) Put(key K, v V) bool { - m.mu.Lock() + +func (m *Map[K, _]) Has(key K) bool { + m.mu.RLock() _, ok := m.m[key] - m.m[key] = v - m.mu.Unlock() + m.mu.RUnlock() return ok } -func (m *Map[K, _]) Remove(key K) { m.mu.Lock(); delete(m.m, key); m.mu.Unlock() } -func (m *Map[K, _]) RemoveL(key K) int { - m.mu.Lock() - delete(m.m, key) - k := len(m.m) - m.mu.Unlock() - return k -} -func (m *Map[K, V]) String() string { - m.mu.Lock() - s := fmt.Sprintf("%v", m.m) - m.mu.Unlock() - return s -} -// Contains returns the first value found and a boolean flag if its found or not. -func (m *Map[K, V]) Contains(key K) (v V, ok bool) { - m.mu.Lock() - defer m.mu.Unlock() - if vv, ok := m.m[key]; ok { - return vv, true - } - return v, false +// Get returns the value and exists flag (standard map comma-ok idiom). +func (m *Map[K, V]) Get(key K) (V, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + val, ok := m.m[key] + return val, ok } func (m *Map[K, V]) Find(key K) V { - v, _ := m.Contains(key) + v, _ := m.Get(key) return v } -// FindBy searches the first key-value with the provided predicate function. -func (m *Map[K, V]) FindBy(fn func(v V) bool) (v V, ok bool) { - m.mu.Lock() - defer m.mu.Unlock() - for _, vv := range m.m { - if fn(vv) { - return vv, true - } - } - return v, false +func (m *Map[K, V]) String() string { + m.mu.RLock() + defer m.mu.RUnlock() + return fmt.Sprintf("%v", m.m) } -// ForEach processes every element with the provided callback function. -func (m *Map[K, V]) ForEach(fn func(v V)) { +// FindBy searches for the first value satisfying the predicate. +// Note: This holds a Read Lock during iteration. +func (m *Map[K, V]) FindBy(predicate func(v V) bool) (V, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + for _, v := range m.m { + if predicate(v) { + return v, true + } + } + var zero V + return zero, false +} + +// Put sets the value and returns true if the key already existed. +func (m *Map[K, V]) Put(key K, v V) bool { m.mu.Lock() defer m.mu.Unlock() - for _, v := range m.m { - fn(v) + + if m.m == nil { + m.m = make(map[K]V) + } + + _, exists := m.m[key] + m.m[key] = v + return exists +} + +func (m *Map[K, V]) Remove(key K) { + m.mu.Lock() + delete(m.m, key) + m.mu.Unlock() +} + +// Pop returns the value and removes it from the map. +// Returns zero value if not found. +func (m *Map[K, V]) Pop(key K) V { + m.mu.Lock() + defer m.mu.Unlock() + + val, ok := m.m[key] + if ok { + delete(m.m, key) + } + return val +} + +// RemoveL removes the key and returns the new length of the map. +func (m *Map[K, _]) RemoveL(key K) int { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.m, key) + return len(m.m) +} + +// Clear empties the map. +func (m *Map[K, V]) Clear() { + m.mu.Lock() + m.m = make(map[K]V) + m.mu.Unlock() +} + +// Values returns an iterator for values only. +// +// Usage: for k, v := range m.Values() { ... } +// +// Warning: This holds a Read Lock (RLock) during iteration. +// Do not call Put/Remove on this map inside the loop (Deadlock). +func (m *Map[K, V]) Values() iter.Seq[V] { + return func(yield func(V) bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + for _, v := range m.m { + if !yield(v) { + return + } + } } } diff --git a/pkg/com/map_test.go b/pkg/com/map_test.go index 4ebe1005..15af76c4 100644 --- a/pkg/com/map_test.go +++ b/pkg/com/map_test.go @@ -17,11 +17,11 @@ func TestMap_Base(t *testing.T) { if !m.Has(k) { t.Errorf("should have the key %v, %v", k, m.m) } - v, ok := m.Contains(k) + v, ok := m.Get(k) if v != 0 && !ok { t.Errorf("should have the key %v and ok, %v %v", k, ok, m.m) } - _, ok = m.Contains(k + 1) + _, ok = m.Get(k + 1) if ok { t.Errorf("should not find anything, %v %v", ok, m.m) } @@ -31,7 +31,9 @@ func TestMap_Base(t *testing.T) { t.Errorf("should have the key %v and ok, %v %v", 1, ok, m.m) } sum := 0 - m.ForEach(func(v int) { sum += v }) + for v := range m.Values() { + sum += v + } if sum != 1 { t.Errorf("shoud have exact sum of 1, but have %v", sum) } diff --git a/pkg/com/net.go b/pkg/com/net.go index 558c8148..04ed7e54 100644 --- a/pkg/com/net.go +++ b/pkg/com/net.go @@ -170,10 +170,10 @@ func (t *RPC[_, _]) callTimeout() time.Duration { func (t *RPC[_, _]) Cleanup() { // drain cancels all what's left in the task queue. - t.calls.ForEach(func(task *request) { + for task := range t.calls.Values() { if task.err == nil { task.err = errCanceled } close(task.done) - }) + } } diff --git a/pkg/coordinator/hub.go b/pkg/coordinator/hub.go index f4a1398c..490747df 100644 --- a/pkg/coordinator/hub.go +++ b/pkg/coordinator/hub.go @@ -155,7 +155,7 @@ func (h *Hub) handleWorkerConnection() http.HandlerFunc { func (h *Hub) GetServerList() (r []api.Server) { debug := h.conf.Coordinator.Debug - h.workers.ForEach(func(w *Worker) { + for w := range h.workers.Values() { server := api.Server{ Addr: w.Addr, Id: w.Id(), @@ -170,7 +170,7 @@ func (h *Hub) GetServerList() (r []api.Server) { server.Room = w.RoomId } r = append(r, server) - }) + } return } @@ -240,11 +240,11 @@ func (h *Hub) findWorkerByRoom(id string, region string) *Worker { func (h *Hub) getAvailableWorkers(region string) []*Worker { var workers []*Worker - h.workers.ForEach(func(w *Worker) { + for w := range h.workers.Values() { if w.HasSlot() && w.In(region) { workers = append(workers, w) } - }) + } return workers } diff --git a/pkg/worker/room/room.go b/pkg/worker/room/room.go index c52f091d..c2686bdc 100644 --- a/pkg/worker/room/room.go +++ b/pkg/worker/room/room.go @@ -1,6 +1,7 @@ package room import ( + "iter" "sync" "github.com/giongto35/cloud-game/v3/pkg/worker/caged/app" @@ -27,10 +28,10 @@ type SessionManager[T Session] interface { Add(T) bool Empty() bool Find(string) T - ForEach(func(T)) RemoveL(T) int // Reset used for proper cleanup of the resources if needed. Reset() + Values() iter.Seq[T] } type Session interface { @@ -65,13 +66,19 @@ func NewRoom[T Session](id string, app app.App, um SessionManager[T], media Medi func (r *Room[T]) InitAudio() { r.app.SetAudioCb(func(a app.Audio) { r.media.PushAudio(a.Data) }) - r.media.SetAudioCb(func(d []byte, l int32) { r.users.ForEach(func(u T) { u.SendAudio(d, l) }) }) + r.media.SetAudioCb(func(d []byte, l int32) { + for u := range r.users.Values() { + u.SendAudio(d, l) + } + }) } func (r *Room[T]) InitVideo() { r.app.SetVideoCb(func(v app.Video) { data := r.media.ProcessVideo(v) - r.users.ForEach(func(u T) { u.SendVideo(data, v.Duration) }) + for u := range r.users.Values() { + u.SendVideo(data, v.Duration) + } }) } @@ -81,7 +88,11 @@ func (r *Room[T]) Id() string { return r.id } func (r *Room[T]) SetApp(app app.App) { r.app = app } func (r *Room[T]) SetMedia(m MediaPipe) { r.media = m } func (r *Room[T]) StartApp() { r.app.Start() } -func (r *Room[T]) Send(data []byte) { r.users.ForEach(func(u T) { u.SendData(data) }) } +func (r *Room[T]) Send(data []byte) { + for u := range r.users.Values() { + u.SendData(data) + } +} func (r *Room[T]) Close() { if r == nil || r.closed { @@ -137,7 +148,9 @@ func (r *Router[T]) Reset() { r.room.Close() r.room = nil } - r.users.ForEach(func(u T) { u.Disconnect() }) + for u := range r.users.Values() { + u.Disconnect() + } r.users.Reset() r.mu.Unlock() }