diff options
Diffstat (limited to 'dict')
-rw-r--r-- | dict/pdict-impl.h | 38 | ||||
-rw-r--r-- | dict/pdict.c | 500 | ||||
-rw-r--r-- | dict/pdict.h | 70 | ||||
-rw-r--r-- | dict/pdictclient.c | 1490 | ||||
-rw-r--r-- | dict/pdictclient.h | 128 |
5 files changed, 2226 insertions, 0 deletions
diff --git a/dict/pdict-impl.h b/dict/pdict-impl.h new file mode 100644 index 0000000..2d1d92b --- /dev/null +++ b/dict/pdict-impl.h @@ -0,0 +1,38 @@ +#ifndef _PDICT_IMPL_H_ +#define _PDICT_IMPL_H_ + +#include "pdict.h" + +struct pdict_listener; + +/* + * Represents a dictionary entry. + * key and value strings, with a plist of listeners + */ +struct pdict_ent { + const char *pde_key; + const char *pde_val; + plist_node_t *pde_listeners; +}; + +typedef struct pdict_listener { + pdl_notify_func_t pdl_notify; + void *pdl_arg; +} pdict_listener_t; + +typedef struct pdict_persistent_listener { + pdict_listener_t pdpl_l; + regex_t pdpl_regex; + int pdpl_new; +} pdict_persistent_listener_t; + +/* + * A Phidget Dictionary + * contains a ptree of entries and a plist of persistent listeners + */ +struct pdict { + ptree_node_t *pd_ents; + plist_node_t *pd_persistent_listeners; +}; + +#endif diff --git a/dict/pdict.c b/dict/pdict.c new file mode 100644 index 0000000..04ac84c --- /dev/null +++ b/dict/pdict.c @@ -0,0 +1,500 @@ +#include "../stdafx.h" +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <assert.h> +#include <regex.h> +#if defined(_WINDOWS) && !defined(_CYGWIN) +#include "wincompat.h" +#endif +#include "utils.h" +#include "plist.h" +#include "ptree.h" +#include "pdict-impl.h" + +typedef struct pdict_ent pdict_ent_t; +typedef int (*pdict_walk_int_func_t)(pdict_ent_t *pde, void *arg); + +static int _pdict_ent_add_persistent_change_listeners(pdict_t *pd, + pdict_ent_t *pde); +static int _pdict_ent_add_persistent_change_listener(pdict_ent_t *pde, + pdict_persistent_listener_t *pl); +static int _pdict_ent_remove_persistent_change_listener(pdict_ent_t *pde, + pdict_persistent_listener_t *pl); +static void _pdict_ent_notify(pdict_ent_t *pde, int reason, const char *ov); +static int _pdict_ent_add_change_listener(pdict_ent_t *pde, + pdl_notify_func_t notify, void *arg); +static int _pdict_walk_int(pdict_t *pd, pdict_walk_int_func_t w, void *arg); +static int _pdict_ent_remove_change_listener(pdict_ent_t *pde, + pdl_notify_func_t notify, void *a); + +pdict_t * +pdict_alloc(void) +{ + pdict_t *pd; + + if (!(pd = malloc(sizeof (*pd)))) + return 0; + memset(pd, 0, sizeof (*pd)); + + return pd; +} + +static int +pdecmp(const void *sv, const void *dv) +{ + return strcmp(*(char **)sv, *(char **)dv); +} + +static int +pdict_ent_remove_change_listeners_cb(const void *v, const void *v0, void *a) +{ + free((void *)v); v = NULL; + return (1); +} + +static int +pdict_ent_remove_change_listeners(pdict_ent_t *pde) +{ + plist_walk(pde->pde_listeners, pdict_ent_remove_change_listeners_cb, 0); + plist_clear(&pde->pde_listeners); + return 1; +} + +static int +pdict_ent_listeners_copy_cb(const void *v, const void *v0, void *a) +{ + pdict_listener_t *pdl = (pdict_listener_t *)v; + pdict_ent_t *pde_ent_copy = a; + + return _pdict_ent_add_change_listener(pde_ent_copy, pdl->pdl_notify, pdl->pdl_arg); +} + +static void +_pdict_ent_listeners_copy(pdict_ent_t *pde, pdict_ent_t *pde_copy) +{ + plist_walk(pde->pde_listeners, pdict_ent_listeners_copy_cb, pde_copy); +} + +/* + * Sets or resets an entry to the dictionary, returning 0 on failure. + */ +int +pdict_add(pdict_t *pd, const char *k, const char *v, const char **ovp) +{ + pdict_ent_t *n; + pdict_ent_t n_copy; + const char *ov; + + if (!(k = strdup(k))) + return 0; + if (!(v = strdup(v))) { + free((void *)k); k = NULL; + return 0; + } + memset(&n_copy, 0, sizeof(pdict_ent_t)); + if (ptree_contains((void *)&k, pd->pd_ents, pdecmp, (void *)&n)) { + free((void *)k); k = NULL; + ov = n->pde_val; + n->pde_val = v; + + if (ovp) + *ovp = ov; + else { + free((void *)ov); ov = NULL; + } + + //We copy n so that it's safe if it gets removed during a callback + if(n->pde_listeners) + { + n_copy.pde_key = strdup(n->pde_key); + n_copy.pde_val = strdup(n->pde_val); + _pdict_ent_listeners_copy(n, &n_copy); + _pdict_ent_notify(&n_copy, PDR_VALUE_CHANGED, ov); + pdict_ent_remove_change_listeners(&n_copy); + free((char *)n_copy.pde_key); + free((char *)n_copy.pde_val); + } + return 1; + } + if (!(n = malloc(sizeof (*n)))) { + free((void *)k); k = NULL; + free((void *)v); v = NULL; + return (0); + } + memset(n, 0, sizeof (*n)); + n->pde_key = k; + n->pde_val = v; + + //Add dict listeners to the dict entry object + if (!_pdict_ent_add_persistent_change_listeners(pd, n)) { + free((void *)k); k = NULL; + free((void *)v); v = NULL; + free(n); n = NULL; + return (0); + } + //Add the dict entry to the dict (replacing if there is a matching key). + if (!ptree_replace(n, &pd->pd_ents, pdecmp, NULL)) { + pdict_ent_remove_change_listeners(n); + free((void *)k); k = NULL; + free((void *)v); v = NULL; + free(n); n = NULL; + return (0); + } + //notify the listeners + if(n->pde_listeners) + { + n_copy.pde_key = strdup(n->pde_key); + n_copy.pde_val = strdup(n->pde_val); + _pdict_ent_listeners_copy(n, &n_copy); + _pdict_ent_notify(&n_copy, PDR_ENTRY_ADDED, n_copy.pde_val); + pdict_ent_remove_change_listeners(&n_copy); + free((char *)n_copy.pde_key); + free((char *)n_copy.pde_val); + } + + if (ovp) + *ovp = NULL; + + return 1; +} + +int +pdict_ent_remove(pdict_t *pd, const char *k, char **ovp) +{ + pdict_ent_t *n; + + pu_log(PUL_VERB, 0, "Removing in key pdict_ent_remove: %s", k); + + if (!ptree_remove((void *)&k, &pd->pd_ents, pdecmp, (void *)&n)) + { + //pu_log(PUL_INFO, 0, "Failed to remove key in pdict_ent_remove: %s", k); + return 0; + } + + _pdict_ent_notify(n, PDR_ENTRY_REMOVING, n->pde_val); + + if (ovp) + *ovp = (char *)n->pde_val; + else { + free((void *)n->pde_val); + } + free((void *)n->pde_key); + pdict_ent_remove_change_listeners(n); + free(n); + return 1; +} + +static int +pdict_ent_remove_persistent_change_listener_cb(const void *pl, const void *v, + void *a) +{ + _pdict_ent_remove_persistent_change_listener((pdict_ent_t *)a, + (pdict_persistent_listener_t *)pl); + return 1; +} + +static int +pdict_ent_add_persistent_change_listener_cb(const void *lid, const void *pl, + void *a) +{ + return _pdict_ent_add_persistent_change_listener((pdict_ent_t *)a, + (pdict_persistent_listener_t *)pl); +} + +static int +_pdict_ent_add_persistent_change_listeners(pdict_t *pd, pdict_ent_t *pde) +{ + if (!plist_walk(pd->pd_persistent_listeners, pdict_ent_add_persistent_change_listener_cb, pde)) { + plist_walk(pd->pd_persistent_listeners, pdict_ent_remove_persistent_change_listener_cb, pde); + pu_log(PUL_WARN, 0, "Failed to add persistent change listener in _pdict_ent_add_persistent_change_listeners."); + return 0; + } + return 1; +} + +static int +pdict_ent_remove_persistent_change_listener_dcb(pdict_ent_t *pde, void *pl) +{ + return _pdict_ent_remove_persistent_change_listener(pde, + (pdict_persistent_listener_t *)pl); +} + +static int +pdict_ent_add_persistent_change_listener_dcb(pdict_ent_t *pde, void *pl) +{ + return _pdict_ent_add_persistent_change_listener(pde, + (pdict_persistent_listener_t *)pl); +} + +static int +_pdict_ent_add_persistent_change_listener(pdict_ent_t *pde, + pdict_persistent_listener_t *pdpl) +{ + int res; + + if ((res = regexec(&pdpl->pdpl_regex, pde->pde_key, 0, NULL, 0)) != 0) + { + return res == REG_NOMATCH; + } + if (!_pdict_ent_add_change_listener(pde, pdpl->pdpl_l.pdl_notify, pdpl->pdpl_l.pdl_arg)) + { + pu_log(PUL_WARN, 0, "Failed to add persistent change listener in _pdict_ent_add_persistent_change_listener."); + return 0; + } + if (pdpl->pdpl_new) + pdpl->pdpl_l.pdl_notify(pde->pde_key, pde->pde_val, PDR_CURRENT_VALUE, NULL, pdpl->pdpl_l.pdl_arg); + return 1; +} + +static int +_pdict_ent_remove_persistent_change_listener(pdict_ent_t *pde, + pdict_persistent_listener_t *pl) +{ + _pdict_ent_remove_change_listener(pde, pl->pdpl_l.pdl_notify, + pl->pdpl_l.pdl_arg); + return 1; +} + +int +pdict_remove_persistent_change_listener(pdict_t *pd, int id) +{ + pdict_persistent_listener_t *pdpl; + + if (!plist_remove((void *)(size_t)id, &pd->pd_persistent_listeners, (void **)&pdpl) || !pdpl) + { + pu_log(PUL_WARN, 0, "Failed plist_remove in pdict_remove_persistent_change_listener."); + return 0; + } + if (!_pdict_walk_int(pd, pdict_ent_remove_persistent_change_listener_dcb, pdpl)) + { + pu_log(PUL_WARN, 0, "Failed _pdict_walk_int in pdict_remove_persistent_change_listener."); + return 0; + } + regfree(&pdpl->pdpl_regex); + free(pdpl); pdpl = NULL; + return 1; +} + +int +pdict_add_persistent_change_listener(pdict_t *pd, const char *kpat, + pdl_notify_func_t notify, void *arg) +{ + pdict_persistent_listener_t *pl; + static int lid = 1; + + if (!(pl = malloc(sizeof (*pl)))) + return 0; + memset(pl, 0, sizeof (*pl)); + pl->pdpl_l.pdl_notify = notify; + pl->pdpl_l.pdl_arg = arg; + if (regcomp(&pl->pdpl_regex, kpat, REG_EXTENDED | REG_NOSUB) != 0) { + // XXX todo: communicate error context is not libc + free(pl); pl = NULL; + pu_log(PUL_WARN, 0, "Failed regcomp in pdict_add_persistent_change_listener."); + return 0; + } + + plist_add((void *)(size_t)lid, pl, &pd->pd_persistent_listeners); + + pl->pdpl_new = 1; + if (!_pdict_walk_int(pd, + pdict_ent_add_persistent_change_listener_dcb, pl)) { + _pdict_walk_int(pd, + pdict_ent_remove_persistent_change_listener_dcb, pl); + plist_remove((void *)(size_t)lid, &pd->pd_persistent_listeners, NULL); + regfree(&pl->pdpl_regex); + free(pl); pl = NULL; + pu_log(PUL_WARN, 0, "Failed _pdict_walk_int in pdict_add_persistent_change_listener."); + return 0; + } + pl->pdpl_new = 0; + return lid++; +} + +/* + * Return whether a given key is in the dictionary. If the given + * entry pointer is non-NULL, set it to the entry. + */ +static int +_pdict_ent_lookup(pdict_t *pd, const char *k, pdict_ent_t **e) +{ + return ptree_contains((void *)&k, pd->pd_ents, pdecmp, (void **)e); +} + +int +pdict_ent_lookup(pdict_t *pd, const char *k, const char **v) +{ + pdict_ent_t *pde; + + if (_pdict_ent_lookup(pd, k, &pde)) { + if (v) + *v = strdup(pde->pde_val); + return 1; + } + return 0; +} + +static int +pdict_ent_remove_change_listener_cb(const void *k, const void *v, void *a) +{ + pdict_listener_t *l = (pdict_listener_t *)k; + void **arg = (void **)a; + + if (l->pdl_notify == (pdl_notify_func_t)arg[0] && l->pdl_arg == arg[1]) { + arg[2] = l; + return 0; + } + return 1; +} + +static int +_pdict_ent_remove_change_listener(pdict_ent_t *pde, pdl_notify_func_t notify, + void *a) +{ + void *arg[3]; + + arg[0] = (void *)notify; + arg[1] = a; + arg[2] = NULL; + plist_walk(pde->pde_listeners, pdict_ent_remove_change_listener_cb, arg); + if (arg[2]) { + plist_remove(arg[2], &pde->pde_listeners, NULL); + free(arg[2]); arg[2] = NULL; + return 1; + } + return 0; +} + +static int +_pdict_ent_add_change_listener(pdict_ent_t *pde, pdl_notify_func_t notify, + void *arg) +{ + pdict_listener_t *l; + + if (!(l = malloc(sizeof (*l)))) + return 0; + memset(l, 0, sizeof (*l)); + l->pdl_notify = notify; + l->pdl_arg = arg; + if (!plist_add(l, 0, &pde->pde_listeners)) { + free(l); l = NULL; + pu_log(PUL_WARN, 0, "Failed plist_add in _pdict_ent_add_change_listener."); + return 0; + } + + return 1; +} + +int +pdict_ent_add_change_listener(pdict_t *pd, const char *k, + pdl_notify_func_t notify, void *arg) +{ + pdict_ent_t *pde; + + if (!_pdict_ent_lookup(pd, k, &pde)) + return 0; + return _pdict_ent_add_change_listener(pde, notify, arg); +} + +typedef struct { + pdict_ent_t *penca_pde; + pdict_reason_t penca_reason; + const char *penca_ov; +} pdict_ent_notify_cb_args_t; + +static int +pdict_ent_notify_cb(const void *v, const void *v0, void *a) +{ + pdict_listener_t *pdl = (pdict_listener_t *)v; + pdict_ent_notify_cb_args_t *penca = a; + + pdl->pdl_notify(penca->penca_pde->pde_key, penca->penca_pde->pde_val, + penca->penca_reason, penca->penca_ov, pdl->pdl_arg); + return 1; +} + +static void +_pdict_ent_notify(pdict_ent_t *pde, int reason, const char *ov) +{ + pdict_ent_notify_cb_args_t penca = { pde, reason, ov }; + + plist_walk(pde->pde_listeners, pdict_ent_notify_cb, &penca); +} + +static ptree_walk_res_t +pdict_walk_int_cb(const void *v, int level, void *a, void *pwra) +{ + void **args = a; + + return ((pdict_walk_int_func_t)args[0])((pdict_ent_t *)v, args[1]); +} + +static ptree_walk_res_t +pdict_walk_cb(const void *v, int level, void *a, void *pwra) +{ + pdict_ent_t *pde = (pdict_ent_t *)v; + void **args = a; + + return ((pdict_walk_func_t)args[0])(pde->pde_key, pde->pde_val, + args[1]); +} + +static int +_pdict_walk_int(pdict_t *pd, pdict_walk_int_func_t w, void *arg) +{ + void *args[2] = { (void *)w, arg }; + + return ptree_walk(pd->pd_ents, PTREE_INORDER, pdict_walk_int_cb, pdecmp, args); +} + +int +pdict_walk(pdict_t *pd, pdict_walk_func_t w, void *arg) +{ + void *args[2] = { (void *)w, arg }; + + return ptree_walk(pd->pd_ents, PTREE_INORDER, pdict_walk_cb, NULL, args); +} + +int +pdict_ent_remove_change_listener(pdict_t *pd, const char *k, + pdl_notify_func_t nf, void *arg) +{ + pdict_ent_t *e; + int res; + + if (!(res = _pdict_ent_lookup(pd, k, &e))) + return 0; + return _pdict_ent_remove_change_listener(e, nf, arg); +} + +const char * +pdict_reason_str(pdict_reason_t r) +{ + switch (r) { + case PDR_VALUE_CHANGED: + return "changed"; + case PDR_ENTRY_ADDED: + return "added"; + case PDR_ENTRY_REMOVING: + return "removing"; + case PDR_CURRENT_VALUE: + return "current"; + default: + return "?"; + } +} + +pdict_reason_t +pdict_reason_from_str(const char *s) +{ + if (strcmp(s, "changed") == 0) + return PDR_VALUE_CHANGED; + if (strcmp(s, "current") == 0) + return PDR_CURRENT_VALUE; + if (strcmp(s, "added") == 0) + return PDR_ENTRY_ADDED; + if (strcmp(s, "removing") == 0) + return PDR_ENTRY_REMOVING; + return 0; +} diff --git a/dict/pdict.h b/dict/pdict.h new file mode 100644 index 0000000..bdd8065 --- /dev/null +++ b/dict/pdict.h @@ -0,0 +1,70 @@ +#ifndef _PDICT_H_ +#define _PDICT_H_ + +typedef enum { + PDR_VALUE_CHANGED = 1, + PDR_ENTRY_ADDED, + PDR_ENTRY_REMOVING, + PDR_CURRENT_VALUE +} pdict_reason_t; + +/* The _t stands for typedef? */ +typedef struct pdict pdict_t; + +typedef void (*pdl_notify_func_t)(const char *k, + const char *v, + pdict_reason_t r, + const char *pde_oldval, + void *arg); + +typedef int (*pdict_walk_func_t)(const char *k, + const char *v, + void *arg); + +pdict_t *pdict_alloc(void); + +int pdict_add(pdict_t *pd, + const char *k, + const char *v, + const char **ovp); + +int pdict_add_persistent_change_listener(pdict_t *pd, + const char *kpat, + pdl_notify_func_t, + void *); + +int pdict_walk(pdict_t *pd, + pdict_walk_func_t, + void *arg); + +/* + * Return whether a given key is in the dictionary. If v is set, it + * will be set to point to a copy of the value, which must be freed by + * the caller. + */ +int pdict_ent_lookup(pdict_t *pd, + const char *k, + const char **v); + +int pdict_ent_add_change_listener(pdict_t *pd, + const char *k, + pdl_notify_func_t, + void *); + +int pdict_ent_remove_change_listener(pdict_t *pd, + const char *k, + pdl_notify_func_t, + void *); + +int pdict_remove_persistent_change_listener(pdict_t *pd, + int id); + +int pdict_ent_remove(pdict_t *pd, + const char *k, + char **ovp); + +const char *pdict_reason_str(pdict_reason_t); + +pdict_reason_t pdict_reason_from_str(const char *); + +#endif diff --git a/dict/pdictclient.c b/dict/pdictclient.c new file mode 100644 index 0000000..987d6bc --- /dev/null +++ b/dict/pdictclient.c @@ -0,0 +1,1490 @@ +#include "../stdafx.h" +#include <ctype.h> +#include <stdlib.h> +#include <stdio.h> +#ifndef _MSC_EXTENSIONS +#include <unistd.h> +#endif +#include <assert.h> +#if !defined(_WINDOWS) || defined(_CYGWIN) +#include <pthread.h> +#include <signal.h> +#else +#include "wincompat.h" +#endif +#include "pdictclient.h" +#include "ptree.h" +#include "utils.h" +#include "md5.h" + +/* + * TODO XXX: + * - notification arguments and reason + */ + +#ifdef DEBUG_PROTOCOL +int debug_protocol = 0; +#endif + +#define ABORT() ((*(int *)0 = 1), abort()) +#define PENDING_PATTERN "report 200-lid([0-9]*) is pending, key (.*)" \ + " latest value \".*\" \\((.*)\\)" + +extern const char *ws_protocol_ver; + +typedef struct result { + int r_tag; + void (*r_notify)(pdc_session_t *, struct result *, int code, + int final, const char *line); + void *r_arg; +} result_t; + +struct pdc_session { + int pdcs_rfd; + int pdcs_wfd; + int (*pdcs_read)(int, void *, unsigned int, char *, int); + int (*pdcs_write)(int, const void *, unsigned int, char *, int); + int (*pdcs_close)(int, char *, int); + void (*pdcs_cleanup)(void *); + void *pdcs_cleanup_ptr; + ptree_node_t *pdcs_listeners; + char pdcs_readbuf[2048]; + int pdcs_bufcur; + int pdcs_buflen; + pthread_mutex_t pdcs_listeners_lock; + pthread_mutex_t pdcs_pending_lock; + ptree_node_t *pdcs_pending; + pthread_t pdcs_resultreader; + char pdcs_resultreader_errdesc[2048]; +}; + +typedef struct { + int l_id; + void (*l_cb)(const char *key, const char *val, unsigned int len, + pdict_reason_t, void *); + void *l_arg; +} listener_t; + +typedef struct { + pthread_mutex_t grcba_lock; + pthread_cond_t grcba_cv; + int grcba_code; + int grcba_desired; + char *grcba_line; +} getresult_cb_arg_t; + +typedef struct { + pthread_mutex_t grcba_lock; + pthread_cond_t grcba_cv; + int grcba_code; + int grcba_reply_len; + char *grcba_reply; + char *grcba_reply_ptr; +} jgetresult_cb_arg_t; + +static int events; +regex_t pendingex; +static int initialized; + +static void *read_results(void *); +static pdict_reason_t _pdict_reason_from_str(const char *s); +static int cmd_timeout = 10000; //ms + +/* + * Stops the read thread and frees the pdc session + */ +void pdc_session_free(pdc_session_t *pdcs) +{ + if(pdcs) + { + //make sure the thread is not running... + pdc_readthread_join(pdcs, NULL); + + //free + pthread_mutex_destroy(&pdcs->pdcs_pending_lock); + pthread_mutex_destroy(&pdcs->pdcs_listeners_lock); + free(pdcs); pdcs = NULL; + } + return; +} + +/* + * Allocates a new pdc session and starts a thread reading on readfd + */ +pdc_session_t * CCONV +pdc_session_alloc(int readfd, int(*readfunc)(int, void *, unsigned int, char *, + int), int writefd, int(*writefunc)(int, const void *, unsigned int, char *, + int), int (*closefunc)(int, char *, int), void *cleanupPtr, void (*cleanupFunc)(void *)) +{ + pdc_session_t *pdcs; +#ifndef _WINDOWS + sigset_t new, old; +#endif +#ifdef _WINDOWS + int opt = 1; + int ssoret; +#endif + + if (!initialized) + pdc_init(); + + if (!(pdcs = malloc(sizeof (*pdcs)))) + return NULL; + memset(pdcs, 0, sizeof (*pdcs)); + pdcs->pdcs_rfd = readfd; + pdcs->pdcs_read = readfunc; + pdcs->pdcs_wfd = writefd; + pdcs->pdcs_write = writefunc; + pdcs->pdcs_close = closefunc; + pdcs->pdcs_cleanup_ptr = cleanupPtr; + pdcs->pdcs_cleanup = cleanupFunc; + if (pthread_mutex_init(&pdcs->pdcs_pending_lock, 0) != 0 + || pthread_mutex_init(&pdcs->pdcs_listeners_lock, 0) != 0) { + free(pdcs); + return NULL; + } +#ifndef _WINDOWS + sigfillset(&new); + pthread_sigmask(SIG_BLOCK, &new, &old); +#endif + +#ifdef _WINDOWS + //Disable Nagle + opt = TRUE; + ssoret = setsockopt(readfd, IPPROTO_TCP, TCP_NODELAY, (void *)&opt, sizeof (opt)); + //set send buffer to 0 (send data right away) + opt = 0; + ssoret = setsockopt(readfd, SOL_SOCKET, SO_SNDBUF, (void *)&opt, sizeof (opt)); +#endif + + if (pthread_create(&pdcs->pdcs_resultreader, 0, read_results, pdcs) != + 0) { + pthread_mutex_destroy(&pdcs->pdcs_pending_lock); + pthread_mutex_destroy(&pdcs->pdcs_listeners_lock); + free(pdcs); pdcs = NULL; + return NULL; + } +#ifdef _WINDOWS + //pthread_up_priority(pdcs->pdcs_resultreader); +#endif +#ifndef _WINDOWS + pthread_sigmask(SIG_SETMASK, &old, NULL); +#endif + + return pdcs; +} + +/* + * Sets up regex for PENDING_PATTERN + */ +int CCONV +pdc_init(void) +{ + int res; + + if ((res = regcomp(&pendingex, PENDING_PATTERN, REG_EXTENDED)) != 0) { + LOG_STDERR(PHIDGET_LOG_CRITICAL, "pending report pattern compilation error %d", res); + ABORT(); + } + initialized = 1; + return 1; +} + +/* + * Compares 2 listener IDs + */ +static int +lcmp(const void *sv, const void *tv) +{ + return ((const listener_t *)sv)->l_id - ((const listener_t *)tv)->l_id; +} + +/* + * Compares 2 tag IDs + */ +static int +tagcmp(const void *sv, const void *tv) +{ + return ((result_t *)sv)->r_tag - ((result_t *)tv)->r_tag; +} + +/* + * Handles a report from the read thread + */ +static void +handle_report(pdc_session_t *pdcs, char *line) +{ + regmatch_t pmatch[7]; + listener_t *lp; + listener_t l; + char *idstr = NULL; + char *key = NULL; + char *val = NULL; + char *eoval = NULL; //end of val + char *soval = NULL; //start of val + char *reason_str = NULL; + pdict_reason_t r; + int res; + //char err[1024]; + + events++; + +//Disable this, I think it's causing bad problems! +#if 0 +#ifdef _WINDOWS + //Send something back if we're on Windows, so this report gets ACKed right away + // otherwise, we face 200ms delayed ack -> clumps of events every 200ms + pdcs->pdcs_write(pdcs->pdcs_wfd, "report ack\n", strlen("report ack\n"), err, sizeof (err)); +#endif +#endif + + /* handle value without regex, due to crappy regex implementations */ + /* val is bounded by double quotes */ + + if (!(val = strchr(line, '\"'))) + goto ignore; + soval = ++val; + if (!(eoval = strchr(val, '\"'))) + goto ignore; + + /* end of val \" = NULL */ + *eoval = '\0'; + + /* make sure there are no more double quotes */ + if (strchr(eoval + 1, '\"') != NULL) + goto ignore; + + /* create a copy of val, or undo my changes if there is not enough memory */ + if (!(val = strdup(val))) { + *eoval = '\"'; + pu_log(PUL_WARN, pdcs->pdcs_rfd, + "report dropped due to low memory"); + goto ignore; + } + + /* put back what I changed and shift anything after val to the start of val */ + *eoval = '\"'; + memmove(soval, eoval, strlen(eoval) + 1); + + if ((res = regexec(&pendingex, line, 6, pmatch, 0)) != 0) { +ignore: +#ifdef DEBUG_PROTOCOL + pu_log(PUL_DEBUG, pdcs->pdcs_rfd, + "ignoring invalid report (see next line):"); + pu_log(PUL_DEBUG, pdcs->pdcs_rfd, line); +#endif + goto end; + } + if (!getmatchsub(line, &idstr, pmatch, 1) || !idstr) + { + goto end; + } + if (!getmatchsub(line, &key, pmatch, 2) || !key) + { + goto end; + } + if (!getmatchsub(line, &reason_str, pmatch, 3) || !reason_str || + !(r = _pdict_reason_from_str(reason_str))) + { + goto end; + } + free(reason_str); reason_str = NULL; + + l.l_id = atoi(idstr); + pthread_mutex_lock(&pdcs->pdcs_listeners_lock); + if (ptree_contains(&l, (ptree_node_t *)pdcs->pdcs_listeners, lcmp, + (void **)&lp) && lp->l_cb) { + char *ueval; + unsigned int uevlen; + + pthread_mutex_unlock(&pdcs->pdcs_listeners_lock); + if (!unescape(val, &ueval, &uevlen)) { + /* XXX log */ + goto end; + } + lp->l_cb(key, ueval, uevlen, r, lp->l_arg); + free(ueval); + } + else + { + pthread_mutex_unlock(&pdcs->pdcs_listeners_lock); + LOG(PHIDGET_LOG_WARNING, "No callback found for: %s",line); + } +end: + free(val); + free(idstr); + free(key); +} + +/* + * Called by read thread for any pending commands that didn't complete before the socket was closed. + */ +static ptree_walk_res_t +finish_pending_async(const void *node, int level, void *arg, void *pwra) +{ + pdc_session_t *pdcs = arg; + result_t *r; + char msg[1024]; + + ptree_inorder_walk_remove(&pdcs->pdcs_pending, (void **)&r, pwra, tagcmp); + assert(node == r); + + snprintf(msg, 1024, "Socket was closed before command T%d finished.",r->r_tag); + + //notify frees r + if(r->r_notify) + r->r_notify(pdcs, r, 500, PTRUE, msg); + + //free(r); + + return PTREE_WALK_CONTINUE; +} +static ptree_walk_res_t +free_node(const void *node, int level, void *arg, void *pwra) +{ + pdc_session_t *pdcs = arg; + void *ov; + ptree_inorder_walk_remove(&pdcs->pdcs_listeners, &ov, pwra, lcmp); + assert(node == ov); + free(ov); + return PTREE_WALK_CONTINUE; +} + +/* + * Runs in a thread started by pdc_session_alloc + */ +static void * +read_results(void *arg) +{ + pdc_session_t *pdcs = arg; + char *line = 0; + result_t *r; + result_t rk; + char *startp; + int linelen; + int final; + int tag; + +next: + if (line) { + free(line); line = NULL; + } + pdcs->pdcs_resultreader_errdesc[0] = 0; + if (!pd_getline((char *)pdcs->pdcs_readbuf, sizeof (pdcs->pdcs_readbuf), + &pdcs->pdcs_bufcur, &pdcs->pdcs_buflen, pdcs->pdcs_read, pdcs->pdcs_close, + pdcs->pdcs_rfd, &line, pdcs->pdcs_resultreader_errdesc, + sizeof (pdcs->pdcs_resultreader_errdesc))) + { + /* socket closed */ + free(line); line=NULL; + + //respond for any pending commands (failed)... + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + ptree_walk(pdcs->pdcs_pending, PTREE_POSTORDER, finish_pending_async, tagcmp, pdcs); + assert(!pdcs->pdcs_pending); + //ptree_clear(&pdcs->pdcs_pending); + //pdcs->pdcs_pending = NULL; + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + + //remove listeners + pthread_mutex_lock(&pdcs->pdcs_listeners_lock); + ptree_walk(pdcs->pdcs_listeners, PTREE_POSTORDER, free_node, lcmp, pdcs); + assert(!pdcs->pdcs_listeners); + pthread_mutex_unlock(&pdcs->pdcs_listeners_lock); + //ptree_clear(&pdcs->pdcs_listeners); + + //do cleanup - this handles stuff outside on the dictionary code, etc. + if (pdcs->pdcs_cleanup) + pdcs->pdcs_cleanup(pdcs->pdcs_cleanup_ptr); + return (void *)(size_t)(-1 * errno); // XXX pdcs errno function + } + + //LOG(PHIDGET_LOG_DEBUG, "read_results: %s",line); + + if (line[0] == 'r') { + handle_report(pdcs, line); + goto next; + } else if (line[0] == 'T') { + tag = atoi(line + 1); + startp = strchr(line, ' '); + while (startp && *startp) { + if (isdigit(*startp)) + goto notify; + startp++; + } + // XXX log malformed line + goto next; + } else { + startp = line; + tag = 0; + } +notify: + if ((linelen = (int)strlen(line)) < 4) { + // XXX log malformed line + goto next; + } + final = (startp[3] == ' '); + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + rk.r_tag = tag; + if (final) { + if (!ptree_remove(&rk, &pdcs->pdcs_pending, tagcmp, + (void **)&r)) { + //ABORT(); + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + // XXX log unanticipated result + goto next; + } + } else { + if (!ptree_contains(&rk, pdcs->pdcs_pending, tagcmp, + (void **)&r)) { + //ABORT(); + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + // XXX log unanticipated result + goto next; + } + } + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + r->r_notify(pdcs, r, atoi(startp), final, startp + 4); + r = NULL; + goto next; +// return (void *)(0); +} + +/* + * Callback from cmd + */ +static void +getresult_cb(pdc_session_t *pdcs, result_t *r, int code, int final, + const char *line) +{ + getresult_cb_arg_t *grcba = r->r_arg; + + if (final) { + if (pthread_mutex_lock(&grcba->grcba_lock)) + abort(); + grcba->grcba_code = code; + if (grcba->grcba_code != grcba->grcba_desired) + grcba->grcba_line = strdup(line); + if (pthread_mutex_unlock(&grcba->grcba_lock)) + abort(); + if (pthread_cond_signal(&grcba->grcba_cv)) + abort(); + } +} + +/* + * Callback from jcmd + */ +static void +jgetresult_cb(pdc_session_t *pdcs, result_t *r, int code, int final, + const char *line) +{ + jgetresult_cb_arg_t *grcba = r->r_arg; + int spaceleft = grcba->grcba_reply_len - (grcba->grcba_reply_ptr-grcba->grcba_reply); + + if (pthread_mutex_lock(&grcba->grcba_lock)) + abort(); + + if (final) + { + grcba->grcba_code = code; + } + + //add more space if needed + if((size_t)spaceleft < strlen(line)+1) + { + int offset = grcba->grcba_reply_ptr-grcba->grcba_reply; + if(!(grcba->grcba_reply = (char *)realloc(grcba->grcba_reply, strlen(line) + offset + 100))) + { + LOG(PHIDGET_LOG_WARNING,"Couldn't realloc!"); + return; + } + grcba->grcba_reply_len = strlen(line) + offset + 100; + grcba->grcba_reply_ptr = grcba->grcba_reply + offset; + spaceleft = grcba->grcba_reply_len - (grcba->grcba_reply_ptr-grcba->grcba_reply); + } + + strncpy(grcba->grcba_reply_ptr, line, + spaceleft - 1); + if(!final) + grcba->grcba_reply_ptr[strlen(line)] = '\n'; + grcba->grcba_reply_ptr+=(strlen(line)+1); + + if (pthread_mutex_unlock(&grcba->grcba_lock)) + abort(); + + if (final) + { + if (pthread_cond_signal(&grcba->grcba_cv)) + abort(); + } +} + +static int wait_timeout(pthread_mutex_t *grcba_lock, pthread_cond_t *grcba_cv, int timeout_ms) +{ + int retval; +#ifdef _WINDOWS + int timeout = timeout_ms; +#else + struct timespec timeout; + struct timeval now; + + // Lock the mutex. + //pthread_mutex_lock(&grcba->grcba_lock); + + gettimeofday(&now,0); + timeout.tv_sec = now.tv_sec + timeout_ms/1000; + timeout.tv_nsec = now.tv_usec*1000 + (timeout_ms%1000 * 1000000); + if(timeout.tv_nsec >= 1000000000) + { + timeout.tv_sec++; + timeout.tv_nsec -= 1000000000; + } +#endif + retval = pthread_cond_timedwait(grcba_cv, grcba_lock, &timeout); + //pthread_mutex_unlock(&grcba->grcba_lock); + return retval; +} + +static int +cmd(pdc_session_t *pdcs, + int desired, + const char *cmd, + char *errdesc, + int errlen) +{ + int len = (int)strlen(cmd); + getresult_cb_arg_t *grcba; + result_t r; + result_t *or; + + if (!(grcba = malloc(sizeof (*grcba)))) { + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + return 0; + } + r.r_tag = 0; + r.r_notify = getresult_cb; + r.r_arg = grcba; + memset(grcba, 0, sizeof (*grcba)); + grcba->grcba_code = -1; + grcba->grcba_desired = desired; + + if (pthread_mutex_init(&grcba->grcba_lock, NULL)) + abort(); + if (pthread_cond_init(&grcba->grcba_cv, 0)) + abort(); + if (pthread_mutex_lock(&pdcs->pdcs_pending_lock)) + abort(); + if (!ptree_replace(&r, &pdcs->pdcs_pending, tagcmp, (void **)&or)) { + pthread_mutex_destroy(&grcba->grcba_lock); + pthread_cond_destroy(&grcba->grcba_cv); + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + free(grcba); grcba = NULL; + if (errdesc) + (void) snprintf(errdesc, errlen, + "result replacement failure"); + return 0; + } + + // if or != NULL then pdcs_pending already contained a node with tag == r.r_tag + assert(!or); + if (pthread_mutex_lock(&grcba->grcba_lock)) + abort(); + if (pthread_mutex_unlock(&pdcs->pdcs_pending_lock)) + abort(); + + if (!pdcs->pdcs_write(pdcs->pdcs_wfd, cmd, len, errdesc, errlen)) { + pthread_mutex_unlock(&grcba->grcba_lock); + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + ptree_remove(&r, &pdcs->pdcs_pending, tagcmp, (void **)&or); + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + pthread_mutex_destroy(&grcba->grcba_lock); + pthread_cond_destroy(&grcba->grcba_cv); + free(grcba); + return 0; + } + + while (grcba->grcba_code == -1) + { + int retval; + //5 second timeout + if((retval = wait_timeout(&grcba->grcba_lock, &grcba->grcba_cv, cmd_timeout)) != 0) + { + switch (retval) + { + case ETIMEDOUT: + default: + if (errdesc) + (void) snprintf(errdesc, errlen,"cmd: \"%s\" Timed out after %dms.",cmd,cmd_timeout); + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + ptree_remove(&r, &pdcs->pdcs_pending, tagcmp, (void **)&or); + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + pthread_mutex_destroy(&grcba->grcba_lock); + pthread_cond_destroy(&grcba->grcba_cv); + if (grcba->grcba_line) + free(grcba->grcba_line); + free(grcba); + return 0; //Timeout + } + } + } + + pthread_mutex_destroy(&grcba->grcba_lock); + pthread_cond_destroy(&grcba->grcba_cv); + + if (grcba->grcba_code != desired) + { + if (errdesc) + (void) snprintf(errdesc, errlen, "protocol error: %d%s%s", grcba->grcba_code, grcba->grcba_line ? " " : "", grcba->grcba_line ? grcba->grcba_line : ""); + if (grcba->grcba_line) + free(grcba->grcba_line); + free(grcba); + return 0; + } + + if (grcba->grcba_line) + free(grcba->grcba_line); + free(grcba); + return 1; +} + +/* + * jcmd is for a command that returns results, like getKey + */ +static int +jcmd(pdc_session_t *pdcs, + const char *cmd, + int desired, + char **reply, + int replylen, + char *errdesc, + int errlen) +{ + int len = (int)strlen(cmd); + jgetresult_cb_arg_t *grcba; + result_t r; + result_t *or; + + if (!(grcba = malloc(sizeof (*grcba)))) { + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + return 0; + } + r.r_tag = 0; + r.r_notify = jgetresult_cb; + r.r_arg = grcba; + memset(grcba, 0, sizeof (*grcba)); + grcba->grcba_code = -1; + grcba->grcba_reply = *reply; + grcba->grcba_reply_ptr = *reply; + grcba->grcba_reply_len = replylen; + + if (pthread_mutex_init(&grcba->grcba_lock, NULL)) + abort(); + if (pthread_cond_init(&grcba->grcba_cv, 0)) + abort(); + if (pthread_mutex_lock(&pdcs->pdcs_pending_lock)) + abort(); + if (!ptree_replace(&r, &pdcs->pdcs_pending, tagcmp, (void **)&or)) { + pthread_mutex_destroy(&grcba->grcba_lock); + pthread_cond_destroy(&grcba->grcba_cv); + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + free(grcba); grcba = NULL; + if (errdesc) + (void) snprintf(errdesc, errlen, + "result replacement failure"); + return 0; + } + // if or != NULL then pdcs_pending already contained a node with tag == r.r_tag + assert(!or); + if (pthread_mutex_lock(&grcba->grcba_lock)) + abort(); + if (pthread_mutex_unlock(&pdcs->pdcs_pending_lock)) + abort(); + +#ifndef _MSC_EXTENSIONS +#ifdef DEBUG_PROTOCOL + if (debug_protocol) { + write(1, "L: ", 3); + write(1, cmd, len); + } +#endif +#endif + if (!pdcs->pdcs_write(pdcs->pdcs_wfd, cmd, len, errdesc, errlen)) { + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + ptree_remove(&r, &pdcs->pdcs_pending, tagcmp, (void **)&or); + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + pthread_mutex_destroy(&grcba->grcba_lock); + pthread_cond_destroy(&grcba->grcba_cv); + free(grcba); grcba = NULL; + return 0; + } + while (grcba->grcba_code == -1) + { + int retval; + //5 second timeout + if((retval = wait_timeout(&grcba->grcba_lock, &grcba->grcba_cv, cmd_timeout)) != 0) + { + switch (retval) + { + case ETIMEDOUT: + default: + if (errdesc) + (void) snprintf(errdesc, errlen,"jcmd: \"%s\" Timed out after %dms.",cmd,cmd_timeout); + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + ptree_remove(&r, &pdcs->pdcs_pending, tagcmp, (void **)&or); + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + pthread_mutex_destroy(&grcba->grcba_lock); + pthread_cond_destroy(&grcba->grcba_cv); + free(grcba); grcba = NULL; + return 0; + } + } + //if (pthread_cond_wait(&grcba->grcba_cv, &grcba->grcba_lock)) + // abort(); + } + pthread_mutex_destroy(&grcba->grcba_lock); + pthread_cond_destroy(&grcba->grcba_cv); + if (grcba->grcba_code != desired) { + if (errdesc) + (void) snprintf(errdesc, errlen, + "protocol error: %d%s%s", grcba->grcba_code, + grcba->grcba_reply ? " " : "", grcba->grcba_reply ? + grcba->grcba_reply : ""); + *reply = grcba->grcba_reply; + free(grcba); grcba = NULL; + return 0; + } + *reply = grcba->grcba_reply; + free(grcba); grcba = NULL; + return 1; +} + +//int CCONV +//pdc_set(pdc_session_t *pdcs, const char *key, const char *val, +// int len, int remove_on_close, char *errdesc, int errlen) +//{ +// char *escval; +// char *buf; +// int res; +// +// if(val[0] == '\0') +// { +// if (!escape("\001", len, &escval)) { +// if (errdesc) +// (void) snprintf(errdesc, errlen, "%s", strerror(errno)); +// return 0; +// } +// } +// else +// { +// if (!escape(val, len, &escval)) { +// if (errdesc) +// (void) snprintf(errdesc, errlen, "%s", strerror(errno)); +// return 0; +// } +// } +// if ((len = pasprintf(&buf, "set %s=\"%s\"%s\n", key, escval, +// remove_on_close ? " for session" : "")) < 0) { +// free((void *)escval); escval = NULL; +// if (errdesc) +// (void) snprintf(errdesc, errlen, "%s", strerror(errno)); +// return 0; +// } +// res = cmd(pdcs, 200, buf, errdesc, errlen); +// free((void *)escval); escval = NULL; +// free(buf); buf = NULL; +// return res; +//} + +/* + * Returns a nonzero listen ID on success, 0 otherwise. + */ +pdc_listen_id_t CCONV +pdc_listen(pdc_session_t *pdcs, const char *pattern, + void (*cb)(const char *, const char *, unsigned int, pdict_reason_t, + void *), void *arg, char *errdesc, int errlen) +{ + listener_t *l; + static int lid = 1; + char *buf; + int len; + + if(!pdcs) + return 0; + + if (!(l = malloc(sizeof (*l)))) { + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + return 0; + } + l->l_id = lid; + l->l_cb = cb; + l->l_arg = arg; + if ((len = pasprintf(&buf, "listen \"%s\" lid%d\n", pattern, lid++)) < 0) + { + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + return 0; + } + + pthread_mutex_lock(&pdcs->pdcs_listeners_lock); + if (!ptree_replace(l, (ptree_node_t **)&pdcs->pdcs_listeners, lcmp, NULL)) { + free(buf); buf = NULL; + free(l); l = NULL; + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + pthread_mutex_unlock(&pdcs->pdcs_listeners_lock); + return 0; + } + pthread_mutex_unlock(&pdcs->pdcs_listeners_lock); + + if (!cmd(pdcs, 200, buf, errdesc, errlen)) { + free(buf); buf = NULL; + return 0; + } + + free(buf); buf = NULL; + return l->l_id; +} + +int CCONV +pdc_enable_periodic_reports(pdc_session_t *pdcs, int periodms, + char *errdesc, int errlen) +{ + char *buf; + int res; + if(!pdcs) + return 0; + + if (periodms <= 0) { + if (errdesc) + (void) snprintf(errdesc, errlen, "invalid period"); + return 0; + } + if (pasprintf(&buf, "report %d report\n", periodms) < 0) { + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + return 0; + } + res = cmd(pdcs, 200, buf, errdesc, errlen); + free(buf); buf = NULL; + return res; +} + +int CCONV +pdc_disable_periodic_reports(pdc_session_t *pdcs, char *errdesc, + int errlen) +{ + if(!pdcs) + return 0; + return cmd(pdcs, 200, "report 0 report\n", errdesc, errlen); +} + +int CCONV +pdc_flush(pdc_session_t *pdc, char *errdesc, int errlen) +{ + if(!pdc) + return 0; + return cmd(pdc, 200, "flush\n", errdesc, errlen); +} + +int CCONV +pdc_ignore(pdc_session_t *pdcs, pdc_listen_id_t id, char *errdesc, + int errlen) +{ + char *buf; + int res; + + if(!pdcs) + return 0; + + if (pasprintf(&buf, "ignore lid%d\n", id) < 0) { + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + return 0; + } + res = cmd(pdcs, 200, buf, errdesc, errlen); + free(buf); buf = NULL; + return res; +} + +//Async stuff starts here +static void +async_cmd_callback(pdc_session_t *pdcs, const char *cmd, + void (*r_notify)(pdc_session_t *, struct result *, int code,int final, const char *line), void *r_arg, + void (*error)(const char *errdesc, void *arg), void *arg); + +typedef struct { + int aca_desired; + void (*aca_error)(const char *errdesc, void *arg); + void *aca_error_arg; +} async_cmd_arg_t; + +/* + * Default callback for async commands. + * If the code matches what we wanted, then we don't hear anything about it. + * Otherwise, we get the error callback. + */ +static void +async_cmd_cb(pdc_session_t *pdcs, result_t *r, int code, int final, + const char *line) +{ + async_cmd_arg_t *aca; + char *buf; + + assert(final); + aca = (async_cmd_arg_t *)r->r_arg; + if (aca->aca_desired != code) { + if (pasprintf(&buf, "protocol error: %s", line) > 0) { + if(aca->aca_error) + aca->aca_error(buf, aca->aca_error_arg); + free(buf); buf = NULL; + } else { + /* XXX warn about low-memory condition */ + if(aca->aca_error) + aca->aca_error("protocol error (insufficient memory" + " to describe)", aca->aca_error_arg); + } + } + free(aca); aca = NULL; + free(r); r = NULL; +} + +typedef struct { + void (*aaa_success)(void *arg, void (*error)(const char *errdesc, void *arg)); + void (*aaa_error)(const char *errdesc, void *arg); + void *aaa_arg; + char *aaa_pass; +} async_auth_arg_t; + +static void +async_auth_cb(pdc_session_t *pdcs, result_t *r, int code, int final, + const char *line) +{ + async_auth_arg_t *aaa; + + assert(final); + aaa = (async_auth_arg_t *)r->r_arg; + + switch(code) + { + //Authentication needed + case 999: + { + int randomHash; + char buf[100]; + md5_state_t state; + md5_byte_t digest[16]; + + int di; + + randomHash = strtol(line, NULL, 0); + + LOG(PHIDGET_LOG_DEBUG,"Got hash: %d",randomHash); + + if(aaa->aaa_pass) + { + snprintf(buf,100,"%d%s",randomHash,aaa->aaa_pass); + } + + md5_init(&state); + md5_append(&state, (const md5_byte_t *)buf, (int)strlen(buf)); + md5_finish(&state, digest); + + memset(buf, 0, sizeof(buf)); + + snprintf(buf, 5, "997 "); + + for (di = 0; di < 16; ++di) + snprintf((buf+4) + di * 2, 3, "%02x", digest[di]); + + buf[strlen(buf)] = '\n'; + + //send out the next async command + async_cmd_callback(pdcs, buf, async_auth_cb, aaa, aaa->aaa_error, aaa->aaa_arg); + + goto exit_pending; + } + // Authentication failed + case 998: + LOG(PHIDGET_LOG_INFO,"Authentication Failed - bad password"); + //send out the error event + if(aaa->aaa_error) + aaa->aaa_error("Authentication Failed - bad password", aaa->aaa_arg); + goto exit_done; + // Authentication is good, or not needed + case 996: + //make sure there is a verison - otherwise this is a really old webservice + { + char *versionstring = strstr(line, "version="); + if(!versionstring) + { + LOG(PHIDGET_LOG_ERROR,"Trying to connect to an old webservice. Webservice needs to be updated."); + //send out an error event + if(aaa->aaa_error) + aaa->aaa_error("Trying to connect to an old webservice. Webservice needs to be updated.", aaa->aaa_arg); + goto exit_done; + } + //It's good - this is the end of authentication + else + { + //send out auth_finished event + if(aaa->aaa_success) + aaa->aaa_success(aaa->aaa_arg, aaa->aaa_error); + } + } + goto exit_done; + // Version mismatch + case 994: + { + char buf[100]; + snprintf(buf, sizeof(buf), "Version Mismatch; webservice:%s, expecting:%s",strlen(line)>24?(line+24):line,ws_protocol_ver); + LOG(PHIDGET_LOG_ERROR, buf); + //send out error event + if(aaa->aaa_error) + aaa->aaa_error(buf, aaa->aaa_arg); + } + goto exit_done; + case 500: + { + //callback an error + char buf[100]; + snprintf(buf, sizeof(buf), "Network Error (probably a timeout occured): %s", line); + LOG(PHIDGET_LOG_ERROR, buf); + //send out error event + if(aaa->aaa_error) + aaa->aaa_error(buf, aaa->aaa_arg); + } + goto exit_done; + default: + //callback an error + if(aaa->aaa_error) + aaa->aaa_error("Unexpected response during authorization. Probably a version mismatch - upgrade the Webservice.", aaa->aaa_arg); + goto exit_done; + } + +exit_done: + free(aaa); aaa = NULL; +exit_pending: + free(r); r = NULL; +} + +/* + * Sends out an async command. Response comes back on the r_notify callback. + */ +static void +async_cmd_callback(pdc_session_t *pdcs, const char *cmd, + void (*r_notify)(pdc_session_t *, struct result *, int code,int final, const char *line), void *r_arg, + void (*error)(const char *errdesc, void *arg), void *arg) +{ + static int tag = 1; + char errdesc[256]; + result_t *r; + result_t *or; + char *buf; + int len; + + if ((len = pasprintf(&buf, "T%d %s", tag, cmd)) < 0) { + if(error) + error(strerror(errno), arg); + return; + } + if (!(r = malloc(sizeof (*r)))) { + if(error) + error(strerror(errno), arg); + return; + } + /* add pending */ + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + r->r_tag = tag++; + r->r_notify = r_notify; + r->r_arg = r_arg; + ptree_replace(r, &pdcs->pdcs_pending, tagcmp, (void **)&or); + // if or != NULL then pdcs_pending already contained a node with tag == r.r_tag + assert(!or); + + if (!pdcs->pdcs_write(pdcs->pdcs_wfd, buf, (int)strlen(buf), errdesc, + (unsigned int)sizeof (errdesc))) { + if(error) + error(errdesc, arg); + ptree_remove(r, &pdcs->pdcs_pending, tagcmp, (void **)&or); + if(r == or && r) + { + if(r->r_arg) + free(r->r_arg); r->r_arg = NULL; + free(r); r = NULL; + } + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + return; + } + + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + free(buf); buf = NULL; +} + +/* + * Sends out an async command. Errors come back on the error callback if desired does not match the response. + */ +static void +async_cmd(pdc_session_t *pdcs, int desired, const char *cmd, + void (*error)(const char *errdesc, void *arg), void *arg) +{ + async_cmd_arg_t *aca; + + if (!(aca = malloc(sizeof (*aca)))) { + if(error) + error(strerror(errno), arg); + return; + } + + aca->aca_desired = desired; + aca->aca_error = error; + aca->aca_error_arg = arg; + + async_cmd_callback(pdcs, cmd, async_cmd_cb, aca, error, arg); +} + +typedef struct _pdc_session_and_arg_t{ + pdc_session_t *pdcs; + void *r_arg; +} pdc_session_and_arg_t; + +static ptree_walk_res_t +clean_pending_async(const void *node, int level, void *arg, void *pwra) +{ + result_t *rk = (result_t *)node; + pdc_session_and_arg_t *a = (pdc_session_and_arg_t *)arg; + result_t *r; + char msg[1024]; + + if(rk->r_notify == &async_cmd_cb && ((async_cmd_arg_t *)rk->r_arg)->aca_error_arg == a->r_arg) + { + ptree_inorder_walk_remove(&a->pdcs->pdcs_pending, (void **)&r, pwra, tagcmp); + assert(rk == r); + + snprintf(msg, 1024, "Socket was closed before command T%d finished.",r->r_tag); + + //notify frees r + if(r->r_notify) + r->r_notify(a->pdcs, r, 500, PTRUE, msg); + } + + return PTREE_WALK_CONTINUE; +} + +static int +async_pending_cmp(const void *arg, const void *tv) +{ + result_t *rk = (result_t *)tv; + + if(rk->r_notify == &async_cmd_cb && ((async_cmd_arg_t *)rk->r_arg)->aca_error_arg == arg) + return 0; + return -1; +} + +//If there are pending keys for this arg, then 1st wait up to 1 second for them to finish, then report the errors +void +cleanup_pending(pdc_session_t *pdcs, void *arg) +{ + int timeout = 500; //ms + pdc_session_and_arg_t a = {pdcs, arg}; + + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + //TODO: Do this differently - can't use a different compare then the one used to add to the list! + while(ptree_contains(arg, pdcs->pdcs_pending, async_pending_cmp, NULL) && timeout>0) + { + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + SLEEP(10); + timeout-=10; + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + } + if(timeout<=0) + { + ptree_walk(pdcs->pdcs_pending, PTREE_POSTORDER, clean_pending_async, tagcmp, &a); + } + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); +} + +//loop so long as there are any pending +void +wait_pending(pdc_session_t *pdcs) +{ + int timeout = 500; //ms + + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + while(pdcs->pdcs_pending && timeout>0) + { + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); + SLEEP(10); + timeout-=10; + pthread_mutex_lock(&pdcs->pdcs_pending_lock); + } + pthread_mutex_unlock(&pdcs->pdcs_pending_lock); +} + +void CCONV +pdc_async_enable_periodic_reports(pdc_session_t *pdcs, int periodms, + void (*error)(const char *errdesc, void *arg), + void *arg) +{ + char *buf; + + if(!pdcs) + return; + + if (periodms <= 0) { + if (error) + error("invalid period", arg); + return; + } + if (pasprintf(&buf, "report %d report\n", periodms) < 0) { + if (error) + error(strerror(errno), arg); + return; + } + async_cmd(pdcs, 200, buf, error, arg); + free(buf); buf = NULL; +} + +void CCONV +pdc_async_disable_periodic_reports(pdc_session_t *pdcs, void (*error)(const char *errdesc, void *arg), + void *arg) +{ + if(!pdcs) + return; + async_cmd(pdcs, 200, "report 0 report\n", error, arg); +} + +void CCONV +pdc_async_set(pdc_session_t *pdcs, const char *key, const char *val, + int len, int remove_on_close, void (*error)(const char *errdesc, void *arg), + void *arg) +{ + char *escval; + char *buf; + if(!pdcs) + return; + + if(val[0] == '\0') + { + if (!escape("\001", len, &escval)) { + if (error) + error(strerror(errno), arg); + return; + } + } + else + { + if (!escape(val, len, &escval)) { + if (error) + error(strerror(errno), arg); + return; + } + } + if (pasprintf(&buf, "set %s=\"%s\"%s\n", key, escval, + remove_on_close ? " for session" : "") < 0) { + free((void *)escval); escval = NULL; + if (error) + error(strerror(errno), arg); + return; + } + async_cmd(pdcs, 200, buf, error, arg); + free(buf); buf = NULL; + free((void *)escval); escval = NULL; +} + +void CCONV +pdc_async_ignore(pdc_session_t *pdcs, pdc_listen_id_t id, + void (*error)(const char *errdesc, void *arg), void *arg) +{ + char *buf; + if(!pdcs) + return; + + if (pasprintf(&buf, "ignore lid%d\n", id) < 0) { + if (error) + error(strerror(errno), arg); + return; + } + async_cmd(pdcs, 200, buf, error, arg); + free(buf); buf = NULL; +} + +void CCONV +pdc_async_authorize(pdc_session_t *pdcs, const char *version, char *password, + void (*success) (void *arg, void (*error)(const char *errdesc, void *arg)), + void (*error)(const char *errdesc, void *arg), void *arg) +{ + char *buf; + async_auth_arg_t *aaa; + + if(!pdcs) + return; + + if (!(aaa = malloc(sizeof (*aaa)))) { + if(error) + error(strerror(errno), arg); + return; + } + + aaa->aaa_success = success; + aaa->aaa_error = error; + aaa->aaa_arg = arg; + aaa->aaa_pass = password; + + if (pasprintf(&buf, "995 authenticate, version=%s\n", version) < 0) { + if (error) + error(strerror(errno), arg); + return; + } + + //200 is bogus - I want to get the callback on any response + async_cmd_callback(pdcs, buf, async_auth_cb, aaa, error, arg); + free(buf); buf = NULL; +} + +void CCONV +pdc_async_remove(pdc_session_t *pdcs, const char *pattern, void (*error)(const char *errdesc, void *arg), void *arg) +{ + char *buf; + + if(!pdcs) + return; + + if (pasprintf(&buf, "remove %s\n", pattern) < 0) { + if (error) + error(strerror(errno), arg); + return; + } + async_cmd(pdcs, 200, buf, error, arg); + free(buf); buf = NULL; +} + +int CCONV +pdc_quit(pdc_session_t *pdc, char *errdesc, int errlen) +{ + return cmd(pdc, 200, "quit\n", errdesc, errlen); +} + +int CCONV +pdc_remove(pdc_session_t *pdcs, const char *pattern, char *errdesc, int errlen) +{ + char *buf; + int res; + + if(!pdcs) + return 0; + + if (pasprintf(&buf, "remove %s\n", pattern) < 0) { + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + return 0; + } + res = cmd(pdcs, 200, buf, errdesc, errlen); + free(buf); buf = NULL; + return res; +} + +int CCONV +pdc_get(pdc_session_t *pdcs, const char *key, char *val, int vallen, char *errdesc, int errlen) +{ + char *buf, *results; + int res, resultsize; + + if(!pdcs) + return 0; + + resultsize = vallen + 30; + + results = (char *)malloc(resultsize); + + if (pasprintf(&buf, "get %s\n", key) < 0) { + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + free(results); + return 0; + } + res = jcmd(pdcs, buf, 200, &results, resultsize, errdesc, errlen); + if(res) + { + char *ueval, *eol, *resultval; + unsigned int uevlen; + + if ((eol = strchr(results, '\n'))) + { + eol[0] = '\0'; + } + if(!(resultval = strstr(results, "value "))) + { + val[0] = 0; + goto end; + } + resultval += 6; + + if (!unescape(resultval, &ueval, &uevlen)) { + /* XXX log */ + goto end; + } + strncpy(val, ueval, vallen-1); + val[vallen-1]='\0'; + free(ueval); + } + +end: + free(buf); + free(results); + return res; +} + +static pdict_reason_t +_pdict_reason_from_str(const char *s) +{ + if (strcmp(s, "changed") == 0) + return PDR_VALUE_CHANGED; + if (strcmp(s, "current") == 0) + return PDR_CURRENT_VALUE; + if (strcmp(s, "added") == 0) + return PDR_ENTRY_ADDED; + if (strcmp(s, "removing") == 0) + return PDR_ENTRY_REMOVING; + return 0; +} + +int CCONV +pdc_get_server_session_id(pdc_session_t *pdc, int *id, char *errdesc, + int errlen) +{ + char reply[80]; + char *buf; + int res; + + if(!pdc) + return 0; + + if (pasprintf(&buf, "get session id\n") < 0) { + if (errdesc) + (void) snprintf(errdesc, errlen, "%s", strerror(errno)); + return 0; + } + res = jcmd(pdc, buf, 200, (char **)&reply, sizeof (reply), errdesc, errlen); + free(buf); buf = NULL; + + if (res && id) + *id = atoi(reply); + return res; +} + +int CCONV +pdc_readthread_join(pdc_session_t *pdcs, void **status) +{ + int res=0; + if(pdcs) + { + if(pdcs->pdcs_resultreader) + res = pthread_join(pdcs->pdcs_resultreader, status); + pdcs->pdcs_resultreader = 0; + } + return res; +} diff --git a/dict/pdictclient.h b/dict/pdictclient.h new file mode 100644 index 0000000..1fa60d9 --- /dev/null +++ b/dict/pdictclient.h @@ -0,0 +1,128 @@ +#ifndef _PDICTCLIENT_H_ +#define _PDICTCLIENT_H_ + +#include "pdict.h" + +typedef struct pdc_session pdc_session_t; +typedef int pdc_listen_id_t; + +int CCONV pdc_init(void); + +pdc_session_t * CCONV +pdc_session_alloc(int readfd, + int(*readfunc)(int, void *, unsigned int, char *, int), + int writefd, + int(*writefunc)(int, const void *, unsigned int, char *, int), + int(*closefunc)(int, char *, int), + void *cleanupPtr, + void (*cleanupFunc)(void *)); + +void +pdc_session_free(pdc_session_t *pdcs); + +//int CCONV pdc_set(pdc_session_t *pdcs, const char *key, const char *val, int len, int remove_on_close, char *errdesc, int errlen); +void CCONV pdc_async_set(pdc_session_t *pdcs, + const char *key, + const char *val, + int len, + int remove_on_close, + void (*error)(const char *errdesc, void *arg), + void *arg); + +pdc_listen_id_t CCONV +pdc_listen(pdc_session_t *pdcs, + const char *pattern, + void (*cb)(const char *, const char *, unsigned int, pdict_reason_t, void *), + void *ptr, + char *errdesc, + int errlen); + +int CCONV +pdc_disable_periodic_reports(pdc_session_t *pdc, + char *errdesc, + int errlen); + +void CCONV +pdc_async_disable_periodic_reports(pdc_session_t *pdc, + void (*error)(const char *errdesc, void *arg), + void *arg); + +int CCONV +pdc_enable_periodic_reports(pdc_session_t *pdc, + int periodms, + char *errdesc, + int errlen); + +void CCONV +pdc_async_enable_periodic_reports(pdc_session_t *pdc, + int periodms, + void (*error)(const char *errdesc, void *arg), + void *arg); + +int CCONV +pdc_ignore(pdc_session_t *pdcs, + pdc_listen_id_t id, + char *errdesc, + int errlen); + +void CCONV +pdc_async_ignore(pdc_session_t *pdcs, + pdc_listen_id_t id, + void (*error)(const char *errdesc, void *arg), + void *arg); + +int CCONV +pdc_flush(pdc_session_t *pdc, + char *errdesc, + int errlen); + +int CCONV +pdc_quit(pdc_session_t *pdc, + char *errdesc, + int errlen); + +int CCONV +pdc_remove(pdc_session_t *pdc, + const char *pattern, + char *errdesc, + int errlen); + +void CCONV +pdc_async_remove(pdc_session_t *pdcs, + const char *pattern, + void (*error)(const char *errdesc, void *arg), + void *arg); + +int CCONV +pdc_get(pdc_session_t *pdcs, + const char *pattern, + char *results, + int resultslen, + char *errdesc, + int errlen); + +int CCONV +pdc_get_server_session_id(pdc_session_t *pdc, + int *id, + char *errdesc, + int errlen); + +int CCONV +pdc_readthread_join(pdc_session_t *pdcs, + void **status); + +void CCONV +pdc_async_authorize(pdc_session_t *pdcs, + const char *version, + char *password, + void (*success) (void *arg, void (*error)(const char *errdesc, void *arg)), + void (*error)(const char *errdesc, void *arg), + void *arg); + +void +cleanup_pending(pdc_session_t *pdcs, void *arg); + +void +wait_pending(pdc_session_t *pdcs); + +#endif |