From f708fce1122f115c5e12f61d3c8790ba3313c09b Mon Sep 17 00:00:00 2001 From: sergystepanov Date: Sun, 14 Dec 2025 13:30:45 +0300 Subject: [PATCH] Revert "Try atomic-based locks in the same thread execution loop instead of a bunch of mutexes." This reverts commit 460c4660530cb816ddf4e1a5d2387e8561322dd2. --- pkg/worker/caged/libretro/nanoarch/nanoarch.c | 315 +++++++----------- .../caged/libretro/nanoarch/nanoarch.go | 10 +- 2 files changed, 129 insertions(+), 196 deletions(-) diff --git a/pkg/worker/caged/libretro/nanoarch/nanoarch.c b/pkg/worker/caged/libretro/nanoarch/nanoarch.c index 23bf8543..6290150c 100644 --- a/pkg/worker/caged/libretro/nanoarch/nanoarch.c +++ b/pkg/worker/caged/libretro/nanoarch/nanoarch.c @@ -1,44 +1,30 @@ #include "libretro.h" - #include #include #include -#include #include -#include #define RETRO_ENVIRONMENT_GET_CLEAR_ALL_THREAD_WAITS_CB (3 | 0x800000) -// ============================================================================ -// Call types for same_thread operations -// ============================================================================ +int initialized = 0; + +typedef struct { + int type; + void* fn; + void* arg1; + void* arg2; + void* result; +} call_def_t; + +call_def_t call; enum call_type { - CALL_VOID = 0, + CALL_VOID = -1, CALL_SERIALIZE = 1, CALL_UNSERIALIZE = 2, }; -// ============================================================================ -// Lock-free call structure -// ============================================================================ - -typedef struct { - atomic_int state; // 0=idle, 1=pending, 2=done - int type; - void *fn; - void *arg1; - size_t arg2; - bool result; -} lf_call_t; - -static lf_call_t lf_call = {0}; -static atomic_int thread_running = 0; -static pthread_t worker_thread; - -// ============================================================================ -// Logging -// ============================================================================ +void *same_thread_with_args(void *f, int type, ...); void core_log_cgo(enum retro_log_level level, const char *fmt, ...) { char msg[2048] = {0}; @@ -50,10 +36,6 @@ void core_log_cgo(enum retro_log_level level, const char *fmt, ...) { coreLog(level, msg); } -// ============================================================================ -// Bridge functions for calling libretro core -// ============================================================================ - void bridge_call(void *f) { ((void (*)(void)) f)(); } @@ -110,55 +92,49 @@ void bridge_retro_set_controller_port_device(void *f, unsigned port, unsigned de ((void (*)(unsigned, unsigned)) f)(port, device); } -void bridge_retro_keyboard_callback(void *cb, bool down, unsigned keycode, uint32_t character, uint16_t keyModifiers) { - (*(retro_keyboard_event_t *) cb)(down, keycode, character, keyModifiers); -} - -void bridge_context_reset(retro_hw_context_reset_t f) { - f(); -} - -// ============================================================================ -// Environment callback -// ============================================================================ - static bool clear_all_thread_waits_cb(unsigned v, void *data) { core_log_cgo(RETRO_LOG_DEBUG, "CLEAR_ALL_THREAD_WAITS_CB (%d)\n", v); return true; } +void bridge_retro_keyboard_callback(void *cb, bool down, unsigned keycode, uint32_t character, uint16_t keyModifiers) { + (*(retro_keyboard_event_t *) cb)(down, keycode, character, keyModifiers); +} + bool core_environment_cgo(unsigned cmd, void *data) { bool coreEnvironment(unsigned, void *); switch (cmd) { case RETRO_ENVIRONMENT_GET_VARIABLE_UPDATE: - return false; + return false; + break; case RETRO_ENVIRONMENT_GET_AUDIO_VIDEO_ENABLE: - return false; + return false; + break; case RETRO_ENVIRONMENT_GET_CLEAR_ALL_THREAD_WAITS_CB: - *(retro_environment_t *)data = clear_all_thread_waits_cb; - return true; + *(retro_environment_t *)data = clear_all_thread_waits_cb; + return true; + break; case RETRO_ENVIRONMENT_GET_INPUT_MAX_USERS: - *(unsigned *)data = 4; - core_log_cgo(RETRO_LOG_DEBUG, "Set max users: %d\n", 4); - return true; + *(unsigned *)data = 4; + core_log_cgo(RETRO_LOG_DEBUG, "Set max users: %d\n", 4); + return true; + break; case RETRO_ENVIRONMENT_GET_INPUT_BITMASKS: - return false; + return false; case RETRO_ENVIRONMENT_SHUTDOWN: - return false; + return false; + break; case RETRO_ENVIRONMENT_GET_SAVESTATE_CONTEXT: - if (data != NULL) *(int *)data = RETRO_SAVESTATE_CONTEXT_NORMAL; - return true; + if (data != NULL) *(int *)data = RETRO_SAVESTATE_CONTEXT_NORMAL; + return true; + break; } return coreEnvironment(cmd, data); } -// ============================================================================ -// Core callbacks -// ============================================================================ - void core_video_refresh_cgo(void *data, unsigned width, unsigned height, size_t pitch) { void coreVideoRefresh(void *, unsigned, unsigned, size_t); coreVideoRefresh(data, width, height, pitch); @@ -192,9 +168,9 @@ retro_proc_address_t core_get_proc_address_cgo(const char *sym) { return coreGetProcAddress(sym); } -// ============================================================================ -// Video init/deinit -// ============================================================================ +void bridge_context_reset(retro_hw_context_reset_t f) { + f(); +} void init_video_cgo() { void initVideo(); @@ -206,151 +182,106 @@ void deinit_video_cgo() { deinitVideo(); } -// ============================================================================ -// CPU pause hints for spin loops -// ============================================================================ +typedef struct { + pthread_mutex_t m; + pthread_cond_t cond; +} mutex_t; -static inline void cpu_relax(void) { -#if defined(__x86_64__) || defined(_M_X64) || defined(__i386__) || defined(_M_IX86) - __asm__ volatile("pause" ::: "memory"); -#elif defined(__aarch64__) - __asm__ volatile("isb" ::: "memory"); -#elif defined(__arm__) - __asm__ volatile("yield" ::: "memory"); -#else - // Generic fallback - compiler barrier - __asm__ volatile("" ::: "memory"); -#endif +void mutex_init(mutex_t *m) { + pthread_mutex_init(&m->m, NULL); + pthread_cond_init(&m->cond, NULL); } -// ============================================================================ -// Lock-free same_thread implementation. -// Needed due to C/Go stack grow issues (libco). -// ============================================================================ +void mutex_destroy(mutex_t *m) { + pthread_mutex_trylock(&m->m); + pthread_mutex_unlock(&m->m); + pthread_mutex_destroy(&m->m); + pthread_cond_signal(&m->cond); + pthread_cond_destroy(&m->cond); +} -static void *run_loop_fast(void *unused) { - core_log_cgo(RETRO_LOG_DEBUG, "Worker thread started\n"); +void mutex_lock(mutex_t *m) { pthread_mutex_lock(&m->m); } +void mutex_wait(mutex_t *m) { pthread_cond_wait(&m->cond, &m->m); } +void mutex_unlock(mutex_t *m) { pthread_mutex_unlock(&m->m); } +void mutex_signal(mutex_t *m) { pthread_cond_signal(&m->cond); } - while (atomic_load_explicit(&thread_running, memory_order_acquire)) { - // Check if there's a pending call - int state = atomic_load_explicit(&lf_call.state, memory_order_acquire); +static pthread_t thread; +mutex_t run_mutex, done_mutex; - if (state == 1) { - // Execute the call - switch (lf_call.type) { - case CALL_SERIALIZE: - lf_call.result = ((bool (*)(void*, size_t))lf_call.fn)( - lf_call.arg1, lf_call.arg2); - break; - case CALL_UNSERIALIZE: - lf_call.result = ((bool (*)(void*, size_t))lf_call.fn)( - lf_call.arg1, lf_call.arg2); - break; - case CALL_VOID: - default: - ((void (*)(void))lf_call.fn)(); - break; - } - - // Mark as done - atomic_store_explicit(&lf_call.state, 2, memory_order_release); - } else { - // Spin with CPU hint to reduce power consumption - cpu_relax(); +void *run_loop(void *unused) { + core_log_cgo(RETRO_LOG_DEBUG, "UnLibCo run loop start\n"); + mutex_lock(&done_mutex); + mutex_lock(&run_mutex); + mutex_signal(&done_mutex); + mutex_unlock(&done_mutex); + while (initialized) { + mutex_wait(&run_mutex); + switch (call.type) { + case CALL_SERIALIZE: + case CALL_UNSERIALIZE: + *(bool*)call.result = ((bool (*)(void*, size_t))call.fn)(call.arg1, *(size_t*)call.arg2); + break; + default: + ((void (*)(void)) call.fn)(); } + mutex_lock(&done_mutex); + mutex_signal(&done_mutex); + mutex_unlock(&done_mutex); } - - core_log_cgo(RETRO_LOG_DEBUG, "Worker thread stopped\n"); - return NULL; + mutex_destroy(&run_mutex); + mutex_destroy(&done_mutex); + pthread_detach(thread); + core_log_cgo(RETRO_LOG_DEBUG, "UnLibCo run loop stop\n"); + pthread_exit(NULL); } -// Initialize the worker thread if not already running -static void same_thread_ensure_init(void) { - int expected = 0; - if (atomic_compare_exchange_strong_explicit( - &thread_running, &expected, 1, - memory_order_acq_rel, memory_order_acquire)) { - // We won the race to initialize - atomic_store_explicit(&lf_call.state, 0, memory_order_release); - pthread_create(&worker_thread, NULL, run_loop_fast, NULL); - core_log_cgo(RETRO_LOG_DEBUG, "Worker thread initialized\n"); - } +void same_thread_stop() { + initialized = 0; } -// Stop the worker thread -void same_thread_stop(void) { - if (atomic_load_explicit(&thread_running, memory_order_acquire)) { - atomic_store_explicit(&thread_running, 0, memory_order_release); - pthread_join(worker_thread, NULL); - core_log_cgo(RETRO_LOG_DEBUG, "Worker thread stopped\n"); +void *same_thread_with_args(void *f, int type, ...) { + if (!initialized) { + initialized = 1; + mutex_init(&run_mutex); + mutex_init(&done_mutex); + mutex_lock(&done_mutex); + pthread_create(&thread, NULL, run_loop, NULL); + mutex_wait(&done_mutex); + mutex_unlock(&done_mutex); } + mutex_lock(&run_mutex); + mutex_lock(&done_mutex); + + call.type = type; + call.fn = f; + + if (type != CALL_VOID) { + va_list args; + va_start(args, type); + switch (type) { + case CALL_SERIALIZE: + case CALL_UNSERIALIZE: + call.arg1 = va_arg(args, void*); + size_t size; + size = va_arg(args, size_t); + call.arg2 = &size; + bool result; + call.result = &result; + break; + } + va_end(args); + } + mutex_signal(&run_mutex); + mutex_unlock(&run_mutex); + mutex_wait(&done_mutex); + mutex_unlock(&done_mutex); + return call.result; } -// Execute a void function on the worker thread -void same_thread(void *f) { - same_thread_ensure_init(); - - // Wait for any previous call to complete - while (atomic_load_explicit(&lf_call.state, memory_order_acquire) != 0) { - cpu_relax(); - } - - // Set up the call - lf_call.fn = f; - lf_call.type = CALL_VOID; - - // Signal that a call is pending - atomic_store_explicit(&lf_call.state, 1, memory_order_release); - - // Wait for completion - while (atomic_load_explicit(&lf_call.state, memory_order_acquire) != 2) { - cpu_relax(); - } - - // Reset to idle - atomic_store_explicit(&lf_call.state, 0, memory_order_release); -} - -// Execute a serialize/unserialize function on the worker thread -// Returns pointer to the result (stored in lf_call.result) -bool same_thread_serialize(void *f, int type, void *data, size_t size) { - same_thread_ensure_init(); - - // Wait for any previous call to complete - while (atomic_load_explicit(&lf_call.state, memory_order_acquire) != 0) { - cpu_relax(); - } - - // Set up the call - store values directly, not pointers to locals! - lf_call.fn = f; - lf_call.type = type; - lf_call.arg1 = data; - lf_call.arg2 = size; - lf_call.result = false; - - // Signal that a call is pending - atomic_store_explicit(&lf_call.state, 1, memory_order_release); - - // Wait for completion - while (atomic_load_explicit(&lf_call.state, memory_order_acquire) != 2) { - cpu_relax(); - } - - // Get result before resetting - bool result = lf_call.result; - - // Reset to idle - atomic_store_explicit(&lf_call.state, 0, memory_order_release); - - return result; -} - -// Execute functions on the same thread. void *same_thread_with_args2(void *f, int type, void *arg1, void *arg2) { - size_t size = *(size_t*)arg2; + return same_thread_with_args(f, type, arg1, arg2); +} - static _Thread_local bool result_storage; - result_storage = same_thread_serialize(f, type, arg1, size); - - return &result_storage; -} \ No newline at end of file +void same_thread(void *f) { + same_thread_with_args(f, CALL_VOID); +} diff --git a/pkg/worker/caged/libretro/nanoarch/nanoarch.go b/pkg/worker/caged/libretro/nanoarch/nanoarch.go index 710e0aec..7c05d962 100644 --- a/pkg/worker/caged/libretro/nanoarch/nanoarch.go +++ b/pkg/worker/caged/libretro/nanoarch/nanoarch.go @@ -504,8 +504,9 @@ func SaveState() (State, error) { rez := false if Nan0.LibCo && !Nan0.hackSkipSameThreadSave { - rez = *(*bool)(C.same_thread_with_args2(retroSerialize, C.int(CallSerialize), - unsafe.Pointer(&data[0]), unsafe.Pointer(&size))) + rez = *(*bool)(C.same_thread_with_args2(retroSerialize, C.int(CallSerialize), unsafe.Pointer(&data[0]), unsafe.Pointer(&size))) + } else { + rez = bool(C.bridge_retro_serialize(retroSerialize, unsafe.Pointer(&data[0]), size)) } if !rez { @@ -525,8 +526,9 @@ func RestoreSaveState(st State) error { rez := false if Nan0.LibCo { - rez = *(*bool)(C.same_thread_with_args2(retroUnserialize, C.int(CallUnserialize), - unsafe.Pointer(&st[0]), unsafe.Pointer(&size))) + rez = *(*bool)(C.same_thread_with_args2(retroUnserialize, C.int(CallUnserialize), unsafe.Pointer(&st[0]), unsafe.Pointer(&size))) + } else { + rez = bool(C.bridge_retro_unserialize(retroUnserialize, unsafe.Pointer(&st[0]), size)) } if !rez {