M7350/wlan/utils/sigma-dut/traffic_agent.c
2024-09-09 08:57:42 +00:00

1435 lines
32 KiB
C

/*
* Sigma Control API DUT (station/AP)
* Copyright (c) 2010, Atheros Communications, Inc.
* Copyright (c) 2011-2015, Qualcomm Atheros, Inc.
* All Rights Reserved.
* Licensed under the Clear BSD license. See README for more details.
*/
#include "sigma_dut.h"
#include "wpa_helpers.h"
#define TG_MAX_CLIENTS_CONNECTIONS 1
static int cmd_traffic_agent_config(struct sigma_dut *dut,
struct sigma_conn *conn,
struct sigma_cmd *cmd)
{
struct sigma_stream *s;
const char *val;
char buf[100];
if (dut->num_streams == MAX_SIGMA_STREAMS) {
send_resp(dut, conn, SIGMA_ERROR, "errorCode,No more "
"concurrent traffic streams supported");
return 0;
}
s = &dut->streams[dut->num_streams];
free(s->stats);
memset(s, 0, sizeof(*s));
s->sock = -1;
s->no_timestamps = dut->no_timestamps;
val = get_param(cmd, "profile");
if (!val)
return -1;
if (strcasecmp(val, "File_Transfer") == 0)
s->profile = SIGMA_PROFILE_FILE_TRANSFER;
else if (strcasecmp(val, "Multicast") == 0)
s->profile = SIGMA_PROFILE_MULTICAST;
else if (strcasecmp(val, "IPTV") == 0)
s->profile = SIGMA_PROFILE_IPTV;
else if (strcasecmp(val, "Transaction") == 0)
s->profile = SIGMA_PROFILE_TRANSACTION;
else if (strcasecmp(val, "Start_Sync") == 0)
s->profile = SIGMA_PROFILE_START_SYNC;
else if (strcasecmp(val, "Uapsd") == 0)
s->profile = SIGMA_PROFILE_UAPSD;
else {
send_resp(dut, conn, SIGMA_INVALID, "errorCode,Unsupported "
"profile");
return 0;
}
val = get_param(cmd, "direction");
if (!val)
return -1;
if (strcasecmp(val, "send") == 0)
s->sender = 1;
else if (strcasecmp(val, "receive") == 0)
s->sender = 0;
else
return -1;
val = get_param(cmd, "destination");
if (val) {
if (inet_aton(val, &s->dst) == 0)
return -1;
}
val = get_param(cmd, "source");
if (val) {
if (inet_aton(val, &s->src) == 0)
return -1;
}
val = get_param(cmd, "destinationPort");
if (val)
s->dst_port = atoi(val);
val = get_param(cmd, "sourcePort");
if (val)
s->src_port = atoi(val);
val = get_param(cmd, "frameRate");
if (val)
s->frame_rate = atoi(val);
val = get_param(cmd, "duration");
if (val)
s->duration = atoi(val);
val = get_param(cmd, "payloadSize");
if (val)
s->payload_size = atoi(val);
val = get_param(cmd, "startDelay");
if (val)
s->start_delay = atoi(val);
val = get_param(cmd, "maxCnt");
if (val)
s->max_cnt = atoi(val);
val = get_param(cmd, "trafficClass");
if (val) {
if (strcasecmp(val, "Voice") == 0)
s->tc = SIGMA_TC_VOICE;
else if (strcasecmp(val, "Video") == 0)
s->tc = SIGMA_TC_VIDEO;
else if (strcasecmp(val, "Background") == 0)
s->tc = SIGMA_TC_BACKGROUND;
else if (strcasecmp(val, "BestEffort") == 0)
s->tc = SIGMA_TC_BEST_EFFORT;
else
return -1;
}
val = get_param(cmd, "userpriority");
if (val) {
s->user_priority_set = 1;
s->user_priority = atoi(val);
}
val = get_param(cmd, "tagName");
if (val) {
strncpy(s->test_name, val, sizeof(s->test_name));
s->test_name[sizeof(s->test_name) - 1] = '\0';
sigma_dut_print(dut, DUT_MSG_DEBUG,
"Traffic agent: U-APSD console tagname %s",
s->test_name);
}
if (dut->throughput_pktsize && s->frame_rate == 0 && s->sender &&
dut->throughput_pktsize != s->payload_size &&
(s->profile == SIGMA_PROFILE_FILE_TRANSFER ||
s->profile == SIGMA_PROFILE_IPTV ||
s->profile == SIGMA_PROFILE_UAPSD)) {
sigma_dut_print(dut, DUT_MSG_INFO,
"Traffic agent: Override throughput test payload size %u -> %u",
s->payload_size, dut->throughput_pktsize);
s->payload_size = dut->throughput_pktsize;
}
val = get_param(cmd, "transProtoType");
if (val) {
if (strcmp(val, "1") == 0)
s->trans_proto = IPPROTO_TCP;
else if (strcmp(val, "0") == 0)
s->trans_proto = IPPROTO_UDP;
else
return -1;
} else {
s->trans_proto = IPPROTO_UDP;
}
if (s->profile == SIGMA_PROFILE_IPTV && !s->sender && !s->no_timestamps)
{
s->stats = calloc(MAX_SIGMA_STATS,
sizeof(struct sigma_frame_stats));
if (s->stats == NULL)
return -1;
}
dut->stream_id++;
dut->num_streams++;
s->stream_id = dut->stream_id;
snprintf(buf, sizeof(buf), "streamID,%d", s->stream_id);
send_resp(dut, conn, SIGMA_COMPLETE, buf);
return 0;
}
static void stop_stream(struct sigma_stream *s)
{
if (s && s->started) {
pthread_join(s->thr, NULL);
if (s->sock != -1) {
close(s->sock);
s->sock = -1;
}
s->started = 0;
}
}
static int cmd_traffic_agent_reset(struct sigma_dut *dut,
struct sigma_conn *conn,
struct sigma_cmd *cmd)
{
int i;
for (i = 0; i < dut->num_streams; i++) {
struct sigma_stream *s = &dut->streams[i];
s->stop = 1;
stop_stream(s);
}
dut->num_streams = 0;
memset(&dut->streams, 0, sizeof(dut->streams));
return 1;
}
static int get_stream_id(const char *str, int streams[MAX_SIGMA_STREAMS])
{
int count;
count = 0;
for (;;) {
if (count == MAX_SIGMA_STREAMS)
return -1;
streams[count] = atoi(str);
if (streams[count] == 0)
return -1;
count++;
str = strchr(str, ' ');
if (str == NULL)
break;
while (*str == ' ')
str++;
}
return count;
}
static int open_socket_file_transfer(struct sigma_dut *dut,
struct sigma_stream *s)
{
struct sockaddr_in addr;
int sock_opt_val = 1;
s->sock = socket(PF_INET, IPPROTO_UDP == s->trans_proto ? SOCK_DGRAM :
SOCK_STREAM, s->trans_proto);
if (s->sock < 0) {
perror("socket");
return -1;
}
if (setsockopt(s->sock, SOL_SOCKET, SO_REUSEADDR, &sock_opt_val,
sizeof(sock_opt_val)) < 0) {
perror("setsockopt");
close(s->sock);
s->sock = -1;
return -1;
}
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(s->sender ? s->src_port : s->dst_port);
sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: sender=%d "
"bind port %d", s->sender, ntohs(addr.sin_port));
if (bind(s->sock, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
perror("bind");
close(s->sock);
s->sock = -1;
return -1;
}
if (s->profile == SIGMA_PROFILE_MULTICAST && !s->sender)
return 0;
if (s->trans_proto == IPPROTO_TCP && s->sender == 0) {
if (listen(s->sock, TG_MAX_CLIENTS_CONNECTIONS ) < 0) {
sigma_dut_print(dut, DUT_MSG_INFO,
"Listen failed with error %d: %s",
errno, strerror(errno));
close(s->sock);
s->sock = -1;
return -1;
}
} else {
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = s->sender ? s->dst.s_addr :
s->src.s_addr;
addr.sin_port = htons(s->sender ? s->dst_port : s->src_port);
sigma_dut_print(dut, DUT_MSG_DEBUG,
"Traffic agent: connect %s:%d",
inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
if (connect(s->sock, (struct sockaddr *) &addr, sizeof(addr)) <
0) {
perror("connect");
close(s->sock);
s->sock = -1;
return -1;
}
}
return 0;
}
static int open_socket_multicast(struct sigma_dut *dut, struct sigma_stream *s)
{
if (open_socket_file_transfer(dut, s) < 0)
return -1;
if (!s->sender) {
struct ip_mreq mr;
memset(&mr, 0, sizeof(mr));
mr.imr_multiaddr.s_addr = s->dst.s_addr;
mr.imr_interface.s_addr = htonl(INADDR_ANY);
sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: "
"IP_ADD_MEMBERSHIP %s", inet_ntoa(s->dst));
if (setsockopt(s->sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
(void *) &mr, sizeof(mr)) < 0) {
sigma_dut_print(dut, DUT_MSG_INFO,
"setsockopt[IP_ADD_MEMBERSHIP]: %s",
strerror(errno));
/*
* Continue anyway since this can happen, e.g., if the
* default route is missing. This is not critical for
* multicast RX testing.
*/
}
}
return 0;
}
static int set_socket_prio(struct sigma_stream *s)
{
int tos = 0x00;
switch (s->tc) {
case SIGMA_TC_VOICE:
if (s->user_priority_set) {
if (s->user_priority == 6)
tos = 48 << 2;
else if (s->user_priority == 7)
tos = 56 << 2;
else
return -1;
} else
tos = 0xe0; /* DSCP = 56 */
break;
case SIGMA_TC_VIDEO:
if (s->user_priority_set) {
if (s->user_priority == 4)
tos = 32 << 2;
else if (s->user_priority == 5)
tos = 40 << 2;
else
return -1;
} else
tos = 0xa0; /* DSCP = 40 */
break;
case SIGMA_TC_BACKGROUND:
if (s->user_priority_set) {
if (s->user_priority == 1)
tos = 8 << 2;
else if (s->user_priority == 2)
tos = 16 << 2;
else
return -1;
} else
tos = 0x20; /* DSCP = 8 */
break;
case SIGMA_TC_BEST_EFFORT:
if (s->user_priority_set) {
if (s->user_priority == 0)
tos = 0 << 2;
else if (s->user_priority == 3)
tos = 20 << 2;
else
return -1;
} else
tos = 0x00; /* DSCP = 0 */
break;
}
if (setsockopt(s->sock, IPPROTO_IP, IP_TOS, &tos, sizeof(tos)) < 0) {
perror("setsockopt");
return -1;
}
return 0;
}
static int open_socket(struct sigma_dut *dut, struct sigma_stream *s)
{
switch (s->profile) {
case SIGMA_PROFILE_FILE_TRANSFER:
return open_socket_file_transfer(dut, s);
case SIGMA_PROFILE_MULTICAST:
return open_socket_multicast(dut, s);
case SIGMA_PROFILE_IPTV:
if (open_socket_file_transfer(dut, s) < 0)
return -1;
return set_socket_prio(s);
case SIGMA_PROFILE_TRANSACTION:
return open_socket_file_transfer(dut, s);
case SIGMA_PROFILE_UAPSD:
return open_socket_file_transfer(dut, s);
case SIGMA_PROFILE_START_SYNC:
sigma_dut_print(dut, DUT_MSG_INFO, "Traffic stream profile %d "
"not yet supported", s->profile);
/* TODO */
break;
}
return -1;
}
static void send_file_fast(struct sigma_stream *s, char *pkt)
{
struct timeval stop, now;
int res;
unsigned int counter = 0;
gettimeofday(&stop, NULL);
stop.tv_sec += s->duration;
while (!s->stop) {
counter++;
WPA_PUT_BE32(&pkt[8], counter);
if ((counter & 0xf) == 0) {
gettimeofday(&now, NULL);
if (now.tv_sec > stop.tv_sec ||
(now.tv_sec == stop.tv_sec &&
now.tv_usec >= stop.tv_usec))
break;
}
s->tx_act_frames++;
res = send(s->sock, pkt, s->payload_size, 0);
if (res >= 0) {
s->tx_frames++;
s->tx_payload_bytes += res;
} else {
switch (errno) {
case EAGAIN:
case ENOBUFS:
usleep(1000);
break;
case ECONNRESET:
case EPIPE:
s->stop = 1;
break;
default:
perror("send");
break;
}
}
}
}
static void send_file(struct sigma_stream *s)
{
char *pkt;
struct timeval stop, now, start;
int res;
unsigned int counter = 0, total_sleep_usec = 0, total_pkts;
int sleep_usec = 0;
if (s->duration <= 0 || s->frame_rate < 0 || s->payload_size < 20)
return;
pkt = malloc(s->payload_size);
if (pkt == NULL)
return;
memset(pkt, 1, s->payload_size);
strncpy(pkt, "1345678", s->payload_size);
if (s->frame_rate == 0 && s->no_timestamps) {
send_file_fast(s, pkt);
free(pkt);
return;
}
gettimeofday(&stop, NULL);
stop.tv_sec += s->duration;
total_pkts = s->duration * s ->frame_rate;
gettimeofday(&start, NULL);
while (!s->stop) {
counter++;
WPA_PUT_BE32(&pkt[8], counter);
if (sleep_usec) {
usleep(sleep_usec);
total_sleep_usec += sleep_usec;
}
gettimeofday(&now, NULL);
if (now.tv_sec > stop.tv_sec ||
(now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec))
break;
if (s->frame_rate && (unsigned int) s->tx_frames >= total_pkts)
break;
if (s->frame_rate == 0 || s->tx_frames == 0)
sleep_usec = 0;
else if (sleep_usec || s->frame_rate < 10 ||
counter % (s->frame_rate / 10) == 0) {
/* Recalculate sleep_usec for every 100 ms approximately
*/
struct timeval tmp;
int diff, duration;
timersub(&now, &start, &tmp);
diff = tmp.tv_sec * 1000000 + tmp.tv_usec;
duration = (1000000 / s->frame_rate) * s->tx_frames;
if (duration > diff)
sleep_usec = (total_sleep_usec +
(duration - diff)) / s->tx_frames;
else
sleep_usec = 0;
}
WPA_PUT_BE32(&pkt[12], now.tv_sec);
WPA_PUT_BE32(&pkt[16], now.tv_usec);
s->tx_act_frames++;
res = send(s->sock, pkt, s->payload_size, 0);
if (res >= 0) {
s->tx_frames++;
s->tx_payload_bytes += res;
} else {
switch (errno) {
case EAGAIN:
case ENOBUFS:
usleep(1000);
break;
case ECONNRESET:
case EPIPE:
s->stop = 1;
break;
default:
perror("send");
break;
}
}
}
sigma_dut_print(s->dut, DUT_MSG_DEBUG,
"send_file: counter %u s->tx_frames %d total_sleep_usec %u",
counter, s->tx_frames, total_sleep_usec);
free(pkt);
}
static void send_transaction(struct sigma_stream *s)
{
char *pkt, *rpkt;
struct timeval stop, now;
int res;
unsigned int counter = 0, rcounter;
int wait_time;
fd_set rfds;
struct timeval tv;
if (s->duration <= 0 || s->frame_rate <= 0 || s->payload_size < 20)
return;
pkt = malloc(s->payload_size);
if (pkt == NULL)
return;
rpkt = malloc(s->payload_size);
if (rpkt == NULL) {
free(pkt);
return;
}
memset(pkt, 1, s->payload_size);
strncpy(pkt, "1345678", s->payload_size);
gettimeofday(&stop, NULL);
stop.tv_sec += s->duration;
wait_time = 1000000 / s->frame_rate;
while (!s->stop) {
counter++;
if (s->max_cnt && (int) counter > s->max_cnt)
break;
WPA_PUT_BE32(&pkt[8], counter);
gettimeofday(&now, NULL);
if (now.tv_sec > stop.tv_sec ||
(now.tv_sec == stop.tv_sec && now.tv_usec >= stop.tv_usec))
break;
WPA_PUT_BE32(&pkt[12], now.tv_sec);
WPA_PUT_BE32(&pkt[16], now.tv_usec);
res = send(s->sock, pkt, s->payload_size, 0);
if (res >= 0) {
s->tx_frames++;
s->tx_payload_bytes += res;
} else {
switch (errno) {
case EAGAIN:
case ENOBUFS:
usleep(1000);
break;
case ECONNRESET:
case EPIPE:
s->stop = 1;
break;
default:
perror("send");
break;
}
}
/* Wait for response */
tv.tv_sec = 0;
tv.tv_usec = wait_time;
FD_ZERO(&rfds);
FD_SET(s->sock, &rfds);
res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
if (res < 0) {
if (errno == EINTR)
continue;
perror("select");
break;
}
if (res == 0) {
/* timeout */
continue;
}
if (FD_ISSET(s->sock, &rfds)) {
/* response received */
res = recv(s->sock, rpkt, s->payload_size, 0);
if (res < 0) {
perror("recv");
break;
}
rcounter = WPA_GET_BE32(&rpkt[8]);
if (rcounter != counter)
s->out_of_seq_frames++;
s->rx_frames++;
s->rx_payload_bytes += res;
}
}
free(pkt);
free(rpkt);
}
static void * send_thread(void *ctx)
{
struct sigma_stream *s = ctx;
sleep(s->start_delay);
switch (s->profile) {
case SIGMA_PROFILE_FILE_TRANSFER:
send_file(s);
break;
case SIGMA_PROFILE_MULTICAST:
send_file(s);
break;
case SIGMA_PROFILE_IPTV:
send_file(s);
break;
case SIGMA_PROFILE_TRANSACTION:
send_transaction(s);
break;
case SIGMA_PROFILE_START_SYNC:
break;
case SIGMA_PROFILE_UAPSD:
send_uapsd_console(s);
break;
}
return NULL;
}
struct traffic_agent_send_data {
struct sigma_dut *dut;
struct sigma_conn *conn;
int streams[MAX_SIGMA_STREAMS];
int count;
};
static struct sigma_stream * get_stream(struct sigma_dut *dut, int id)
{
int i;
for (i = 0; i < dut->num_streams; i++) {
if ((unsigned int) id == dut->streams[i].stream_id)
return &dut->streams[i];
}
return NULL;
}
static void * send_report_thread(void *ctx)
{
struct traffic_agent_send_data *data = ctx;
struct sigma_dut *dut = data->dut;
struct sigma_conn *conn = data->conn;
int i, ret;
char buf[100 + MAX_SIGMA_STREAMS * 60], *pos;
for (i = 0; i < data->count; i++) {
sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: waiting "
"for stream %d send to complete",
data->streams[i]);
stop_stream(get_stream(dut, data->streams[i]));
}
buf[0] = '\0';
pos = buf;
pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,");
for (i = 0; i < data->count; i++) {
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", data->streams[i]);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
if (dut->program == PROGRAM_60GHZ) {
sigma_dut_print(dut, DUT_MSG_INFO, "reporting tx_act_frames");
pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,");
for (i = 0; i < data->count; i++) {
struct sigma_stream *s;
s = get_stream(dut, data->streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", s->tx_act_frames);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,");
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", s->tx_frames);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,");
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", s->rx_frames);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,");
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
i > 0 ? " " : "", s->tx_payload_bytes);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,");
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
i > 0 ? " " : "", s->rx_payload_bytes);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,");
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", s->out_of_seq_frames);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (!s)
continue;
s->ta_send_in_progress = 0;
if (s->trans_proto == IPPROTO_TCP) {
/*
* Close the socket to make sure client side close the
* network before the server. Otherwise, the server
* might get "Address already in use" when trying to
* reuse the port.
*/
close(s->sock);
s->sock = -1;
sigma_dut_print(dut, DUT_MSG_DEBUG,
"Closed the sender socket");
}
}
buf[sizeof(buf) - 1] = '\0';
if (conn->s < 0)
sigma_dut_print(dut, DUT_MSG_INFO, "Cannot send traffic_agent response since control socket has already been closed");
else
send_resp(dut, conn, SIGMA_COMPLETE, buf);
conn->waiting_completion = 0;
free(data);
return NULL;
}
static int cmd_traffic_agent_send(struct sigma_dut *dut,
struct sigma_conn *conn,
struct sigma_cmd *cmd)
{
const char *val;
int i, j, res;
char buf[100];
struct traffic_agent_send_data *data;
val = get_param(cmd, "streamID");
if (val == NULL)
return -1;
data = calloc(1, sizeof(*data));
if (data == NULL)
return -1;
data->dut = dut;
data->conn = conn;
data->count = get_stream_id(val, data->streams);
if (data->count < 0) {
free(data);
return -1;
}
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (!s) {
snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
"not configured", data->streams[i]);
send_resp(dut, conn, SIGMA_INVALID, buf);
free(data);
return 0;
}
for (j = 0; j < i; j++)
if (data->streams[i] == data->streams[j])
return -1;
if (!s->sender) {
snprintf(buf, sizeof(buf), "errorCode,Not configured "
"as sender for streamID %d", data->streams[i]);
send_resp(dut, conn, SIGMA_INVALID, buf);
free(data);
return 0;
}
if (s->ta_send_in_progress) {
send_resp(dut, conn, SIGMA_ERROR,
"errorCode,Multiple concurrent send cmds on same streamID not supported");
free(data);
return 0;
}
}
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (!s)
continue;
sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open "
"socket for send stream %d", data->streams[i]);
if (open_socket(dut, s) < 0) {
free(data);
return -2;
}
}
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (!s)
continue;
/*
* Provide dut context to the thread to support debugging and
* returning of error messages.
*/
s->dut = dut;
sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start "
"send for stream %d", data->streams[i]);
res = pthread_create(&s->thr, NULL, send_thread, s);
if (res) {
sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create "
"failed: %d", res);
free(data);
return -2;
}
s->started = 1;
}
sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start a thread to track sending streams");
conn->waiting_completion = 1;
res = pthread_create(&dut->thr, NULL, send_report_thread, data);
if (res) {
sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create failed: %d",
res);
free(data);
conn->waiting_completion = 0;
return -2;
}
for (i = 0; i < data->count; i++) {
struct sigma_stream *s = get_stream(dut, data->streams[i]);
if (s)
s->ta_send_in_progress = 1;
}
/* Command will be completed in send_report_thread() */
return 0;
}
static void receive_file(struct sigma_stream *s)
{
struct timeval tv, now;
fd_set rfds;
int res;
char *pkt;
int pktlen;
unsigned int last_rx = 0, counter;
pktlen = 65536 + 1;
pkt = malloc(pktlen);
if (pkt == NULL)
return;
while (!s->stop) {
FD_ZERO(&rfds);
FD_SET(s->sock, &rfds);
tv.tv_sec = 0;
tv.tv_usec = 300000;
res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
if (res < 0) {
perror("select");
usleep(10000);
} else if (FD_ISSET(s->sock, &rfds)) {
res = recv(s->sock, pkt, pktlen, 0);
if (res >= 0) {
s->rx_frames++;
s->rx_payload_bytes += res;
counter = WPA_GET_BE32(&pkt[8]);
if (counter < last_rx)
s->out_of_seq_frames++;
last_rx = counter;
} else {
perror("recv");
break;
}
if (res >= 20 && s->stats &&
s->num_stats < MAX_SIGMA_STATS) {
struct sigma_frame_stats *stats;
stats = &s->stats[s->num_stats];
s->num_stats++;
gettimeofday(&now, NULL);
stats->seqnum = counter;
stats->local_sec = now.tv_sec;
stats->local_usec = now.tv_usec;
stats->remote_sec = WPA_GET_BE32(&pkt[12]);
stats->remote_usec = WPA_GET_BE32(&pkt[16]);
}
}
}
free(pkt);
}
static void receive_transaction(struct sigma_stream *s)
{
struct timeval tv;
fd_set rfds;
int res;
char *pkt;
int pktlen;
unsigned int last_rx = 0, counter;
struct sockaddr_in addr;
socklen_t addrlen;
if (s->payload_size)
pktlen = s->payload_size;
else
pktlen = 65536 + 1;
pkt = malloc(pktlen);
if (pkt == NULL)
return;
while (!s->stop) {
FD_ZERO(&rfds);
FD_SET(s->sock, &rfds);
tv.tv_sec = 0;
tv.tv_usec = 300000;
res = select(s->sock + 1, &rfds, NULL, NULL, &tv);
if (res < 0) {
perror("select");
usleep(10000);
} else if (FD_ISSET(s->sock, &rfds)) {
addrlen = sizeof(addr);
res = recvfrom(s->sock, pkt, pktlen, 0,
(struct sockaddr *) &addr, &addrlen);
if (res < 0) {
perror("recv");
break;
}
s->rx_frames++;
s->rx_payload_bytes += res;
counter = WPA_GET_BE32(&pkt[8]);
if (counter < last_rx)
s->out_of_seq_frames++;
last_rx = counter;
/* send response */
res = sendto(s->sock, pkt, pktlen, 0,
(struct sockaddr *) &addr, addrlen);
if (res < 0) {
perror("sendto");
} else {
s->tx_frames++;
s->tx_payload_bytes += res;
}
}
}
free(pkt);
}
static void * receive_thread(void *ctx)
{
struct sigma_stream *s = ctx;
if (s->trans_proto == IPPROTO_TCP) {
/* Wait for socket to be accepted */
struct sockaddr_in connected_addr;
int connected_sock; /* returned from accept on sock */
socklen_t connected_addr_len = sizeof(connected_addr);
sigma_dut_print(s->dut, DUT_MSG_DEBUG,
"Traffic agent: Waiting on accept");
connected_sock = accept(s->sock,
(struct sockaddr *) &connected_addr,
&connected_addr_len);
if (connected_sock < 0) {
sigma_dut_print(s->dut, DUT_MSG_ERROR,
"Traffic agent: Failed to accept: %s",
strerror(errno));
return NULL;
}
sigma_dut_print(s->dut, DUT_MSG_DEBUG,
"Traffic agent: Accepted client closing parent socket and talk over connected sock.");
close(s->sock);
s->sock = connected_sock;
}
switch (s->profile) {
case SIGMA_PROFILE_FILE_TRANSFER:
receive_file(s);
break;
case SIGMA_PROFILE_MULTICAST:
receive_file(s);
break;
case SIGMA_PROFILE_IPTV:
receive_file(s);
break;
case SIGMA_PROFILE_TRANSACTION:
receive_transaction(s);
break;
case SIGMA_PROFILE_START_SYNC:
break;
case SIGMA_PROFILE_UAPSD:
receive_uapsd(s);
break;
}
return NULL;
}
static int cmd_traffic_agent_receive_start(struct sigma_dut *dut,
struct sigma_conn *conn,
struct sigma_cmd *cmd)
{
const char *val;
int streams[MAX_SIGMA_STREAMS];
int i, j, count;
char buf[100];
val = get_param(cmd, "streamID");
if (val == NULL)
return -1;
count = get_stream_id(val, streams);
if (count < 0)
return -1;
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s) {
snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
"not configured", streams[i]);
send_resp(dut, conn, SIGMA_INVALID, buf);
return 0;
}
for (j = 0; j < i; j++)
if (streams[i] == streams[j])
return -1;
if (s->sender) {
snprintf(buf, sizeof(buf), "errorCode,Not configured "
"as receiver for streamID %d", streams[i]);
send_resp(dut, conn, SIGMA_INVALID, buf);
return 0;
}
}
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s)
continue;
sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: open "
"receive socket for stream %d", streams[i]);
if (open_socket(dut, s) < 0)
return -2;
}
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
int res;
if (!s)
continue;
/*
* Provide dut context to the thread to support debugging and
* returning of error messages. Similarly, provide interface
* information to the thread. If the Interface parameter is not
* passed, get it from get_station_ifname() since the interface
* name is needed for power save mode configuration for Uapsd
* cases.
*/
s->dut = dut;
val = get_param(cmd, "Interface");
strncpy(s->ifname, (val ? val : get_station_ifname()),
sizeof(s->ifname));
s->ifname[sizeof(s->ifname) - 1] = '\0';
sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: start "
"receive for stream %d", streams[i]);
res = pthread_create(&s->thr, NULL, receive_thread, s);
if (res) {
sigma_dut_print(dut, DUT_MSG_INFO, "pthread_create "
"failed: %d", res);
return -2;
}
s->started = 1;
}
return 1;
}
static void write_frame_stats(struct sigma_dut *dut, struct sigma_stream *s,
int id)
{
char fname[128];
FILE *f;
unsigned int i;
snprintf(fname, sizeof(fname), SIGMA_TMPDIR "/e2e%u-%d.txt",
(unsigned int) time(NULL), id);
f = fopen(fname, "w");
if (f == NULL) {
sigma_dut_print(dut, DUT_MSG_INFO, "Could not write %s",
fname);
return;
}
fprintf(f, "seqnum:local_sec:local_usec:remote_sec:remote_usec\n");
sigma_dut_print(dut, DUT_MSG_DEBUG, "Writing frame stats to %s",
fname);
for (i = 0; i < s->num_stats; i++) {
struct sigma_frame_stats *stats = &s->stats[i];
fprintf(f, "%u:%u:%u:%u:%u\n", stats->seqnum,
stats->local_sec, stats->local_usec,
stats->remote_sec, stats->remote_usec);
}
fclose(f);
}
static int cmd_traffic_agent_receive_stop(struct sigma_dut *dut,
struct sigma_conn *conn,
struct sigma_cmd *cmd)
{
const char *val;
int streams[MAX_SIGMA_STREAMS];
int i, j, ret, count;
char buf[100 + MAX_SIGMA_STREAMS * 60], *pos;
val = get_param(cmd, "streamID");
if (val == NULL)
return -1;
count = get_stream_id(val, streams);
if (count < 0)
return -1;
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s) {
snprintf(buf, sizeof(buf), "errorCode,StreamID %d "
"not configured", streams[i]);
send_resp(dut, conn, SIGMA_INVALID, buf);
return 0;
}
for (j = 0; j < i; j++)
if (streams[i] == streams[j])
return -1;
if (!s->started) {
snprintf(buf, sizeof(buf), "errorCode,Receive not "
"started for streamID %d", streams[i]);
send_resp(dut, conn, SIGMA_INVALID, buf);
return 0;
}
}
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (s)
s->stop = 1;
}
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s)
continue;
sigma_dut_print(dut, DUT_MSG_DEBUG, "Traffic agent: stop "
"receive for stream %d", streams[i]);
stop_stream(s);
}
buf[0] = '\0';
pos = buf;
pos += snprintf(pos, buf + sizeof(buf) - pos, "streamID,");
for (i = 0; i < count; i++) {
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", streams[i]);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
if (dut->program == PROGRAM_60GHZ) {
pos += snprintf(pos, buf + sizeof(buf) - pos, ",txActFrames,");
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", s->tx_act_frames);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",txFrames,");
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", s->tx_frames);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxFrames,");
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", s->rx_frames);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",txPayloadBytes,");
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
i > 0 ? " " : "", s->tx_payload_bytes);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",rxPayloadBytes,");
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%llu",
i > 0 ? " " : "", s->rx_payload_bytes);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
pos += snprintf(pos, buf + sizeof(buf) - pos, ",outOfSequenceFrames,");
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s)
continue;
ret = snprintf(pos, buf + sizeof(buf) - pos, "%s%d",
i > 0 ? " " : "", s->out_of_seq_frames);
if (ret < 0 || ret >= buf + sizeof(buf) - pos)
break;
pos += ret;
}
buf[sizeof(buf) - 1] = '\0';
send_resp(dut, conn, SIGMA_COMPLETE, buf);
for (i = 0; i < count; i++) {
struct sigma_stream *s = get_stream(dut, streams[i]);
if (!s)
continue;
if (s->profile == SIGMA_PROFILE_IPTV && s->num_stats > 0 &&
dut->write_stats)
write_frame_stats(dut, s, streams[i]);
free(s->stats);
s->stats = NULL;
s->num_stats = 0;
}
return 0;
}
static int cmd_traffic_agent_version(struct sigma_dut *dut,
struct sigma_conn *conn,
struct sigma_cmd *cmd)
{
send_resp(dut, conn, SIGMA_COMPLETE, "version,1.0");
return 0;
}
void traffic_agent_register_cmds(void)
{
sigma_dut_reg_cmd("traffic_agent_config", NULL,
cmd_traffic_agent_config);
sigma_dut_reg_cmd("traffic_agent_reset", NULL,
cmd_traffic_agent_reset);
sigma_dut_reg_cmd("traffic_agent_send", NULL,
cmd_traffic_agent_send);
sigma_dut_reg_cmd("traffic_agent_receive_start", NULL,
cmd_traffic_agent_receive_start);
sigma_dut_reg_cmd("traffic_agent_receive_stop", NULL,
cmd_traffic_agent_receive_stop);
sigma_dut_reg_cmd("traffic_agent_version", NULL,
cmd_traffic_agent_version);
}