Use iterators in the custom map implementation

This commit is contained in:
sergystepanov 2025-11-22 22:09:38 +03:00
parent c05e42f597
commit 9191861cab
5 changed files with 126 additions and 64 deletions

View file

@ -2,6 +2,7 @@ package com
import (
"fmt"
"iter"
"sync"
)
@ -9,72 +10,118 @@ import (
// Keep in mind that the underlying map structure will grow indefinitely.
type Map[K comparable, V any] struct {
m map[K]V
mu sync.Mutex
mu sync.RWMutex
}
func (m *Map[K, _]) Has(key K) bool { _, ok := m.Contains(key); return ok }
func (m *Map[_, _]) Len() int { m.mu.Lock(); defer m.mu.Unlock(); return len(m.m) }
func (m *Map[K, V]) Pop(key K) V {
m.mu.Lock()
v := m.m[key]
delete(m.m, key)
m.mu.Unlock()
return v
func (m *Map[K, _]) Len() int {
m.mu.RLock()
defer m.mu.RUnlock()
return len(m.m)
}
func (m *Map[K, V]) Put(key K, v V) bool {
m.mu.Lock()
func (m *Map[K, _]) Has(key K) bool {
m.mu.RLock()
_, ok := m.m[key]
m.m[key] = v
m.mu.Unlock()
m.mu.RUnlock()
return ok
}
func (m *Map[K, _]) Remove(key K) { m.mu.Lock(); delete(m.m, key); m.mu.Unlock() }
func (m *Map[K, _]) RemoveL(key K) int {
m.mu.Lock()
delete(m.m, key)
k := len(m.m)
m.mu.Unlock()
return k
}
func (m *Map[K, V]) String() string {
m.mu.Lock()
s := fmt.Sprintf("%v", m.m)
m.mu.Unlock()
return s
}
// Contains returns the first value found and a boolean flag if its found or not.
func (m *Map[K, V]) Contains(key K) (v V, ok bool) {
m.mu.Lock()
defer m.mu.Unlock()
if vv, ok := m.m[key]; ok {
return vv, true
}
return v, false
// Get returns the value and exists flag (standard map comma-ok idiom).
func (m *Map[K, V]) Get(key K) (V, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
val, ok := m.m[key]
return val, ok
}
func (m *Map[K, V]) Find(key K) V {
v, _ := m.Contains(key)
v, _ := m.Get(key)
return v
}
// FindBy searches the first key-value with the provided predicate function.
func (m *Map[K, V]) FindBy(fn func(v V) bool) (v V, ok bool) {
m.mu.Lock()
defer m.mu.Unlock()
for _, vv := range m.m {
if fn(vv) {
return vv, true
}
}
return v, false
func (m *Map[K, V]) String() string {
m.mu.RLock()
defer m.mu.RUnlock()
return fmt.Sprintf("%v", m.m)
}
// ForEach processes every element with the provided callback function.
func (m *Map[K, V]) ForEach(fn func(v V)) {
// FindBy searches for the first value satisfying the predicate.
// Note: This holds a Read Lock during iteration.
func (m *Map[K, V]) FindBy(predicate func(v V) bool) (V, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, v := range m.m {
if predicate(v) {
return v, true
}
}
var zero V
return zero, false
}
// Put sets the value and returns true if the key already existed.
func (m *Map[K, V]) Put(key K, v V) bool {
m.mu.Lock()
defer m.mu.Unlock()
for _, v := range m.m {
fn(v)
if m.m == nil {
m.m = make(map[K]V)
}
_, exists := m.m[key]
m.m[key] = v
return exists
}
func (m *Map[K, V]) Remove(key K) {
m.mu.Lock()
delete(m.m, key)
m.mu.Unlock()
}
// Pop returns the value and removes it from the map.
// Returns zero value if not found.
func (m *Map[K, V]) Pop(key K) V {
m.mu.Lock()
defer m.mu.Unlock()
val, ok := m.m[key]
if ok {
delete(m.m, key)
}
return val
}
// RemoveL removes the key and returns the new length of the map.
func (m *Map[K, _]) RemoveL(key K) int {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.m, key)
return len(m.m)
}
// Clear empties the map.
func (m *Map[K, V]) Clear() {
m.mu.Lock()
m.m = make(map[K]V)
m.mu.Unlock()
}
// Values returns an iterator for values only.
//
// Usage: for k, v := range m.Values() { ... }
//
// Warning: This holds a Read Lock (RLock) during iteration.
// Do not call Put/Remove on this map inside the loop (Deadlock).
func (m *Map[K, V]) Values() iter.Seq[V] {
return func(yield func(V) bool) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, v := range m.m {
if !yield(v) {
return
}
}
}
}

View file

@ -17,11 +17,11 @@ func TestMap_Base(t *testing.T) {
if !m.Has(k) {
t.Errorf("should have the key %v, %v", k, m.m)
}
v, ok := m.Contains(k)
v, ok := m.Get(k)
if v != 0 && !ok {
t.Errorf("should have the key %v and ok, %v %v", k, ok, m.m)
}
_, ok = m.Contains(k + 1)
_, ok = m.Get(k + 1)
if ok {
t.Errorf("should not find anything, %v %v", ok, m.m)
}
@ -31,7 +31,9 @@ func TestMap_Base(t *testing.T) {
t.Errorf("should have the key %v and ok, %v %v", 1, ok, m.m)
}
sum := 0
m.ForEach(func(v int) { sum += v })
for v := range m.Values() {
sum += v
}
if sum != 1 {
t.Errorf("shoud have exact sum of 1, but have %v", sum)
}

View file

@ -170,10 +170,10 @@ func (t *RPC[_, _]) callTimeout() time.Duration {
func (t *RPC[_, _]) Cleanup() {
// drain cancels all what's left in the task queue.
t.calls.ForEach(func(task *request) {
for task := range t.calls.Values() {
if task.err == nil {
task.err = errCanceled
}
close(task.done)
})
}
}

View file

@ -155,7 +155,7 @@ func (h *Hub) handleWorkerConnection() http.HandlerFunc {
func (h *Hub) GetServerList() (r []api.Server) {
debug := h.conf.Coordinator.Debug
h.workers.ForEach(func(w *Worker) {
for w := range h.workers.Values() {
server := api.Server{
Addr: w.Addr,
Id: w.Id(),
@ -170,7 +170,7 @@ func (h *Hub) GetServerList() (r []api.Server) {
server.Room = w.RoomId
}
r = append(r, server)
})
}
return
}
@ -240,11 +240,11 @@ func (h *Hub) findWorkerByRoom(id string, region string) *Worker {
func (h *Hub) getAvailableWorkers(region string) []*Worker {
var workers []*Worker
h.workers.ForEach(func(w *Worker) {
for w := range h.workers.Values() {
if w.HasSlot() && w.In(region) {
workers = append(workers, w)
}
})
}
return workers
}

View file

@ -1,6 +1,7 @@
package room
import (
"iter"
"sync"
"github.com/giongto35/cloud-game/v3/pkg/worker/caged/app"
@ -27,10 +28,10 @@ type SessionManager[T Session] interface {
Add(T) bool
Empty() bool
Find(string) T
ForEach(func(T))
RemoveL(T) int
// Reset used for proper cleanup of the resources if needed.
Reset()
Values() iter.Seq[T]
}
type Session interface {
@ -65,13 +66,19 @@ func NewRoom[T Session](id string, app app.App, um SessionManager[T], media Medi
func (r *Room[T]) InitAudio() {
r.app.SetAudioCb(func(a app.Audio) { r.media.PushAudio(a.Data) })
r.media.SetAudioCb(func(d []byte, l int32) { r.users.ForEach(func(u T) { u.SendAudio(d, l) }) })
r.media.SetAudioCb(func(d []byte, l int32) {
for u := range r.users.Values() {
u.SendAudio(d, l)
}
})
}
func (r *Room[T]) InitVideo() {
r.app.SetVideoCb(func(v app.Video) {
data := r.media.ProcessVideo(v)
r.users.ForEach(func(u T) { u.SendVideo(data, v.Duration) })
for u := range r.users.Values() {
u.SendVideo(data, v.Duration)
}
})
}
@ -81,7 +88,11 @@ func (r *Room[T]) Id() string { return r.id }
func (r *Room[T]) SetApp(app app.App) { r.app = app }
func (r *Room[T]) SetMedia(m MediaPipe) { r.media = m }
func (r *Room[T]) StartApp() { r.app.Start() }
func (r *Room[T]) Send(data []byte) { r.users.ForEach(func(u T) { u.SendData(data) }) }
func (r *Room[T]) Send(data []byte) {
for u := range r.users.Values() {
u.SendData(data)
}
}
func (r *Room[T]) Close() {
if r == nil || r.closed {
@ -137,7 +148,9 @@ func (r *Router[T]) Reset() {
r.room.Close()
r.room = nil
}
r.users.ForEach(func(u T) { u.Disconnect() })
for u := range r.users.Values() {
u.Disconnect()
}
r.users.Reset()
r.mu.Unlock()
}