From a0bf837eda16c5421a9029122d01c402a74d3f0c Mon Sep 17 00:00:00 2001 From: Urban Wallasch Date: Sun, 18 Aug 2019 22:38:08 +0200 Subject: [PATCH] * Moved shared memory read lock handling from tele2json.c to individual clients. * Fixed shared memory rwlock initialization to use appropriate set of attributes. * Fixed not releasing the write lock when plugin initialization failed. * Changed order of telemetry data initialization (e.g. raise alive flag only on success). * Declared shared memory telemetry objects volatile in the clients. --- tele2json.c | 14 ++------------ telehttpd.c | 21 ++++++++++++++++----- telelogger.c | 18 ++++++++++++++---- teleshmem.cpp | 42 ++++++++++++++++++++++++++++++------------ 4 files changed, 62 insertions(+), 33 deletions(-) diff --git a/tele2json.c b/tele2json.c index aee5bbd..7a40294 100644 --- a/tele2json.c +++ b/tele2json.c @@ -7,22 +7,12 @@ #include "tele2json.h" #include "log.h" -size_t tele2json( char *buf, size_t size, struct telemetry_state_t *ptele ) { - int e; +size_t tele2json( char *buf, size_t size, struct telemetry_state_t *tele ) { size_t n = 0; - struct telemetry_state_t t_, *tele = &t_; - if ( NULL == ptele ) + if ( NULL == tele ) return 0; - e = pthread_rwlock_rdlock( &ptele->rwlock ); - if ( 0 != e ) { - EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); - return 0; - } - memcpy( tele, ptele, sizeof *tele ); - pthread_rwlock_unlock( &ptele->rwlock ); - #define CHKSIZE do{ if ( n >= size ) return 0; }while(0) CHKSIZE; n += snprintf( buf+n, size-n, "{\n" ); CHKSIZE; diff --git a/telehttpd.c b/telehttpd.c index a3c31f5..1c9571b 100644 --- a/telehttpd.c +++ b/telehttpd.c @@ -42,8 +42,7 @@ struct { false, }; -static struct telemetry_state_t *telemetry; -static struct telemetry_state_t telemetry0; +static volatile struct telemetry_state_t *telemetry; static volatile sig_atomic_t force_quit; @@ -69,7 +68,7 @@ static int init_shm( bool retry ) { else if ( TELE_VERSION != telemetry->version ) { EPRINT( "telemetry version mismatch: got %u, want %u\n", (unsigned)telemetry->version, TELE_VERSION ); - release_shm( TELE_SHM_KEY, telemetry ); + release_shm( TELE_SHM_KEY, (void *)telemetry ); telemetry = NULL; if ( !retry ) return -1; @@ -86,7 +85,7 @@ static int check_shm( bool retry ) { if ( NULL == telemetry ) return init_shm( retry ); if ( !(telemetry->flags & TELE_FLAG_ALIVE) ) { - release_shm( TELE_SHM_KEY, telemetry ); + release_shm( TELE_SHM_KEY, (void *)telemetry ); telemetry = NULL; return init_shm( retry ); } @@ -277,7 +276,19 @@ static int serve_http(void) { if ( 0 < as ) { if ( NULL != (td = malloc( sizeof *td )) ) { td->sock = as; - memcpy( &td->tele, telemetry ? telemetry : &telemetry0, sizeof td->tele ); + if ( telemetry ) { + int e = pthread_rwlock_rdlock( (pthread_rwlock_t *)&telemetry->rwlock ); + if ( 0 != e ) { + EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); + return -1; + } + memcpy( &td->tele, (void *)telemetry, sizeof td->tele ); + pthread_rwlock_unlock( (pthread_rwlock_t *)&telemetry->rwlock ); + } + else { + DPRINT( "No telemetry, serving dummy data\n" ); + memset( &td->tele, 0, sizeof td->tele ); + } pthread_attr_init( &attr ); pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); if ( 0 != pthread_create( &tid, &attr, handle_conn, td ) ) { diff --git a/telelogger.c b/telelogger.c index 0fb02b2..8223e8d 100644 --- a/telelogger.c +++ b/telelogger.c @@ -15,7 +15,7 @@ #include "telemetry.h" #include "tele2json.h" -static struct telemetry_state_t *telemetry; +static volatile struct telemetry_state_t *telemetry; static bool last_paused; static uint64_t last_timestamp; @@ -51,7 +51,7 @@ static int init_shm( bool retry ) { (unsigned)telemetry->version, TELE_VERSION ); if ( !retry ) return -1; - release_shm( TELE_SHM_KEY, telemetry ); + release_shm( TELE_SHM_KEY, (void *)telemetry ); telemetry = NULL; sleep(1); } @@ -66,7 +66,7 @@ static int check_shm( bool retry ) { if ( NULL == telemetry ) return init_shm( retry ); if ( !(telemetry->flags & TELE_FLAG_ALIVE) ) { - release_shm( TELE_SHM_KEY, telemetry ); + release_shm( TELE_SHM_KEY, (void *)telemetry ); telemetry = NULL; return init_shm( retry ); } @@ -74,15 +74,25 @@ static int check_shm( bool retry ) { } static int log_console( int cnt, int delay ) { + int e; char buf[4096]; bool forever = !cnt; + struct telemetry_state_t tele; last_timestamp = telemetry->timestamp; while ( forever || cnt-- > 0 ) { if ( 0 != check_shm( cfg.retry ) ) return -1; - if ( tele2json( buf, sizeof buf, telemetry ) > 0 ) + e = pthread_rwlock_rdlock( (pthread_rwlock_t *)&telemetry->rwlock ); + if ( 0 != e ) { + EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); + return -1; + } + memcpy( &tele, (void *)telemetry, sizeof tele ); + pthread_rwlock_unlock( (pthread_rwlock_t *)&telemetry->rwlock ); + + if ( tele2json( buf, sizeof buf, &tele ) > 0 ) puts( buf ); if ( forever || cnt > 0 ) { diff --git a/teleshmem.cpp b/teleshmem.cpp index 46f2482..e00e63f 100644 --- a/teleshmem.cpp +++ b/teleshmem.cpp @@ -207,16 +207,31 @@ static bool init_shm(void) void *p; p = init_shmput( TELE_SHM_KEY, sizeof *telemetry ); telemetry = static_cast(p); - if ( NULL != telemetry ) - pthread_rwlock_init( &telemetry->rwlock, NULL ); - } - if ( NULL != telemetry ) { - lock_shm(); - telemetry->version = TELE_VERSION; - telemetry->flags |= TELE_FLAG_ALIVE; - unlock_shm(); + if ( NULL != telemetry ) { + int e; + pthread_rwlockattr_t attr; + + e = pthread_rwlockattr_init( &attr ); + if ( 0 != e ) { + log_print( "ERROR: pthread_rwlockattr_init: %s\n", strerror(e) ); + goto RWLOCK_ERR; + } + pthread_rwlockattr_setkind_np( &attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP ); + pthread_rwlockattr_setpshared( &attr, PTHREAD_PROCESS_SHARED ); + e = pthread_rwlock_init( &telemetry->rwlock, &attr ); + pthread_rwlockattr_destroy( &attr ); + if ( 0 != e ) { + log_print( "ERROR: pthread_rwlock_init: %s\n", strerror(e) ); + goto RWLOCK_ERR; + } + } } return NULL != telemetry; + + RWLOCK_ERR: + release_shm( TELE_SHM_KEY, telemetry ); + telemetry = NULL; + return false; } static void drop_shm(void) @@ -565,6 +580,8 @@ SCSAPI_RESULT scs_telemetry_init(const scs_u32_t version, const scs_telemetry_in return SCS_RESULT_generic_error; } lock_shm(); + telemetry->version = TELE_VERSION; + telemetry->paused = true; // Check application version. Note that this example uses fairly basic channels which are likely to be supported // by any future SCS trucking game however more advanced application might want to at least warn the user if there @@ -632,6 +649,7 @@ SCSAPI_RESULT scs_telemetry_init(const scs_u32_t version, const scs_telemetry_in // cleared automatically so we can simply exit. log_print("FATAL: Unable to register event callbacks\n"); version_params->common.log(SCS_LOG_TYPE_error, "[teleshmem] Unable to register telemetry event callbacks."); + unlock_shm(); return SCS_RESULT_generic_error; } log_print("Registered event callbacks\n"); @@ -665,16 +683,16 @@ SCSAPI_RESULT scs_telemetry_init(const scs_u32_t version, const scs_telemetry_in if ( !channels_registered) { log_print("FATAL: Unable to register channel callbacks\n"); version_params->common.log(SCS_LOG_TYPE_error, "[teleshmem] Unable to register telemetry channel callbacks."); + unlock_shm(); return SCS_RESULT_generic_error; } log_print("Registered channel callbacks\n"); version_params->common.log(SCS_LOG_TYPE_message, "[teleshmem] Registered telemetry channel callbacks."); - // Set the structure with defaults. - last_timestamp = static_cast(-1); - // Initially the game is paused. - telemetry->paused = true; + // Last not least raise the alive flag: + telemetry->flags |= TELE_FLAG_ALIVE; unlock_shm(); + last_timestamp = static_cast(-1); return SCS_RESULT_ok; } -- 2.30.2