Try atomic-based locks in the same thread execution loop instead of a bunch of mutexes.

This commit is contained in:
sergystepanov 2025-12-14 13:18:34 +03:00
parent 84ad0a4cac
commit 460c466053
2 changed files with 196 additions and 129 deletions

View file

@ -1,30 +1,44 @@
#include "libretro.h"
#include <pthread.h>
#include <stdbool.h>
#include <stdarg.h>
#include <stdatomic.h>
#include <stdio.h>
#include <string.h>
#define RETRO_ENVIRONMENT_GET_CLEAR_ALL_THREAD_WAITS_CB (3 | 0x800000)
int initialized = 0;
typedef struct {
int type;
void* fn;
void* arg1;
void* arg2;
void* result;
} call_def_t;
call_def_t call;
// ============================================================================
// Call types for same_thread operations
// ============================================================================
enum call_type {
CALL_VOID = -1,
CALL_VOID = 0,
CALL_SERIALIZE = 1,
CALL_UNSERIALIZE = 2,
};
void *same_thread_with_args(void *f, int type, ...);
// ============================================================================
// Lock-free call structure
// ============================================================================
typedef struct {
atomic_int state; // 0=idle, 1=pending, 2=done
int type;
void *fn;
void *arg1;
size_t arg2;
bool result;
} lf_call_t;
static lf_call_t lf_call = {0};
static atomic_int thread_running = 0;
static pthread_t worker_thread;
// ============================================================================
// Logging
// ============================================================================
void core_log_cgo(enum retro_log_level level, const char *fmt, ...) {
char msg[2048] = {0};
@ -36,6 +50,10 @@ void core_log_cgo(enum retro_log_level level, const char *fmt, ...) {
coreLog(level, msg);
}
// ============================================================================
// Bridge functions for calling libretro core
// ============================================================================
void bridge_call(void *f) {
((void (*)(void)) f)();
}
@ -92,49 +110,55 @@ void bridge_retro_set_controller_port_device(void *f, unsigned port, unsigned de
((void (*)(unsigned, unsigned)) f)(port, device);
}
void bridge_retro_keyboard_callback(void *cb, bool down, unsigned keycode, uint32_t character, uint16_t keyModifiers) {
(*(retro_keyboard_event_t *) cb)(down, keycode, character, keyModifiers);
}
void bridge_context_reset(retro_hw_context_reset_t f) {
f();
}
// ============================================================================
// Environment callback
// ============================================================================
static bool clear_all_thread_waits_cb(unsigned v, void *data) {
core_log_cgo(RETRO_LOG_DEBUG, "CLEAR_ALL_THREAD_WAITS_CB (%d)\n", v);
return true;
}
void bridge_retro_keyboard_callback(void *cb, bool down, unsigned keycode, uint32_t character, uint16_t keyModifiers) {
(*(retro_keyboard_event_t *) cb)(down, keycode, character, keyModifiers);
}
bool core_environment_cgo(unsigned cmd, void *data) {
bool coreEnvironment(unsigned, void *);
switch (cmd)
{
case RETRO_ENVIRONMENT_GET_VARIABLE_UPDATE:
return false;
break;
return false;
case RETRO_ENVIRONMENT_GET_AUDIO_VIDEO_ENABLE:
return false;
break;
return false;
case RETRO_ENVIRONMENT_GET_CLEAR_ALL_THREAD_WAITS_CB:
*(retro_environment_t *)data = clear_all_thread_waits_cb;
return true;
break;
*(retro_environment_t *)data = clear_all_thread_waits_cb;
return true;
case RETRO_ENVIRONMENT_GET_INPUT_MAX_USERS:
*(unsigned *)data = 4;
core_log_cgo(RETRO_LOG_DEBUG, "Set max users: %d\n", 4);
return true;
break;
*(unsigned *)data = 4;
core_log_cgo(RETRO_LOG_DEBUG, "Set max users: %d\n", 4);
return true;
case RETRO_ENVIRONMENT_GET_INPUT_BITMASKS:
return false;
return false;
case RETRO_ENVIRONMENT_SHUTDOWN:
return false;
break;
return false;
case RETRO_ENVIRONMENT_GET_SAVESTATE_CONTEXT:
if (data != NULL) *(int *)data = RETRO_SAVESTATE_CONTEXT_NORMAL;
return true;
break;
if (data != NULL) *(int *)data = RETRO_SAVESTATE_CONTEXT_NORMAL;
return true;
}
return coreEnvironment(cmd, data);
}
// ============================================================================
// Core callbacks
// ============================================================================
void core_video_refresh_cgo(void *data, unsigned width, unsigned height, size_t pitch) {
void coreVideoRefresh(void *, unsigned, unsigned, size_t);
coreVideoRefresh(data, width, height, pitch);
@ -168,9 +192,9 @@ retro_proc_address_t core_get_proc_address_cgo(const char *sym) {
return coreGetProcAddress(sym);
}
void bridge_context_reset(retro_hw_context_reset_t f) {
f();
}
// ============================================================================
// Video init/deinit
// ============================================================================
void init_video_cgo() {
void initVideo();
@ -182,106 +206,151 @@ void deinit_video_cgo() {
deinitVideo();
}
typedef struct {
pthread_mutex_t m;
pthread_cond_t cond;
} mutex_t;
// ============================================================================
// CPU pause hints for spin loops
// ============================================================================
void mutex_init(mutex_t *m) {
pthread_mutex_init(&m->m, NULL);
pthread_cond_init(&m->cond, NULL);
static inline void cpu_relax(void) {
#if defined(__x86_64__) || defined(_M_X64) || defined(__i386__) || defined(_M_IX86)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("isb" ::: "memory");
#elif defined(__arm__)
__asm__ volatile("yield" ::: "memory");
#else
// Generic fallback - compiler barrier
__asm__ volatile("" ::: "memory");
#endif
}
void mutex_destroy(mutex_t *m) {
pthread_mutex_trylock(&m->m);
pthread_mutex_unlock(&m->m);
pthread_mutex_destroy(&m->m);
pthread_cond_signal(&m->cond);
pthread_cond_destroy(&m->cond);
}
// ============================================================================
// Lock-free same_thread implementation.
// Needed due to C/Go stack grow issues (libco).
// ============================================================================
void mutex_lock(mutex_t *m) { pthread_mutex_lock(&m->m); }
void mutex_wait(mutex_t *m) { pthread_cond_wait(&m->cond, &m->m); }
void mutex_unlock(mutex_t *m) { pthread_mutex_unlock(&m->m); }
void mutex_signal(mutex_t *m) { pthread_cond_signal(&m->cond); }
static void *run_loop_fast(void *unused) {
core_log_cgo(RETRO_LOG_DEBUG, "Worker thread started\n");
static pthread_t thread;
mutex_t run_mutex, done_mutex;
while (atomic_load_explicit(&thread_running, memory_order_acquire)) {
// Check if there's a pending call
int state = atomic_load_explicit(&lf_call.state, memory_order_acquire);
void *run_loop(void *unused) {
core_log_cgo(RETRO_LOG_DEBUG, "UnLibCo run loop start\n");
mutex_lock(&done_mutex);
mutex_lock(&run_mutex);
mutex_signal(&done_mutex);
mutex_unlock(&done_mutex);
while (initialized) {
mutex_wait(&run_mutex);
switch (call.type) {
case CALL_SERIALIZE:
case CALL_UNSERIALIZE:
*(bool*)call.result = ((bool (*)(void*, size_t))call.fn)(call.arg1, *(size_t*)call.arg2);
break;
default:
((void (*)(void)) call.fn)();
if (state == 1) {
// Execute the call
switch (lf_call.type) {
case CALL_SERIALIZE:
lf_call.result = ((bool (*)(void*, size_t))lf_call.fn)(
lf_call.arg1, lf_call.arg2);
break;
case CALL_UNSERIALIZE:
lf_call.result = ((bool (*)(void*, size_t))lf_call.fn)(
lf_call.arg1, lf_call.arg2);
break;
case CALL_VOID:
default:
((void (*)(void))lf_call.fn)();
break;
}
// Mark as done
atomic_store_explicit(&lf_call.state, 2, memory_order_release);
} else {
// Spin with CPU hint to reduce power consumption
cpu_relax();
}
mutex_lock(&done_mutex);
mutex_signal(&done_mutex);
mutex_unlock(&done_mutex);
}
mutex_destroy(&run_mutex);
mutex_destroy(&done_mutex);
pthread_detach(thread);
core_log_cgo(RETRO_LOG_DEBUG, "UnLibCo run loop stop\n");
pthread_exit(NULL);
core_log_cgo(RETRO_LOG_DEBUG, "Worker thread stopped\n");
return NULL;
}
void same_thread_stop() {
initialized = 0;
}
void *same_thread_with_args(void *f, int type, ...) {
if (!initialized) {
initialized = 1;
mutex_init(&run_mutex);
mutex_init(&done_mutex);
mutex_lock(&done_mutex);
pthread_create(&thread, NULL, run_loop, NULL);
mutex_wait(&done_mutex);
mutex_unlock(&done_mutex);
// Initialize the worker thread if not already running
static void same_thread_ensure_init(void) {
int expected = 0;
if (atomic_compare_exchange_strong_explicit(
&thread_running, &expected, 1,
memory_order_acq_rel, memory_order_acquire)) {
// We won the race to initialize
atomic_store_explicit(&lf_call.state, 0, memory_order_release);
pthread_create(&worker_thread, NULL, run_loop_fast, NULL);
core_log_cgo(RETRO_LOG_DEBUG, "Worker thread initialized\n");
}
mutex_lock(&run_mutex);
mutex_lock(&done_mutex);
}
call.type = type;
call.fn = f;
if (type != CALL_VOID) {
va_list args;
va_start(args, type);
switch (type) {
case CALL_SERIALIZE:
case CALL_UNSERIALIZE:
call.arg1 = va_arg(args, void*);
size_t size;
size = va_arg(args, size_t);
call.arg2 = &size;
bool result;
call.result = &result;
break;
}
va_end(args);
// Stop the worker thread
void same_thread_stop(void) {
if (atomic_load_explicit(&thread_running, memory_order_acquire)) {
atomic_store_explicit(&thread_running, 0, memory_order_release);
pthread_join(worker_thread, NULL);
core_log_cgo(RETRO_LOG_DEBUG, "Worker thread stopped\n");
}
mutex_signal(&run_mutex);
mutex_unlock(&run_mutex);
mutex_wait(&done_mutex);
mutex_unlock(&done_mutex);
return call.result;
}
void *same_thread_with_args2(void *f, int type, void *arg1, void *arg2) {
return same_thread_with_args(f, type, arg1, arg2);
}
// Execute a void function on the worker thread
void same_thread(void *f) {
same_thread_with_args(f, CALL_VOID);
same_thread_ensure_init();
// Wait for any previous call to complete
while (atomic_load_explicit(&lf_call.state, memory_order_acquire) != 0) {
cpu_relax();
}
// Set up the call
lf_call.fn = f;
lf_call.type = CALL_VOID;
// Signal that a call is pending
atomic_store_explicit(&lf_call.state, 1, memory_order_release);
// Wait for completion
while (atomic_load_explicit(&lf_call.state, memory_order_acquire) != 2) {
cpu_relax();
}
// Reset to idle
atomic_store_explicit(&lf_call.state, 0, memory_order_release);
}
// Execute a serialize/unserialize function on the worker thread
// Returns pointer to the result (stored in lf_call.result)
bool same_thread_serialize(void *f, int type, void *data, size_t size) {
same_thread_ensure_init();
// Wait for any previous call to complete
while (atomic_load_explicit(&lf_call.state, memory_order_acquire) != 0) {
cpu_relax();
}
// Set up the call - store values directly, not pointers to locals!
lf_call.fn = f;
lf_call.type = type;
lf_call.arg1 = data;
lf_call.arg2 = size;
lf_call.result = false;
// Signal that a call is pending
atomic_store_explicit(&lf_call.state, 1, memory_order_release);
// Wait for completion
while (atomic_load_explicit(&lf_call.state, memory_order_acquire) != 2) {
cpu_relax();
}
// Get result before resetting
bool result = lf_call.result;
// Reset to idle
atomic_store_explicit(&lf_call.state, 0, memory_order_release);
return result;
}
// Execute functions on the same thread.
void *same_thread_with_args2(void *f, int type, void *arg1, void *arg2) {
size_t size = *(size_t*)arg2;
static _Thread_local bool result_storage;
result_storage = same_thread_serialize(f, type, arg1, size);
return &result_storage;
}

View file

@ -504,9 +504,8 @@ func SaveState() (State, error) {
rez := false
if Nan0.LibCo && !Nan0.hackSkipSameThreadSave {
rez = *(*bool)(C.same_thread_with_args2(retroSerialize, C.int(CallSerialize), unsafe.Pointer(&data[0]), unsafe.Pointer(&size)))
} else {
rez = bool(C.bridge_retro_serialize(retroSerialize, unsafe.Pointer(&data[0]), size))
rez = *(*bool)(C.same_thread_with_args2(retroSerialize, C.int(CallSerialize),
unsafe.Pointer(&data[0]), unsafe.Pointer(&size)))
}
if !rez {
@ -526,9 +525,8 @@ func RestoreSaveState(st State) error {
rez := false
if Nan0.LibCo {
rez = *(*bool)(C.same_thread_with_args2(retroUnserialize, C.int(CallUnserialize), unsafe.Pointer(&st[0]), unsafe.Pointer(&size)))
} else {
rez = bool(C.bridge_retro_unserialize(retroUnserialize, unsafe.Pointer(&st[0]), size))
rez = *(*bool)(C.same_thread_with_args2(retroUnserialize, C.int(CallUnserialize),
unsafe.Pointer(&st[0]), unsafe.Pointer(&size)))
}
if !rez {