From 4b60c661da172f29b828e852f8a863bc2d9c339e Mon Sep 17 00:00:00 2001 From: volpol Date: Fri, 7 Jun 2013 16:25:01 +0000 Subject: [PATCH 1/1] Initial commitment --- CMakeLists.txt | 9 ++ curly.c | 317 +++++++++++++++++++++++++++++++++++++++++ curly.h | 20 +++ distclean.sh | 7 + itemq.c | 372 +++++++++++++++++++++++++++++++++++++++++++++++++ log.h | 19 +++ m3u8.c | 150 ++++++++++++++++++++ m3u8.h | 38 +++++ 8 files changed, 932 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 curly.c create mode 100644 curly.h create mode 100755 distclean.sh create mode 100644 itemq.c create mode 100644 log.h create mode 100644 m3u8.c create mode 100644 m3u8.h diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..df3eb27 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,9 @@ +cmake_minimum_required(VERSION 2.6) +project(HLS) +include_directories(${HLS_SOURCE_DIR}) +add_definitions(-DUSE_TCP) +add_executable(hls curly.c itemq.c m3u8.c) +include(FindPkgConfig) +pkg_check_modules(CURL REQUIRED libcurl) +include_directories(${CURL_INCLUDE_DIR}) +target_link_libraries(hls ${CURL_LIBRARIES} ) diff --git a/curly.c b/curly.c new file mode 100644 index 0000000..8102010 --- /dev/null +++ b/curly.c @@ -0,0 +1,317 @@ +#include +#include +#include +#include +#include + +#include +#include "log.h" + +#ifdef USE_TCP + +#include +#include +#include +#include + +static int sock; +static int conn; +static int flag_conn; + +#endif + +static unsigned bufsize; +static unsigned delay; + +#define BSIZE 4096 + + +#ifdef USE_TCP + +static void simple_http(int sock) { + const char *RSP = + "HTTP/1.0 200 OK\r\nContent-Type: video/mp2t\r\nConnection: close\r\n\r\n\r\n"; + char drain[1024] = ""; + int err; + + WHOAMI; + //TODO POLL + do { + err = read(sock, drain, sizeof drain); + if (err <= 0) + return; + drain[err] = 0; + DPRINT("%s\n", drain); + } while (!strstr(drain, "\r\n\r\n")); + + send(sock, RSP, strlen(RSP), MSG_NOSIGNAL); + + DPRINT ("simple http response sent\n"); +} + +size_t fetch_send(void *ptr, size_t size, size_t nmemb, void *stream) { + size_t bytesize = size * nmemb; + + int sent; + + WHOAMI; + + if (conn < 0) + conn = accept(sock, NULL, NULL); + + if (conn >= 0 && !flag_conn) { + DPRINT("Connection accepted\n"); + flag_conn = 1; + simple_http(conn); + } + + if (flag_conn) { + sent = send(conn, ptr, bytesize, MSG_NOSIGNAL); + if (sent <= 0) { + DPRINT("Pious Teardown!\n"); + shutdown(conn, SHUT_RDWR); + close(conn); + conn = -1; + flag_conn = 0; + } + } + + if (!flag_conn) + usleep(delay); + //either limit speed further or burst at max speed filling clien't buffer + //send will block when it's full thus slowing download down +/* + else + if (sent==bufsize) usleep(delay/2); //burst to fill client's buffer +*/ + return bytesize; + +} + + +static int prepare_tcp_dream(const char *ip, unsigned short port) { + int err = -1; + struct sockaddr_in lob; + WHOAMI; + + sock = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); + if (sock < 0) + goto ERR_SOCK; + + lob.sin_family = AF_INET; + lob.sin_port = htons(port); + lob.sin_addr.s_addr = inet_addr(ip); + + err = bind(sock, (const struct sockaddr*) &lob, sizeof lob); + + if (err < 0) + goto ERR_BIND; + + err = listen(sock, 5); + + if (err < 0) + goto ERR_LSTN; + + flag_conn = 0; + conn = -1; + err = 0; + + goto ERR_SUCC; + + ERR_LSTN: + ERR_BIND: + close(sock); + + ERR_SOCK: + ERR_SUCC: + return err; +} + +static void cleanup_tcp_dream(void){ + WHOAMI; + if (conn>=0){ + shutdown(conn, SHUT_RDWR); + close(conn); + conn = -1; + flag_conn = 0; + } + close(sock); +} + +#else + +static void cleanup_tcp_dream(void){} + +static void prepare_tcp_dream(void){} + +size_t fetch_send(void *ptr, size_t size, size_t nmemb, void *stream) { + return (size * nmemb); +} + +#endif + +int curly_init(void) { + CURLcode cc; + WHOAMI; + + cc = curl_global_init(CURL_GLOBAL_NOTHING); + if (cc != CURLE_OK) + return -1; + return 0; +} + +void curly_cleanup(void) { + WHOAMI; + curl_global_cleanup(); +} + +static CURL *sch; + +int curly_stream_init(const char *ip, unsigned short port) { + int err; + + WHOAMI; + + sch = curl_easy_init(); + bufsize = BSIZE; + +//TODO error handling + + err = prepare_tcp_dream(ip, port); + + return (sch != NULL && 0 == err); + +} + +void curly_stream_cleanup(void) { + WHOAMI; + + if (sch) + curl_easy_cleanup(sch); + + cleanup_tcp_dream(); + sch = NULL; +} + + +int curly_stream(const char *from_url, unsigned long duration) { + + CURLcode cc = CURLE_COULDNT_CONNECT; + int err = -1; + double abps, size; + + WHOAMI; + +#ifdef USE_TCP + //when we don't have a client don't actually stream anything + //but try to take just as long + time_t st = time(NULL); + + while (conn < 0 && time(NULL)-st +#include +#include +#include +#include +#include +#include +#include "curly.h" +#include "m3u8.h" +#include "log.h" + +#define PLS "/tmp/iq.m3u8" +#define QVC_MASTER_M3U8 "http://live1.qvc.jp/iPhone/QVC_PC.m3u8" +#define QVC_1500_M3U8 "http://live1.qvc.jp/iPhone/1501.m3u8" +#define QVC_800_M3U8 "http://live1.qvc.jp/iPhone/800.m3u8" + +struct item; + +static int in_game; +static struct item *head, *tail, *curr; +static int live; + +struct item { + char url[1024]; + unsigned int hash; + //will be set to 1 after streamed + int done; + // will be set to 1 when it can be purged from the list + // this is true when done is 1 and after the last refresh round + // this hash was no longer present in the m3u8 + int purge; + struct item *next; + unsigned long duration; + int is_streamlist; + int bandwidth; +}; + +static void *zalloc(size_t size) { + void *ptr; + + WHOAMI; + + ptr = malloc(size); + if (ptr) + memset(ptr, 0, size); + + return ptr; + +} + + +static void print_list(void) { + +#ifdef DEBUG + struct item *n = head; + int i = 0; + + WHOAMI; + + while (n != NULL) { + DPRINT("[%d] %s:%lu:%X:%d:%d\n", ++i, n->url, n->duration, n->hash, n->done, + n->purge); + n = n->next; + } +#endif + +} + +struct item *find_hash(int hash) { + + struct item *n = head; + + WHOAMI; + + while (n != NULL) { + if (n->hash == hash) + break; + n = n->next; + } + return n; +} + +static int compute_hash(const char *s) { + int hash = 0; + unsigned char bcc = 0; + + WHOAMI; + + while (*s) { + bcc ^= *s; + hash ^= bcc; + hash <<= 2; + s++; + } + return hash; +} + +static void purge_run(void) { + struct item *n; + + WHOAMI; + +//purge here + while (head && head->purge) { + DPRINT("Purging %s:%X:%d:%d\n", head->url, head->hash, head->done, + head->purge); + n = head; + head = head->next; + free(n); + } + +} + +static void purge_prepare(void) { + struct item *n; + + WHOAMI; + + //mark done entries as ok to purge + //don't mark curr as ok to purge because we might + //still need it to advance to its next in emulate_stream() + for (n = head; n && n->done && n!=curr; n = n->next){ + n->purge = 1; + } + +} + +void emulate_stream(void) { + + int err; + + WHOAMI; + + if (!curr) { + //this should never happen, no reasonable way to recover + fprintf(stderr,"curr is NULL, bailing out\n"); + in_game = 0; + return; + } + + if (curr->done){ + if (curr->next){ + curr = curr->next; + } else { + //we end up here if we are faster than the server can produce + if (!live) { + in_game = 0; + } else { + DPRINT("sleeping\n"); + sleep(1); + } + return; + } + } + + do { + printf("Streaming %s (%lu)\n", curr->url, curr->duration); +#if 1 + err = curly_stream(curr->url, curr->duration); +#else + err = -1; curly_fake_stream(); +#endif + + if (err){ + DPRINT("Streaming failed\n"); + } + + } while (0); + + curr->done = 1; + +} + +int reload_m3u8(const char *from_file, const char *urlbase) { + + FILE *f; + char buf[1024]; + struct item *n; + struct item *p; + unsigned int thash; + unsigned long lval; + + + int plist,pid,bw; + + const char *v; + MTAG m; + int urloff; + + WHOAMI; + + + plist = pid = bw = 0; + + DPRINT("Reloading M3U8\n"); + f = fopen(PLS, "r"); + if (!f) + return -1; + + while (fgets(buf, sizeof buf, f)) { + + if (buf[strlen(buf) - 1] == '\n') + buf[strlen(buf) - 1] = 0; + if (buf[strlen(buf) - 1] == '\r') + buf[strlen(buf) - 1] = 0; + + m = m3u8_parse(buf, &v); + + if (M_EXTINF == m){ + m3u8_get_property(P_DURATION, v, &lval); + } + if (M_X_ENDLIST == m){ + printf("This is not a live stream!\n"); + live = 0; + } + if (M_X_STREAM_INF == m){ + //playlist entry detected; + plist = 1; + m3u8_get_property(P_BANDWIDTH, v, &lval); + bw = lval; + m3u8_get_property(P_PROGRAM_ID, v, &lval); + pid = lval; + printf ("PID: %d BW:%d\n",pid, bw); + } + + if (M_URL != m) continue; + + /* + position to add will be determined by the hash of new entry vs hashes already in the list + that is...all new hashes will be added at tail unless this entry's hash is already in the list + the entry is not then not added but the next new one will be added after this one + this will in degenerate into adding at tail if m3u8 refresh interval is too long (old entries purged) + */ + + thash = compute_hash(buf); + + if ((p = find_hash(thash))) { + //element is still in the newest m3u8 + //hold onto it some longer + p->purge = 0; + continue; + } + + n = zalloc(sizeof(struct item)); + n->hash = thash; + n->is_streamlist = plist; + n->bandwidth = bw; + + urloff = 0; + + if (strncmp(buf,"http://",7)){ + strcpy(n->url,urlbase); + urloff = strlen(urlbase); + } + + strcpy(n->url+urloff, buf); + n->duration = lval; + + if (!head) { + //XXX curr is always first, which is not what we want in certain scenarios + head = tail = curr = n; + } else { + if (!p || p == tail) { //hash not found + tail->next = n; + tail = n; + } else { + n->next = p->next; + p->next = n; + } + } + } + fclose(f); + return 0; +} + +const char *choose_url(void){ + struct item *n; + const char *u; + int bw = 0; + + WHOAMI; + + n = head; + u = head->url; + while (n){ + //choose first matching url, >= to choose last, but could lead to problems, allow user selection! + if (n->bandwidth>bw) {u = n->url; bw=n->bandwidth; } + n->done = 1; + n->purge = 1; + n=n->next; + } + + printf ("Final BW: %d | URL: %s\n",bw, u); + return u; +} + +static void reset_base(char *base, char *url){ + + WHOAMI; + + strcpy(base,url); + //TODO risky dice + *(strrchr(base, '/')+1) = 0; +} + +int main(int argc, char *argv[]) { + + time_t last, now; + + int refreshed; + int refresh_period = 20; + char url[1024]; + char base[1024]; + + //url = QVC_MASTER_M3U8; + + if (argc>1) + strcpy(url, argv[1]); + else + strcpy(url, QVC_MASTER_M3U8); + + printf ("Master URL: %s\n",url); + + reset_base(base, url); + + curly_init(); + curly_stream_init("0.0.0.0",1234); + + + last = now = 0; + head = tail = curr = NULL; + live = in_game = 1; + + + + while (in_game) { + now = time(NULL); + if (live && now-last>=refresh_period) { + DPRINT("Refreshing M3U8 after %d real seconds\n",(int)(last?now-last:0)); + refreshed = (0 == curly_refresh_m3u8(url, PLS)); + if (refreshed) { + last = now = time(NULL); + purge_prepare(); + if (0 == reload_m3u8(PLS, base)) { + refreshed = 0; + purge_run(); + print_list(); + + if (head->is_streamlist){ + strcpy (url, choose_url()); + reset_base(base, url); + curr = NULL; //so purge will not spare it + purge_run(); + last = 0; + continue; + } + } + } else { + DPRINT ("Could not load M3U8 from %s!\n",url); + sleep (1); + } + } + + emulate_stream(); + + + } + + curly_stream_cleanup(); + curly_cleanup(); + return 0; +} diff --git a/log.h b/log.h new file mode 100644 index 0000000..a994bd1 --- /dev/null +++ b/log.h @@ -0,0 +1,19 @@ +/* + * log.h + * + * Created on: Jul 5, 2012 + * Author: volpol + */ + +#ifndef LOG_H_ +#define LOG_H_ + +#ifdef DEBUG + #define DPRINT(fmt, args...) do { fprintf (stderr, "%s,%d : "fmt, __FUNCTION__, __LINE__, ##args); } while (0) +#else + #define DPRINT(...) +#endif + +#define WHOAMI DPRINT("\n"); + +#endif /* LOG_H_ */ diff --git a/m3u8.c b/m3u8.c new file mode 100644 index 0000000..5321969 --- /dev/null +++ b/m3u8.c @@ -0,0 +1,150 @@ +/* + * m3u8.c + * + * Created on: Jul 4, 2012 + * Author: volpol + */ +#include +#include +#include +#include + +#include + +#include "m3u8.h" +#include "log.h" + +#define M_EXT "#EXT" + +static struct tagmatch { + const char *tagname; + int taglen; + MTAG tagval; +} tm[] = { + { + .tagname = "M3U", + .taglen = 3, + .tagval = M_M3U, + }, + { + .tagname = "-X-STREAM-INF", + .taglen = 13, + .tagval = M_X_STREAM_INF, + }, + { + .tagname = "-X-TARGETDURATION", + .taglen = 17, + .tagval = M_X_TARGETDURATION, + }, + { + .tagname = "INF", + .taglen = 3, + .tagval = M_EXTINF, + }, + { + .tagname = "-X-ENDLIST", + .taglen = 10, + .tagval = M_X_ENDLIST, + }, + { + .tagname = NULL, + .taglen = 0, + .tagval = M_UNSUPPORTED, + } +}; + +MTAG m3u8_parse (const char *buf, const char **val){ + MTAG tag = M_URL; + *val = buf; + + int i = 999; + + WHOAMI; + + if (!buf || !*buf || isspace(*buf)) return M_UNSUPPORTED; + + + DPRINT ("Parsing...-%s-\n",buf); + + if (strncmp(buf,M_EXT,strlen(M_EXT)) == 0){ + tag = M_UNSUPPORTED; + for (i = 0; tm[i].tagname; i++){ + if (strncmp(buf+strlen(M_EXT),tm[i].tagname, tm[i].taglen) == 0){ + tag = tm[i].tagval; + break; + } + } + + } + + if (M_UNSUPPORTED != tag && M_URL!=tag) + *val = buf + +strlen(M_EXT) + tm[i].taglen; + + DPRINT ("Tag: %d, Set *val to %s\n",tag, *val); + + return tag; +} + +static unsigned long get_long(const char *s, char sep, int *r){ + char *n; + n = strchr(s, sep); + unsigned long l = 0; + + WHOAMI; + + if (n) { + s = n+1; + l = strtoul(s, &n, 10); + if (s!=n) *r = 0; + } + + return l; +} + +int m3u8_get_property(PROP p, const char *s, void *v){ + int err = -1; + char *n; + + WHOAMI; + + if (s) { + switch (p) { + + case P_PROGRAM_ID: + + n = strstr(s,"PROGRAM-ID="); + if (n){ + *((unsigned long*) v) = get_long(n, '=', &err); + } + break; + + + case P_BANDWIDTH: + n = strstr(s,"BANDWIDTH="); + + if (n){ + *((unsigned long*) v) = get_long(n, '=', &err); + } + break; + + + case P_DURATION: + case P_SEQUENCE: + + *((unsigned long*) v) = get_long(s, ':', &err); + + break; + + case P_TITLE: + n = strchr(s, ','); + if (n) { + v = n + 1; + err = 0; + } + break; + default: + break; + } + } + return err; +} diff --git a/m3u8.h b/m3u8.h new file mode 100644 index 0000000..37c6c8f --- /dev/null +++ b/m3u8.h @@ -0,0 +1,38 @@ +/* + * m3u8.h + * + * Created on: Jul 4, 2012 + * Author: volpol + */ + +#ifndef M3U8_H_ +#define M3U8_H_ + +typedef enum _MTAG { + M_M3U, + M_URL, + M_X_STREAM_INF, + M_X_TARGETDURATION, + M_X_VERSION, + M_EXTINF, + M_X_MEDIASEQUENCE, + M_X_BYTERANGE, + M_X_ENDLIST, + M_UNSUPPORTED, +} MTAG; + +typedef enum _PROP { + P_NONE, + P_PROGRAM_ID, + P_BANDWIDTH, + P_VERSION, + P_DURATION, + P_SEQUENCE, + P_TITLE, +} PROP; + +MTAG m3u8_parse (const char *buf, const char **val); +int m3u8_get_property(PROP p, const char *s, void *v); + + +#endif /* M3U8_H_ */ -- 2.30.2