From: Urban Wallasch Date: Mon, 19 Aug 2019 10:05:08 +0000 (+0200) Subject: * Added telehelper module providing locked read access to shared memory. X-Git-Tag: v0.1.0~17 X-Git-Url: https://git.packet-gain.de/?a=commitdiff_plain;h=919281d5ad4382f651866c2ac2a745d3d1e18ed2;p=ets2_tele.git * Added telehelper module providing locked read access to shared memory. --- diff --git a/Makefile b/Makefile index 61ab024..d70e3ed 100644 --- a/Makefile +++ b/Makefile @@ -26,8 +26,8 @@ endif COMMON_HDR := telemetry.h PLUGIN_SRC := teleshmem.cpp shmget.c -HTTPD_OBJ := telehttpd.o shmget.o net.o fserv.o tele2json.o -LOGGER_OBJ := telelogger.o shmget.o tele2json.o +HTTPD_OBJ := telehttpd.o shmget.o net.o fserv.o tele2json.o telehelper.o +LOGGER_OBJ := telelogger.o shmget.o tele2json.o telehelper.o .PHONY: all clean diff --git a/telehelper.c b/telehelper.c new file mode 100644 index 0000000..c7eb790 --- /dev/null +++ b/telehelper.c @@ -0,0 +1,80 @@ +// Needed for pthread_rwlock* +#define _XOPEN_SOURCE 500 + +#include + +#include "log.h" +#include "telehelper.h" + + +uint32_t tele_version( volatile struct telemetry_state_t *tele ) { + int e; + uint32_t ver = -1; + + e = pthread_rwlock_rdlock( (pthread_rwlock_t *)&tele->rwlock ); + if ( 0 == e ) { + ver = tele->version; + pthread_rwlock_unlock( (pthread_rwlock_t *)&tele->rwlock ); + } + else + EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); + return ver; +} + +uint32_t tele_flags( volatile struct telemetry_state_t *tele ) { + int e; + uint32_t flags = 0; + + e = pthread_rwlock_rdlock( (pthread_rwlock_t *)&tele->rwlock ); + if ( 0 == e ) { + flags = tele->flags; + pthread_rwlock_unlock( (pthread_rwlock_t *)&tele->rwlock ); + } + else + EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); + return flags; +} + +uint64_t tele_timestamp( volatile struct telemetry_state_t *tele ) { + int e; + uint64_t ts = -1ULL; + + e = pthread_rwlock_rdlock( (pthread_rwlock_t *)&tele->rwlock ); + if ( 0 == e ) { + ts = tele->timestamp; + pthread_rwlock_unlock( (pthread_rwlock_t *)&tele->rwlock ); + } + else + EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); + return ts; +} + +bool tele_paused( volatile struct telemetry_state_t *tele ) { + int e; + bool paused = true; + + e = pthread_rwlock_rdlock( (pthread_rwlock_t *)&tele->rwlock ); + if ( 0 == e ) { + paused = tele->paused; + pthread_rwlock_unlock( (pthread_rwlock_t *)&tele->rwlock ); + } + else + EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); + return paused; +} + +int tele_cpy( void *dst, volatile struct telemetry_state_t *tele ) { + if ( tele ) { + int e; + e = pthread_rwlock_rdlock( (pthread_rwlock_t *)&tele->rwlock ); + if ( 0 == e ) { + memcpy( dst, (void *)tele, sizeof *tele ); + pthread_rwlock_unlock( (pthread_rwlock_t *)&tele->rwlock ); + return 0; + } + EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); + } + memset( dst, 0, sizeof *tele ); + return -1; +} + diff --git a/telehelper.h b/telehelper.h new file mode 100644 index 0000000..aa16765 --- /dev/null +++ b/telehelper.h @@ -0,0 +1,29 @@ +#ifndef TELEHELPER_H_ +#define TELEHELPER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +#include "telemetry.h" + + +extern uint32_t tele_version( volatile struct telemetry_state_t *tele ); + +extern uint32_t tele_flags( volatile struct telemetry_state_t *tele ); + +extern uint64_t tele_timestamp( volatile struct telemetry_state_t *tele ); + +extern bool tele_paused( volatile struct telemetry_state_t *tele ); + +extern int tele_cpy( void *dst, volatile struct telemetry_state_t *tele ); + + +#ifdef __cplusplus +} +#endif + +#endif /* TELEHELPER_H_ */ diff --git a/telehttpd.c b/telehttpd.c index 1c9571b..6f9e843 100644 --- a/telehttpd.c +++ b/telehttpd.c @@ -20,6 +20,7 @@ #include "shmget.h" #include "telemetry.h" #include "tele2json.h" +#include "telehelper.h" struct thread_data_t { int sock; @@ -57,6 +58,8 @@ static void handle_signal(int sig) { } static int init_shm( bool retry ) { + uint32_t ver; + do { DPRINT( "Trying to attach to shared memory ...\n" ); telemetry = init_shmget( TELE_SHM_KEY, sizeof *telemetry ); @@ -65,9 +68,9 @@ static int init_shm( bool retry ) { return -1; sleep(1); } - else if ( TELE_VERSION != telemetry->version ) { + else if ( TELE_VERSION != ( ver = tele_version( telemetry ) ) ) { EPRINT( "telemetry version mismatch: got %u, want %u\n", - (unsigned)telemetry->version, TELE_VERSION ); + (unsigned)ver, TELE_VERSION ); release_shm( TELE_SHM_KEY, (void *)telemetry ); telemetry = NULL; if ( !retry ) @@ -76,7 +79,7 @@ static int init_shm( bool retry ) { } if ( force_quit ) return -1; - } while ( NULL == telemetry ); + } while ( NULL == telemetry ); DPRINT( "have telemetry version %u\n", (unsigned)telemetry->version ); return 0; } @@ -84,7 +87,7 @@ static int init_shm( bool retry ) { static int check_shm( bool retry ) { if ( NULL == telemetry ) return init_shm( retry ); - if ( !(telemetry->flags & TELE_FLAG_ALIVE) ) { + if ( !( tele_flags( telemetry ) & TELE_FLAG_ALIVE ) ) { release_shm( TELE_SHM_KEY, (void *)telemetry ); telemetry = NULL; return init_shm( retry ); @@ -276,19 +279,8 @@ static int serve_http(void) { if ( 0 < as ) { if ( NULL != (td = malloc( sizeof *td )) ) { td->sock = as; - if ( telemetry ) { - int e = pthread_rwlock_rdlock( (pthread_rwlock_t *)&telemetry->rwlock ); - if ( 0 != e ) { - EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); - return -1; - } - memcpy( &td->tele, (void *)telemetry, sizeof td->tele ); - pthread_rwlock_unlock( (pthread_rwlock_t *)&telemetry->rwlock ); - } - else { + if ( 0 != tele_cpy( &td->tele, telemetry ) ) DPRINT( "No telemetry, serving dummy data\n" ); - memset( &td->tele, 0, sizeof td->tele ); - } pthread_attr_init( &attr ); pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED ); if ( 0 != pthread_create( &tid, &attr, handle_conn, td ) ) { diff --git a/telelogger.c b/telelogger.c index 8223e8d..8cb523c 100644 --- a/telelogger.c +++ b/telelogger.c @@ -14,6 +14,7 @@ #include "shmget.h" #include "telemetry.h" #include "tele2json.h" +#include "telehelper.h" static volatile struct telemetry_state_t *telemetry; @@ -38,6 +39,8 @@ static void msleep( int ms ) { } static int init_shm( bool retry ) { + uint32_t ver; + do { DPRINT( "Trying to attach to shared memory ...\n" ); telemetry = init_shmget( TELE_SHM_KEY, sizeof *telemetry ); @@ -46,9 +49,9 @@ static int init_shm( bool retry ) { return -1; sleep(1); } - else if ( TELE_VERSION != telemetry->version ) { + else if ( TELE_VERSION != ( ver = tele_version( telemetry ) ) ) { DPRINT( "telemetry version mismatch: got %u, want %u\n", - (unsigned)telemetry->version, TELE_VERSION ); + (unsigned)ver, TELE_VERSION ); if ( !retry ) return -1; release_shm( TELE_SHM_KEY, (void *)telemetry ); @@ -65,7 +68,7 @@ static int init_shm( bool retry ) { static int check_shm( bool retry ) { if ( NULL == telemetry ) return init_shm( retry ); - if ( !(telemetry->flags & TELE_FLAG_ALIVE) ) { + if ( !( tele_flags( telemetry ) & TELE_FLAG_ALIVE ) ) { release_shm( TELE_SHM_KEY, (void *)telemetry ); telemetry = NULL; return init_shm( retry ); @@ -74,37 +77,29 @@ static int check_shm( bool retry ) { } static int log_console( int cnt, int delay ) { - int e; char buf[4096]; bool forever = !cnt; struct telemetry_state_t tele; - last_timestamp = telemetry->timestamp; + last_timestamp = tele_timestamp( telemetry ); while ( forever || cnt-- > 0 ) { if ( 0 != check_shm( cfg.retry ) ) return -1; - - e = pthread_rwlock_rdlock( (pthread_rwlock_t *)&telemetry->rwlock ); - if ( 0 != e ) { - EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) ); + if ( 0 != tele_cpy( &tele, telemetry ) ) return -1; - } - memcpy( &tele, (void *)telemetry, sizeof tele ); - pthread_rwlock_unlock( (pthread_rwlock_t *)&telemetry->rwlock ); - if ( tele2json( buf, sizeof buf, &tele ) > 0 ) puts( buf ); if ( forever || cnt > 0 ) { - if ( last_paused && telemetry->paused && cfg.pause ) { - while ( last_paused && telemetry->paused ) { + if ( last_paused && tele_paused( telemetry ) && cfg.pause ) { + while ( last_paused && tele_paused( telemetry ) ) { if ( 0 != check_shm( cfg.retry ) ) return -1; usleep( 10000 ); } } - else if ( last_timestamp == telemetry->timestamp && cfg.pause ) { - while ( last_timestamp == telemetry->timestamp ) { + else if ( last_timestamp == tele_timestamp( telemetry ) && cfg.pause ) { + while ( last_timestamp == tele_timestamp( telemetry ) ) { if ( 0 != check_shm( cfg.retry ) ) return -1; msleep( delay ); @@ -112,8 +107,8 @@ static int log_console( int cnt, int delay ) { } else msleep( delay ); - last_paused = telemetry->paused; - last_timestamp = telemetry->timestamp; + last_paused = tele_paused( telemetry ); + last_timestamp = tele_timestamp( telemetry ); } } return 0;