/* en50221 encoder An implementation for libdvb an implementation for the en50221 transport layer Copyright (C) 2004, 2005 Manu Abraham Copyright (C) 2005 Julian Scheel (julian at jusst dot de) Copyright (C) 2006 Andrew de Quincey (adq_dvb@lidskialf.net) This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include #include #include #include #include #include #include #include "en50221_errno.h" #include "en50221_transport.h" #include "asn_1.h" // these are the Transport Tags, like // described in EN50221, Annex A.4.1.13 (pg70) #define T_SB 0x80 // sb primitive h<--m #define T_RCV 0x81 // receive primitive h-->m #define T_CREATE_T_C 0x82 // create transport connection primitive h-->m #define T_C_T_C_REPLY 0x83 // ctc reply primitive h<--m #define T_DELETE_T_C 0x84 // delete tc primitive h<->m #define T_D_T_C_REPLY 0x85 // dtc reply primitive h<->m #define T_REQUEST_T_C 0x86 // request transport connection primitive h<--m #define T_NEW_T_C 0x87 // new tc / reply to t_request primitive h-->m #define T_T_C_ERROR 0x77 // error creating tc primitive h-->m #define T_DATA_LAST 0xA0 // convey data from higher constructed h<->m // layers #define T_DATA_MORE 0xA1 // convey data from higher constructed h<->m // layers struct en50221_message { struct en50221_message *next; uint32_t length; uint8_t data[0]; }; struct en50221_connection { uint32_t state; // the current state: idle/in_delete/in_create/active struct timeval tx_time; // time last request was sent from host->module, or 0 if ok struct timeval last_poll_time; // time of last poll transmission uint8_t *chain_buffer; // used to save parts of chained packets uint32_t buffer_length; struct en50221_message *send_queue; struct en50221_message *send_queue_tail; }; struct en50221_slot { int ca_hndl; uint8_t slot; // CAM slot struct en50221_connection *connections; pthread_mutex_t slot_lock; uint32_t response_timeout; uint32_t poll_delay; }; struct en50221_transport_layer { uint8_t max_slots; uint8_t max_connections_per_slot; struct en50221_slot *slots; struct pollfd *slot_pollfds; int slots_changed; pthread_mutex_t global_lock; pthread_mutex_t setcallback_lock; int error; int error_slot; en50221_tl_callback callback; void *callback_arg; }; static int en50221_tl_process_data(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t * data, uint32_t data_length); static int en50221_tl_poll_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_alloc_new_tc(struct en50221_transport_layer *tl, uint8_t slot_id); static void queue_message(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, struct en50221_message *msg); static int en50221_tl_handle_create_tc_reply(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_handle_delete_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_handle_delete_tc_reply(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_handle_request_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_handle_data_more(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length); static int en50221_tl_handle_data_last(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length); static int en50221_tl_handle_sb(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length); struct en50221_transport_layer *en50221_tl_create(uint8_t max_slots, uint8_t max_connections_per_slot) { struct en50221_transport_layer *tl = NULL; int i; int j; // setup structure tl = (struct en50221_transport_layer *) malloc(sizeof(struct en50221_transport_layer)); if (tl == NULL) goto error_exit; tl->max_slots = max_slots; tl->max_connections_per_slot = max_connections_per_slot; tl->slots = NULL; tl->slot_pollfds = NULL; tl->slots_changed = 1; tl->callback = NULL; tl->callback_arg = NULL; tl->error_slot = 0; tl->error = 0; pthread_mutex_init(&tl->global_lock, NULL); pthread_mutex_init(&tl->setcallback_lock, NULL); // create the slots tl->slots = malloc(sizeof(struct en50221_slot) * max_slots); if (tl->slots == NULL) goto error_exit; // set them up for (i = 0; i < max_slots; i++) { tl->slots[i].ca_hndl = -1; // create the connections for this slot tl->slots[i].connections = malloc(sizeof(struct en50221_connection) * max_connections_per_slot); if (tl->slots[i].connections == NULL) goto error_exit; // create a mutex for the slot pthread_mutex_init(&tl->slots[i].slot_lock, NULL); // set them up for (j = 0; j < max_connections_per_slot; j++) { tl->slots[i].connections[j].state = T_STATE_IDLE; tl->slots[i].connections[j].tx_time.tv_sec = 0; tl->slots[i].connections[j].last_poll_time.tv_sec = 0; tl->slots[i].connections[j].last_poll_time.tv_usec = 0; tl->slots[i].connections[j].chain_buffer = NULL; tl->slots[i].connections[j].buffer_length = 0; tl->slots[i].connections[j].send_queue = NULL; tl->slots[i].connections[j].send_queue_tail = NULL; } } // create the pollfds tl->slot_pollfds = malloc(sizeof(struct pollfd) * max_slots); if (tl->slot_pollfds == NULL) { goto error_exit; } memset(tl->slot_pollfds, 0, sizeof(struct pollfd) * max_slots); return tl; error_exit: en50221_tl_destroy(tl); return NULL; } // Destroy an instance of the transport layer void en50221_tl_destroy(struct en50221_transport_layer *tl) { int i, j; if (tl) { if (tl->slots) { for (i = 0; i < tl->max_slots; i++) { if (tl->slots[i].connections) { for (j = 0; j < tl->max_connections_per_slot; j++) { if (tl->slots[i].connections[j].chain_buffer) { free(tl->slots[i].connections[j].chain_buffer); } struct en50221_message *cur_msg = tl->slots[i].connections[j].send_queue; while (cur_msg) { struct en50221_message *next_msg = cur_msg->next; free(cur_msg); cur_msg = next_msg; } tl->slots[i].connections[j].send_queue = NULL; tl->slots[i].connections[j].send_queue_tail = NULL; } free(tl->slots[i].connections); pthread_mutex_destroy(&tl->slots[i].slot_lock); } } free(tl->slots); } if (tl->slot_pollfds) { free(tl->slot_pollfds); } pthread_mutex_destroy(&tl->setcallback_lock); pthread_mutex_destroy(&tl->global_lock); free(tl); } } // this can be called from the user-space app to // register new slots that we should work with int en50221_tl_register_slot(struct en50221_transport_layer *tl, int ca_hndl, uint8_t slot, uint32_t response_timeout, uint32_t poll_delay) { // lock pthread_mutex_lock(&tl->global_lock); // we browse through the array of slots // to look for the first unused one int i; int16_t slot_id = -1; for (i = 0; i < tl->max_slots; i++) { if (tl->slots[i].ca_hndl == -1) { slot_id = i; break; } } if (slot_id == -1) { tl->error = EN50221ERR_OUTOFSLOTS; pthread_mutex_unlock(&tl->global_lock); return -1; } // set up the slot struct pthread_mutex_lock(&tl->slots[slot_id].slot_lock); tl->slots[slot_id].ca_hndl = ca_hndl; tl->slots[slot_id].slot = slot; tl->slots[slot_id].response_timeout = response_timeout; tl->slots[slot_id].poll_delay = poll_delay; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); tl->slots_changed = 1; pthread_mutex_unlock(&tl->global_lock); return slot_id; } void en50221_tl_destroy_slot(struct en50221_transport_layer *tl, uint8_t slot_id) { int i; if (slot_id >= tl->max_slots) return; // lock pthread_mutex_lock(&tl->global_lock); // clear the slot pthread_mutex_lock(&tl->slots[slot_id].slot_lock); tl->slots[slot_id].ca_hndl = -1; for (i = 0; i < tl->max_connections_per_slot; i++) { tl->slots[slot_id].connections[i].state = T_STATE_IDLE; tl->slots[slot_id].connections[i].tx_time.tv_sec = 0; tl->slots[slot_id].connections[i].last_poll_time.tv_sec = 0; tl->slots[slot_id].connections[i].last_poll_time.tv_usec = 0; if (tl->slots[slot_id].connections[i].chain_buffer) { free(tl->slots[slot_id].connections[i]. chain_buffer); } tl->slots[slot_id].connections[i].chain_buffer = NULL; tl->slots[slot_id].connections[i].buffer_length = 0; struct en50221_message *cur_msg = tl->slots[slot_id].connections[i].send_queue; while (cur_msg) { struct en50221_message *next_msg = cur_msg->next; free(cur_msg); cur_msg = next_msg; } tl->slots[slot_id].connections[i].send_queue = NULL; tl->slots[slot_id].connections[i].send_queue_tail = NULL; } pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); // tell upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb) cb(cb_arg, T_CALLBACK_REASON_SLOTCLOSE, NULL, 0, slot_id, 0); tl->slots_changed = 1; pthread_mutex_unlock(&tl->global_lock); } int en50221_tl_poll(struct en50221_transport_layer *tl) { uint8_t data[4096]; int slot_id; int j; // make up pollfds if the slots have changed pthread_mutex_lock(&tl->global_lock); if (tl->slots_changed) { for (slot_id = 0; slot_id < tl->max_slots; slot_id++) { if (tl->slots[slot_id].ca_hndl != -1) { tl->slot_pollfds[slot_id].fd = tl->slots[slot_id].ca_hndl; tl->slot_pollfds[slot_id].events = POLLIN | POLLPRI | POLLERR; tl->slot_pollfds[slot_id].revents = 0; } else { tl->slot_pollfds[slot_id].fd = 0; tl->slot_pollfds[slot_id].events = 0; tl->slot_pollfds[slot_id].revents = 0; } } tl->slots_changed = 0; } pthread_mutex_unlock(&tl->global_lock); // anything happened? if (poll(tl->slot_pollfds, tl->max_slots, 10) < 0) { tl->error_slot = -1; tl->error = EN50221ERR_CAREAD; return -1; } // go through all slots (even though poll may not have reported any events for (slot_id = 0; slot_id < tl->max_slots; slot_id++) { // check if this slot is still used and get its handle pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); continue; } int ca_hndl = tl->slots[slot_id].ca_hndl; if (tl->slot_pollfds[slot_id].revents & (POLLPRI | POLLIN)) { // read data uint8_t r_slot_id; uint8_t connection_id; int readcnt = dvbca_link_read(ca_hndl, &r_slot_id, &connection_id, data, sizeof(data)); if (readcnt < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAREAD; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // process it if we got some if (readcnt > 0) { if (tl->slots[slot_id].slot != r_slot_id) { // this message is for an other CAM of the same CA int new_slot_id; for (new_slot_id = 0; new_slot_id < tl->max_slots; new_slot_id++) { if ((tl->slots[new_slot_id].ca_hndl == ca_hndl) && (tl->slots[new_slot_id].slot == r_slot_id)) break; } if (new_slot_id != tl->max_slots) { // we found the requested CAM pthread_mutex_lock(&tl->slots[new_slot_id].slot_lock); if (en50221_tl_process_data(tl, new_slot_id, data, readcnt)) { pthread_mutex_unlock(&tl->slots[new_slot_id].slot_lock); pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } pthread_mutex_unlock(&tl->slots[new_slot_id].slot_lock); } else { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } } else if (en50221_tl_process_data(tl, slot_id, data, readcnt)) { pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } } } else if (tl->slot_pollfds[slot_id].revents & POLLERR) { // an error was reported tl->error_slot = slot_id; tl->error = EN50221ERR_CAREAD; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // poll the connections on this slot + check for timeouts for (j = 0; j < tl->max_connections_per_slot; j++) { // ignore connection if idle if (tl->slots[slot_id].connections[j].state == T_STATE_IDLE) { continue; } // send queued data if (tl->slots[slot_id].connections[j].state & (T_STATE_IN_CREATION | T_STATE_ACTIVE | T_STATE_ACTIVE_DELETEQUEUED)) { // send data if there is some to go and we're not waiting for a response already if (tl->slots[slot_id].connections[j].send_queue && (tl->slots[slot_id].connections[j].tx_time.tv_sec == 0)) { // get the message struct en50221_message *msg = tl->slots[slot_id].connections[j].send_queue; if (msg->next != NULL) { tl->slots[slot_id].connections[j].send_queue = msg->next; } else { tl->slots[slot_id].connections[j].send_queue = NULL; tl->slots[slot_id].connections[j].send_queue_tail = NULL; } // send the message if (dvbca_link_write(tl->slots[slot_id].ca_hndl, tl->slots[slot_id].slot, j, msg->data, msg->length) < 0) { free(msg); pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; print(LOG_LEVEL, ERROR, 1, "CAWrite failed"); return -1; } gettimeofday(&tl->slots[slot_id].connections[j].tx_time, 0); // fixup connection state for T_DELETE_T_C if (msg->length && (msg->data[0] == T_DELETE_T_C)) { tl->slots[slot_id].connections[j].state = T_STATE_IN_DELETION; if (tl->slots[slot_id].connections[j].chain_buffer) { free(tl->slots[slot_id].connections[j].chain_buffer); } tl->slots[slot_id].connections[j].chain_buffer = NULL; tl->slots[slot_id].connections[j].buffer_length = 0; } free(msg); } } // poll it if we're not expecting a reponse and the poll time has elapsed if (tl->slots[slot_id].connections[j].state & T_STATE_ACTIVE) { if ((tl->slots[slot_id].connections[j].tx_time.tv_sec == 0) && (time_after(tl->slots[slot_id].connections[j].last_poll_time, tl->slots[slot_id].poll_delay))) { gettimeofday(&tl->slots[slot_id].connections[j].last_poll_time, 0); if (en50221_tl_poll_tc(tl, slot_id, j)) { pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } } } // check for timeouts - in any state if (tl->slots[slot_id].connections[j].tx_time.tv_sec && (time_after(tl->slots[slot_id].connections[j].tx_time, tl->slots[slot_id].response_timeout))) { if (tl->slots[slot_id].connections[j].state & (T_STATE_IN_CREATION |T_STATE_IN_DELETION)) { tl->slots[slot_id].connections[j].state = T_STATE_IDLE; } else if (tl->slots[slot_id].connections[j].state & (T_STATE_ACTIVE | T_STATE_ACTIVE_DELETEQUEUED)) { tl->error_slot = slot_id; tl->error = EN50221ERR_TIMEOUT; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } } } pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); } return 0; } void en50221_tl_register_callback(struct en50221_transport_layer *tl, en50221_tl_callback callback, void *arg) { pthread_mutex_lock(&tl->setcallback_lock); tl->callback = callback; tl->callback_arg = arg; pthread_mutex_unlock(&tl->setcallback_lock); } int en50221_tl_get_error_slot(struct en50221_transport_layer *tl) { return tl->error_slot; } int en50221_tl_get_error(struct en50221_transport_layer *tl) { return tl->error; } int en50221_tl_send_data(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_size) { #ifdef DEBUG_TXDATA printf("[[[[[[[[[[[[[[[[[[[[\n"); uint32_t ii = 0; for (ii = 0; ii < data_size; ii++) { printf("%02x: %02x\n", ii, data[ii]); } printf("]]]]]]]]]]]]]]]]]]]]\n"); #endif if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (connection_id >= tl->max_connections_per_slot) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // allocate msg structure struct en50221_message *msg = malloc(sizeof(struct en50221_message) + data_size + 10); if (msg == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // make up data to send int length_field_len; msg->data[0] = T_DATA_LAST; if ((length_field_len = asn_1_encode(data_size + 1, msg->data + 1, 3)) < 0) { free(msg); tl->error_slot = slot_id; tl->error = EN50221ERR_ASNENCODE; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } msg->data[1 + length_field_len] = connection_id; memcpy(msg->data + 1 + length_field_len + 1, data, data_size); msg->length = 1 + length_field_len + 1 + data_size; // queue it for transmission queue_message(tl, slot_id, connection_id, msg); pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return 0; } int en50221_tl_send_datav(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, struct iovec *vector, int iov_count) { #ifdef DEBUG_TXDATA printf("[[[[[[[[[[[[[[[[[[[[\n"); uint32_t ii = 0; uint32_t iipos = 0; for (ii = 0; ii < (uint32_t) iov_count; ii++) { uint32_t jj; for (jj = 0; jj < vector[ii].iov_len; jj++) { printf("%02x: %02x\n", jj + iipos, *((uint8_t *) (vector[ii].iov_base) + jj)); } iipos += vector[ii].iov_len; } printf("]]]]]]]]]]]]]]]]]]]]\n"); #endif if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (connection_id >= tl->max_connections_per_slot) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // calculate the total length of the data to send uint32_t data_size = 0; int i; for (i = 0; i < iov_count; i++) { data_size += vector[i].iov_len; } // allocate msg structure struct en50221_message *msg = malloc(sizeof(struct en50221_message) + data_size + 10); if (msg == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // make up data to send int length_field_len; msg->data[0] = T_DATA_LAST; if ((length_field_len = asn_1_encode(data_size + 1, msg->data + 1, 3)) < 0) { free(msg); tl->error_slot = slot_id; tl->error = EN50221ERR_ASNENCODE; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } msg->data[1 + length_field_len] = connection_id; msg->length = 1 + length_field_len + 1 + data_size; msg->next = NULL; // merge the iovecs uint32_t pos = 1 + length_field_len + 1; for (i = 0; i < iov_count; i++) { memcpy(msg->data + pos, vector[i].iov_base, vector[i].iov_len); pos += vector[i].iov_len; } // queue it for transmission queue_message(tl, slot_id, connection_id, msg); pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return 0; } int en50221_tl_new_tc(struct en50221_transport_layer *tl, uint8_t slot_id) { // check if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // allocate a new connection if possible int conid = en50221_tl_alloc_new_tc(tl, slot_id); if (conid == -1) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFCONNECTIONS; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // allocate msg structure struct en50221_message *msg = malloc(sizeof(struct en50221_message) + 3); if (msg == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // make up the data to send msg->data[0] = T_CREATE_T_C; msg->data[1] = 1; msg->data[2] = conid; msg->length = 3; msg->next = NULL; // queue it for transmission queue_message(tl, slot_id, conid, msg); // done pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return conid; } int en50221_tl_del_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // check if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (connection_id >= tl->max_connections_per_slot) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (!(tl->slots[slot_id].connections[connection_id].state & (T_STATE_ACTIVE | T_STATE_IN_DELETION))) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADSTATE; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // allocate msg structure struct en50221_message *msg = malloc(sizeof(struct en50221_message) + 3); if (msg == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // make up the data to send msg->data[0] = T_DELETE_T_C; msg->data[1] = 1; msg->data[2] = connection_id; msg->length = 3; msg->next = NULL; // queue it for transmission queue_message(tl, slot_id, connection_id, msg); tl->slots[slot_id].connections[connection_id].state = T_STATE_ACTIVE_DELETEQUEUED; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return 0; } int en50221_tl_get_connection_state(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (connection_id >= tl->max_connections_per_slot) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } int state = tl->slots[slot_id].connections[connection_id].state; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return state; } // ask the module for new data static int en50221_tl_poll_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { gettimeofday(&tl->slots[slot_id].connections[connection_id]. tx_time, 0); // send command uint8_t hdr[3]; hdr[0] = T_DATA_LAST; hdr[1] = 1; hdr[2] = connection_id; if (dvbca_link_write(tl->slots[slot_id].ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 3) < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } return 0; } // handle incoming data static int en50221_tl_process_data(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t * data, uint32_t data_length) { int result; #ifdef DEBUG_RXDATA printf("-------------------\n"); uint32_t ii = 0; for (ii = 0; ii < data_length; ii++) { printf("%02x: %02x\n", ii, data[ii]); } printf("+++++++++++++++++++\n"); #endif // process the received data while (data_length) { // parse the header uint8_t tpdu_tag = data[0]; uint16_t asn_data_length; int length_field_len; if ((length_field_len = asn_1_decode(&asn_data_length, data + 1, data_length - 1)) < 0) { print(LOG_LEVEL, ERROR, 1, "Received data with invalid asn from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } if ((asn_data_length < 1) || (asn_data_length > (data_length - (1 + length_field_len)))) { print(LOG_LEVEL, ERROR, 1, "Received data with invalid length from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } uint8_t connection_id = data[1 + length_field_len]; data += 1 + length_field_len + 1; data_length -= (1 + length_field_len + 1); asn_data_length--; // check the connection_id if (connection_id >= tl->max_connections_per_slot) { print(LOG_LEVEL, ERROR, 1, "Received bad connection id %02x from module on slot %02x\n", connection_id, slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; return -1; } // process the TPDUs switch (tpdu_tag) { case T_C_T_C_REPLY: if ((result = en50221_tl_handle_create_tc_reply(tl, slot_id, connection_id)) < 0) { return -1; } break; case T_DELETE_T_C: if ((result = en50221_tl_handle_delete_tc(tl, slot_id, connection_id)) < 0) { return -1; } break; case T_D_T_C_REPLY: if ((result = en50221_tl_handle_delete_tc_reply(tl, slot_id, connection_id)) < 0) { return -1; } break; case T_REQUEST_T_C: if ((result = en50221_tl_handle_request_tc(tl, slot_id, connection_id)) < 0) { return -1; } break; case T_DATA_MORE: if ((result = en50221_tl_handle_data_more(tl, slot_id, connection_id, data, asn_data_length)) < 0) { return -1; } break; case T_DATA_LAST: if ((result = en50221_tl_handle_data_last(tl, slot_id, connection_id, data, asn_data_length)) < 0) { return -1; } break; case T_SB: if ((result = en50221_tl_handle_sb(tl, slot_id, connection_id, data, asn_data_length)) < 0) { return -1; } break; default: print(LOG_LEVEL, ERROR, 1, "Recieved unexpected TPDU tag %02x from module on slot %02x\n", tpdu_tag, slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // skip over the consumed data data += asn_data_length; data_length -= asn_data_length; } return 0; } static int en50221_tl_handle_create_tc_reply(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // set this connection to state active if (tl->slots[slot_id].connections[connection_id].state == T_STATE_IN_CREATION) { tl->slots[slot_id].connections[connection_id].state = T_STATE_ACTIVE; tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; // tell upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb) cb(cb_arg, T_CALLBACK_REASON_CONNECTIONOPEN, NULL, 0, slot_id, connection_id); } else { print(LOG_LEVEL, ERROR, 1, "Received T_C_T_C_REPLY for connection not in " "T_STATE_IN_CREATION from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } return 0; } static int en50221_tl_handle_delete_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // immediately delete this connection and send D_T_C_REPLY if (tl->slots[slot_id].connections[connection_id].state & (T_STATE_ACTIVE | T_STATE_IN_DELETION)) { // clear down the slot tl->slots[slot_id].connections[connection_id].state = T_STATE_IDLE; if (tl->slots[slot_id].connections[connection_id].chain_buffer) { free(tl->slots[slot_id].connections[connection_id].chain_buffer); } tl->slots[slot_id].connections[connection_id].chain_buffer = NULL; tl->slots[slot_id].connections[connection_id].buffer_length = 0; // send the reply uint8_t hdr[3]; hdr[0] = T_D_T_C_REPLY; hdr[1] = 1; hdr[2] = connection_id; if (dvbca_link_write(tl->slots[slot_id].ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 3) < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } // tell upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb) cb(cb_arg, T_CALLBACK_REASON_CONNECTIONCLOSE, NULL, 0, slot_id, connection_id); } else { print(LOG_LEVEL, ERROR, 1, "Received T_DELETE_T_C for inactive connection from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } return 0; } static int en50221_tl_handle_delete_tc_reply(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // delete this connection, should be in T_STATE_IN_DELETION already if (tl->slots[slot_id].connections[connection_id].state == T_STATE_IN_DELETION) { tl->slots[slot_id].connections[connection_id].state = T_STATE_IDLE; } else { print(LOG_LEVEL, ERROR, 1, "Received T_D_T_C_REPLY received for connection not in " "T_STATE_IN_DELETION from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } return 0; } static int en50221_tl_handle_request_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // allocate a new connection if possible int conid = en50221_tl_alloc_new_tc(tl, slot_id); int ca_hndl = tl->slots[slot_id].ca_hndl; if (conid == -1) { print(LOG_LEVEL, ERROR, 1, "Too many connections requested by module on slot %02x\n", slot_id); // send the error uint8_t hdr[4]; hdr[0] = T_T_C_ERROR; hdr[1] = 2; hdr[2] = connection_id; hdr[3] = 1; if (dvbca_link_write(ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 4) < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } tl->slots[slot_id].connections[connection_id].tx_time. tv_sec = 0; } else { // send the NEW_T_C on the connection we received it on uint8_t hdr[4]; hdr[0] = T_NEW_T_C; hdr[1] = 2; hdr[2] = connection_id; hdr[3] = conid; if (dvbca_link_write(ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 4) < 0) { tl->slots[slot_id].connections[conid].state = T_STATE_IDLE; tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; // send the CREATE_T_C on the new connnection hdr[0] = T_CREATE_T_C; hdr[1] = 1; hdr[2] = conid; if (dvbca_link_write(ca_hndl, tl->slots[slot_id].slot, conid, hdr, 3) < 0) { tl->slots[slot_id].connections[conid].state = T_STATE_IDLE; tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } gettimeofday(&tl->slots[slot_id].connections[conid].tx_time, 0); // tell upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb) cb(cb_arg, T_CALLBACK_REASON_CAMCONNECTIONOPEN, NULL, 0, slot_id, conid); } return 0; } static int en50221_tl_handle_data_more(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length) { // connection in correct state? if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { print(LOG_LEVEL, ERROR, 1, "Received T_DATA_MORE for connection not in " "T_STATE_ACTIVE from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // a chained data packet is coming in, save // it to the buffer and wait for more tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; int new_data_length = tl->slots[slot_id].connections[connection_id].buffer_length + data_length; uint8_t *new_data_buffer = realloc(tl->slots[slot_id].connections[connection_id].chain_buffer, new_data_length); if (new_data_buffer == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; return -1; } tl->slots[slot_id].connections[connection_id].chain_buffer = new_data_buffer; memcpy(tl->slots[slot_id].connections[connection_id].chain_buffer + tl->slots[slot_id].connections[connection_id].buffer_length, data, data_length); tl->slots[slot_id].connections[connection_id].buffer_length = new_data_length; return 0; } static int en50221_tl_handle_data_last(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length) { // connection in correct state? if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { print(LOG_LEVEL, ERROR, 1, "Received T_DATA_LAST received for connection not in " "T_STATE_ACTIVE from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // last package of a chain or single package comes in tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; if (tl->slots[slot_id].connections[connection_id].chain_buffer == NULL) { // single package => dispatch immediately pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb && data_length) { pthread_mutex_unlock(&tl->slots[slot_id]. slot_lock); cb(cb_arg, T_CALLBACK_REASON_DATA, data, data_length, slot_id, connection_id); pthread_mutex_lock(&tl->slots[slot_id].slot_lock); } } else { int new_data_length = tl->slots[slot_id].connections[connection_id].buffer_length + data_length; uint8_t *new_data_buffer = realloc(tl->slots[slot_id].connections[connection_id].chain_buffer, new_data_length); if (new_data_buffer == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; return -1; } memcpy(new_data_buffer + tl->slots[slot_id].connections[connection_id]. buffer_length, data, data_length); // clean the buffer position tl->slots[slot_id].connections[connection_id].chain_buffer = NULL; tl->slots[slot_id].connections[connection_id].buffer_length = 0; // tell the upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb && data_length) { pthread_mutex_unlock(&tl->slots[slot_id]. slot_lock); cb(cb_arg, T_CALLBACK_REASON_DATA, new_data_buffer, new_data_length, slot_id, connection_id); pthread_mutex_lock(&tl->slots[slot_id].slot_lock); } free(new_data_buffer); } return 0; } static int en50221_tl_handle_sb(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length) { // is the connection id ok? if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { print(LOG_LEVEL, ERROR, 1, "Received T_SB for connection not in T_STATE_ACTIVE from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // did we get enough data in the T_SB? if (data_length != 1) { print(LOG_LEVEL, ERROR, 1, "Recieved T_SB with invalid length from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // tell it to send the data if it says there is some if (data[0] & 0x80) { int ca_hndl = tl->slots[slot_id].ca_hndl; // send the RCV uint8_t hdr[3]; hdr[0] = T_RCV; hdr[1] = 1; hdr[2] = connection_id; if (dvbca_link_write(ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 3) < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } gettimeofday(&tl->slots[slot_id].connections[connection_id].tx_time, 0); } else { // no data - indicate not waiting for anything now tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; } return 0; } static int en50221_tl_alloc_new_tc(struct en50221_transport_layer *tl, uint8_t slot_id) { // we browse through the array of connection // types, to look for the first unused one int i, conid = -1; for (i = 1; i < tl->max_connections_per_slot; i++) { if (tl->slots[slot_id].connections[i].state == T_STATE_IDLE) { conid = i; break; } } if (conid == -1) { print(LOG_LEVEL, ERROR, 1, "CREATE_T_C failed: no more connections available\n"); return -1; } // set up the connection struct tl->slots[slot_id].connections[conid].state = T_STATE_IN_CREATION; tl->slots[slot_id].connections[conid].chain_buffer = NULL; tl->slots[slot_id].connections[conid].buffer_length = 0; return conid; } static void queue_message(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, struct en50221_message *msg) { msg->next = NULL; if (tl->slots[slot_id].connections[connection_id].send_queue_tail) { tl->slots[slot_id].connections[connection_id].send_queue_tail->next = msg; tl->slots[slot_id].connections[connection_id].send_queue_tail = msg; } else { tl->slots[slot_id].connections[connection_id].send_queue = msg; tl->slots[slot_id].connections[connection_id].send_queue_tail = msg; } }