From: volpol Date: Fri, 7 Jun 2013 16:25:01 +0000 (+0000) Subject: Initial commitment X-Git-Url: https://git.packet-gain.de/?a=commitdiff_plain;h=4b60c661da172f29b828e852f8a863bc2d9c339e;p=hls.git Initial commitment --- 4b60c661da172f29b828e852f8a863bc2d9c339e diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..df3eb27 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,9 @@ +cmake_minimum_required(VERSION 2.6) +project(HLS) +include_directories(${HLS_SOURCE_DIR}) +add_definitions(-DUSE_TCP) +add_executable(hls curly.c itemq.c m3u8.c) +include(FindPkgConfig) +pkg_check_modules(CURL REQUIRED libcurl) +include_directories(${CURL_INCLUDE_DIR}) +target_link_libraries(hls ${CURL_LIBRARIES} ) diff --git a/curly.c b/curly.c new file mode 100644 index 0000000..8102010 --- /dev/null +++ b/curly.c @@ -0,0 +1,317 @@ +#include +#include +#include +#include +#include + +#include +#include "log.h" + +#ifdef USE_TCP + +#include +#include +#include +#include + +static int sock; +static int conn; +static int flag_conn; + +#endif + +static unsigned bufsize; +static unsigned delay; + +#define BSIZE 4096 + + +#ifdef USE_TCP + +static void simple_http(int sock) { + const char *RSP = + "HTTP/1.0 200 OK\r\nContent-Type: video/mp2t\r\nConnection: close\r\n\r\n\r\n"; + char drain[1024] = ""; + int err; + + WHOAMI; + //TODO POLL + do { + err = read(sock, drain, sizeof drain); + if (err <= 0) + return; + drain[err] = 0; + DPRINT("%s\n", drain); + } while (!strstr(drain, "\r\n\r\n")); + + send(sock, RSP, strlen(RSP), MSG_NOSIGNAL); + + DPRINT ("simple http response sent\n"); +} + +size_t fetch_send(void *ptr, size_t size, size_t nmemb, void *stream) { + size_t bytesize = size * nmemb; + + int sent; + + WHOAMI; + + if (conn < 0) + conn = accept(sock, NULL, NULL); + + if (conn >= 0 && !flag_conn) { + DPRINT("Connection accepted\n"); + flag_conn = 1; + simple_http(conn); + } + + if (flag_conn) { + sent = send(conn, ptr, bytesize, MSG_NOSIGNAL); + if (sent <= 0) { + DPRINT("Pious Teardown!\n"); + shutdown(conn, SHUT_RDWR); + close(conn); + conn = -1; + flag_conn = 0; + } + } + + if (!flag_conn) + usleep(delay); + //either limit speed further or burst at max speed filling clien't buffer + //send will block when it's full thus slowing download down +/* + else + if (sent==bufsize) usleep(delay/2); //burst to fill client's buffer +*/ + return bytesize; + +} + + +static int prepare_tcp_dream(const char *ip, unsigned short port) { + int err = -1; + struct sockaddr_in lob; + WHOAMI; + + sock = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP); + if (sock < 0) + goto ERR_SOCK; + + lob.sin_family = AF_INET; + lob.sin_port = htons(port); + lob.sin_addr.s_addr = inet_addr(ip); + + err = bind(sock, (const struct sockaddr*) &lob, sizeof lob); + + if (err < 0) + goto ERR_BIND; + + err = listen(sock, 5); + + if (err < 0) + goto ERR_LSTN; + + flag_conn = 0; + conn = -1; + err = 0; + + goto ERR_SUCC; + + ERR_LSTN: + ERR_BIND: + close(sock); + + ERR_SOCK: + ERR_SUCC: + return err; +} + +static void cleanup_tcp_dream(void){ + WHOAMI; + if (conn>=0){ + shutdown(conn, SHUT_RDWR); + close(conn); + conn = -1; + flag_conn = 0; + } + close(sock); +} + +#else + +static void cleanup_tcp_dream(void){} + +static void prepare_tcp_dream(void){} + +size_t fetch_send(void *ptr, size_t size, size_t nmemb, void *stream) { + return (size * nmemb); +} + +#endif + +int curly_init(void) { + CURLcode cc; + WHOAMI; + + cc = curl_global_init(CURL_GLOBAL_NOTHING); + if (cc != CURLE_OK) + return -1; + return 0; +} + +void curly_cleanup(void) { + WHOAMI; + curl_global_cleanup(); +} + +static CURL *sch; + +int curly_stream_init(const char *ip, unsigned short port) { + int err; + + WHOAMI; + + sch = curl_easy_init(); + bufsize = BSIZE; + +//TODO error handling + + err = prepare_tcp_dream(ip, port); + + return (sch != NULL && 0 == err); + +} + +void curly_stream_cleanup(void) { + WHOAMI; + + if (sch) + curl_easy_cleanup(sch); + + cleanup_tcp_dream(); + sch = NULL; +} + + +int curly_stream(const char *from_url, unsigned long duration) { + + CURLcode cc = CURLE_COULDNT_CONNECT; + int err = -1; + double abps, size; + + WHOAMI; + +#ifdef USE_TCP + //when we don't have a client don't actually stream anything + //but try to take just as long + time_t st = time(NULL); + + while (conn < 0 && time(NULL)-st +#include +#include +#include +#include +#include +#include +#include "curly.h" +#include "m3u8.h" +#include "log.h" + +#define PLS "/tmp/iq.m3u8" +#define QVC_MASTER_M3U8 "http://live1.qvc.jp/iPhone/QVC_PC.m3u8" +#define QVC_1500_M3U8 "http://live1.qvc.jp/iPhone/1501.m3u8" +#define QVC_800_M3U8 "http://live1.qvc.jp/iPhone/800.m3u8" + +struct item; + +static int in_game; +static struct item *head, *tail, *curr; +static int live; + +struct item { + char url[1024]; + unsigned int hash; + //will be set to 1 after streamed + int done; + // will be set to 1 when it can be purged from the list + // this is true when done is 1 and after the last refresh round + // this hash was no longer present in the m3u8 + int purge; + struct item *next; + unsigned long duration; + int is_streamlist; + int bandwidth; +}; + +static void *zalloc(size_t size) { + void *ptr; + + WHOAMI; + + ptr = malloc(size); + if (ptr) + memset(ptr, 0, size); + + return ptr; + +} + + +static void print_list(void) { + +#ifdef DEBUG + struct item *n = head; + int i = 0; + + WHOAMI; + + while (n != NULL) { + DPRINT("[%d] %s:%lu:%X:%d:%d\n", ++i, n->url, n->duration, n->hash, n->done, + n->purge); + n = n->next; + } +#endif + +} + +struct item *find_hash(int hash) { + + struct item *n = head; + + WHOAMI; + + while (n != NULL) { + if (n->hash == hash) + break; + n = n->next; + } + return n; +} + +static int compute_hash(const char *s) { + int hash = 0; + unsigned char bcc = 0; + + WHOAMI; + + while (*s) { + bcc ^= *s; + hash ^= bcc; + hash <<= 2; + s++; + } + return hash; +} + +static void purge_run(void) { + struct item *n; + + WHOAMI; + +//purge here + while (head && head->purge) { + DPRINT("Purging %s:%X:%d:%d\n", head->url, head->hash, head->done, + head->purge); + n = head; + head = head->next; + free(n); + } + +} + +static void purge_prepare(void) { + struct item *n; + + WHOAMI; + + //mark done entries as ok to purge + //don't mark curr as ok to purge because we might + //still need it to advance to its next in emulate_stream() + for (n = head; n && n->done && n!=curr; n = n->next){ + n->purge = 1; + } + +} + +void emulate_stream(void) { + + int err; + + WHOAMI; + + if (!curr) { + //this should never happen, no reasonable way to recover + fprintf(stderr,"curr is NULL, bailing out\n"); + in_game = 0; + return; + } + + if (curr->done){ + if (curr->next){ + curr = curr->next; + } else { + //we end up here if we are faster than the server can produce + if (!live) { + in_game = 0; + } else { + DPRINT("sleeping\n"); + sleep(1); + } + return; + } + } + + do { + printf("Streaming %s (%lu)\n", curr->url, curr->duration); +#if 1 + err = curly_stream(curr->url, curr->duration); +#else + err = -1; curly_fake_stream(); +#endif + + if (err){ + DPRINT("Streaming failed\n"); + } + + } while (0); + + curr->done = 1; + +} + +int reload_m3u8(const char *from_file, const char *urlbase) { + + FILE *f; + char buf[1024]; + struct item *n; + struct item *p; + unsigned int thash; + unsigned long lval; + + + int plist,pid,bw; + + const char *v; + MTAG m; + int urloff; + + WHOAMI; + + + plist = pid = bw = 0; + + DPRINT("Reloading M3U8\n"); + f = fopen(PLS, "r"); + if (!f) + return -1; + + while (fgets(buf, sizeof buf, f)) { + + if (buf[strlen(buf) - 1] == '\n') + buf[strlen(buf) - 1] = 0; + if (buf[strlen(buf) - 1] == '\r') + buf[strlen(buf) - 1] = 0; + + m = m3u8_parse(buf, &v); + + if (M_EXTINF == m){ + m3u8_get_property(P_DURATION, v, &lval); + } + if (M_X_ENDLIST == m){ + printf("This is not a live stream!\n"); + live = 0; + } + if (M_X_STREAM_INF == m){ + //playlist entry detected; + plist = 1; + m3u8_get_property(P_BANDWIDTH, v, &lval); + bw = lval; + m3u8_get_property(P_PROGRAM_ID, v, &lval); + pid = lval; + printf ("PID: %d BW:%d\n",pid, bw); + } + + if (M_URL != m) continue; + + /* + position to add will be determined by the hash of new entry vs hashes already in the list + that is...all new hashes will be added at tail unless this entry's hash is already in the list + the entry is not then not added but the next new one will be added after this one + this will in degenerate into adding at tail if m3u8 refresh interval is too long (old entries purged) + */ + + thash = compute_hash(buf); + + if ((p = find_hash(thash))) { + //element is still in the newest m3u8 + //hold onto it some longer + p->purge = 0; + continue; + } + + n = zalloc(sizeof(struct item)); + n->hash = thash; + n->is_streamlist = plist; + n->bandwidth = bw; + + urloff = 0; + + if (strncmp(buf,"http://",7)){ + strcpy(n->url,urlbase); + urloff = strlen(urlbase); + } + + strcpy(n->url+urloff, buf); + n->duration = lval; + + if (!head) { + //XXX curr is always first, which is not what we want in certain scenarios + head = tail = curr = n; + } else { + if (!p || p == tail) { //hash not found + tail->next = n; + tail = n; + } else { + n->next = p->next; + p->next = n; + } + } + } + fclose(f); + return 0; +} + +const char *choose_url(void){ + struct item *n; + const char *u; + int bw = 0; + + WHOAMI; + + n = head; + u = head->url; + while (n){ + //choose first matching url, >= to choose last, but could lead to problems, allow user selection! + if (n->bandwidth>bw) {u = n->url; bw=n->bandwidth; } + n->done = 1; + n->purge = 1; + n=n->next; + } + + printf ("Final BW: %d | URL: %s\n",bw, u); + return u; +} + +static void reset_base(char *base, char *url){ + + WHOAMI; + + strcpy(base,url); + //TODO risky dice + *(strrchr(base, '/')+1) = 0; +} + +int main(int argc, char *argv[]) { + + time_t last, now; + + int refreshed; + int refresh_period = 20; + char url[1024]; + char base[1024]; + + //url = QVC_MASTER_M3U8; + + if (argc>1) + strcpy(url, argv[1]); + else + strcpy(url, QVC_MASTER_M3U8); + + printf ("Master URL: %s\n",url); + + reset_base(base, url); + + curly_init(); + curly_stream_init("0.0.0.0",1234); + + + last = now = 0; + head = tail = curr = NULL; + live = in_game = 1; + + + + while (in_game) { + now = time(NULL); + if (live && now-last>=refresh_period) { + DPRINT("Refreshing M3U8 after %d real seconds\n",(int)(last?now-last:0)); + refreshed = (0 == curly_refresh_m3u8(url, PLS)); + if (refreshed) { + last = now = time(NULL); + purge_prepare(); + if (0 == reload_m3u8(PLS, base)) { + refreshed = 0; + purge_run(); + print_list(); + + if (head->is_streamlist){ + strcpy (url, choose_url()); + reset_base(base, url); + curr = NULL; //so purge will not spare it + purge_run(); + last = 0; + continue; + } + } + } else { + DPRINT ("Could not load M3U8 from %s!\n",url); + sleep (1); + } + } + + emulate_stream(); + + + } + + curly_stream_cleanup(); + curly_cleanup(); + return 0; +} diff --git a/log.h b/log.h new file mode 100644 index 0000000..a994bd1 --- /dev/null +++ b/log.h @@ -0,0 +1,19 @@ +/* + * log.h + * + * Created on: Jul 5, 2012 + * Author: volpol + */ + +#ifndef LOG_H_ +#define LOG_H_ + +#ifdef DEBUG + #define DPRINT(fmt, args...) do { fprintf (stderr, "%s,%d : "fmt, __FUNCTION__, __LINE__, ##args); } while (0) +#else + #define DPRINT(...) +#endif + +#define WHOAMI DPRINT("\n"); + +#endif /* LOG_H_ */ diff --git a/m3u8.c b/m3u8.c new file mode 100644 index 0000000..5321969 --- /dev/null +++ b/m3u8.c @@ -0,0 +1,150 @@ +/* + * m3u8.c + * + * Created on: Jul 4, 2012 + * Author: volpol + */ +#include +#include +#include +#include + +#include + +#include "m3u8.h" +#include "log.h" + +#define M_EXT "#EXT" + +static struct tagmatch { + const char *tagname; + int taglen; + MTAG tagval; +} tm[] = { + { + .tagname = "M3U", + .taglen = 3, + .tagval = M_M3U, + }, + { + .tagname = "-X-STREAM-INF", + .taglen = 13, + .tagval = M_X_STREAM_INF, + }, + { + .tagname = "-X-TARGETDURATION", + .taglen = 17, + .tagval = M_X_TARGETDURATION, + }, + { + .tagname = "INF", + .taglen = 3, + .tagval = M_EXTINF, + }, + { + .tagname = "-X-ENDLIST", + .taglen = 10, + .tagval = M_X_ENDLIST, + }, + { + .tagname = NULL, + .taglen = 0, + .tagval = M_UNSUPPORTED, + } +}; + +MTAG m3u8_parse (const char *buf, const char **val){ + MTAG tag = M_URL; + *val = buf; + + int i = 999; + + WHOAMI; + + if (!buf || !*buf || isspace(*buf)) return M_UNSUPPORTED; + + + DPRINT ("Parsing...-%s-\n",buf); + + if (strncmp(buf,M_EXT,strlen(M_EXT)) == 0){ + tag = M_UNSUPPORTED; + for (i = 0; tm[i].tagname; i++){ + if (strncmp(buf+strlen(M_EXT),tm[i].tagname, tm[i].taglen) == 0){ + tag = tm[i].tagval; + break; + } + } + + } + + if (M_UNSUPPORTED != tag && M_URL!=tag) + *val = buf + +strlen(M_EXT) + tm[i].taglen; + + DPRINT ("Tag: %d, Set *val to %s\n",tag, *val); + + return tag; +} + +static unsigned long get_long(const char *s, char sep, int *r){ + char *n; + n = strchr(s, sep); + unsigned long l = 0; + + WHOAMI; + + if (n) { + s = n+1; + l = strtoul(s, &n, 10); + if (s!=n) *r = 0; + } + + return l; +} + +int m3u8_get_property(PROP p, const char *s, void *v){ + int err = -1; + char *n; + + WHOAMI; + + if (s) { + switch (p) { + + case P_PROGRAM_ID: + + n = strstr(s,"PROGRAM-ID="); + if (n){ + *((unsigned long*) v) = get_long(n, '=', &err); + } + break; + + + case P_BANDWIDTH: + n = strstr(s,"BANDWIDTH="); + + if (n){ + *((unsigned long*) v) = get_long(n, '=', &err); + } + break; + + + case P_DURATION: + case P_SEQUENCE: + + *((unsigned long*) v) = get_long(s, ':', &err); + + break; + + case P_TITLE: + n = strchr(s, ','); + if (n) { + v = n + 1; + err = 0; + } + break; + default: + break; + } + } + return err; +} diff --git a/m3u8.h b/m3u8.h new file mode 100644 index 0000000..37c6c8f --- /dev/null +++ b/m3u8.h @@ -0,0 +1,38 @@ +/* + * m3u8.h + * + * Created on: Jul 4, 2012 + * Author: volpol + */ + +#ifndef M3U8_H_ +#define M3U8_H_ + +typedef enum _MTAG { + M_M3U, + M_URL, + M_X_STREAM_INF, + M_X_TARGETDURATION, + M_X_VERSION, + M_EXTINF, + M_X_MEDIASEQUENCE, + M_X_BYTERANGE, + M_X_ENDLIST, + M_UNSUPPORTED, +} MTAG; + +typedef enum _PROP { + P_NONE, + P_PROGRAM_ID, + P_BANDWIDTH, + P_VERSION, + P_DURATION, + P_SEQUENCE, + P_TITLE, +} PROP; + +MTAG m3u8_parse (const char *buf, const char **val); +int m3u8_get_property(PROP p, const char *s, void *v); + + +#endif /* M3U8_H_ */