samples: net: zperf: Rewrite download part to use sockets
Rewrite the TCP/UDP download part of the zperf sample to use socket API.
For UDP, performance impact is negligible (< 1 Mbps), for TCP it's
noticable, but still throughputs can be considered satisfactory (up to
~75 Mbps).
Signed-off-by: Robert Lubos <robert.lubos@nordicsemi.no>
diff --git a/samples/net/zperf/src/zperf_session.c b/samples/net/zperf/src/zperf_session.c
index 8cf5f35..bc643ec 100644
--- a/samples/net/zperf/src/zperf_session.c
+++ b/samples/net/zperf/src/zperf_session.c
@@ -19,64 +19,44 @@
static struct session sessions[SESSION_PROTO_END][SESSION_MAX];
/* Get session from a given packet */
-struct session *get_session(struct net_pkt *pkt,
- union net_ip_header *ip_hdr,
- union net_proto_header *proto_hdr,
+struct session *get_session(const struct sockaddr *addr,
enum session_proto proto)
{
struct session *active = NULL;
struct session *free = NULL;
- struct in6_addr ipv6 = { };
- struct in_addr ipv4 = { };
- struct net_udp_hdr *udp_hdr;
int i = 0;
- uint16_t port;
-
- if (!pkt) {
- printk("Error! null pkt detected.\n");
- return NULL;
- }
+ const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr;
+ const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr;
if (proto != SESSION_TCP && proto != SESSION_UDP) {
printk("Error! unsupported proto.\n");
return NULL;
}
- udp_hdr = proto_hdr->udp;
-
- /* Get tuple of the remote connection */
- port = udp_hdr->src_port;
-
- if (net_pkt_family(pkt) == AF_INET6) {
- net_ipv6_addr_copy_raw((uint8_t *)&ipv6, ip_hdr->ipv6->src);
- } else if (net_pkt_family(pkt) == AF_INET) {
- net_ipv4_addr_copy_raw((uint8_t *)&ipv4, ip_hdr->ipv4->src);
- } else {
- printk("Error! unsupported protocol %d\n",
- net_pkt_family(pkt));
- return NULL;
- }
-
/* Check whether we already have an active session */
while (!active && i < SESSION_MAX) {
struct session *ptr = &sessions[proto][i];
-#if defined(CONFIG_NET_IPV4)
- if (ptr->port == port &&
- net_pkt_family(pkt) == AF_INET &&
- net_ipv4_addr_cmp(&ptr->ip.in_addr, &ipv4)) {
+ if (IS_ENABLED(CONFIG_NET_IPV4) &&
+ addr->sa_family == AF_INET &&
+ ptr->ip.family == AF_INET &&
+ ptr->port == addr4->sin_port &&
+ net_ipv4_addr_cmp(&ptr->ip.in_addr, &addr4->sin_addr)) {
/* We found an active session */
active = ptr;
- } else
-#endif
-#if defined(CONFIG_NET_IPV6)
- if (ptr->port == port &&
- net_pkt_family(pkt) == AF_INET6 &&
- net_ipv6_addr_cmp(&ptr->ip.in6_addr, &ipv6)) {
+ break;
+ }
+
+ if (IS_ENABLED(CONFIG_NET_IPV6) &&
+ addr->sa_family == AF_INET6 &&
+ ptr->ip.family == AF_INET6 &&
+ ptr->port == addr6->sin6_port &&
+ net_ipv6_addr_cmp(&ptr->ip.in6_addr, &addr6->sin6_addr)) {
/* We found an active session */
active = ptr;
- } else
-#endif
+ break;
+ }
+
if (!free && (ptr->state == STATE_NULL ||
ptr->state == STATE_COMPLETED)) {
/* We found a free slot - just in case */
@@ -89,30 +69,30 @@
/* If no active session then create a new one */
if (!active && free) {
active = free;
- active->port = port;
-#if defined(CONFIG_NET_IPV6)
- if (net_pkt_family(pkt) == AF_INET6) {
- net_ipaddr_copy(&active->ip.in6_addr, &ipv6);
+ if (IS_ENABLED(CONFIG_NET_IPV4) && addr->sa_family == AF_INET) {
+ active->port = addr4->sin_port;
+ active->ip.family = AF_INET;
+ net_ipaddr_copy(&active->ip.in_addr, &addr4->sin_addr);
+ } else if (IS_ENABLED(CONFIG_NET_IPV6) &&
+ addr->sa_family == AF_INET6) {
+ active->port = addr6->sin6_port;
+ active->ip.family = AF_INET6;
+ net_ipaddr_copy(&active->ip.in6_addr, &addr6->sin6_addr);
}
-#endif
-#if defined(CONFIG_NET_IPV4)
- if (net_pkt_family(pkt) == AF_INET) {
- net_ipaddr_copy(&active->ip.in_addr, &ipv4);
- }
-#endif
}
return active;
}
-struct session *get_tcp_session(struct net_context *ctx)
+/* TODO Unify session handling */
+struct session *get_tcp_session(int sock)
{
struct session *free = NULL;
int i = 0;
- if (!ctx) {
- printk("Error! null context detected.\n");
+ if (sock < 0) {
+ printk("Error! Invalid socket.\n");
return NULL;
}
@@ -120,7 +100,7 @@
while (i < SESSION_MAX) {
struct session *ptr = &sessions[SESSION_TCP][i];
- if (ptr->ctx == ctx) {
+ if (ptr->sock == sock) {
return ptr;
}
@@ -135,7 +115,7 @@
}
if (free) {
- free->ctx = ctx;
+ free->sock = sock;
}
return free;
diff --git a/samples/net/zperf/src/zperf_session.h b/samples/net/zperf/src/zperf_session.h
index 79900df..71c5dbb 100644
--- a/samples/net/zperf/src/zperf_session.h
+++ b/samples/net/zperf/src/zperf_session.h
@@ -39,7 +39,7 @@
struct net_addr ip;
/* TCP session */
- struct net_context *ctx;
+ int sock;
enum state state;
@@ -58,11 +58,9 @@
struct zperf_server_hdr stat;
};
-struct session *get_session(struct net_pkt *pkt,
- union net_ip_header *ip_hdr,
- union net_proto_header *proto_hdr,
+struct session *get_session(const struct sockaddr *addr,
enum session_proto proto);
-struct session *get_tcp_session(struct net_context *ctx);
+struct session *get_tcp_session(int sock);
void zperf_session_init(void);
void zperf_reset_session_stats(struct session *session);
diff --git a/samples/net/zperf/src/zperf_tcp_receiver.c b/samples/net/zperf/src/zperf_tcp_receiver.c
index 86e7cf8..d97652b 100644
--- a/samples/net/zperf/src/zperf_tcp_receiver.c
+++ b/samples/net/zperf/src/zperf_tcp_receiver.c
@@ -14,9 +14,7 @@
#include <zephyr/sys/printk.h>
-#include <zephyr/net/net_core.h>
-#include <zephyr/net/net_ip.h>
-#include <zephyr/net/net_pkt.h>
+#include <zephyr/net/socket.h>
#include "zperf.h"
#include "zperf_internal.h"
@@ -30,28 +28,35 @@
static struct sockaddr_in6 *in6_addr_my;
static struct sockaddr_in *in4_addr_my;
-const struct shell *tcp_shell;
+static bool init_done;
-static void tcp_received(struct net_context *context,
- struct net_pkt *pkt,
- union net_ip_header *ip_hdr,
- union net_proto_header *proto_hdr,
- int status,
- void *user_data)
+#if IS_ENABLED(CONFIG_NET_TC_THREAD_COOPERATIVE)
+#define TCP_RECEIVER_THREAD_PRIORITY K_PRIO_COOP(8)
+#else
+#define TCP_RECEIVER_THREAD_PRIORITY K_PRIO_PREEMPT(8)
+#endif
+
+#define TCP_RECEIVER_STACK_SIZE 2048
+
+#define SOCK_ID_IPV4_LISTEN 0
+#define SOCK_ID_IPV4_DATA 1
+#define SOCK_ID_IPV6_LISTEN 2
+#define SOCK_ID_IPV6_DATA 3
+#define SOCK_ID_MAX 4
+
+#define TCP_RECEIVER_BUF_SIZE 1500
+
+K_THREAD_STACK_DEFINE(tcp_receiver_stack_area, TCP_RECEIVER_STACK_SIZE);
+struct k_thread tcp_receiver_thread_data;
+
+static void tcp_received(const struct shell *shell, int sock, size_t datalen)
{
- const struct shell *shell = tcp_shell;
struct session *session;
int64_t time;
- int len = 0;
-
- if (!shell) {
- printk("Shell is not set!\n");
- return;
- }
time = k_uptime_ticks();
- session = get_tcp_session(context);
+ session = get_tcp_session(sock);
if (!session) {
shell_fprintf(shell, SHELL_WARNING, "Cannot get a session!\n");
return;
@@ -69,13 +74,9 @@
__fallthrough;
case STATE_ONGOING:
session->counter++;
+ session->length += datalen;
- if (pkt) {
- len = net_pkt_remaining_data(pkt);
- session->length += len;
- }
-
- if (pkt == NULL && status == 0) { /* EOF */
+ if (datalen == 0) { /* EOF */
uint32_t rate_in_kbps;
uint32_t duration;
@@ -108,13 +109,9 @@
zperf_tcp_stopped();
- net_context_unref(context);
session->state = STATE_NULL;
}
- if (pkt) {
- (void)net_context_update_recv_wnd(context, len);
- }
break;
case STATE_LAST_PACKET_RECEIVED:
@@ -122,60 +119,33 @@
default:
shell_fprintf(shell, SHELL_WARNING, "Unsupported case\n");
}
-
- if (pkt) {
- net_pkt_unref(pkt);
- }
}
-static void tcp_accepted(struct net_context *context,
- struct sockaddr *addr,
- socklen_t addrlen,
- int error,
- void *user_data)
+void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
{
- const struct shell *shell = user_data;
+ ARG_UNUSED(ptr3);
+
+ static uint8_t buf[TCP_RECEIVER_BUF_SIZE];
+ const struct shell *shell = ptr1;
+ int port = POINTER_TO_INT(ptr2);
+ struct pollfd fds[SOCK_ID_MAX] = { 0 };
int ret;
- ret = net_context_recv(context, tcp_received, K_NO_WAIT, user_data);
- if (ret < 0) {
- shell_fprintf(shell, SHELL_WARNING,
- "Cannot receive TCP packet (family %d)",
- net_context_get_family(context));
- }
-}
-
-void zperf_tcp_receiver_init(const struct shell *shell, int port)
-{
- static bool init_done;
- struct net_context *context4 = NULL;
- struct net_context *context6 = NULL;
- const struct in_addr *in4_addr = NULL;
- const struct in6_addr *in6_addr = NULL;
- int ret;
-
- if (init_done) {
- zperf_tcp_started();
- return;
- }
-
- tcp_shell = shell;
-
- if (IS_ENABLED(CONFIG_NET_IPV6)) {
- in6_addr_my = zperf_get_sin6();
+ for (int i = 0; i < ARRAY_SIZE(fds); i++) {
+ fds[i].fd = -1;
}
if (IS_ENABLED(CONFIG_NET_IPV4)) {
+ const struct in_addr *in4_addr = NULL;
+
in4_addr_my = zperf_get_sin();
- }
- if (IS_ENABLED(CONFIG_NET_IPV4)) {
- ret = net_context_get(AF_INET, SOCK_STREAM, IPPROTO_TCP,
- &context4);
- if (ret < 0) {
+ fds[SOCK_ID_IPV4_LISTEN].fd = socket(AF_INET, SOCK_STREAM,
+ IPPROTO_TCP);
+ if (fds[SOCK_ID_IPV4_LISTEN].fd < 0) {
shell_fprintf(shell, SHELL_WARNING,
- "Cannot get IPv4 TCP network context.\n");
- return;
+ "Cannot create IPv4 network socket.\n");
+ goto cleanup;
}
if (MY_IP4ADDR && strlen(MY_IP4ADDR)) {
@@ -200,19 +170,43 @@
sizeof(struct in_addr));
}
+ in4_addr_my->sin_port = htons(port);
+
shell_fprintf(shell, SHELL_NORMAL, "Binding to %s\n",
net_sprint_ipv4_addr(&in4_addr_my->sin_addr));
- in4_addr_my->sin_port = htons(port);
+ ret = bind(fds[SOCK_ID_IPV4_LISTEN].fd,
+ (struct sockaddr *)in4_addr_my,
+ sizeof(struct sockaddr_in));
+ if (ret < 0) {
+ shell_fprintf(shell, SHELL_WARNING,
+ "Cannot bind IPv4 UDP port %d (%d)\n",
+ ntohs(in4_addr_my->sin_port),
+ errno);
+ goto cleanup;
+ }
+
+ ret = listen(fds[SOCK_ID_IPV4_LISTEN].fd, 1);
+ if (ret < 0) {
+ shell_fprintf(shell, SHELL_WARNING,
+ "Cannot listen IPv4 TCP (%d)", errno);
+ goto cleanup;
+ }
+
+ fds[SOCK_ID_IPV4_LISTEN].events = POLLIN;
}
if (IS_ENABLED(CONFIG_NET_IPV6)) {
- ret = net_context_get(AF_INET6, SOCK_STREAM, IPPROTO_TCP,
- &context6);
- if (ret < 0) {
+ const struct in6_addr *in6_addr = NULL;
+
+ in6_addr_my = zperf_get_sin6();
+
+ fds[SOCK_ID_IPV6_LISTEN].fd = socket(AF_INET6, SOCK_STREAM,
+ IPPROTO_TCP);
+ if (fds[SOCK_ID_IPV6_LISTEN].fd < 0) {
shell_fprintf(shell, SHELL_WARNING,
- "Cannot get IPv6 TCP network context.\n");
- return;
+ "Cannot create IPv6 network socket.\n");
+ goto cleanup;
}
if (MY_IP6ADDR && strlen(MY_IP6ADDR)) {
@@ -238,71 +232,141 @@
sizeof(struct in6_addr));
}
+ in6_addr_my->sin6_port = htons(port);
+
shell_fprintf(shell, SHELL_NORMAL, "Binding to %s\n",
net_sprint_ipv6_addr(&in6_addr_my->sin6_addr));
- in6_addr_my->sin6_port = htons(port);
- }
-
- if (IS_ENABLED(CONFIG_NET_IPV6) && context6) {
- ret = net_context_bind(context6,
- (struct sockaddr *)in6_addr_my,
- sizeof(struct sockaddr_in6));
+ ret = bind(fds[SOCK_ID_IPV6_LISTEN].fd,
+ (struct sockaddr *)in6_addr_my,
+ sizeof(struct sockaddr_in6));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
- "Cannot bind IPv6 TCP port %d (%d)\n",
- ntohs(in6_addr_my->sin6_port), ret);
- return;
+ "Cannot bind IPv6 UDP port %d (%d)\n",
+ ntohs(in6_addr_my->sin6_port),
+ errno);
+ goto cleanup;
}
- ret = net_context_listen(context6, 0);
+ ret = listen(fds[SOCK_ID_IPV6_LISTEN].fd, 1);
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
- "Cannot listen IPv6 TCP (%d)", ret);
- return;
+ "Cannot listen IPv6 TCP (%d)", errno);
+ goto cleanup;
}
- ret = net_context_accept(context6, tcp_accepted, K_NO_WAIT,
- NULL);
- if (ret < 0) {
- shell_fprintf(shell, SHELL_WARNING,
- "Cannot receive IPv6 TCP packets (%d)",
- ret);
- return;
- }
- }
-
- if (IS_ENABLED(CONFIG_NET_IPV4) && context4) {
- ret = net_context_bind(context4,
- (struct sockaddr *)in4_addr_my,
- sizeof(struct sockaddr_in));
- if (ret < 0) {
- shell_fprintf(shell, SHELL_WARNING,
- "Cannot bind IPv4 TCP port %d (%d)\n",
- ntohs(in4_addr_my->sin_port), ret);
- return;
- }
-
- ret = net_context_listen(context4, 0);
- if (ret < 0) {
- shell_fprintf(shell, SHELL_WARNING,
- "Cannot listen IPv4 TCP (%d)", ret);
- return;
- }
-
- ret = net_context_accept(context4, tcp_accepted, K_NO_WAIT,
- (void *)shell);
- if (ret < 0) {
- shell_fprintf(shell, SHELL_WARNING,
- "Cannot receive IPv4 TCP packets (%d)",
- ret);
- return;
- }
+ fds[SOCK_ID_IPV6_LISTEN].events = POLLIN;
}
shell_fprintf(shell, SHELL_NORMAL,
"Listening on port %d\n", port);
+ /* TODO Investigate started/stopped logic */
zperf_tcp_started();
init_done = true;
+
+ while (true) {
+ ret = poll(fds, ARRAY_SIZE(fds), -1);
+ if (ret < 0) {
+ shell_fprintf(shell, SHELL_WARNING,
+ "TCP receiver poll error (%d)\n",
+ errno);
+ goto cleanup;
+ }
+
+ for (int i = 0; i < ARRAY_SIZE(fds); i++) {
+ struct sockaddr addr;
+ socklen_t addrlen = sizeof(addr);
+
+ if ((fds[i].revents & POLLERR) ||
+ (fds[i].revents & POLLNVAL)) {
+ shell_fprintf(
+ shell, SHELL_WARNING,
+ "TCP receiver IPv%d socket error\n",
+ (i <= SOCK_ID_IPV4_DATA) ? 4 : 6);
+ goto cleanup;
+ }
+
+ if (!(fds[i].revents & POLLIN)) {
+ continue;
+ }
+
+ switch (i) {
+ case SOCK_ID_IPV4_LISTEN:
+ case SOCK_ID_IPV6_LISTEN:{
+ int sock = accept(fds[i].fd, &addr, &addrlen);
+
+ if (sock < 0) {
+ shell_fprintf(
+ shell, SHELL_WARNING,
+ "TCP receiver IPv%d accept error\n",
+ (i <= SOCK_ID_IPV4_DATA) ? 4 : 6);
+ goto cleanup;
+ }
+
+ if (i == SOCK_ID_IPV4_LISTEN &&
+ fds[SOCK_ID_IPV4_DATA].fd < 0) {
+ fds[SOCK_ID_IPV4_DATA].fd = sock;
+ fds[SOCK_ID_IPV4_DATA].events = POLLIN;
+ } else if (i == SOCK_ID_IPV6_LISTEN &&
+ fds[SOCK_ID_IPV6_DATA].fd < 0) {
+ fds[SOCK_ID_IPV6_DATA].fd = sock;
+ fds[SOCK_ID_IPV6_DATA].events = POLLIN;
+ } else {
+ /* Too many connections. */
+ close(sock);
+ break;
+ }
+
+ break;
+ }
+
+ case SOCK_ID_IPV4_DATA:
+ case SOCK_ID_IPV6_DATA:
+ ret = recv(fds[i].fd, buf, sizeof(buf), 0);
+ if (ret < 0) {
+ shell_fprintf(
+ shell, SHELL_WARNING,
+ "recv failed on IPv%d socket (%d)\n",
+ (i <= SOCK_ID_IPV4_DATA) ? 4 : 6,
+ errno);
+ goto cleanup;
+ }
+
+ tcp_received(shell, fds[i].fd, ret);
+
+ if (ret == 0) {
+ close(fds[i].fd);
+ fds[i].fd = -1;
+ }
+
+ break;
+ }
+ }
+ }
+
+cleanup:
+ for (int i = 0; i < ARRAY_SIZE(fds); i++) {
+ if (fds[i].fd >= 0) {
+ close(fds[i].fd);
+ }
+ }
+}
+
+void zperf_tcp_receiver_init(const struct shell *shell, int port)
+{
+ if (init_done) {
+ zperf_tcp_started();
+ return;
+ }
+
+ k_thread_create(&tcp_receiver_thread_data,
+ tcp_receiver_stack_area,
+ K_THREAD_STACK_SIZEOF(tcp_receiver_stack_area),
+ tcp_receiver_thread,
+ (void *)shell, INT_TO_POINTER(port), NULL,
+ TCP_RECEIVER_THREAD_PRIORITY,
+ IS_ENABLED(CONFIG_USERSPACE) ? K_USER |
+ K_INHERIT_PERMS : 0,
+ K_NO_WAIT);
}
diff --git a/samples/net/zperf/src/zperf_udp_receiver.c b/samples/net/zperf/src/zperf_udp_receiver.c
index 35e9c55..c47f701 100644
--- a/samples/net/zperf/src/zperf_udp_receiver.c
+++ b/samples/net/zperf/src/zperf_udp_receiver.c
@@ -13,9 +13,7 @@
#include <zephyr/zephyr.h>
#include <zephyr/sys/printk.h>
-#include <zephyr/net/net_core.h>
-#include <zephyr/net/net_pkt.h>
-#include <zephyr/net/udp.h>
+#include <zephyr/net/socket.h>
#include "zperf.h"
#include "zperf_internal.h"
@@ -29,27 +27,22 @@
static struct sockaddr_in6 *in6_addr_my;
static struct sockaddr_in *in4_addr_my;
-static inline void set_dst_addr(const struct shell *shell,
- sa_family_t family,
- struct net_pkt *pkt,
- union net_ip_header *ip_hdr,
- struct net_udp_hdr *udp_hdr,
- struct sockaddr *dst_addr)
-{
- if (IS_ENABLED(CONFIG_NET_IPV6) && family == AF_INET6) {
- net_ipv6_addr_copy_raw((uint8_t *)&net_sin6(dst_addr)->sin6_addr,
- ip_hdr->ipv6->src);
- net_sin6(dst_addr)->sin6_family = AF_INET6;
- net_sin6(dst_addr)->sin6_port = udp_hdr->src_port;
- }
+#if IS_ENABLED(CONFIG_NET_TC_THREAD_COOPERATIVE)
+#define UDP_RECEIVER_THREAD_PRIORITY K_PRIO_COOP(8)
+#else
+#define UDP_RECEIVER_THREAD_PRIORITY K_PRIO_PREEMPT(8)
+#endif
- if (IS_ENABLED(CONFIG_NET_IPV4) && family == AF_INET) {
- net_ipv4_addr_copy_raw((uint8_t *)&net_sin(dst_addr)->sin_addr,
- ip_hdr->ipv4->src);
- net_sin(dst_addr)->sin_family = AF_INET;
- net_sin(dst_addr)->sin_port = udp_hdr->src_port;
- }
-}
+#define UDP_RECEIVER_STACK_SIZE 2048
+
+#define SOCK_ID_IPV4 0
+#define SOCK_ID_IPV6 1
+#define SOCK_ID_MAX 2
+
+#define UDP_RECEIVER_BUF_SIZE 1500
+
+K_THREAD_STACK_DEFINE(udp_receiver_stack_area, UDP_RECEIVER_STACK_SIZE);
+struct k_thread udp_receiver_thread_data;
static inline void build_reply(struct zperf_udp_datagram *hdr,
struct zperf_server_hdr *stat,
@@ -80,72 +73,51 @@
sizeof(struct zperf_server_hdr)
static int zperf_receiver_send_stat(const struct shell *shell,
- struct net_context *context,
- struct net_pkt *pkt,
- union net_ip_header *ip_hdr,
- struct net_udp_hdr *udp_hdr,
+ int sock, const struct sockaddr *addr,
struct zperf_udp_datagram *hdr,
struct zperf_server_hdr *stat)
{
uint8_t reply[BUF_SIZE];
- struct sockaddr dst_addr;
int ret;
- shell_fprintf(shell, SHELL_NORMAL,
- "Received %d bytes\n", net_pkt_remaining_data(pkt));
-
- set_dst_addr(shell, net_pkt_family(pkt),
- pkt, ip_hdr, udp_hdr, &dst_addr);
-
build_reply(hdr, stat, reply);
- ret = net_context_sendto(context, reply, BUF_SIZE, &dst_addr,
- net_pkt_family(pkt) == AF_INET6 ?
- sizeof(struct sockaddr_in6) :
- sizeof(struct sockaddr_in),
- NULL, K_NO_WAIT, NULL);
+ ret = sendto(sock, reply, sizeof(reply), 0, addr,
+ addr->sa_family == AF_INET6 ?
+ sizeof(struct sockaddr_in6) :
+ sizeof(struct sockaddr_in));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
- " Cannot send data to peer (%d)", ret);
+ " Cannot send data to peer (%d)", errno);
}
return ret;
}
-static void udp_received(struct net_context *context,
- struct net_pkt *pkt,
- union net_ip_header *ip_hdr,
- union net_proto_header *proto_hdr,
- int status,
- void *user_data)
+static void udp_received(const struct shell *shell, int sock,
+ const struct sockaddr *addr, uint8_t *data,
+ size_t datalen)
{
- NET_PKT_DATA_ACCESS_DEFINE(zperf, struct zperf_udp_datagram);
- struct net_udp_hdr *udp_hdr = proto_hdr->udp;
- const struct shell *shell = user_data;
struct zperf_udp_datagram *hdr;
struct session *session;
int32_t transit_time;
int64_t time;
int32_t id;
- if (!pkt) {
+ if (datalen < sizeof(struct zperf_udp_datagram)) {
+ shell_fprintf(shell, SHELL_WARNING,
+ "Short iperf packet!\n");
return;
}
- hdr = (struct zperf_udp_datagram *)net_pkt_get_data(pkt, &zperf);
- if (!hdr) {
- shell_fprintf(shell, SHELL_WARNING,
- "Short iperf packet!\n");
- goto out;
- }
-
+ hdr = (struct zperf_udp_datagram *)data;
time = k_uptime_ticks();
- session = get_session(pkt, ip_hdr, proto_hdr, SESSION_UDP);
+ session = get_session(addr, SESSION_UDP);
if (!session) {
shell_fprintf(shell, SHELL_WARNING,
"Cannot get a session!\n");
- goto out;
+ return;
}
id = ntohl(hdr->id);
@@ -157,8 +129,7 @@
/* Session is already completed: Resend the stat packet
* and continue
*/
- if (zperf_receiver_send_stat(shell, context, pkt,
- ip_hdr, udp_hdr, hdr,
+ if (zperf_receiver_send_stat(shell, sock, addr, hdr,
&session->stat) < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Failed to send the packet\n");
@@ -209,8 +180,7 @@
session->stat.jitter1 = 0;
session->stat.jitter2 = session->jitter;
- if (zperf_receiver_send_stat(shell, context, pkt,
- ip_hdr, udp_hdr, hdr,
+ if (zperf_receiver_send_stat(shell, sock, addr, hdr,
&session->stat) < 0) {
shell_fprintf(shell, SHELL_WARNING,
"Failed to send the packet\n");
@@ -244,7 +214,7 @@
} else {
/* Update counter */
session->counter++;
- session->length += net_pkt_remaining_data(pkt);
+ session->length += datalen;
/* Compute jitter */
transit_time = time_delta(
@@ -281,35 +251,32 @@
default:
break;
}
-
-out:
- net_pkt_unref(pkt);
}
-void zperf_udp_receiver_init(const struct shell *shell, int port)
+void udp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
{
- struct net_context *context4 = NULL;
- struct net_context *context6 = NULL;
- const struct in_addr *in4_addr = NULL;
- const struct in6_addr *in6_addr = NULL;
+ ARG_UNUSED(ptr3);
+
+ static uint8_t buf[UDP_RECEIVER_BUF_SIZE];
+ const struct shell *shell = ptr1;
+ int port = POINTER_TO_INT(ptr2);
+ struct pollfd fds[SOCK_ID_MAX] = { 0 };
int ret;
- if (IS_ENABLED(CONFIG_NET_IPV6)) {
- in6_addr_my = zperf_get_sin6();
+ for (int i = 0; i < ARRAY_SIZE(fds); i++) {
+ fds[i].fd = -1;
}
if (IS_ENABLED(CONFIG_NET_IPV4)) {
+ const struct in_addr *in4_addr = NULL;
+
in4_addr_my = zperf_get_sin();
- }
-
- if (IS_ENABLED(CONFIG_NET_IPV4)) {
- ret = net_context_get(AF_INET, SOCK_DGRAM, IPPROTO_UDP,
- &context4);
- if (ret < 0) {
+ fds[SOCK_ID_IPV4].fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
+ if (fds[SOCK_ID_IPV4].fd < 0) {
shell_fprintf(shell, SHELL_WARNING,
- "Cannot get IPv4 network context.\n");
- return;
+ "Cannot create IPv4 network socket.\n");
+ goto cleanup;
}
if (MY_IP4ADDR && strlen(MY_IP4ADDR)) {
@@ -328,7 +295,7 @@
if (!in4_addr) {
shell_fprintf(shell, SHELL_WARNING,
"Unable to get IPv4 by default\n");
- return;
+ goto cleanup;
}
memcpy(&in4_addr_my->sin_addr, in4_addr,
sizeof(struct in_addr));
@@ -339,27 +306,30 @@
in4_addr_my->sin_port = htons(port);
- if (context4) {
- ret = net_context_bind(context4,
- (struct sockaddr *)in4_addr_my,
- sizeof(struct sockaddr_in));
- if (ret < 0) {
- shell_fprintf(shell, SHELL_WARNING,
- "Cannot bind IPv4 UDP port %d (%d)\n",
- ntohs(in4_addr_my->sin_port),
- ret);
- return;
- }
+ ret = bind(fds[SOCK_ID_IPV4].fd,
+ (struct sockaddr *)in4_addr_my,
+ sizeof(struct sockaddr_in));
+ if (ret < 0) {
+ shell_fprintf(shell, SHELL_WARNING,
+ "Cannot bind IPv4 UDP port %d (%d)\n",
+ ntohs(in4_addr_my->sin_port),
+ errno);
+ goto cleanup;
}
+
+ fds[SOCK_ID_IPV4].events = POLLIN;
}
if (IS_ENABLED(CONFIG_NET_IPV6)) {
- ret = net_context_get(AF_INET6, SOCK_DGRAM, IPPROTO_UDP,
- &context6);
- if (ret < 0) {
+ const struct in6_addr *in6_addr = NULL;
+
+ in6_addr_my = zperf_get_sin6();
+
+ fds[SOCK_ID_IPV6].fd = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
+ if (fds[SOCK_ID_IPV6].fd < 0) {
shell_fprintf(shell, SHELL_WARNING,
- "Cannot get IPv6 network context.\n");
- return;
+ "Cannot create IPv4 network socket.\n");
+ goto cleanup;
}
if (MY_IP6ADDR && strlen(MY_IP6ADDR)) {
@@ -379,7 +349,7 @@
if (!in6_addr) {
shell_fprintf(shell, SHELL_WARNING,
"Unable to get IPv4 by default\n");
- return;
+ goto cleanup;
}
memcpy(&in6_addr_my->sin6_addr, in6_addr,
sizeof(struct in6_addr));
@@ -390,40 +360,80 @@
in6_addr_my->sin6_port = htons(port);
- if (context6) {
- ret = net_context_bind(context6,
- (struct sockaddr *)in6_addr_my,
- sizeof(struct sockaddr_in6));
- if (ret < 0) {
- shell_fprintf(shell, SHELL_WARNING,
- "Cannot bind IPv6 UDP port %d (%d)\n",
- ntohs(in6_addr_my->sin6_port),
- ret);
- return;
- }
- }
- }
-
- if (IS_ENABLED(CONFIG_NET_IPV6)) {
- ret = net_context_recv(context6, udp_received, K_NO_WAIT,
- (void *)shell);
+ ret = bind(fds[SOCK_ID_IPV6].fd,
+ (struct sockaddr *)in6_addr_my,
+ sizeof(struct sockaddr_in6));
if (ret < 0) {
shell_fprintf(shell, SHELL_WARNING,
- "Cannot receive IPv6 UDP packets\n");
- return;
+ "Cannot bind IPv6 UDP port %d (%d)\n",
+ ntohs(in6_addr_my->sin6_port),
+ ret);
+ goto cleanup;
}
- }
- if (IS_ENABLED(CONFIG_NET_IPV4)) {
- ret = net_context_recv(context4, udp_received, K_NO_WAIT,
- (void *)shell);
- if (ret < 0) {
- shell_fprintf(shell, SHELL_WARNING,
- "Cannot receive IPv4 UDP packets\n");
- return;
- }
+ fds[SOCK_ID_IPV6].events = POLLIN;
}
shell_fprintf(shell, SHELL_NORMAL,
"Listening on port %d\n", port);
+
+ while (true) {
+ ret = poll(fds, ARRAY_SIZE(fds), -1);
+ if (ret < 0) {
+ shell_fprintf(shell, SHELL_WARNING,
+ "UDP receiver poll error (%d)\n",
+ errno);
+ goto cleanup;
+ }
+
+ for (int i = 0; i < ARRAY_SIZE(fds); i++) {
+ struct sockaddr addr;
+ socklen_t addrlen = sizeof(addr);
+
+ if ((fds[i].revents & POLLERR) ||
+ (fds[i].revents & POLLNVAL)) {
+ shell_fprintf(
+ shell, SHELL_WARNING,
+ "UDP receiver IPv%d socket error\n",
+ (i == SOCK_ID_IPV4) ? 4 : 6);
+ goto cleanup;
+ }
+
+ if (!(fds[i].revents & POLLIN)) {
+ continue;
+ }
+
+ ret = recvfrom(fds[i].fd, buf, sizeof(buf), 0, &addr,
+ &addrlen);
+ if (ret < 0) {
+ shell_fprintf(
+ shell, SHELL_WARNING,
+ "recv failed on IPv%d socket (%d)\n",
+ (i == SOCK_ID_IPV4) ? 4 : 6, errno);
+ goto cleanup;
+ }
+
+ udp_received(shell, fds[i].fd, &addr, buf, ret);
+ }
+ }
+
+cleanup:
+ for (int i = 0; i < ARRAY_SIZE(fds); i++) {
+ if (fds[i].fd >= 0) {
+ close(fds[i].fd);
+ }
+ }
+}
+
+void zperf_udp_receiver_init(const struct shell *shell, int port)
+{
+ k_thread_create(&udp_receiver_thread_data,
+ udp_receiver_stack_area,
+ K_THREAD_STACK_SIZEOF(udp_receiver_stack_area),
+ udp_receiver_thread,
+ (void *)shell, INT_TO_POINTER(port), NULL,
+ UDP_RECEIVER_THREAD_PRIORITY,
+ IS_ENABLED(CONFIG_USERSPACE) ? K_USER |
+ K_INHERIT_PERMS : 0,
+ K_NO_WAIT);
}