* Synchronized shared memory access with pthread read write lock.
authorUrban Wallasch <urban.wallasch@freenet.de>
Sun, 18 Aug 2019 13:06:36 +0000 (15:06 +0200)
committerUrban Wallasch <urban.wallasch@freenet.de>
Sun, 18 Aug 2019 13:06:36 +0000 (15:06 +0200)
* Bumped TELE_VERSION to 2.

Makefile
dash.html
index.html
shmget.c
tele2json.c
tele2json.h
telehttpd.c
telelogger.c
telemetry.h
teleshmem.cpp

index 1b868caabd9226eb2f2e37ddf2f37ecbc284f358..61ab0249bec3cb0c10a2149d056b76675c6f5be9 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -13,8 +13,8 @@ SDK_INCLUDES=\
   -Isdk/include/amtrucks/ \
   -Isdk/include/eurotrucks2
 
-CFLAGS=-Wall -Wextra -std=c99 -O2 -DDEBUG -I.
-CXXFLAGS=-Wall -O2 -DLOGGING -I.
+CFLAGS=-Wall -Wextra -std=c99 -O2 -DDEBUG -I. -pthread
+CXXFLAGS=-Wall -O2 -DLOGGING -I. -pthread
 
 UNAME:= $(shell uname -s)
 
@@ -40,7 +40,7 @@ telehttpd: $(HTTPD_OBJ) $(COMMON_HDR) $(SELF)
        $(CC) $(LDFLAGS) -o $@ -pthread $(HTTPD_OBJ)
 
 telelogger: $(LOGGER_OBJ) $(COMMON_HDR) $(SELF)
-       $(CC) $(LDFLAGS) -o $@ $(LOGGER_OBJ)
+       $(CC) $(LDFLAGS) -o $@ -pthread $(LOGGER_OBJ)
 
 %.o: %.c $(SELF)
        $(CC) -c $(CFLAGS) -o $*.o $*.c
index 9eb46080cded0493baf1f012838205104a52dcf2..ebd43f9bc94a2cce2cb9f5d6cf6e22b0b18e6f3c 100644 (file)
--- a/dash.html
+++ b/dash.html
@@ -319,7 +319,7 @@ function loadDoc() {
 }
 
 // Version and flag constants:
-const TELE_VERSION = 1;
+const TELE_VERSION = 2;
 const TELE_FLAG_ALIVE = 1;
 const MINUS_1_U32 = 4294967295;   // minus one as 32 bit unsigned int
 
index 3b8a81e345d41fbd525a648f17c634687b63060e..dfb98148978be15803ef2b4dd8c272f3c344840b 100644 (file)
@@ -269,7 +269,7 @@ function update_cb() {
       errbar.innerHTML = "Game offline";
       return;
     }
