Added inital version of standard UDP streamer and updated README

This commit is contained in:
2025-09-17 03:45:51 +01:00
parent ffc84cdff1
commit 70d115f23e
2 changed files with 618 additions and 1 deletions

612
Networking/udpStreamer.c Normal file
View File

@ -0,0 +1,612 @@
/*============================================================================*/
//
// Title: udpStreamer.c
// Author: RH
// Last modified: 15/09/2025
//
//============================================================================
//
// Description:
// ------------
// Tool that takes a source file path, ip address, port, payload size, and
// target data rate. The tool then reads data from the file and sends it to
// the target IP address and port in chunks specified by payload size,
// attempting to meet the target data rate.
//
// Compilation:
// ------------
// gcc -Wall -Wextra ../Logging/logging_lib.c udpStreamer.c -o udpstreamer
//
/*============================================================================*/
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <stdint.h>
#include <signal.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <time.h>
#include "../Logging/logging_lib.h"
#define MAX_PAYLOAD_SIZE 65507
#define DEFAULT_PAYLOAD_SIZE 8972
#define SI_SIZE_G 1000000000
#define SI_SIZE_M 1000000
#define SI_SIZE_K 1000
// max value for nanosleep fields
#define NSLEEP_MAX_VALUE 999999999
// type to help converting decimal time into whole integer + unit
typedef enum
{
seconds,
milliseconds,
microseconds,
nanoseconds
} TimeUnit_T;
// optional args can appear in any order before the required arguments
// and have short and long formats, so there is a bit of variance to
// account for
// the solution is to use a lookup table to map each valid argument to
// a value and call on each arg in the optional range within a switch
// statement
typedef enum
{
help_flag,
verbose_flag,
data_rate,
payload_size,
invalid_arg
} ArgOpts_T;
typedef struct
{
char *option;
ArgOpts_T value;
} ArgMap_T;
static ArgMap_T arg_lut[] =
{
{"-h", help_flag} , {"--help", help_flag} ,
{"-v", verbose_flag}, {"--verbose", verbose_flag} ,
{"-d", data_rate} , {"--data-rate", data_rate} ,
{"-s", payload_size}, {"--size", payload_size}
};
// the number of elements in the option arguments lookup table
#define NUM_OPT_ARG_KEYS ( (int) ( sizeof(arg_lut) / sizeof(ArgMap_T) ) )
// program, verbose (optional), data rate (optional), size (optional)
// file path, ip address, port
#define NUM_REQ_ARGS 4 // req args inc program name
#define MIN_NUM_ARGS 2 // program -h
#define MAX_NUM_ARGS 7 // program + all args (excluding -h)
// determine the number of supplied optional arguments
#define NUM_OPT_ARGS (argc - NUM_REQ_ARGS)
// ----------------------------------------------------------------------------
// print usage information to the terminal to provide information on how the
// program expects to be run
// ----------------------------------------------------------------------------
//TODO: add ramp option?
//TODO: a 'user trigger' option might also be useful?
void usage (void)
{
printf("\n");
printf("---------------------------------------------------------------------------------------------\n");
printf(" Usage:\n");
printf(" ./udpStreamer [options] <file path> <IP address> <port>\n");
printf("---------------------------------------------------------------------------------------------\n");
printf(" Required Arguments:\n");
printf(" <file path>:\n");
printf(" \tThe path to the file containing the binary data to send over UDP.\n");
printf(" <IP address>:\n");
printf(" \tThe destination IP address in 'numbers and dots' notation.\n");
printf(" <port>:\n");
printf(" \tThe port on the destination to send the UDP datagrams.\n");
printf("\n");
printf(" Optional Arguments:\n");
printf(" -h, --help\n");
printf(" \tDisplay this usage information then exit.\n");
printf(" -v, --verbose\n");
printf(" \tVerbose flag to enable additional debug output messages - at high data rate this can\n");
printf(" \tbe very noisy and so is not recommended for operational use.\n");
printf(" -d, --data-rate\n");
printf(" \tThe data rate (bits per second) to attempt to send the UDP datagrams at. This is used to\n");
printf(" \tcalculate the time to put the transmission thread to sleep based on how many packets of\n");
printf(" \t<size> need to be sent per second to meet the target rate. If no rate is\n");
printf(" \tspecified, or a value of 0 is specified, the data will be sent as fast as possible.\n");
printf(" \tValue supplied must be a positive integer. [Integer Range: 1 to 2^31 - 1 (2147483647)]\n");
printf(" \t[Units allowed: k,M,G (metric SI units 1000/1000000/1000000000 bits per second)]\n");
printf(" -s, --size\n");
printf(" \tThe size (in bytes) of data to read from the source file and send in one UDP datagram.\n");
printf(" \tThis is the size of the datagram payload, so an MTU of 9000 (jumbo frames) means a maximum\n");
printf(" \tsuggested capture size of 8972 bytes to prevent fragmenting.\n");
printf(" \tJumbo frames are recommended for high-thoughput scenarios. [Range: 1 to 65507.]\n");
printf("---------------------------------------------------------------------------------------------\n");
printf(" Example:\n");
printf(" ./udpStreamer ~/udp_data/ramp_250k.bin 192.168.1.50 10800\n");
printf(" ./udpStreamer -d 200M -s 1472 ~/udp_data/rndm_5M.txt 127.0.0.1 60000\n");
printf("---------------------------------------------------------------------------------------------\n");
}
// ----------------------------------------------------------------------------
// function to retrieve value from LUT when processing args within a switch
// statment (valid args are mapped to a value)
// ----------------------------------------------------------------------------
ArgOpts_T lookupArg(char *supplied_arg)
{
for (int i=0; i < NUM_OPT_ARG_KEYS; i++)
{
ArgMap_T *currArgMap = &arg_lut[i];
if (strcmp(currArgMap->option, supplied_arg) == 0)
{
return currArgMap->value;
}
}
return invalid_arg;
}
// ----------------------------------------------------------------------------
// signal handler to allow application to exit cleanly
// ----------------------------------------------------------------------------
static int run = 1;
// the handler just sets 'run' to 0, which allows the main loop to exit
// and the cleanup code after the main loop to run
static void SignalHandler ( int dummy )
{
dummy++ ;
LOGMSG_INFO(">>>>>>>>>>>> Signal received, terminating...")
run = 0;
}
// ----------------------------------------------------------------------------
// main program - parse arguments, open output file, create listener
// socket, then enter a loop waiting for datagrams and writing them to file
// ----------------------------------------------------------------------------
int main(int argc, char **argv)
{
// args
int Verbose = 0;
int throughput = 0;
int payloadSize = DEFAULT_PAYLOAD_SIZE; // default
char *pFileName = NULL;
char *dstIpAddr = NULL;
int dstPort = 0;
uint64_t rateUnitScalar = 1;
// initiliase logger to print messages to stdout
LOGGING_INIT_STDOUT
// signal handlers to ensure program exits cleanly
(void) signal(SIGINT , SignalHandler) ;
(void) signal(SIGTERM, SignalHandler) ;
// -------------------------------------------------------------------------
// parse input arguments
// -------------------------------------------------------------------------
// require at least one arg (the help arg)
if (argc < MIN_NUM_ARGS)
{
LOGMSG_ERR("Error: incorrect number of arguments supplied. Received %d, minimum %d -> maximum %d",
argc, MIN_NUM_ARGS, MAX_NUM_ARGS)
usage();
return -1;
}
int currArg = 1;
// special case for help flag
if (lookupArg( argv[currArg] ) == help_flag)
{
usage();
return 0;
}
// optional arguments
if (argc > NUM_REQ_ARGS)
{
int numOptionArgs = NUM_OPT_ARGS;
// quite a few options, so loop over args
while (currArg < numOptionArgs+1)
{
switch (lookupArg( argv[currArg] ))
{
// just in case...
case help_flag:
usage();
return 0;
// breaks out of switch
case verbose_flag:
Verbose = 1;
currArg++;
LOGMSG_DBG("Running in verbose mode")
break;
case data_rate:
currArg++; // move over the flag
// check for unit
uint32_t unitOffset = strlen(argv[currArg]) - 1;
char rateValue[40] = {'\0'}; // used in value conversion to int
char rateChar = argv[currArg][unitOffset];
// RH: a better approach might be to iterate over each char and check
// each is a valid character
if ( (rateChar != '0') && (rateChar != '1')
&& (rateChar != '2') && (rateChar != '3')
&& (rateChar != '4') && (rateChar != '5')
&& (rateChar != '6') && (rateChar != '7')
&& (rateChar != '8') && (rateChar != '9') )
{
// it must be a letter
if (rateChar == 'k')
{
rateUnitScalar = SI_SIZE_K;
}
else if (rateChar == 'M')
{
rateUnitScalar = SI_SIZE_M;
}
else if (rateChar == 'G')
{
rateUnitScalar = SI_SIZE_G;
}
else
{
LOGMSG_ERR("Error: data rate contains an invalid character: %s -> %c",
argv[currArg], rateChar)
usage();
return -1;
}
// copy up to the unit
strncpy(rateValue, argv[currArg], unitOffset);
}
else // no unit supplied
{
rateUnitScalar = 1;
rateChar = ' ';
// copy the whole thing
strncpy(rateValue, argv[currArg], unitOffset + 1);
}
// get value of data rate
throughput = atoi(rateValue);
currArg++; // move over the value
if ( throughput < 0 )
{
LOGMSG_WARN("Warning: target data rate [%d] is negative - ignoring",
throughput)
throughput = 0;
}
LOGMSG_DBG("Target data rate: %d %cb/s", throughput, rateChar)
break;
case payload_size:
payloadSize = atoi(argv[++currArg]); //move over the flag
currArg ++; // move over the value
if ( payloadSize > MAX_PAYLOAD_SIZE )
{
LOGMSG_WARN("Warning: supplied payload size [%d] is greater than than the"
" maximum possible UDP datagram payload size [%d] - clipping to maximum size",
payloadSize, MAX_PAYLOAD_SIZE)
payloadSize = MAX_PAYLOAD_SIZE;
}
if ( payloadSize == 0 )
{
LOGMSG_ERR("Error: supplied payload size [%d] must be greater than 0",
payloadSize)
usage();
return -1;
}
LOGMSG_DBG("Payload size (not including UDP/IP header): %d", payloadSize)
break;
case invalid_arg:
LOGMSG_ERR("Error: Unrecognised argument \"%s\"", argv[currArg])
usage();
return -1;
// breaks out of switch
default:
LOGMSG_ERR("Error: Panic - invalid state! Exiting...")
return -1;
// breaks out of switch
}
}
}
// required args
if ( ((argc - currArg) != (NUM_REQ_ARGS - 1)) ) // -1 to exclude program name from NUM_REQ_ARGS
{
LOGMSG_ERR("Error: incorrect number of required arguments supplied. Received %d, minimum %d -> maximum %d",
argc, NUM_REQ_ARGS, MAX_NUM_ARGS)
usage();
return -1;
}
pFileName = argv[currArg++];
dstIpAddr = argv[currArg++];
dstPort = atoi(argv[currArg]);
LOGMSG_DBG("Args: [Verbose: %s] [Data Rate: %d] [Payload Size: %d]"
" [Source File: %s] [Destination Address: %s] [Destination Port: %d]",
(Verbose == 1) ? "Yes" : "No", throughput, payloadSize,
pFileName , dstIpAddr, dstPort)
// -------------------------------------------------------------------------
// open the file to read the binary data from
// -------------------------------------------------------------------------
FILE *pSrcFile = NULL;
ssize_t bytes_read = 0;
pSrcFile = fopen(pFileName, "r");
if (pSrcFile == NULL)
{
LOGMSG_ERR("Error: unable to open the requested file %s. Errno: %s [%d]",
pFileName, strerror(errno), errno)
return -1;
}
else
{
LOGMSG_DBG("Opened file for reading: %s", pFileName)
}
// -------------------------------------------------------------------------
// create the socket to send the UDP datagrams to the destination IP and
// port
// -------------------------------------------------------------------------
// the buffer to send the datagram payload from, size set by program argument
char udpSendBuffer[payloadSize];
// socket and interface information
int tx_fd = 0;
ssize_t bytes_sent = 0;
// create socket file descriptor
if ( (tx_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
{
LOGMSG_ERR("Error: unable to create UDP transmit socket. Errno: %s [%d]",
strerror(errno), errno)
fclose(pSrcFile);
return -1;
}
// set the socket REUSEADDR option
int option = 1 ;
if ( setsockopt(tx_fd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0 )
{
LOGMSG_ERR("Error: unable to set socket options. Errno: %s [%d]",
strerror(errno), errno)
fclose(pSrcFile);
close(tx_fd);
return -1;
}
struct sockaddr_in tx_addr ;
// clear local_addr
memset(&tx_addr, '\0', sizeof(tx_addr));
// populate information about the remote and the port to listen on
tx_addr.sin_family = AF_INET;
tx_addr.sin_port = htons(dstPort);
tx_addr.sin_addr.s_addr = inet_addr(dstIpAddr);
LOGMSG_DBG("Socket %d opened for transmit to IP: %s Port: %d",
tx_fd, dstIpAddr, dstPort)
// -------------------------------------------------------------------------
// set up timing variables for target data rate
// -------------------------------------------------------------------------
// for measuring approximate throughput
struct timespec ts_now, ts_start;
uint64_t delta_us = 0;
uint64_t totalBytesSent = 0;
uint64_t bitsSentPerSecond = 0;
uint64_t appxThroughput = 0;
char appxUnit = ' ';
if ( clock_gettime(CLOCK_REALTIME, &ts_start) < 0)
{
LOGMSG_ERR("Error: Unable to get system time [errno: %d (%s)]",
errno, strerror(errno))
}
// work out time to sleep in micro and nano seconds - we can be lazy as
// we are not intending to be super accurate
double totalBitsPerSec = 0.;
double timeToSleepSeconds = 0.;
uint64_t timeToSleep_us = 0;
uint64_t timeToSleep_ns = 0;
// only calculate the sleep timer if throughput is non-zero
if ( throughput > 0 )
{
totalBitsPerSec = throughput * rateUnitScalar;
// throughput in bits, payload in bytes
timeToSleepSeconds = ( (double) payloadSize * 8) / totalBitsPerSec;
timeToSleep_ns = ( timeToSleepSeconds * SI_SIZE_G );
timeToSleep_ns = timeToSleep_ns % 1000;
timeToSleep_us = ( timeToSleepSeconds * SI_SIZE_M );
}
// set up sleep timer
// there isn't really much point in waiting for ns in an OS that is not real-time
// and 1 us sleep is still really fast, but usleep is deprecated so we will use
// nanosleep even though we will not get nanosleep precision
struct timespec ts;
ts.tv_sec = ( timeToSleepSeconds > NSLEEP_MAX_VALUE ) ? NSLEEP_MAX_VALUE : timeToSleepSeconds;
ts.tv_nsec = ( timeToSleep_ns > NSLEEP_MAX_VALUE ) ? NSLEEP_MAX_VALUE : timeToSleep_ns;
// for nicer logging
const uint64_t adjustedBitsPerSec = (totalBitsPerSec < SI_SIZE_K) ? totalBitsPerSec :
(totalBitsPerSec < SI_SIZE_M) ? totalBitsPerSec / SI_SIZE_K :
(totalBitsPerSec < SI_SIZE_G) ? totalBitsPerSec / SI_SIZE_M :
totalBitsPerSec / SI_SIZE_G ;
const char adjustedUnit = (totalBitsPerSec < SI_SIZE_K) ? ' ':
(totalBitsPerSec < SI_SIZE_M) ? 'k' :
(totalBitsPerSec < SI_SIZE_G) ? 'M' :
'G' ;
LOGMSG_DBG("Time interval for target data rate: %ld us %ld ns - payload size: %d bits, target rate: %ld%cb/s",
timeToSleep_us, timeToSleep_ns, payloadSize, adjustedBitsPerSec, adjustedUnit)
// -------------------------------------------------------------------------
// main loop
// -------------------------------------------------------------------------
LOGMSG_INFO("Running udpStreamer: [Verbose: %s] [Data Rate: %ld%cb/s] [Payload Size: %d]"
" [Source File: %s] [Destination Address: %s] [Destination Port: %d]",
(Verbose == 1) ? "Yes" : "No", adjustedBitsPerSec, adjustedUnit, payloadSize,
pFileName , dstIpAddr, dstPort)
while(run == 1)
{
// -------------------------------------------------------------------------
// read the next payload size chunk of data in btyes from the source file
// and store in the transmit buffer
// -------------------------------------------------------------------------
bytes_read = fread(udpSendBuffer, sizeof(char), (size_t) payloadSize, pSrcFile);
if (bytes_read == (ssize_t) payloadSize)
{
LOGMSG_DBG("Successfully read %d bytes from input source %s",
bytes_read, pFileName)
}
else
{
if ( feof(pSrcFile) ) // no more of input to read
{
LOGMSG_INFO("Reached end of source file %s", pFileName)
run = 0; // exit
if ( bytes_read == 0 )
{
break;
}
}
else if ( ferror(pSrcFile) ) // error conditions
{
LOGMSG_ERR("Error: Problem reading from %s. Errno: %s [%d]",
pFileName, strerror(errno), errno)
}
}
// -------------------------------------------------------------------------
// wait until our target time period before transmitting to destination
// -------------------------------------------------------------------------
if ( throughput > 0 )
{
if ( nanosleep(&ts, NULL) < 0 )
{
// EINVAL shouldn't happen due to range checks at initilisation of ts
// so only really covering off EFAULT
if (errno == EINTR) // system interrupt (like ctrl + c)
{
LOGMSG_DBG("Leaving nanosleep due to system interrupt. Errno: %s [%d]",
strerror(errno), errno)
}
else
{
LOGMSG_ERR("Error: Problem with nanosleep. Errno: %s [%d]",
strerror(errno), errno)
}
}
}
// -------------------------------------------------------------------------
// transmit the bytes read from the source file
// -------------------------------------------------------------------------
bytes_sent = sendto(tx_fd, udpSendBuffer, bytes_read, 0 /*flags*/, (struct sockaddr *) &tx_addr, sizeof(tx_addr));
// time of transmission
if ( clock_gettime(CLOCK_REALTIME, &ts_now) < 0)
{
LOGMSG_ERR("Error: Unable to get system time [errno: %d (%s)]",
errno, strerror(errno))
}
// verify the transmission worked correctly on the transmit side
if ( bytes_sent == bytes_read )
{
LOGMSG_DBG("Successfully sent %d bytes (buffer size %d) (read in from file %d)",
bytes_sent, payloadSize, bytes_read)
delta_us = ( (ts_now.tv_sec - ts_start.tv_sec) * SI_SIZE_M ) +
( (ts_now.tv_nsec - ts_start.tv_nsec) / SI_SIZE_K );
totalBytesSent += bytes_sent;
bitsSentPerSecond = (uint64_t) ( ( (double) totalBytesSent / (double) delta_us ) * SI_SIZE_M * 8);
appxThroughput = (bitsSentPerSecond < SI_SIZE_K) ? bitsSentPerSecond :
(bitsSentPerSecond < SI_SIZE_M) ? bitsSentPerSecond / SI_SIZE_K :
(bitsSentPerSecond < SI_SIZE_G) ? bitsSentPerSecond / SI_SIZE_M :
bitsSentPerSecond / SI_SIZE_G ;
appxUnit = (bitsSentPerSecond < SI_SIZE_K) ? ' ':
(bitsSentPerSecond < SI_SIZE_M) ? 'k' :
(bitsSentPerSecond < SI_SIZE_G) ? 'M' :
'G' ;
LOGMSG_DBG("Target throughput: %d%cb/s | Approximate throughput: %d%cb/s | Total Bytes: %ld | Total Time: %ld us",
adjustedBitsPerSec, adjustedUnit, appxThroughput, appxUnit, totalBytesSent, delta_us)
}
else
{
if (bytes_sent > 0)
{
LOGMSG_ERR("Error: Failed to send all bytes: Errno: %s [%d] Bytes read: %d, Bytes sent: %d",
strerror(errno), errno, bytes_read, bytes_sent)
}
else
{
LOGMSG_ERR("Error: Failed to send any bytes: Errno: %s [%d] Bytes read: %d, Bytes sent: %d",
strerror(errno), errno, bytes_read, bytes_sent)
}
}
}
// -------------------------------------------------------------------------
// clean up
// -------------------------------------------------------------------------
LOGMSG_INFO("Finished transmission: [Total Bytes Sent: %ld] [Total Time: %lf s] [Approximate throughput: %d%cb/s]",
totalBytesSent, ( (double) delta_us / SI_SIZE_M), appxThroughput, appxUnit)
fclose(pSrcFile);
close(tx_fd);
//TODO: this should be something added to the logging lib
LOGMSG_INFO(
"\n######################################################################\n"
"\t\t\tProgram Exit\n"
"######################################################################")
return 0;
}

View File

@ -10,4 +10,9 @@ This is used by most applications I build, certainly all within this repository.
## Networking ## Networking
Contains a standard UDP listener implementation that writes to a file. We have tcpdump at home...
Plenty of useful things in here, and generally reflects code I write elsewhere, so is a good sanity test for when something new I have written isn't working.
Contains:
* a standard UDP listener implementation that writes to a file.
* a standard UDP streamer that can read from a file and send at a target rate or as fast as possible. Also allows for varied transmission sizes for path fragmenting/MTU checks.