/* * Sigma Control API DUT (station/AP) * Copyright (c) 2010, Atheros Communications, Inc. * Copyright (c) 2011-2015, Qualcomm Atheros, Inc. * All Rights Reserved. * Licensed under the Clear BSD license. See README for more details. */ #include "sigma_dut.h" #include "wpa_helpers.h" #define TG_MAX_CLIENTS_CONNECTIONS 1 static int cmd_traffic_agent_config(struct sigma_dut *dut, struct sigma_conn *conn, struct sigma_cmd *cmd) { struct sigma_stream *s; const char *val; char buf[100]; if (dut->num_streams == MAX_SIGMA_STREAMS) { send_resp(dut, conn, SIGMA_ERROR, "errorCode,No more " "concurrent traffic streams supported"); return 0; } s = &dut->streams[dut->num_streams]; free(s->stats); memset(s, 0, sizeof(*s)); s->sock = -1; s->no_timestamps = dut->no_timestamps; val = get_param(cmd, "profile"); if (!val) return -1; if (strcasecmp(val, "File_Transfer") == 0) s->profile = SIGMA_PROFILE_FILE_TRANSFER; else if (strcasecmp(val, "Multicast") == 0) s->profile = SIGMA_PROFILE_MULTICAST; else if (strcasecmp(val, "IPTV") == 0) s->profile = SIGMA_PROFILE_IPTV; else if (strcasecmp(val, "Transaction") == 0) s->profile = SIGMA_PROFILE_TRANSACTION; else if (strcasecmp(val, "Start_Sync") == 0) s->profile = SIGMA_PROFILE_START_SYNC; else if (strcasecmp(val, "Uapsd") == 0) s->profile = SIGMA_PROFILE_UAPSD; else { send_resp(dut, conn, SIGMA_INVALID, "errorCode,Unsupported " "profile"); return 0; } val = get_param(cmd, "direction"); if (!val) return -1; if (strcasecmp(val, "send") == 0) s->sender = 1; else if (strcasecmp(val, "receive") == 0) s->sender = 0; else return -1; val = get_param(cmd, "destination"); if (val) { if (inet_aton(val, &s->dst) == 0) return -1; } val = get_param(cmd, "source"); if (val) { if (inet_aton(val, &s->src) == 0) return -1; } val = get_param(cmd, "destinationPort"); if (val) s->dst_port = atoi(val); val = get_param(cmd, "sourcePort"); if (val) s->src_port = atoi(val); val = get_param(cmd, "frameRate"); if (val) s->frame_rate = atoi(val); val = get_param(cmd, "duration"); if (val) s->duration = atoi(val); val = get_param(cmd, "payloadSize"); if (val) s->payload_size = atoi(val); val = get_param(cmd, "startDelay"); if (val) s->start_delay = atoi(val); val = get_param(cmd, "maxCnt"); if (val) s->max_cnt = atoi(val); val = get_param(cmd, "trafficClass"); if (val) { if (strcasecmp(val, "Voice") == 0) s->tc = SIGMA_TC_VOICE; else if (strcasecmp(val, "Video") == 0) s->tc = SIGMA_TC_VIDEO; else if (strcasecmp(val, "Background") == 0) s->tc = SIGMA_TC_BACKGROUND; else if (strcasecmp(val, "BestEffort") == 0) s->tc = SIGMA_TC_BEST_EFFORT; else return -1; } val = get_param(cmd, "userpriority"); if (val) { s->user_priority_set = 1; s->user_priority = atoi(val); } val = get_param(cmd, "tagName"); if (val) { strncpy(s->test_name, val, sizeof(s->test_name)); s->test_name[sizeof(s->test_name) - 1] = '\0'; sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: U-APSD console tagname %s", s->test_name); } if (dut->throughput_pktsize && s->frame_rate == 0 && s->sender && dut->throughput_pktsize != s->payload_size && (s->profile == SIGMA_PROFILE_FILE_TRANSFER || s->profile == SIGMA_PROFILE_IPTV || s->profile == SIGMA_PROFILE_UAPSD)) { sigma_dut_print(dut, DUT_MSG_INFO, "Traffic agent: Override throughput test payload size %u -> %u", s->payload_size, dut->throughput_pktsize); s->payload_size = dut->throughput_pktsize; } val = get_param(cmd, "transProtoType"); if (val) { if (strcmp(val, "1") == 0) s->trans_proto = IPPROTO_TCP; else if (strcmp(val, "0") == 0) s->trans_proto = IPPROTO_UDP; else return -1; } else { s->trans_proto = IPPROTO_UDP; } if (s->profile == SIGMA_PROFILE_IPTV && !s->sender && !s->no_timestamps) { s->stats = calloc(MAX_SIGMA_STATS, sizeof(struct sigma_frame_stats)); if (s->stats == NULL) return -1; } dut->stream_id++; dut->num_streams++; s->stream_id = dut->stream_id; snprintf(buf, sizeof(buf), "streamID,%d", s->stream_id); send_resp(dut, conn, SIGMA_COMPLETE, buf); return 0; } static void stop_stream(struct sigma_stream *s) { if (s && s->started) { pthread_join(s->thr, NULL); if (s->sock != -1) { close(s->sock); s->sock = -1; } s->started = 0; } } static int cmd_traffic_agent_reset(struct sigma_dut *dut, struct sigma_conn *conn, struct sigma_cmd *cmd) { int i; for (i = 0; i < dut->num_streams; i++) { struct sigma_stream *s = &dut->streams[i]; s->stop = 1; stop_stream(s); } dut->num_streams = 0; memset(&dut->streams, 0, sizeof(dut->streams)); return 1; } static int get_stream_id(const char *str, int streams[MAX_SIGMA_STREAMS]) { int count; count = 0; for (;;) { if (count == MAX_SIGMA_STREAMS) return -1; streams[count] = atoi(str); if (streams[count] == 0) return -1; count++; str = strchr(str, ' '); if (str == NULL) break; while (*str == ' ') str++; } return count; } static int open_socket_file_transfer(struct sigma_dut *dut, struct sigma_stream *s) { struct sockaddr_in addr; int sock_opt_val = 1; s->sock = socket(PF_INET, IPPROTO_UDP == s->trans_proto ? SOCK_DGRAM : SOCK_STREAM, s->trans_proto); if (s->sock < 0) { perror("socket"); return -1; } if (setsockopt(s->sock, SOL_SOCKET, SO_REUSEADDR, &sock_opt_val, sizeof(sock_opt_val)) < 0) { perror("setsockopt"); close(s->sock); s->sock = -1; return -1; } memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(s->sender ? s->src_port : s->dst_port); sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: sender=%d " "bind port %d", s->sender, ntohs(addr.sin_port)); if (bind(s->sock, (struct sockaddr *) &addr, sizeof(addr)) < 0) { perror("bind"); close(s->sock); s->sock = -1; return -1; } if (s->profile == SIGMA_PROFILE_MULTICAST && !s->sender) return 0; if (s->trans_proto == IPPROTO_TCP && s->sender == 0) { if (listen(s->sock, TG_MAX_CLIENTS_CONNECTIONS ) < 0) { sigma_dut_print(dut, DUT_MSG_INFO, "Listen failed with error %d: %s", errno, strerror(errno)); close(s->sock); s->sock = -1; return -1; } } else { memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_addr.s_addr = s->sender ? s->dst.s_addr : s->src.s_addr; addr.sin_port = htons(s->sender ? s->dst_port : s->src_port); sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: connect %s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); if (connect(s->sock, (struct sockaddr *) &addr, sizeof(addr)) < 0) { perror("connect"); close(s->sock); s->sock = -1; return -1; } } return 0; } static int open_socket_multicast(struct sigma_dut *dut, struct sigma_stream *s) { if (open_socket_file_transfer(dut, s) < 0) return -1; if (!s->sender) { struct ip_mreq mr; memset(&mr, 0, sizeof(mr)); mr.imr_multiaddr.s_addr = s->dst.s_addr; mr.imr_interface.s_addr = htonl(INADDR_ANY); sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: " "IP_ADD_MEMBERSHIP %s", inet_ntoa(s->dst)); if (setsockopt(s->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (void *) &mr, sizeof(mr)) < 0) { sigma_dut_print(dut, DUT_MSG_INFO, "setsockopt[IP_ADD_MEMBERSHIP]: %s", strerror(errno)); /* * Continue anyway since this can happen, e.g., if the * default route is missing. This is not critical for * multicast RX testing. */ } } return 0; } static int set_socket_prio(struct sigma_stream *s) { int tos = 0x00; switch (s->tc) { case SIGMA_TC_VOICE: if (s->user_priority_set) { if (s->user_priority == 6) tos = 48 << 2; else if (s->user_priority == 7) tos = 56 << 2; else return -1; } else tos = 0xe0; /* DSCP = 56 */ break; case SIGMA_TC_VIDEO: if (s->user_priority_set) { if (s->user_priority == 4) tos = 32 << 2; else if (s->user_priority == 5) tos = 40 << 2; else return -1; } else tos = 0xa0; /* DSCP = 40 */ break; case SIGMA_TC_BACKGROUND: if (s->user_priority_set) { if (s->user_priority == 1) tos = 8 << 2; else if (s->user_priority == 2) tos = 16 << 2; else return -1; } else tos = 0x20; /* DSCP = 8 */ break; case SIGMA_TC_BEST_EFFORT: if (s->user_priority_set) { if (s->user_priority == 0) tos = 0 << 2; else if (s->user_priority == 3) tos = 20 << 2; else return -1; } else tos = 0x00; /* DSCP = 0 */ break; } if (setsockopt(s->sock, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)) < 0) { perror("setsockopt"); return -1; } return 0; } static int open_socket(struct sigma_dut *dut, struct sigma_stream *s) { switch (s->profile) { case SIGMA_PROFILE_FILE_TRANSFER: return open_socket_file_transfer(dut, s); case SIGMA_PROFILE_MULTICAST: return open_socket_multicast(dut, s); case SIGMA_PROFILE_IPTV: if (open_socket_file_transfer(dut, s) < 0) return -1; return set_socket_prio(s); case SIGMA_PROFILE_TRANSACTION: return open_socket_file_transfer(dut, s); case SIGMA_PROFILE_UAPSD: return open_socket_file_transfer(dut, s); case SIGMA_PROFILE_START_SYNC: sigma_dut_print(dut, DUT_MSG_INFO, "Traffic stream profile %d " "not yet supported", s->profile); /* TODO */ break; } return -1; } static void send_file_fast(struct sigma_stream *s, char *pkt) { struct timeval stop, now; int res; unsigned int counter = 0; gettimeofday(&stop, NULL); stop.tv_sec += s->duration; while (!s->stop) { counter++; WPA_PUT_BE32(&pkt[8], counter); if ((counter & 0xf) == 0) { gettimeofday(&now, NULL); if (now.tv_sec > stop.tv_sec || (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec)) break; } s->tx_act_frames++; res = send(s->sock, pkt, s->payload_size, 0); if (res >= 0) { s->tx_frames++; s->tx_payload_bytes += res; } else { switch (errno) { case EAGAIN: case ENOBUFS: usleep(1000); break; case ECONNRESET: case EPIPE: s->stop = 1; break; default: perror("send"); break; } } } } static void send_file(struct sigma_stream *s) { char *pkt; struct timeval stop, now, start; int res; unsigned int counter = 0, total_sleep_usec = 0, total_pkts; int sleep_usec = 0; if (s->duration <= 0 || s->frame_rate < 0 || s->payload_size < 20) return; pkt = malloc(s->payload_size); if (pkt == NULL) return; memset(pkt, 1, s->payload_size); strncpy(pkt, "1345678", s->payload_size); if (s->frame_rate == 0 && s->no_timestamps) { send_file_fast(s, pkt); free(pkt); return; } gettimeofday(&stop, NULL); stop.tv_sec += s->duration; total_pkts = s->duration * s ->frame_rate; gettimeofday(&start, NULL); while (!s->stop) { counter++; WPA_PUT_BE32(&pkt[8], counter); if (sleep_usec) { usleep(sleep_usec); total_sleep_usec += sleep_usec; } gettimeofday(&now, NULL); if (now.tv_sec > stop.tv_sec || (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec)) break; if (s->frame_rate && (unsigned int) s->tx_frames >= total_pkts) break; if (s->frame_rate == 0 || s->tx_frames == 0) sleep_usec = 0; else if (sleep_usec || s->frame_rate < 10 || counter % (s->frame_rate / 10) == 0) { /* Recalculate sleep_usec for every 100 ms approximately */ struct timeval tmp; int diff, duration; timersub(&now, &start, &tmp); diff = tmp.tv_sec * 1000000 + tmp.tv_usec; duration = (1000000 / s->frame_rate) * s->tx_frames; if (duration > diff) sleep_usec = (total_sleep_usec + (duration - diff)) / s->tx_frames; else sleep_usec = 0; } WPA_PUT_BE32(&pkt[12], now.tv_sec); WPA_PUT_BE32(&pkt[16], now.tv_usec); s->tx_act_frames++; res = send(s->sock, pkt, s->payload_size, 0); if (res >= 0) { s->tx_frames++; s->tx_payload_bytes += res; } else { switch (errno) { case EAGAIN: case ENOBUFS: usleep(1000); break; case ECONNRESET: case EPIPE: s->stop = 1; break; default: perror("send"); break; } } } sigma_dut_print(s->dut, DUT_MSG_DEBUG, "send_file: counter %u s->tx_frames %d total_sleep_usec %u", counter, s->tx_frames, total_sleep_usec); free(pkt); } static void send_transaction(struct sigma_stream *s) { char *pkt, *rpkt; struct timeval stop, now; int res; unsigned int counter = 0, rcounter; int wait_time; fd_set rfds; struct timeval tv; if (s->duration <= 0 || s->frame_rate <= 0 || s->payload_size < 20) return; pkt = malloc(s->payload_size); if (pkt == NULL) return; rpkt = malloc(s->payload_size); if (rpkt == NULL) { free(pkt); return; } memset(pkt, 1, s->payload_size); strncpy(pkt, "1345678", s->payload_size); gettimeofday(&stop, NULL); stop.tv_sec += s->duration; wait_time = 1000000 / s->frame_rate; while (!s->stop) { counter++; if (s->max_cnt && (int) counter > s->max_cnt) break; WPA_PUT_BE32(&pkt[8], counter); gettimeofday(&now, NULL); if (now.tv_sec > stop.tv_sec || (now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec)) break; WPA_PUT_BE32(&pkt[12], now.tv_sec); WPA_PUT_BE32(&pkt[16], now.tv_usec); res = send(s->sock, pkt, s->payload_size, 0); if (res >= 0) { s->tx_frames++; s->tx_payload_bytes += res; } else { switch (errno) { case EAGAIN: case ENOBUFS: usleep(1000); break; case ECONNRESET: case EPIPE: s->stop = 1; break; default: perror("send"); break; } } /* Wait for response */ tv.tv_sec = 0; tv.tv_usec = wait_time; FD_ZERO(&rfds); FD_SET(s->sock, &rfds); res = select(s->sock + 1, &rfds, NULL, NULL, &tv); if (res < 0) { if (errno == EINTR) continue; perror("select"); break; } if (res == 0) { /* timeout */ continue; } if (FD_ISSET(s->sock, &rfds)) { /* response received */ res = recv(s->sock, rpkt, s->payload_size, 0); if (res < 0) { perror("recv"); break; } rcounter = WPA_GET_BE32(&rpkt[8]); if (rcounter != counter) s->out_of_seq_frames++; s->rx_frames++; s->rx_payload_bytes += res; } } free(pkt); free(rpkt); } static void * send_thread(void *ctx) { struct sigma_stream *s = ctx; sleep(s->start_delay); switch (s->profile) { case SIGMA_PROFILE_FILE_TRANSFER: send_file(s); break; case SIGMA_PROFILE_MULTICAST: send_file(s); break; case SIGMA_PROFILE_IPTV: send_file(s); break; case SIGMA_PROFILE_TRANSACTION: send_transaction(s); break; case SIGMA_PROFILE_START_SYNC: break; case SIGMA_PROFILE_UAPSD: send_uapsd_console(s); break; } return NULL; } struct traffic_agent_send_data { struct sigma_dut *dut; struct sigma_conn *conn; int streams[MAX_SIGMA_STREAMS]; int count; }; static struct sigma_stream * get_stream(struct sigma_dut *dut, int id) { int i; for (i = 0; i < dut->num_streams; i++) { if ((unsigned int) id == dut->streams[i].stream_id) return &dut->streams[i]; } return NULL; } static void * send_report_thread(void *ctx) { struct traffic_agent_send_data *data = ctx; struct sigma_dut *dut = data->dut; struct sigma_conn *conn = data->conn; int i, ret; char buf[100 + MAX_SIGMA_STREAMS * 60], *pos; for (i = 0; i < data->count; i++) { sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: waiting " "for stream %d send to complete", data->streams[i]); stop_stream(get_stream(dut, data->streams[i])); } buf[0] = '\0'; pos = buf; pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,"); for (i = 0; i < data->count; i++) { ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", data->streams[i]); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } if (dut->program == PROGRAM_60GHZ) { sigma_dut_print(dut, DUT_MSG_INFO, "reporting tx_act_frames"); pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,"); for (i = 0; i < data->count; i++) { struct sigma_stream *s; s = get_stream(dut, data->streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", s->tx_act_frames); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } } pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,"); for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", s->tx_frames); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,"); for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", s->rx_frames); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,"); for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu", i > 0 ? " " : "", s->tx_payload_bytes); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,"); for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu", i > 0 ? " " : "", s->rx_payload_bytes); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,"); for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", s->out_of_seq_frames); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (!s) continue; s->ta_send_in_progress = 0; if (s->trans_proto == IPPROTO_TCP) { /* * Close the socket to make sure client side close the * network before the server. Otherwise, the server * might get "Address already in use" when trying to * reuse the port. */ close(s->sock); s->sock = -1; sigma_dut_print(dut, DUT_MSG_DEBUG, "Closed the sender socket"); } } buf[sizeof(buf) - 1] = '\0'; if (conn->s < 0) sigma_dut_print(dut, DUT_MSG_INFO, "Cannot send traffic_agent response since control socket has already been closed"); else send_resp(dut, conn, SIGMA_COMPLETE, buf); conn->waiting_completion = 0; free(data); return NULL; } static int cmd_traffic_agent_send(struct sigma_dut *dut, struct sigma_conn *conn, struct sigma_cmd *cmd) { const char *val; int i, j, res; char buf[100]; struct traffic_agent_send_data *data; val = get_param(cmd, "streamID"); if (val == NULL) return -1; data = calloc(1, sizeof(*data)); if (data == NULL) return -1; data->dut = dut; data->conn = conn; data->count = get_stream_id(val, data->streams); if (data->count < 0) { free(data); return -1; } for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (!s) { snprintf(buf, sizeof(buf), "errorCode,StreamID %d " "not configured", data->streams[i]); send_resp(dut, conn, SIGMA_INVALID, buf); free(data); return 0; } for (j = 0; j < i; j++) if (data->streams[i] == data->streams[j]) return -1; if (!s->sender) { snprintf(buf, sizeof(buf), "errorCode,Not configured " "as sender for streamID %d", data->streams[i]); send_resp(dut, conn, SIGMA_INVALID, buf); free(data); return 0; } if (s->ta_send_in_progress) { send_resp(dut, conn, SIGMA_ERROR, "errorCode,Multiple concurrent send cmds on same streamID not supported"); free(data); return 0; } } for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (!s) continue; sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open " "socket for send stream %d", data->streams[i]); if (open_socket(dut, s) < 0) { free(data); return -2; } } for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (!s) continue; /* * Provide dut context to the thread to support debugging and * returning of error messages. */ s->dut = dut; sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start " "send for stream %d", data->streams[i]); res = pthread_create(&s->thr, NULL, send_thread, s); if (res) { sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create " "failed: %d", res); free(data); return -2; } s->started = 1; } sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start a thread to track sending streams"); conn->waiting_completion = 1; res = pthread_create(&dut->thr, NULL, send_report_thread, data); if (res) { sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create failed: %d", res); free(data); conn->waiting_completion = 0; return -2; } for (i = 0; i < data->count; i++) { struct sigma_stream *s = get_stream(dut, data->streams[i]); if (s) s->ta_send_in_progress = 1; } /* Command will be completed in send_report_thread() */ return 0; } static void receive_file(struct sigma_stream *s) { struct timeval tv, now; fd_set rfds; int res; char *pkt; int pktlen; unsigned int last_rx = 0, counter; pktlen = 65536 + 1; pkt = malloc(pktlen); if (pkt == NULL) return; while (!s->stop) { FD_ZERO(&rfds); FD_SET(s->sock, &rfds); tv.tv_sec = 0; tv.tv_usec = 300000; res = select(s->sock + 1, &rfds, NULL, NULL, &tv); if (res < 0) { perror("select"); usleep(10000); } else if (FD_ISSET(s->sock, &rfds)) { res = recv(s->sock, pkt, pktlen, 0); if (res >= 0) { s->rx_frames++; s->rx_payload_bytes += res; counter = WPA_GET_BE32(&pkt[8]); if (counter < last_rx) s->out_of_seq_frames++; last_rx = counter; } else { perror("recv"); break; } if (res >= 20 && s->stats && s->num_stats < MAX_SIGMA_STATS) { struct sigma_frame_stats *stats; stats = &s->stats[s->num_stats]; s->num_stats++; gettimeofday(&now, NULL); stats->seqnum = counter; stats->local_sec = now.tv_sec; stats->local_usec = now.tv_usec; stats->remote_sec = WPA_GET_BE32(&pkt[12]); stats->remote_usec = WPA_GET_BE32(&pkt[16]); } } } free(pkt); } static void receive_transaction(struct sigma_stream *s) { struct timeval tv; fd_set rfds; int res; char *pkt; int pktlen; unsigned int last_rx = 0, counter; struct sockaddr_in addr; socklen_t addrlen; if (s->payload_size) pktlen = s->payload_size; else pktlen = 65536 + 1; pkt = malloc(pktlen); if (pkt == NULL) return; while (!s->stop) { FD_ZERO(&rfds); FD_SET(s->sock, &rfds); tv.tv_sec = 0; tv.tv_usec = 300000; res = select(s->sock + 1, &rfds, NULL, NULL, &tv); if (res < 0) { perror("select"); usleep(10000); } else if (FD_ISSET(s->sock, &rfds)) { addrlen = sizeof(addr); res = recvfrom(s->sock, pkt, pktlen, 0, (struct sockaddr *) &addr, &addrlen); if (res < 0) { perror("recv"); break; } s->rx_frames++; s->rx_payload_bytes += res; counter = WPA_GET_BE32(&pkt[8]); if (counter < last_rx) s->out_of_seq_frames++; last_rx = counter; /* send response */ res = sendto(s->sock, pkt, pktlen, 0, (struct sockaddr *) &addr, addrlen); if (res < 0) { perror("sendto"); } else { s->tx_frames++; s->tx_payload_bytes += res; } } } free(pkt); } static void * receive_thread(void *ctx) { struct sigma_stream *s = ctx; if (s->trans_proto == IPPROTO_TCP) { /* Wait for socket to be accepted */ struct sockaddr_in connected_addr; int connected_sock; /* returned from accept on sock */ socklen_t connected_addr_len = sizeof(connected_addr); sigma_dut_print(s->dut, DUT_MSG_DEBUG, "Traffic agent: Waiting on accept"); connected_sock = accept(s->sock, (struct sockaddr *) &connected_addr, &connected_addr_len); if (connected_sock < 0) { sigma_dut_print(s->dut, DUT_MSG_ERROR, "Traffic agent: Failed to accept: %s", strerror(errno)); return NULL; } sigma_dut_print(s->dut, DUT_MSG_DEBUG, "Traffic agent: Accepted client closing parent socket and talk over connected sock."); close(s->sock); s->sock = connected_sock; } switch (s->profile) { case SIGMA_PROFILE_FILE_TRANSFER: receive_file(s); break; case SIGMA_PROFILE_MULTICAST: receive_file(s); break; case SIGMA_PROFILE_IPTV: receive_file(s); break; case SIGMA_PROFILE_TRANSACTION: receive_transaction(s); break; case SIGMA_PROFILE_START_SYNC: break; case SIGMA_PROFILE_UAPSD: receive_uapsd(s); break; } return NULL; } static int cmd_traffic_agent_receive_start(struct sigma_dut *dut, struct sigma_conn *conn, struct sigma_cmd *cmd) { const char *val; int streams[MAX_SIGMA_STREAMS]; int i, j, count; char buf[100]; val = get_param(cmd, "streamID"); if (val == NULL) return -1; count = get_stream_id(val, streams); if (count < 0) return -1; for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) { snprintf(buf, sizeof(buf), "errorCode,StreamID %d " "not configured", streams[i]); send_resp(dut, conn, SIGMA_INVALID, buf); return 0; } for (j = 0; j < i; j++) if (streams[i] == streams[j]) return -1; if (s->sender) { snprintf(buf, sizeof(buf), "errorCode,Not configured " "as receiver for streamID %d", streams[i]); send_resp(dut, conn, SIGMA_INVALID, buf); return 0; } } for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) continue; sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open " "receive socket for stream %d", streams[i]); if (open_socket(dut, s) < 0) return -2; } for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); int res; if (!s) continue; /* * Provide dut context to the thread to support debugging and * returning of error messages. Similarly, provide interface * information to the thread. If the Interface parameter is not * passed, get it from get_station_ifname() since the interface * name is needed for power save mode configuration for Uapsd * cases. */ s->dut = dut; val = get_param(cmd, "Interface"); strncpy(s->ifname, (val ? val : get_station_ifname()), sizeof(s->ifname)); s->ifname[sizeof(s->ifname) - 1] = '\0'; sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start " "receive for stream %d", streams[i]); res = pthread_create(&s->thr, NULL, receive_thread, s); if (res) { sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create " "failed: %d", res); return -2; } s->started = 1; } return 1; } static void write_frame_stats(struct sigma_dut *dut, struct sigma_stream *s, int id) { char fname[128]; FILE *f; unsigned int i; snprintf(fname, sizeof(fname), SIGMA_TMPDIR "/e2e%u-%d.txt", (unsigned int) time(NULL), id); f = fopen(fname, "w"); if (f == NULL) { sigma_dut_print(dut, DUT_MSG_INFO, "Could not write %s", fname); return; } fprintf(f, "seqnum:local_sec:local_usec:remote_sec:remote_usec\n"); sigma_dut_print(dut, DUT_MSG_DEBUG, "Writing frame stats to %s", fname); for (i = 0; i < s->num_stats; i++) { struct sigma_frame_stats *stats = &s->stats[i]; fprintf(f, "%u:%u:%u:%u:%u\n", stats->seqnum, stats->local_sec, stats->local_usec, stats->remote_sec, stats->remote_usec); } fclose(f); } static int cmd_traffic_agent_receive_stop(struct sigma_dut *dut, struct sigma_conn *conn, struct sigma_cmd *cmd) { const char *val; int streams[MAX_SIGMA_STREAMS]; int i, j, ret, count; char buf[100 + MAX_SIGMA_STREAMS * 60], *pos; val = get_param(cmd, "streamID"); if (val == NULL) return -1; count = get_stream_id(val, streams); if (count < 0) return -1; for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) { snprintf(buf, sizeof(buf), "errorCode,StreamID %d " "not configured", streams[i]); send_resp(dut, conn, SIGMA_INVALID, buf); return 0; } for (j = 0; j < i; j++) if (streams[i] == streams[j]) return -1; if (!s->started) { snprintf(buf, sizeof(buf), "errorCode,Receive not " "started for streamID %d", streams[i]); send_resp(dut, conn, SIGMA_INVALID, buf); return 0; } } for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (s) s->stop = 1; } for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) continue; sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: stop " "receive for stream %d", streams[i]); stop_stream(s); } buf[0] = '\0'; pos = buf; pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,"); for (i = 0; i < count; i++) { ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", streams[i]); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } if (dut->program == PROGRAM_60GHZ) { pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,"); for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", s->tx_act_frames); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } } pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,"); for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", s->tx_frames); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,"); for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", s->rx_frames); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,"); for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu", i > 0 ? " " : "", s->tx_payload_bytes); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,"); for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu", i > 0 ? " " : "", s->rx_payload_bytes); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,"); for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) continue; ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d", i > 0 ? " " : "", s->out_of_seq_frames); if (ret < 0 || ret >= buf + sizeof(buf) - pos) break; pos += ret; } buf[sizeof(buf) - 1] = '\0'; send_resp(dut, conn, SIGMA_COMPLETE, buf); for (i = 0; i < count; i++) { struct sigma_stream *s = get_stream(dut, streams[i]); if (!s) continue; if (s->profile == SIGMA_PROFILE_IPTV && s->num_stats > 0 && dut->write_stats) write_frame_stats(dut, s, streams[i]); free(s->stats); s->stats = NULL; s->num_stats = 0; } return 0; } static int cmd_traffic_agent_version(struct sigma_dut *dut, struct sigma_conn *conn, struct sigma_cmd *cmd) { send_resp(dut, conn, SIGMA_COMPLETE, "version,1.0"); return 0; } void traffic_agent_register_cmds(void) { sigma_dut_reg_cmd("traffic_agent_config", NULL, cmd_traffic_agent_config); sigma_dut_reg_cmd("traffic_agent_reset", NULL, cmd_traffic_agent_reset); sigma_dut_reg_cmd("traffic_agent_send", NULL, cmd_traffic_agent_send); sigma_dut_reg_cmd("traffic_agent_receive_start", NULL, cmd_traffic_agent_receive_start); sigma_dut_reg_cmd("traffic_agent_receive_stop", NULL, cmd_traffic_agent_receive_stop); sigma_dut_reg_cmd("traffic_agent_version", NULL, cmd_traffic_agent_version); }