/* en50221 encoder An implementation for libdvb an implementation for the en50221 transport layer Copyright (C) 2004, 2005 Manu Abraham <abraham.manu@gmail.com> Copyright (C) 2005 Julian Scheel (julian at jusst dot de) Copyright (C) 2006 Andrew de Quincey (adq_dvb@lidskialf.net) This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include <stdio.h> #include <unistd.h> #include <string.h> #include <pthread.h> #include <fcntl.h> #include <sys/ioctl.h> #include <sys/poll.h> #include <time.h> #include <libdvbmisc/dvbmisc.h> #include <libdvbapi/dvbca.h> #include "en50221_errno.h" #include "en50221_transport.h" #include "asn_1.h" // these are the Transport Tags, like // described in EN50221, Annex A.4.1.13 (pg70) #define T_SB 0x80 // sb primitive h<--m #define T_RCV 0x81 // receive primitive h-->m #define T_CREATE_T_C 0x82 // create transport connection primitive h-->m #define T_C_T_C_REPLY 0x83 // ctc reply primitive h<--m #define T_DELETE_T_C 0x84 // delete tc primitive h<->m #define T_D_T_C_REPLY 0x85 // dtc reply primitive h<->m #define T_REQUEST_T_C 0x86 // request transport connection primitive h<--m #define T_NEW_T_C 0x87 // new tc / reply to t_request primitive h-->m #define T_T_C_ERROR 0x77 // error creating tc primitive h-->m #define T_DATA_LAST 0xA0 // convey data from higher constructed h<->m // layers #define T_DATA_MORE 0xA1 // convey data from higher constructed h<->m // layers struct en50221_message { struct en50221_message *next; uint32_t length; uint8_t data[0]; }; struct en50221_connection { uint32_t state; // the current state: idle/in_delete/in_create/active struct timeval tx_time; // time last request was sent from host->module, or 0 if ok struct timeval last_poll_time; // time of last poll transmission uint8_t *chain_buffer; // used to save parts of chained packets uint32_t buffer_length; struct en50221_message *send_queue; struct en50221_message *send_queue_tail; }; struct en50221_slot { int ca_hndl; uint8_t slot; // CAM slot struct en50221_connection *connections; pthread_mutex_t slot_lock; uint32_t response_timeout; uint32_t poll_delay; }; struct en50221_transport_layer { uint8_t max_slots; uint8_t max_connections_per_slot; struct en50221_slot *slots; struct pollfd *slot_pollfds; int slots_changed; pthread_mutex_t global_lock; pthread_mutex_t setcallback_lock; int error; int error_slot; en50221_tl_callback callback; void *callback_arg; }; static int en50221_tl_process_data(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t * data, uint32_t data_length); static int en50221_tl_poll_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_alloc_new_tc(struct en50221_transport_layer *tl, uint8_t slot_id); static void queue_message(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, struct en50221_message *msg); static int en50221_tl_handle_create_tc_reply(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_handle_delete_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_handle_delete_tc_reply(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_handle_request_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id); static int en50221_tl_handle_data_more(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length); static int en50221_tl_handle_data_last(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length); static int en50221_tl_handle_sb(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length); struct en50221_transport_layer *en50221_tl_create(uint8_t max_slots, uint8_t max_connections_per_slot) { struct en50221_transport_layer *tl = NULL; int i; int j; // setup structure tl = (struct en50221_transport_layer *) malloc(sizeof(struct en50221_transport_layer)); if (tl == NULL) goto error_exit; tl->max_slots = max_slots; tl->max_connections_per_slot = max_connections_per_slot; tl->slots = NULL; tl->slot_pollfds = NULL; tl->slots_changed = 1; tl->callback = NULL; tl->callback_arg = NULL; tl->error_slot = 0; tl->error = 0; pthread_mutex_init(&tl->global_lock, NULL); pthread_mutex_init(&tl->setcallback_lock, NULL); // create the slots tl->slots = malloc(sizeof(struct en50221_slot) * max_slots); if (tl->slots == NULL) goto error_exit; // set them up for (i = 0; i < max_slots; i++) { tl->slots[i].ca_hndl = -1; // create the connections for this slot tl->slots[i].connections = malloc(sizeof(struct en50221_connection) * max_connections_per_slot); if (tl->slots[i].connections == NULL) goto error_exit; // create a mutex for the slot pthread_mutex_init(&tl->slots[i].slot_lock, NULL); // set them up for (j = 0; j < max_connections_per_slot; j++) { tl->slots[i].connections[j].state = T_STATE_IDLE; tl->slots[i].connections[j].tx_time.tv_sec = 0; tl->slots[i].connections[j].last_poll_time.tv_sec = 0; tl->slots[i].connections[j].last_poll_time.tv_usec = 0; tl->slots[i].connections[j].chain_buffer = NULL; tl->slots[i].connections[j].buffer_length = 0; tl->slots[i].connections[j].send_queue = NULL; tl->slots[i].connections[j].send_queue_tail = NULL; } } // create the pollfds tl->slot_pollfds = malloc(sizeof(struct pollfd) * max_slots); if (tl->slot_pollfds == NULL) { goto error_exit; } memset(tl->slot_pollfds, 0, sizeof(struct pollfd) * max_slots); return tl; error_exit: en50221_tl_destroy(tl); return NULL; } // Destroy an instance of the transport layer void en50221_tl_destroy(struct en50221_transport_layer *tl) { int i, j; if (tl) { if (tl->slots) { for (i = 0; i < tl->max_slots; i++) { if (tl->slots[i].connections) { for (j = 0; j < tl->max_connections_per_slot; j++) { if (tl->slots[i].connections[j].chain_buffer) { free(tl->slots[i].connections[j].chain_buffer); } struct en50221_message *cur_msg = tl->slots[i].connections[j].send_queue; while (cur_msg) { struct en50221_message *next_msg = cur_msg->next; free(cur_msg); cur_msg = next_msg; } tl->slots[i].connections[j].send_queue = NULL; tl->slots[i].connections[j].send_queue_tail = NULL; } free(tl->slots[i].connections); pthread_mutex_destroy(&tl->slots[i].slot_lock); } } free(tl->slots); } if (tl->slot_pollfds) { free(tl->slot_pollfds); } pthread_mutex_destroy(&tl->setcallback_lock); pthread_mutex_destroy(&tl->global_lock); free(tl); } } // this can be called from the user-space app to // register new slots that we should work with int en50221_tl_register_slot(struct en50221_transport_layer *tl, int ca_hndl, uint8_t slot, uint32_t response_timeout, uint32_t poll_delay) { // lock pthread_mutex_lock(&tl->global_lock); // we browse through the array of slots // to look for the first unused one int i; int16_t slot_id = -1; for (i = 0; i < tl->max_slots; i++) { if (tl->slots[i].ca_hndl == -1) { slot_id = i; break; } } if (slot_id == -1) { tl->error = EN50221ERR_OUTOFSLOTS; pthread_mutex_unlock(&tl->global_lock); return -1; } // set up the slot struct pthread_mutex_lock(&tl->slots[slot_id].slot_lock); tl->slots[slot_id].ca_hndl = ca_hndl; tl->slots[slot_id].slot = slot; tl->slots[slot_id].response_timeout = response_timeout; tl->slots[slot_id].poll_delay = poll_delay; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); tl->slots_changed = 1; pthread_mutex_unlock(&tl->global_lock); return slot_id; } void en50221_tl_destroy_slot(struct en50221_transport_layer *tl, uint8_t slot_id) { int i; if (slot_id >= tl->max_slots) return; // lock pthread_mutex_lock(&tl->global_lock); // clear the slot pthread_mutex_lock(&tl->slots[slot_id].slot_lock); tl->slots[slot_id].ca_hndl = -1; for (i = 0; i < tl->max_connections_per_slot; i++) { tl->slots[slot_id].connections[i].state = T_STATE_IDLE; tl->slots[slot_id].connections[i].tx_time.tv_sec = 0; tl->slots[slot_id].connections[i].last_poll_time.tv_sec = 0; tl->slots[slot_id].connections[i].last_poll_time.tv_usec = 0; if (tl->slots[slot_id].connections[i].chain_buffer) { free(tl->slots[slot_id].connections[i]. chain_buffer); } tl->slots[slot_id].connections[i].chain_buffer = NULL; tl->slots[slot_id].connections[i].buffer_length = 0; struct en50221_message *cur_msg = tl->slots[slot_id].connections[i].send_queue; while (cur_msg) { struct en50221_message *next_msg = cur_msg->next; free(cur_msg); cur_msg = next_msg; } tl->slots[slot_id].connections[i].send_queue = NULL; tl->slots[slot_id].connections[i].send_queue_tail = NULL; } pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); // tell upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb) cb(cb_arg, T_CALLBACK_REASON_SLOTCLOSE, NULL, 0, slot_id, 0); tl->slots_changed = 1; pthread_mutex_unlock(&tl->global_lock); } int en50221_tl_poll(struct en50221_transport_layer *tl) { uint8_t data[4096]; int slot_id; int j; // make up pollfds if the slots have changed pthread_mutex_lock(&tl->global_lock); if (tl->slots_changed) { for (slot_id = 0; slot_id < tl->max_slots; slot_id++) { if (tl->slots[slot_id].ca_hndl != -1) { tl->slot_pollfds[slot_id].fd = tl->slots[slot_id].ca_hndl; tl->slot_pollfds[slot_id].events = POLLIN | POLLPRI | POLLERR; tl->slot_pollfds[slot_id].revents = 0; } else { tl->slot_pollfds[slot_id].fd = 0; tl->slot_pollfds[slot_id].events = 0; tl->slot_pollfds[slot_id].revents = 0; } } tl->slots_changed = 0; } pthread_mutex_unlock(&tl->global_lock); // anything happened? if (poll(tl->slot_pollfds, tl->max_slots, 10) < 0) { tl->error_slot = -1; tl->error = EN50221ERR_CAREAD; return -1; } // go through all slots (even though poll may not have reported any events for (slot_id = 0; slot_id < tl->max_slots; slot_id++) { // check if this slot is still used and get its handle pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); continue; } int ca_hndl = tl->slots[slot_id].ca_hndl; if (tl->slot_pollfds[slot_id].revents & (POLLPRI | POLLIN)) { // read data uint8_t r_slot_id; uint8_t connection_id; int readcnt = dvbca_link_read(ca_hndl, &r_slot_id, &connection_id, data, sizeof(data)); if (readcnt < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAREAD; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // process it if we got some if (readcnt > 0) { if (tl->slots[slot_id].slot != r_slot_id) { // this message is for an other CAM of the same CA int new_slot_id; for (new_slot_id = 0; new_slot_id < tl->max_slots; new_slot_id++) { if ((tl->slots[new_slot_id].ca_hndl == ca_hndl) && (tl->slots[new_slot_id].slot == r_slot_id)) break; } if (new_slot_id != tl->max_slots) { // we found the requested CAM pthread_mutex_lock(&tl->slots[new_slot_id].slot_lock); if (en50221_tl_process_data(tl, new_slot_id, data, readcnt)) { pthread_mutex_unlock(&tl->slots[new_slot_id].slot_lock); pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } pthread_mutex_unlock(&tl->slots[new_slot_id].slot_lock); } else { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } } else if (en50221_tl_process_data(tl, slot_id, data, readcnt)) { pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } } } else if (tl->slot_pollfds[slot_id].revents & POLLERR) { // an error was reported tl->error_slot = slot_id; tl->error = EN50221ERR_CAREAD; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // poll the connections on this slot + check for timeouts for (j = 0; j < tl->max_connections_per_slot; j++) { // ignore connection if idle if (tl->slots[slot_id].connections[j].state == T_STATE_IDLE) { continue; } // send queued data if (tl->slots[slot_id].connections[j].state & (T_STATE_IN_CREATION | T_STATE_ACTIVE | T_STATE_ACTIVE_DELETEQUEUED)) { // send data if there is some to go and we're not waiting for a response already if (tl->slots[slot_id].connections[j].send_queue && (tl->slots[slot_id].connections[j].tx_time.tv_sec == 0)) { // get the message struct en50221_message *msg = tl->slots[slot_id].connections[j].send_queue; if (msg->next != NULL) { tl->slots[slot_id].connections[j].send_queue = msg->next; } else { tl->slots[slot_id].connections[j].send_queue = NULL; tl->slots[slot_id].connections[j].send_queue_tail = NULL; } // send the message if (dvbca_link_write(tl->slots[slot_id].ca_hndl, tl->slots[slot_id].slot, j, msg->data, msg->length) < 0) { free(msg); pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; print(LOG_LEVEL, ERROR, 1, "CAWrite failed"); return -1; } gettimeofday(&tl->slots[slot_id].connections[j].tx_time, 0); // fixup connection state for T_DELETE_T_C if (msg->length && (msg->data[0] == T_DELETE_T_C)) { tl->slots[slot_id].connections[j].state = T_STATE_IN_DELETION; if (tl->slots[slot_id].connections[j].chain_buffer) { free(tl->slots[slot_id].connections[j].chain_buffer); } tl->slots[slot_id].connections[j].chain_buffer = NULL; tl->slots[slot_id].connections[j].buffer_length = 0; } free(msg); } } // poll it if we're not expecting a reponse and the poll time has elapsed if (tl->slots[slot_id].connections[j].state & T_STATE_ACTIVE) { if ((tl->slots[slot_id].connections[j].tx_time.tv_sec == 0) && (time_after(tl->slots[slot_id].connections[j].last_poll_time, tl->slots[slot_id].poll_delay))) { gettimeofday(&tl->slots[slot_id].connections[j].last_poll_time, 0); if (en50221_tl_poll_tc(tl, slot_id, j)) { pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } } } // check for timeouts - in any state if (tl->slots[slot_id].connections[j].tx_time.tv_sec && (time_after(tl->slots[slot_id].connections[j].tx_time, tl->slots[slot_id].response_timeout))) { if (tl->slots[slot_id].connections[j].state & (T_STATE_IN_CREATION |T_STATE_IN_DELETION)) { tl->slots[slot_id].connections[j].state = T_STATE_IDLE; } else if (tl->slots[slot_id].connections[j].state & (T_STATE_ACTIVE | T_STATE_ACTIVE_DELETEQUEUED)) { tl->error_slot = slot_id; tl->error = EN50221ERR_TIMEOUT; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } } } pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); } return 0; } void en50221_tl_register_callback(struct en50221_transport_layer *tl, en50221_tl_callback callback, void *arg) { pthread_mutex_lock(&tl->setcallback_lock); tl->callback = callback; tl->callback_arg = arg; pthread_mutex_unlock(&tl->setcallback_lock); } int en50221_tl_get_error_slot(struct en50221_transport_layer *tl) { return tl->error_slot; } int en50221_tl_get_error(struct en50221_transport_layer *tl) { return tl->error; } int en50221_tl_send_data(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_size) { #ifdef DEBUG_TXDATA printf("[[[[[[[[[[[[[[[[[[[[\n"); uint32_t ii = 0; for (ii = 0; ii < data_size; ii++) { printf("%02x: %02x\n", ii, data[ii]); } printf("]]]]]]]]]]]]]]]]]]]]\n"); #endif if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (connection_id >= tl->max_connections_per_slot) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // allocate msg structure struct en50221_message *msg = malloc(sizeof(struct en50221_message) + data_size + 10); if (msg == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // make up data to send int length_field_len; msg->data[0] = T_DATA_LAST; if ((length_field_len = asn_1_encode(data_size + 1, msg->data + 1, 3)) < 0) { free(msg); tl->error_slot = slot_id; tl->error = EN50221ERR_ASNENCODE; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } msg->data[1 + length_field_len] = connection_id; memcpy(msg->data + 1 + length_field_len + 1, data, data_size); msg->length = 1 + length_field_len + 1 + data_size; // queue it for transmission queue_message(tl, slot_id, connection_id, msg); pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return 0; } int en50221_tl_send_datav(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, struct iovec *vector, int iov_count) { #ifdef DEBUG_TXDATA printf("[[[[[[[[[[[[[[[[[[[[\n"); uint32_t ii = 0; uint32_t iipos = 0; for (ii = 0; ii < (uint32_t) iov_count; ii++) { uint32_t jj; for (jj = 0; jj < vector[ii].iov_len; jj++) { printf("%02x: %02x\n", jj + iipos, *((uint8_t *) (vector[ii].iov_base) + jj)); } iipos += vector[ii].iov_len; } printf("]]]]]]]]]]]]]]]]]]]]\n"); #endif if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (connection_id >= tl->max_connections_per_slot) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // calculate the total length of the data to send uint32_t data_size = 0; int i; for (i = 0; i < iov_count; i++) { data_size += vector[i].iov_len; } // allocate msg structure struct en50221_message *msg = malloc(sizeof(struct en50221_message) + data_size + 10); if (msg == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // make up data to send int length_field_len; msg->data[0] = T_DATA_LAST; if ((length_field_len = asn_1_encode(data_size + 1, msg->data + 1, 3)) < 0) { free(msg); tl->error_slot = slot_id; tl->error = EN50221ERR_ASNENCODE; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } msg->data[1 + length_field_len] = connection_id; msg->length = 1 + length_field_len + 1 + data_size; msg->next = NULL; // merge the iovecs uint32_t pos = 1 + length_field_len + 1; for (i = 0; i < iov_count; i++) { memcpy(msg->data + pos, vector[i].iov_base, vector[i].iov_len); pos += vector[i].iov_len; } // queue it for transmission queue_message(tl, slot_id, connection_id, msg); pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return 0; } int en50221_tl_new_tc(struct en50221_transport_layer *tl, uint8_t slot_id) { // check if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // allocate a new connection if possible int conid = en50221_tl_alloc_new_tc(tl, slot_id); if (conid == -1) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFCONNECTIONS; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // allocate msg structure struct en50221_message *msg = malloc(sizeof(struct en50221_message) + 3); if (msg == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // make up the data to send msg->data[0] = T_CREATE_T_C; msg->data[1] = 1; msg->data[2] = conid; msg->length = 3; msg->next = NULL; // queue it for transmission queue_message(tl, slot_id, conid, msg); // done pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return conid; } int en50221_tl_del_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // check if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (connection_id >= tl->max_connections_per_slot) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (!(tl->slots[slot_id].connections[connection_id].state & (T_STATE_ACTIVE | T_STATE_IN_DELETION))) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADSTATE; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // allocate msg structure struct en50221_message *msg = malloc(sizeof(struct en50221_message) + 3); if (msg == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } // make up the data to send msg->data[0] = T_DELETE_T_C; msg->data[1] = 1; msg->data[2] = connection_id; msg->length = 3; msg->next = NULL; // queue it for transmission queue_message(tl, slot_id, connection_id, msg); tl->slots[slot_id].connections[connection_id].state = T_STATE_ACTIVE_DELETEQUEUED; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return 0; } int en50221_tl_get_connection_state(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { if (slot_id >= tl->max_slots) { tl->error = EN50221ERR_BADSLOTID; return -1; } pthread_mutex_lock(&tl->slots[slot_id].slot_lock); if (tl->slots[slot_id].ca_hndl == -1) { tl->error = EN50221ERR_BADSLOTID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } if (connection_id >= tl->max_connections_per_slot) { tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return -1; } int state = tl->slots[slot_id].connections[connection_id].state; pthread_mutex_unlock(&tl->slots[slot_id].slot_lock); return state; } // ask the module for new data static int en50221_tl_poll_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { gettimeofday(&tl->slots[slot_id].connections[connection_id]. tx_time, 0); // send command uint8_t hdr[3]; hdr[0] = T_DATA_LAST; hdr[1] = 1; hdr[2] = connection_id; if (dvbca_link_write(tl->slots[slot_id].ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 3) < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } return 0; } // handle incoming data static int en50221_tl_process_data(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t * data, uint32_t data_length) { int result; #ifdef DEBUG_RXDATA printf("-------------------\n"); uint32_t ii = 0; for (ii = 0; ii < data_length; ii++) { printf("%02x: %02x\n", ii, data[ii]); } printf("+++++++++++++++++++\n"); #endif // process the received data while (data_length) { // parse the header uint8_t tpdu_tag = data[0]; uint16_t asn_data_length; int length_field_len; if ((length_field_len = asn_1_decode(&asn_data_length, data + 1, data_length - 1)) < 0) { print(LOG_LEVEL, ERROR, 1, "Received data with invalid asn from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } if ((asn_data_length < 1) || (asn_data_length > (data_length - (1 + length_field_len)))) { print(LOG_LEVEL, ERROR, 1, "Received data with invalid length from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } uint8_t connection_id = data[1 + length_field_len]; data += 1 + length_field_len + 1; data_length -= (1 + length_field_len + 1); asn_data_length--; // check the connection_id if (connection_id >= tl->max_connections_per_slot) { print(LOG_LEVEL, ERROR, 1, "Received bad connection id %02x from module on slot %02x\n", connection_id, slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCONNECTIONID; return -1; } // process the TPDUs switch (tpdu_tag) { case T_C_T_C_REPLY: if ((result = en50221_tl_handle_create_tc_reply(tl, slot_id, connection_id)) < 0) { return -1; } break; case T_DELETE_T_C: if ((result = en50221_tl_handle_delete_tc(tl, slot_id, connection_id)) < 0) { return -1; } break; case T_D_T_C_REPLY: if ((result = en50221_tl_handle_delete_tc_reply(tl, slot_id, connection_id)) < 0) { return -1; } break; case T_REQUEST_T_C: if ((result = en50221_tl_handle_request_tc(tl, slot_id, connection_id)) < 0) { return -1; } break; case T_DATA_MORE: if ((result = en50221_tl_handle_data_more(tl, slot_id, connection_id, data, asn_data_length)) < 0) { return -1; } break; case T_DATA_LAST: if ((result = en50221_tl_handle_data_last(tl, slot_id, connection_id, data, asn_data_length)) < 0) { return -1; } break; case T_SB: if ((result = en50221_tl_handle_sb(tl, slot_id, connection_id, data, asn_data_length)) < 0) { return -1; } break; default: print(LOG_LEVEL, ERROR, 1, "Recieved unexpected TPDU tag %02x from module on slot %02x\n", tpdu_tag, slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // skip over the consumed data data += asn_data_length; data_length -= asn_data_length; } return 0; } static int en50221_tl_handle_create_tc_reply(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // set this connection to state active if (tl->slots[slot_id].connections[connection_id].state == T_STATE_IN_CREATION) { tl->slots[slot_id].connections[connection_id].state = T_STATE_ACTIVE; tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; // tell upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb) cb(cb_arg, T_CALLBACK_REASON_CONNECTIONOPEN, NULL, 0, slot_id, connection_id); } else { print(LOG_LEVEL, ERROR, 1, "Received T_C_T_C_REPLY for connection not in " "T_STATE_IN_CREATION from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } return 0; } static int en50221_tl_handle_delete_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // immediately delete this connection and send D_T_C_REPLY if (tl->slots[slot_id].connections[connection_id].state & (T_STATE_ACTIVE | T_STATE_IN_DELETION)) { // clear down the slot tl->slots[slot_id].connections[connection_id].state = T_STATE_IDLE; if (tl->slots[slot_id].connections[connection_id].chain_buffer) { free(tl->slots[slot_id].connections[connection_id].chain_buffer); } tl->slots[slot_id].connections[connection_id].chain_buffer = NULL; tl->slots[slot_id].connections[connection_id].buffer_length = 0; // send the reply uint8_t hdr[3]; hdr[0] = T_D_T_C_REPLY; hdr[1] = 1; hdr[2] = connection_id; if (dvbca_link_write(tl->slots[slot_id].ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 3) < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } // tell upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb) cb(cb_arg, T_CALLBACK_REASON_CONNECTIONCLOSE, NULL, 0, slot_id, connection_id); } else { print(LOG_LEVEL, ERROR, 1, "Received T_DELETE_T_C for inactive connection from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } return 0; } static int en50221_tl_handle_delete_tc_reply(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // delete this connection, should be in T_STATE_IN_DELETION already if (tl->slots[slot_id].connections[connection_id].state == T_STATE_IN_DELETION) { tl->slots[slot_id].connections[connection_id].state = T_STATE_IDLE; } else { print(LOG_LEVEL, ERROR, 1, "Received T_D_T_C_REPLY received for connection not in " "T_STATE_IN_DELETION from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } return 0; } static int en50221_tl_handle_request_tc(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id) { // allocate a new connection if possible int conid = en50221_tl_alloc_new_tc(tl, slot_id); int ca_hndl = tl->slots[slot_id].ca_hndl; if (conid == -1) { print(LOG_LEVEL, ERROR, 1, "Too many connections requested by module on slot %02x\n", slot_id); // send the error uint8_t hdr[4]; hdr[0] = T_T_C_ERROR; hdr[1] = 2; hdr[2] = connection_id; hdr[3] = 1; if (dvbca_link_write(ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 4) < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } tl->slots[slot_id].connections[connection_id].tx_time. tv_sec = 0; } else { // send the NEW_T_C on the connection we received it on uint8_t hdr[4]; hdr[0] = T_NEW_T_C; hdr[1] = 2; hdr[2] = connection_id; hdr[3] = conid; if (dvbca_link_write(ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 4) < 0) { tl->slots[slot_id].connections[conid].state = T_STATE_IDLE; tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; // send the CREATE_T_C on the new connnection hdr[0] = T_CREATE_T_C; hdr[1] = 1; hdr[2] = conid; if (dvbca_link_write(ca_hndl, tl->slots[slot_id].slot, conid, hdr, 3) < 0) { tl->slots[slot_id].connections[conid].state = T_STATE_IDLE; tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } gettimeofday(&tl->slots[slot_id].connections[conid].tx_time, 0); // tell upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb) cb(cb_arg, T_CALLBACK_REASON_CAMCONNECTIONOPEN, NULL, 0, slot_id, conid); } return 0; } static int en50221_tl_handle_data_more(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length) { // connection in correct state? if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { print(LOG_LEVEL, ERROR, 1, "Received T_DATA_MORE for connection not in " "T_STATE_ACTIVE from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // a chained data packet is coming in, save // it to the buffer and wait for more tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; int new_data_length = tl->slots[slot_id].connections[connection_id].buffer_length + data_length; uint8_t *new_data_buffer = realloc(tl->slots[slot_id].connections[connection_id].chain_buffer, new_data_length); if (new_data_buffer == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; return -1; } tl->slots[slot_id].connections[connection_id].chain_buffer = new_data_buffer; memcpy(tl->slots[slot_id].connections[connection_id].chain_buffer + tl->slots[slot_id].connections[connection_id].buffer_length, data, data_length); tl->slots[slot_id].connections[connection_id].buffer_length = new_data_length; return 0; } static int en50221_tl_handle_data_last(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length) { // connection in correct state? if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { print(LOG_LEVEL, ERROR, 1, "Received T_DATA_LAST received for connection not in " "T_STATE_ACTIVE from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // last package of a chain or single package comes in tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; if (tl->slots[slot_id].connections[connection_id].chain_buffer == NULL) { // single package => dispatch immediately pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb && data_length) { pthread_mutex_unlock(&tl->slots[slot_id]. slot_lock); cb(cb_arg, T_CALLBACK_REASON_DATA, data, data_length, slot_id, connection_id); pthread_mutex_lock(&tl->slots[slot_id].slot_lock); } } else { int new_data_length = tl->slots[slot_id].connections[connection_id].buffer_length + data_length; uint8_t *new_data_buffer = realloc(tl->slots[slot_id].connections[connection_id].chain_buffer, new_data_length); if (new_data_buffer == NULL) { tl->error_slot = slot_id; tl->error = EN50221ERR_OUTOFMEMORY; return -1; } memcpy(new_data_buffer + tl->slots[slot_id].connections[connection_id]. buffer_length, data, data_length); // clean the buffer position tl->slots[slot_id].connections[connection_id].chain_buffer = NULL; tl->slots[slot_id].connections[connection_id].buffer_length = 0; // tell the upper layers pthread_mutex_lock(&tl->setcallback_lock); en50221_tl_callback cb = tl->callback; void *cb_arg = tl->callback_arg; pthread_mutex_unlock(&tl->setcallback_lock); if (cb && data_length) { pthread_mutex_unlock(&tl->slots[slot_id]. slot_lock); cb(cb_arg, T_CALLBACK_REASON_DATA, new_data_buffer, new_data_length, slot_id, connection_id); pthread_mutex_lock(&tl->slots[slot_id].slot_lock); } free(new_data_buffer); } return 0; } static int en50221_tl_handle_sb(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, uint8_t * data, uint32_t data_length) { // is the connection id ok? if (tl->slots[slot_id].connections[connection_id].state != T_STATE_ACTIVE) { print(LOG_LEVEL, ERROR, 1, "Received T_SB for connection not in T_STATE_ACTIVE from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // did we get enough data in the T_SB? if (data_length != 1) { print(LOG_LEVEL, ERROR, 1, "Recieved T_SB with invalid length from module on slot %02x\n", slot_id); tl->error_slot = slot_id; tl->error = EN50221ERR_BADCAMDATA; return -1; } // tell it to send the data if it says there is some if (data[0] & 0x80) { int ca_hndl = tl->slots[slot_id].ca_hndl; // send the RCV uint8_t hdr[3]; hdr[0] = T_RCV; hdr[1] = 1; hdr[2] = connection_id; if (dvbca_link_write(ca_hndl, tl->slots[slot_id].slot, connection_id, hdr, 3) < 0) { tl->error_slot = slot_id; tl->error = EN50221ERR_CAWRITE; return -1; } gettimeofday(&tl->slots[slot_id].connections[connection_id].tx_time, 0); } else { // no data - indicate not waiting for anything now tl->slots[slot_id].connections[connection_id].tx_time.tv_sec = 0; } return 0; } static int en50221_tl_alloc_new_tc(struct en50221_transport_layer *tl, uint8_t slot_id) { // we browse through the array of connection // types, to look for the first unused one int i, conid = -1; for (i = 1; i < tl->max_connections_per_slot; i++) { if (tl->slots[slot_id].connections[i].state == T_STATE_IDLE) { conid = i; break; } } if (conid == -1) { print(LOG_LEVEL, ERROR, 1, "CREATE_T_C failed: no more connections available\n"); return -1; } // set up the connection struct tl->slots[slot_id].connections[conid].state = T_STATE_IN_CREATION; tl->slots[slot_id].connections[conid].chain_buffer = NULL; tl->slots[slot_id].connections[conid].buffer_length = 0; return conid; } static void queue_message(struct en50221_transport_layer *tl, uint8_t slot_id, uint8_t connection_id, struct en50221_message *msg) { msg->next = NULL; if (tl->slots[slot_id].connections[connection_id].send_queue_tail) { tl->slots[slot_id].connections[connection_id].send_queue_tail->next = msg; tl->slots[slot_id].connections[connection_id].send_queue_tail = msg; } else { tl->slots[slot_id].connections[connection_id].send_queue = msg; tl->slots[slot_id].connections[connection_id].send_queue_tail = msg; } }