-    else if ( tele.tele_version !== 1 ) {  // TELE_VERSION
+    else if ( tele.tele_version !== 2 ) {  // TELE_VERSION
       errbar.style.display = "block";
       errbar.innerHTML = "Version mismatch";
     } else {
index 2500f1bd9d8f3b48c47e1849f1b9b3142d16718e..ccab5896b6684d245a1304e64847138031a48484 100644 (file)
--- a/shmget.c
+++ b/shmget.c
@@ -41,7 +41,7 @@ void *init_shmput(int key, size_t size) {
 
 // Called by clients to attach to existing shared memory segment:
 void *init_shmget(int key, size_t size) {
-    return init_shm_( key, size, 0, SHM_RDONLY );
+    return init_shm_( key, size, 0, 0 /*SHM_RDONLY*/ );
 }
 
 int release_shm(int key, void *p) {
index 752fbfeaa2ce6f131317e746bba41d5b0edcfb42..aee5bbde5257317b04f5a7261f18286db5f5d2ac 100644 (file)
@@ -1,12 +1,27 @@
+// Needed for pthread_rwlock*
+#define _POSIX_C_SOURCE 200809L
+
+#include <string.h>
 #include <inttypes.h>
 
 #include "tele2json.h"
+#include "log.h"
 
-size_t tele2json( char *buf, size_t size, const struct telemetry_state_t *tele ) {
+size_t tele2json( char *buf, size_t size, struct telemetry_state_t *ptele ) {
+    int e;
     size_t n = 0;
+    struct telemetry_state_t t_, *tele = &t_;
+
+    if ( NULL == ptele )
+        return 0;
 
-    if ( NULL == tele )
+    e = pthread_rwlock_rdlock( &ptele->rwlock );
+    if ( 0 != e ) {
+        EPRINT( "pthread_rwlock_rdlock: %s\n", strerror( e ) );
         return 0;
+    }
+    memcpy( tele, ptele, sizeof *tele );
+    pthread_rwlock_unlock( &ptele->rwlock );
 
 #define CHKSIZE do{ if ( n >= size ) return 0; }while(0)
     CHKSIZE;
index b36cc70557f93642a67fd8f7d3fb8a6fe8b516a4..38a9f2004900e7199b374bd8b7d1654b7168ae03 100644 (file)
@@ -9,7 +9,7 @@ extern "C" {
 
 #include "telemetry.h"
 
-extern size_t tele2json( char *buf, size_t size, const struct telemetry_state_t *tele );
+extern size_t tele2json( char *buf, size_t size, struct telemetry_state_t *tele );
 
 #ifdef __cplusplus
 }
index 9d31190b8f0668c101c77f11e9a37b0d59bf9128..a3c31f52656ce3614ae4164638fc89f01c458fa6 100644 (file)
@@ -1,5 +1,5 @@
-// Needed for getopt():
-#define _XOPEN_SOURCE
+// Needed for getopt() and pthread_rwlock*
+#define _XOPEN_SOURCE 500
 
 #include <stdio.h>
 #include <stdlib.h>
index 06927316a8cf02d2be8647794a19453913843b94..0fb02b2b47ca1dc98dc797a50c9d9ab7655b10fb 100644 (file)
@@ -1,4 +1,5 @@
-#define _XOPEN_SOURCE 500  // for usleep()
+// Needed for usleep(), getopt() and pthread_rwlock*
+#define _XOPEN_SOURCE 500
 
 #include <stdio.h>
 #include <stdlib.h>
index 0f626350c1710c16fa877fa77f179c70ab019ee1..9bf166f9c8ddf25c43df6bfd19627fda13b59ed9 100644 (file)
@@ -4,17 +4,21 @@
 #include <stdbool.h>
 #include <stdint.h>
 
+#include <pthread.h>
+
 #define TELE_SHM_KEY 0xecc11
 
 #define TELE_STRLEN 30
 
-#define TELE_VERSION    1
+#define TELE_VERSION    2
 
 #define TELE_FLAG_ALIVE 0x0001U
 
 typedef uint8_t bool_t;
 
 struct telemetry_state_t {
+    pthread_rwlock_t rwlock;
+
     uint32_t version;
     uint32_t flags;
 
index dac73ce3f4dfbfe4ef7ebf9a5283e5519d874d13..46f24823ff181f67a5478efc17d40f73eb161742 100644 (file)
@@ -17,6 +17,8 @@
 #include <assert.h>
 #include <string.h>
 
+#include <pthread.h>
+
 // SDK
 
 #include "scssdk_telemetry.h"
 
 #define UNUSED(x)
 
-/**
- * @brief Combined telemetry data and shared memory management.
- */
-
-#include "telemetry.h"
-#include "shmget.h"
-
-static struct telemetry_state_t *telemetry = NULL;
-
-static bool init_shm(void)
-{
-    if ( NULL == telemetry ) {
-        void *p;
-        p = init_shmput( TELE_SHM_KEY, sizeof *telemetry );
-        telemetry = static_cast<struct telemetry_state_t *>(p);
-    }
-    if ( NULL != telemetry ) {
-        telemetry->version = TELE_VERSION;
-        telemetry->flags |= TELE_FLAG_ALIVE;
-    }
-    return NULL != telemetry;
-}
-
-static void drop_shm(void)
-{
-    if ( NULL != telemetry ) {
-        telemetry->flags &= ~TELE_FLAG_ALIVE;
-        release_shm( TELE_SHM_KEY, telemetry );
-        telemetry = NULL;
-    }
-}
-
-/**
- * @brief Last timestamp we received.
- */
-scs_timestamp_t last_timestamp = static_cast<scs_timestamp_t>(-1);
-
-
 /**
  * @brief Logging to external file.
  */
@@ -212,6 +176,66 @@ static void log_named_value( const scs_named_value_t *nv ) {
 #endif // def LOGGING
 
 
+/**
+ * @brief Combined telemetry data and shared memory management.
+ */
+
+#include "telemetry.h"
+#include "shmget.h"
+
+static struct telemetry_state_t *telemetry = NULL;
+
+static inline int lock_shm(void)
+{
+    int e = pthread_rwlock_wrlock( &telemetry->rwlock );
+    if ( 0 != e )
+        log_print( "ERROR: Unable to acquire shared memory write lock\n" );
+    return e;
+}
+
+static inline int unlock_shm(void)
+{
+    int e = pthread_rwlock_unlock( &telemetry->rwlock );
+    if ( 0 != e )
+        log_print( "ERROR: Unable to release shared memory write lock\n" );
+    return e;
+}
+
+static bool init_shm(void)
+{
+    if ( NULL == telemetry ) {
+        void *p;
+        p = init_shmput( TELE_SHM_KEY, sizeof *telemetry );
+        telemetry = static_cast<struct telemetry_state_t *>(p);
+        if ( NULL != telemetry )
+            pthread_rwlock_init( &telemetry->rwlock, NULL );
+    }
+    if ( NULL != telemetry ) {
+        lock_shm();
+        telemetry->version = TELE_VERSION;
+        telemetry->flags |= TELE_FLAG_ALIVE;
+        unlock_shm();
+    }
+    return NULL != telemetry;
+}
+
+static void drop_shm(void)
+{
+    if ( NULL != telemetry ) {
+        lock_shm();
+        telemetry->flags &= ~TELE_FLAG_ALIVE;
+        unlock_shm();
+        release_shm( TELE_SHM_KEY, telemetry );
+        telemetry = NULL;
+    }
+}
+
+/**
+ * @brief Last timestamp we received.
+ */
+scs_timestamp_t last_timestamp = static_cast<scs_timestamp_t>(-1);
+
+
 /**
  * @brief Handling of individual events.
  */
@@ -237,16 +261,18 @@ SCSAPI_VOID telemetry_frame_start(const scs_event_t UNUSED(event), const void *c
         last_timestamp = 0;
     }
 
-    // Advance the timestamp by delta since last frame.
+    lock_shm();
 
+    // Advance the timestamp by delta since last frame.
     telemetry->timestamp += (info->paused_simulation_time - last_timestamp);
     last_timestamp = info->paused_simulation_time;
 
     // The raw values.
-
     telemetry->raw_rendering_timestamp = info->render_time;
     telemetry->raw_simulation_timestamp = info->simulation_time;
     telemetry->raw_paused_simulation_timestamp = info->paused_simulation_time;
+
+    unlock_shm();
 }
 
 SCSAPI_VOID telemetry_frame_end(const scs_event_t UNUSED(event), const void *const UNUSED(event_info), const scs_context_t UNUSED(context))
@@ -255,7 +281,9 @@ SCSAPI_VOID telemetry_frame_end(const scs_event_t UNUSED(event), const void *con
 
 SCSAPI_VOID telemetry_pause(const scs_event_t event, const void *const UNUSED(event_info), const scs_context_t UNUSED(context))
 {
+    lock_shm();
     telemetry->paused = (event == SCS_TELEMETRY_EVENT_paused);
+    unlock_shm();
 }
 
 SCSAPI_VOID telemetry_gameplay(const scs_event_t event, const void *const event_info, const scs_context_t UNUSED(context))
@@ -264,7 +292,9 @@ SCSAPI_VOID telemetry_gameplay(const scs_event_t event, const void *const event_
 
     if ( 0 == strcmp( info->id, SCS_TELEMETRY_GAMEPLAY_EVENT_job_cancelled )
       || 0 == strcmp( info->id, SCS_TELEMETRY_GAMEPLAY_EVENT_job_delivered ) ) {
+        lock_shm();
         telemetry->job_isvalid = false;
+        unlock_shm();
     }
 
     // Log the gameplay event.
@@ -286,6 +316,7 @@ SCSAPI_VOID telemetry_configuration(const scs_event_t event, const void *const e
 {
     const struct scs_telemetry_configuration_t *const info = static_cast<const scs_telemetry_configuration_t *>(event_info);
 
+    lock_shm();
     if ( 0 == strcmp( info->id, SCS_TELEMETRY_CONFIG_truck ) ) {
         /*
         * @li brand_id
@@ -392,6 +423,7 @@ SCSAPI_VOID telemetry_configuration(const scs_event_t event, const void *const e
             }
         }
     }
+    unlock_shm();
 
     // Log the configuration info.
     log_print("<%u> Configuration: %s\n", telemetry->game_time, info->id);
@@ -420,6 +452,7 @@ SCSAPI_VOID telemetry_store_dplacement(const scs_string_t name, const scs_u32_t
         tele->placement_isvalid = false;
         return;
     }
+    lock_shm();
     tele->placement_isvalid = true;
     tele->x = value->value_dplacement.position.x;
     tele->y = value->value_dplacement.position.y;
@@ -427,6 +460,7 @@ SCSAPI_VOID telemetry_store_dplacement(const scs_string_t name, const scs_u32_t
     tele->heading = value->value_dplacement.orientation.heading;
     tele->pitch = value->value_dplacement.orientation.pitch;
     tele->roll = value->value_dplacement.orientation.roll;
+    unlock_shm();
 }
 
 SCSAPI_VOID telemetry_store_double(const scs_string_t name, const scs_u32_t index, const scs_value_t *const value, const scs_context_t context)
@@ -437,7 +471,9 @@ SCSAPI_VOID telemetry_store_double(const scs_string_t name, const scs_u32_t inde
     assert(value);
     assert(value->type == SCS_VALUE_TYPE_double);
     assert(context);
+    lock_shm();
     *static_cast<double *>(context) = value->value_double.value;
+    unlock_shm();
 }
 
 SCSAPI_VOID telemetry_store_double_nz(const scs_string_t name, const scs_u32_t index, const scs_value_t *const value, const scs_context_t context)
@@ -448,8 +484,11 @@ SCSAPI_VOID telemetry_store_double_nz(const scs_string_t name, const scs_u32_t i
     assert(value);
     assert(value->type == SCS_VALUE_TYPE_double);
     assert(context);
-    if (value->value_double.value)
+    if (value->value_double.value) {
+        lock_shm();
         *static_cast<double *>(context) = value->value_double.value;
+        unlock_shm();
+    }
 }
 
 SCSAPI_VOID telemetry_store_s32(const scs_string_t name, const scs_u32_t index, const scs_value_t *const value, const scs_context_t context)
@@ -460,7 +499,9 @@ SCSAPI_VOID telemetry_store_s32(const scs_string_t name, const scs_u32_t index,
     assert(value);
     assert(value->type == SCS_VALUE_TYPE_s32);
     assert(context);
+    lock_shm();
     *static_cast<int32_t *>(context) = value->value_s32.value;
+    unlock_shm();
 }
 
 SCSAPI_VOID telemetry_store_u32(const scs_string_t name, const scs_u32_t index, const scs_value_t *const value, const scs_context_t context)
@@ -471,7 +512,9 @@ SCSAPI_VOID telemetry_store_u32(const scs_string_t name, const scs_u32_t index,
     assert(value);
     assert(value->type == SCS_VALUE_TYPE_u32);
     assert(context);
+    lock_shm();
     *static_cast<uint32_t *>(context) = value->value_u32.value;
+    unlock_shm();
 }
 
 SCSAPI_VOID telemetry_store_bool(const scs_string_t name, const scs_u32_t index, const scs_value_t *const value, const scs_context_t context)
@@ -482,7 +525,9 @@ SCSAPI_VOID telemetry_store_bool(const scs_string_t name, const scs_u32_t index,
     assert(value);
     assert(value->type == SCS_VALUE_TYPE_bool);
     assert(context);
+    lock_shm();
     *static_cast<bool *>(context) = value->value_bool.value;
+    unlock_shm();
 }
 
 /**
@@ -519,6 +564,7 @@ SCSAPI_RESULT scs_telemetry_init(const scs_u32_t version, const scs_telemetry_in
         version_params->common.log(SCS_LOG_TYPE_error, "[teleshmem] Unable to initialize shared memory");
         return SCS_RESULT_generic_error;
     }
+    lock_shm();
 
     // Check application version. Note that this example uses fairly basic channels which are likely to be supported
     // by any future SCS trucking game however more advanced application might want to at least warn the user if there
@@ -628,6 +674,7 @@ SCSAPI_RESULT scs_telemetry_init(const scs_u32_t version, const scs_telemetry_in
     last_timestamp = static_cast<scs_timestamp_t>(-1);
     // Initially the game is paused.
     telemetry->paused = true;
+    unlock_shm();
     return SCS_RESULT_ok;
 }