* Added shared memory sanity checks to telehttpd and telelogger.
authorUrban Wallasch <urban.wallasch@freenet.de>
Sat, 3 Aug 2019 16:19:58 +0000 (18:19 +0200)
committerUrban Wallasch <urban.wallasch@freenet.de>
Sat, 3 Aug 2019 16:19:58 +0000 (18:19 +0200)
* Added command line options to allow telehttpd and telelogger to try to re-attach to shared memory.
* Added flags and version fields to telemetry struct.
* Improved error handling in several places.
* Some minor fixes and improvements.

dash.html
shmget.c
tele2json.c
telehttpd.c
telelogger.c
telemetry.h
teleshmem.cpp

index 3cdfc956fddddc441c3caac0fad2d76657900723..305f0bc5c56f48d9c55f31a98b4a44a43fe98a59 100644 (file)
--- a/dash.html
+++ b/dash.html
@@ -74,10 +74,12 @@ x-bar {
 .pause {
   background-color:#fd0;
   color : #000;
+  display: none;
 }
 .error {
   background-color:#a00;
   color : #fff;
+  display: none;
 }
 
 </style>
@@ -236,6 +238,22 @@ function update_cb() {
   if (this.readyState == 4 && this.status == 200) {
     var tele = JSON.parse(this.responseText);
 
+    //// pause and error bar status:
+
+    if ( !(tele.tele_flags & 1) ) {  // TELE_FLAG_ALIVE
+      pausebar.style.display = "none";
+      errbar.style.display = "block";
+      errbar.innerHTML = "Game offline";
+      return;
+    }
+    else if ( tele.tele_version !== 1 ) {  // TELE_VERSION
+      errbar.style.display = "block";
+      errbar.innerHTML = "Version mismatch";
+    } else {
+      errbar.style.display = "none";
+    }
+    pausebar.style.display = tele.paused ? "block" : "none";
+
     //// "speedo" box
 
     // speedometer:
@@ -322,11 +340,6 @@ function update_cb() {
     source.innerHTML = tele.job_isvalid ? tele.job_source_city : '-';
     destination.innerHTML = tele.job_isvalid ? tele.job_destination_city : '-';
     reward.innerHTML = tele.job_isvalid ? tele.job_income + '.-' : '-';
-
-    //// pause and error bar status:
-
-    pausebar.style.display = tele.paused ? "block" : "none";
-    errbar.style.display = "none";
   }
   else if (this.readyState == 4) {
     // Show error status:
index 5700c2d77a4a915154b86899421395da92226d21..243f32f575c6d82b073856c0f1bbe0c88c77c5ae 100644 (file)
--- a/shmget.c
+++ b/shmget.c
@@ -23,22 +23,25 @@ static void *init_shm_(int key, size_t size, int fl1, int fl2)
 
     shmid = shmget(key, size, fl1);
     if ( 0 > shmid ) {
-        EPRINT( "shmget failed\n" );
+        EPRINT( "shmget: %s\n", strerror( errno ) );
         return NULL;
     }
+    DPRINT( "shmget ID is %d\n", shmid );
     shmhnd = shmat(shmid, NULL, fl2);
     if ( (void *)-1 == shmhnd ) {
-        EPRINT( "shmat failed\n" );
+        EPRINT( "shmat: %s\n", strerror( errno ) );
         return NULL;
     }
     return shmhnd;
 }
 
+// Called by plugin to create the shared memory segment:
 void *init_shmput(int key, size_t size)
 {
     return init_shm_( key, size, IPC_CREAT | IPC_EXCL | 0600, 0 );
 }
 
+// Called by clients to attach to existing shared memory segment:
 void *init_shmget(int key, size_t size) {
     return init_shm_( key, size, 0, SHM_RDONLY );
 }
@@ -49,10 +52,11 @@ int release_shm(int key, void *p)
 
     shmid = shmget(key, 0,  0);
     if ( 0 > shmid ) {
-        EPRINT( "shmget failed\n" );
+        EPRINT( "shmget: %s\n", strerror( errno ) );
         return -1;
     }
-    shmdt( p );
+    DPRINT( "shmget ID is %d\n", shmid );
+    // RMID will only succeed if we got the id via init_shmput()!
     shmctl( shmid, IPC_RMID, NULL );
-    return 0;
+    return shmdt( p );
 }
index 4a2f1196e4fe4005ae3d4a64870b50c012ee53b5..1c1cf2cd6e53694994bfdcf3e456e4e5fdfdded9 100644 (file)
@@ -5,10 +5,16 @@
 size_t tele2json( char *buf, size_t size, const struct telemetry_state_t *tele ) {
     size_t n = 0;
 
+    if ( NULL == tele )
+        return 0;
+
 #define CHKSIZE do{ if ( n >= size ) return 0; }while(0)
     CHKSIZE;
     n += snprintf( buf+n, size-n, "{\n" ); CHKSIZE;
 
+    n += snprintf( buf+n, size-n, "  \"tele_version\": %"PRIu32",\n", tele->version ); CHKSIZE;
+    n += snprintf( buf+n, size-n, "  \"tele_flags\": %"PRIu32",\n", tele->flags ); CHKSIZE;
+
     n += snprintf( buf+n, size-n, "  \"game_id\": \"%s\",\n", tele->game_id ); CHKSIZE;
     n += snprintf( buf+n, size-n, "  \"game_major_ver\": %u,\n", tele->game_major_ver ); CHKSIZE;
     n += snprintf( buf+n, size-n, "  \"game_minor_ver\": %u,\n", tele->game_minor_ver ); CHKSIZE;
index ede748d82fb96b435d063b8b44ca0da209feefa4..d81d474d2d9d345d982a7678244e580179ce3c99 100644 (file)
@@ -22,6 +22,7 @@
 #include "tele2json.h"
 
 static struct telemetry_state_t *telemetry;
+static struct telemetry_state_t telemetry0;
 
 struct {
     int port;
@@ -29,12 +30,19 @@ struct {
     char *host;
     char *origin;
     char *idxfile;
+    bool retry;
 } cfg = {
     8837,
     "*",
     "localhost",
     "http://localhost",
     "index.html",
+    false,
+};
+
+struct thread_data_t {
+    int sock;
+    struct telemetry_state_t tele;
 };
 
 static volatile int force_quit;
@@ -49,17 +57,60 @@ static void handle_signal(int sig) {
     }
 }
 
-enum respond_code {
-    r_index,
-    r_json,
-    r_none
-};
+static int init_shm( bool retry ) {
+    do {
+        DPRINT( "Trying to attach to shared memory ...\n" );
+        telemetry = init_shmget( TELE_SHM_KEY, sizeof *telemetry );
+        if ( NULL == telemetry ) {
+            if ( !retry )
+                return -1;
+            sleep(1);
+        }
+        else if ( TELE_VERSION != telemetry->version ) {
+            EPRINT( "telemetry version mismatch: got %u, want %u\n",
+                    (unsigned)telemetry->version, TELE_VERSION );
+            release_shm( TELE_SHM_KEY, telemetry );
+            telemetry = NULL;
+            if ( !retry )
+                return -1;
+            sleep(1);
+        }
+        if ( force_quit )
+            return -1;
+    } while ( NULL == telemetry  );
+    DPRINT( "have telemetry version %u\n", (unsigned)telemetry->version );
+    return 0;
+}
 
-static void respond(int fd, const char *req, int code) {
+static int check_shm( bool retry ) {
+    if ( NULL == telemetry )
+        return init_shm( retry );
+    if ( !(telemetry->flags & TELE_FLAG_ALIVE) ) {
+        release_shm( TELE_SHM_KEY, telemetry );
+        telemetry = NULL;
+        return init_shm( retry );
+    }
+    return 0;
+}
+
+static void respond( struct thread_data_t *td, const char *req ) {
     char buf[4096];
     char origin[256];
     char host[256];
     const char *s;
+    int fd = td->sock;
+    enum respond_code {
+        r_index,
+        r_json,
+        r_none
+    } code;
+
+    if ( 0 == strncmp( req, "GET /json HTTP", 14 ) )
+        code = r_json;
+    else if ( 0 == strncmp( req, "GET / HTTP", 10 ) )
+        code = r_index;
+    else
+        code = r_none;
 
     switch (code) {
     case r_json:
@@ -86,7 +137,7 @@ static void respond(int fd, const char *req, int code) {
             write(fd, buf, sprintf(buf, "Access-Control-Allow-Origin: %s\r\n", origin));
             write(fd, buf, sprintf(buf, "Content-type: text/json\r\n"));
             write(fd, buf, sprintf(buf, "Connection: close\r\n\r\n"));
-            write( fd, buf, tele2json(buf, sizeof buf, telemetry) );
+            write( fd, buf, tele2json(buf, sizeof buf, &td->tele) );
         }
         else {
             struct file_info_t fi;
@@ -161,27 +212,22 @@ static int rcv_request(int sock, char *buf, size_t size, int timeout) {
 }
 
 static void *handle_conn( void *p ) {
-    int sock = *(int*)p;
+    struct thread_data_t *td = p;
     char buf[2048];
 
-    DPRINT( "sock: %d\n", sock );
-    if ( 0 == rcv_request(sock, buf, sizeof buf, 1000) ) {
+    DPRINT( "sock: %d\n", td->sock );
+    if ( 0 == rcv_request(td->sock, buf, sizeof buf, 1000) ) {
         DPRINT( "request:\n%s", buf);
-        if (0 == strncmp(buf, "GET /json HTTP", 10))
-            respond(sock, buf, r_json);
-        else if (0 == strncmp(buf, "GET / HTTP", 10))
-            respond(sock, buf, r_index);
-        else
-            respond(sock, NULL, r_none);
+        respond( td, buf );
     }
-    net_close( sock );
-    free( p );
+    net_close( td->sock );
+    free( td );
     return NULL;
 }
 
 static int serve_http(void) {
     int as, ss;
-    int *ps;
+    struct thread_data_t *td;
     pthread_t tid;
     pthread_attr_t attr;
 
@@ -192,11 +238,26 @@ static int serve_http(void) {
     }
 
     while ( !force_quit ) {
-        if ( 0 < (as = net_accept( ss )) && NULL != (ps = malloc( sizeof *ps )) ) {
-            *ps = as;
-            pthread_attr_init(&attr);
-            pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
-            pthread_create(&tid, &attr, handle_conn, ps);
+        as = net_accept( ss );
+        if ( 0 != check_shm( false ) ) {
+            if ( !cfg.retry )
+                return -1;
+        }
+        if ( 0 < as ) {
+            if ( NULL != (td = malloc( sizeof *td )) ) {
+                td->sock = as;
+                memcpy( &td->tele, telemetry ? telemetry : &telemetry0, sizeof td->tele );
+                pthread_attr_init( &attr );
+                pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );
+                if ( 0 != pthread_create( &tid, &attr, handle_conn, td ) ) {
+                    net_close( td->sock );
+                    free( td );
+                }
+            }
+            else {
+                EPRINT( "malloc: %s\n", strerror(errno) );
+                net_close( as );
+            }
         }
     }
 
@@ -216,12 +277,13 @@ static void usage( const char *me )
         "  -i file  file to serve as index file; default: index.html\n"
         "  -o str   origin; default: http://localhost\n"
         "  -p N     TCP listen port; default: 8837\n"
+        "  -r       try to (re-)attach to shared memory\n"
     );
 }
 
 static int config( int argc, char *argv[] ) {
     int opt;
-    const char *ostr = "+b:h:i:o:p:";
+    const char *ostr = "+rb:h:i:o:p:";
 
     while ( -1 != ( opt = getopt( argc, argv, ostr ) ) ) {
         switch ( opt ) {
@@ -240,6 +302,9 @@ static int config( int argc, char *argv[] ) {
         case 'p':
             cfg.port = strtol(optarg, NULL, 10);
             break;
+        case 'r':
+            cfg.retry = true;
+            break;
         case ':':
             EPRINT( "Missing argument for option '%c'\n", optopt );
             usage( argv[0] );
@@ -259,13 +324,12 @@ static int config( int argc, char *argv[] ) {
 int main(int argc, char *argv[]) {
     signal(SIGTERM, handle_signal);
     signal(SIGINT,  handle_signal);
+    // We don't want do get killed by some thread writing to a stale socket:
+    signal(SIGPIPE, SIG_IGN);
 
     if ( 0 != config( argc, argv ) )
         exit( EXIT_FAILURE);
 
-    if ( NULL == ( telemetry = init_shmget( TELE_SHM_KEY, sizeof *telemetry ) ) )
-        exit( EXIT_FAILURE);
-
     if ( 0 != serve_http() )
         exit( EXIT_FAILURE);
 
index 2311ed467d5e9097013fe2346427c247f7a35046..721724e73e2e8cc38051bbc59c938c7b756f9fec 100644 (file)
@@ -20,10 +20,12 @@ struct {
     int count;
     int delay;
     bool pause;
+    bool retry;
 } cfg = {
     0,
     1000,
     true,
+    false,
 };
 
 static void msleep( int ms ) {
@@ -31,6 +33,40 @@ static void msleep( int ms ) {
         usleep( 999 );
 }
 
+static int init_shm( bool retry ) {
+    do {
+        DPRINT( "Trying to attach to shared memory ...\n" );
+        telemetry = init_shmget( TELE_SHM_KEY, sizeof *telemetry );
+        if ( NULL == telemetry ) {
+            if ( !retry )
+                return -1;
+            sleep(1);
+        }
+        else if ( TELE_VERSION != telemetry->version ) {
+            DPRINT( "telemetry version mismatch: got %u, want %u\n",
+                    (unsigned)telemetry->version, TELE_VERSION );
+            if ( !retry )
+                return -1;
+            release_shm( TELE_SHM_KEY, telemetry );
+            telemetry = NULL;
+            sleep(1);
+        }
+    } while ( NULL == telemetry );
+
+    return 0;
+}
+
+static int check_shm( bool retry ) {
+    if ( NULL == telemetry )
+        return init_shm( retry );
+    if ( !(telemetry->flags & TELE_FLAG_ALIVE) ) {
+        release_shm( TELE_SHM_KEY, telemetry );
+        telemetry = NULL;
+        return init_shm( retry );
+    }
+    return 0;
+}
+
 static int log_console( int cnt, int delay ) {
     char buf[4096];
     bool forever = !cnt;
@@ -38,17 +74,26 @@ static int log_console( int cnt, int delay ) {
     uint64_t last_timestamp = telemetry->timestamp;
 
     while ( forever || cnt-- > 0 ) {
+        if ( 0 != check_shm( cfg.retry ) )
+            return -1;
+
         if ( tele2json( buf, sizeof buf, telemetry ) > 0 )
             puts( buf );
 
         if ( forever || cnt > 0 ) {
-            if ( last_paused && cfg.pause ) {
-                while ( telemetry->paused )
+            if ( last_paused && telemetry->paused && cfg.pause ) {
+                while ( telemetry->paused ) {
+                    if ( 0 != check_shm( cfg.retry ) )
+                        return -1;
                     usleep( 10000 );
+                }
             }
-            else if ( last_timestamp == telemetry->timestamp  && cfg.pause ) {
-                while ( last_timestamp == telemetry->timestamp )
+            else if ( last_timestamp == telemetry->timestamp && cfg.pause ) {
+                while ( last_timestamp == telemetry->timestamp ) {
+                    if ( 0 != check_shm( cfg.retry ) )
+                        return -1;
                     msleep( delay );
+                }
             }
             else
                 msleep( delay );
@@ -71,12 +116,13 @@ static void usage( const char *me )
         "  -h      this help page\n"
         "  -n N    number of cycles; default: 0 (unlimited)\n"
         "  -p      ignore pause condition\n"
+        "  -r      try to (re-)attach to shared memory\n"
     );
 }
 
 static int config( int argc, char *argv[] ) {
     int opt;
-    const char *ostr = "+hpd:n:";
+    const char *ostr = "+hprd:n:";
 
     while ( -1 != ( opt = getopt( argc, argv, ostr ) ) ) {
         switch ( opt ) {
@@ -89,6 +135,9 @@ static int config( int argc, char *argv[] ) {
         case 'p':
             cfg.pause = false;
             break;
+        case 'r':
+            cfg.retry = true;
+            break;
         case ':':
             EPRINT( "Missing argument for option '%c'\n", optopt );
             usage( argv[0] );
@@ -109,7 +158,7 @@ static int config( int argc, char *argv[] ) {
 int main(int argc, char *argv[]) {
     if ( 0 != config( argc, argv ) )
         exit( EXIT_FAILURE);
-    if ( NULL == ( telemetry = init_shmget( TELE_SHM_KEY, sizeof *telemetry ) ) )
+    if ( 0 != init_shm( cfg.retry ) )
         exit( EXIT_FAILURE);
     if ( 0 != log_console( cfg.count, cfg.delay ) )
         exit( EXIT_FAILURE);
index 19286f56bfb502cb41baddd9e6ec2ef8535b33dc..90d558ad9b40cbeec42e629d7bf08a605609f2da 100644 (file)
@@ -8,7 +8,14 @@
 
 #define TELE_STRLEN 30
 
+#define TELE_VERSION    1
+
+#define TELE_FLAG_ALIVE 0x0001U
+
 struct telemetry_state_t {
+    uint32_t version;
+    uint32_t flags;
+
     char game_id[10];
     unsigned game_major_ver;
     unsigned game_minor_ver;
index 9d1fc55690928d2763f4dbe49040beb3295add20..10e7c90b26c7c13b6d6cfe558ca851e892c84d13 100644 (file)
@@ -48,13 +48,17 @@ static bool init_shm(void)
         p = init_shmput( TELE_SHM_KEY, sizeof *telemetry );
         telemetry = static_cast<struct telemetry_state_t *>(p);
     }
-    assert( NULL != telemetry );
+    if ( NULL != telemetry ) {
+        telemetry->version = TELE_VERSION;
+        telemetry->flags |= TELE_FLAG_ALIVE;
+    }
     return NULL != telemetry;
 }
 
 static void drop_shm(void)
 {
     if ( NULL != telemetry ) {
+        telemetry->flags &= ~TELE_FLAG_ALIVE;
         release_shm( TELE_SHM_KEY, telemetry );
         telemetry = NULL;
     }