diff --git a/Networking/udpStreamer.c b/Networking/udpStreamer.c new file mode 100644 index 0000000..053c774 --- /dev/null +++ b/Networking/udpStreamer.c @@ -0,0 +1,612 @@ +/*============================================================================*/ +// +// Title: udpStreamer.c +// Author: RH +// Last modified: 15/09/2025 +// +//============================================================================ +// +// Description: +// ------------ +// Tool that takes a source file path, ip address, port, payload size, and +// target data rate. The tool then reads data from the file and sends it to +// the target IP address and port in chunks specified by payload size, +// attempting to meet the target data rate. +// +// Compilation: +// ------------ +// gcc -Wall -Wextra ../Logging/logging_lib.c udpStreamer.c -o udpstreamer +// +/*============================================================================*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../Logging/logging_lib.h" + + +#define MAX_PAYLOAD_SIZE 65507 +#define DEFAULT_PAYLOAD_SIZE 8972 + +#define SI_SIZE_G 1000000000 +#define SI_SIZE_M 1000000 +#define SI_SIZE_K 1000 + +// max value for nanosleep fields +#define NSLEEP_MAX_VALUE 999999999 + +// type to help converting decimal time into whole integer + unit +typedef enum +{ + seconds, + milliseconds, + microseconds, + nanoseconds +} TimeUnit_T; + +// optional args can appear in any order before the required arguments +// and have short and long formats, so there is a bit of variance to +// account for +// the solution is to use a lookup table to map each valid argument to +// a value and call on each arg in the optional range within a switch +// statement + +typedef enum +{ + help_flag, + verbose_flag, + data_rate, + payload_size, + invalid_arg +} ArgOpts_T; + +typedef struct +{ + char *option; + ArgOpts_T value; +} ArgMap_T; + +static ArgMap_T arg_lut[] = +{ + {"-h", help_flag} , {"--help", help_flag} , + {"-v", verbose_flag}, {"--verbose", verbose_flag} , + {"-d", data_rate} , {"--data-rate", data_rate} , + {"-s", payload_size}, {"--size", payload_size} +}; + +// the number of elements in the option arguments lookup table +#define NUM_OPT_ARG_KEYS ( (int) ( sizeof(arg_lut) / sizeof(ArgMap_T) ) ) + +// program, verbose (optional), data rate (optional), size (optional) +// file path, ip address, port +#define NUM_REQ_ARGS 4 // req args inc program name +#define MIN_NUM_ARGS 2 // program -h +#define MAX_NUM_ARGS 7 // program + all args (excluding -h) + +// determine the number of supplied optional arguments +#define NUM_OPT_ARGS (argc - NUM_REQ_ARGS) + +// ---------------------------------------------------------------------------- +// print usage information to the terminal to provide information on how the +// program expects to be run +// ---------------------------------------------------------------------------- +//TODO: add ramp option? +//TODO: a 'user trigger' option might also be useful? +void usage (void) +{ + printf("\n"); + printf("---------------------------------------------------------------------------------------------\n"); + printf(" Usage:\n"); + printf(" ./udpStreamer [options] \n"); + printf("---------------------------------------------------------------------------------------------\n"); + printf(" Required Arguments:\n"); + printf(" :\n"); + printf(" \tThe path to the file containing the binary data to send over UDP.\n"); + printf(" :\n"); + printf(" \tThe destination IP address in 'numbers and dots' notation.\n"); + printf(" :\n"); + printf(" \tThe port on the destination to send the UDP datagrams.\n"); + printf("\n"); + printf(" Optional Arguments:\n"); + printf(" -h, --help\n"); + printf(" \tDisplay this usage information then exit.\n"); + printf(" -v, --verbose\n"); + printf(" \tVerbose flag to enable additional debug output messages - at high data rate this can\n"); + printf(" \tbe very noisy and so is not recommended for operational use.\n"); + printf(" -d, --data-rate\n"); + printf(" \tThe data rate (bits per second) to attempt to send the UDP datagrams at. This is used to\n"); + printf(" \tcalculate the time to put the transmission thread to sleep based on how many packets of\n"); + printf(" \t need to be sent per second to meet the target rate. If no rate is\n"); + printf(" \tspecified, or a value of 0 is specified, the data will be sent as fast as possible.\n"); + printf(" \tValue supplied must be a positive integer. [Integer Range: 1 to 2^31 - 1 (2147483647)]\n"); + printf(" \t[Units allowed: k,M,G (metric SI units 1000/1000000/1000000000 bits per second)]\n"); + printf(" -s, --size\n"); + printf(" \tThe size (in bytes) of data to read from the source file and send in one UDP datagram.\n"); + printf(" \tThis is the size of the datagram payload, so an MTU of 9000 (jumbo frames) means a maximum\n"); + printf(" \tsuggested capture size of 8972 bytes to prevent fragmenting.\n"); + printf(" \tJumbo frames are recommended for high-thoughput scenarios. [Range: 1 to 65507.]\n"); + printf("---------------------------------------------------------------------------------------------\n"); + printf(" Example:\n"); + printf(" ./udpStreamer ~/udp_data/ramp_250k.bin 192.168.1.50 10800\n"); + printf(" ./udpStreamer -d 200M -s 1472 ~/udp_data/rndm_5M.txt 127.0.0.1 60000\n"); + printf("---------------------------------------------------------------------------------------------\n"); +} + +// ---------------------------------------------------------------------------- +// function to retrieve value from LUT when processing args within a switch +// statment (valid args are mapped to a value) +// ---------------------------------------------------------------------------- + +ArgOpts_T lookupArg(char *supplied_arg) +{ + for (int i=0; i < NUM_OPT_ARG_KEYS; i++) + { + ArgMap_T *currArgMap = &arg_lut[i]; + if (strcmp(currArgMap->option, supplied_arg) == 0) + { + return currArgMap->value; + } + } + return invalid_arg; +} + +// ---------------------------------------------------------------------------- +// signal handler to allow application to exit cleanly +// ---------------------------------------------------------------------------- + +static int run = 1; + +// the handler just sets 'run' to 0, which allows the main loop to exit +// and the cleanup code after the main loop to run +static void SignalHandler ( int dummy ) +{ + dummy++ ; + LOGMSG_INFO(">>>>>>>>>>>> Signal received, terminating...") + run = 0; +} + +// ---------------------------------------------------------------------------- +// main program - parse arguments, open output file, create listener +// socket, then enter a loop waiting for datagrams and writing them to file +// ---------------------------------------------------------------------------- + +int main(int argc, char **argv) +{ + // args + int Verbose = 0; + int throughput = 0; + int payloadSize = DEFAULT_PAYLOAD_SIZE; // default + char *pFileName = NULL; + char *dstIpAddr = NULL; + int dstPort = 0; + uint64_t rateUnitScalar = 1; + + // initiliase logger to print messages to stdout + LOGGING_INIT_STDOUT + + // signal handlers to ensure program exits cleanly + (void) signal(SIGINT , SignalHandler) ; + (void) signal(SIGTERM, SignalHandler) ; + + // ------------------------------------------------------------------------- + // parse input arguments + // ------------------------------------------------------------------------- + + // require at least one arg (the help arg) + if (argc < MIN_NUM_ARGS) + { + LOGMSG_ERR("Error: incorrect number of arguments supplied. Received %d, minimum %d -> maximum %d", + argc, MIN_NUM_ARGS, MAX_NUM_ARGS) + usage(); + return -1; + } + + int currArg = 1; + // special case for help flag + if (lookupArg( argv[currArg] ) == help_flag) + { + usage(); + return 0; + } + + // optional arguments + if (argc > NUM_REQ_ARGS) + { + int numOptionArgs = NUM_OPT_ARGS; + + // quite a few options, so loop over args + while (currArg < numOptionArgs+1) + { + switch (lookupArg( argv[currArg] )) + { + // just in case... + case help_flag: + + usage(); + return 0; + // breaks out of switch + + case verbose_flag: + + Verbose = 1; + currArg++; + LOGMSG_DBG("Running in verbose mode") + break; + + case data_rate: + + currArg++; // move over the flag + + // check for unit + uint32_t unitOffset = strlen(argv[currArg]) - 1; + char rateValue[40] = {'\0'}; // used in value conversion to int + char rateChar = argv[currArg][unitOffset]; + // RH: a better approach might be to iterate over each char and check + // each is a valid character + if ( (rateChar != '0') && (rateChar != '1') + && (rateChar != '2') && (rateChar != '3') + && (rateChar != '4') && (rateChar != '5') + && (rateChar != '6') && (rateChar != '7') + && (rateChar != '8') && (rateChar != '9') ) + { + // it must be a letter + if (rateChar == 'k') + { + rateUnitScalar = SI_SIZE_K; + } + else if (rateChar == 'M') + { + rateUnitScalar = SI_SIZE_M; + } + else if (rateChar == 'G') + { + rateUnitScalar = SI_SIZE_G; + } + else + { + LOGMSG_ERR("Error: data rate contains an invalid character: %s -> %c", + argv[currArg], rateChar) + usage(); + return -1; + } + // copy up to the unit + strncpy(rateValue, argv[currArg], unitOffset); + } + else // no unit supplied + { + rateUnitScalar = 1; + rateChar = ' '; + // copy the whole thing + strncpy(rateValue, argv[currArg], unitOffset + 1); + } + + // get value of data rate + throughput = atoi(rateValue); + currArg++; // move over the value + + if ( throughput < 0 ) + { + LOGMSG_WARN("Warning: target data rate [%d] is negative - ignoring", + throughput) + throughput = 0; + } + LOGMSG_DBG("Target data rate: %d %cb/s", throughput, rateChar) + break; + + case payload_size: + + payloadSize = atoi(argv[++currArg]); //move over the flag + currArg ++; // move over the value + if ( payloadSize > MAX_PAYLOAD_SIZE ) + { + LOGMSG_WARN("Warning: supplied payload size [%d] is greater than than the" + " maximum possible UDP datagram payload size [%d] - clipping to maximum size", + payloadSize, MAX_PAYLOAD_SIZE) + payloadSize = MAX_PAYLOAD_SIZE; + } + if ( payloadSize == 0 ) + { + LOGMSG_ERR("Error: supplied payload size [%d] must be greater than 0", + payloadSize) + usage(); + return -1; + } + LOGMSG_DBG("Payload size (not including UDP/IP header): %d", payloadSize) + break; + + case invalid_arg: + + LOGMSG_ERR("Error: Unrecognised argument \"%s\"", argv[currArg]) + usage(); + return -1; + // breaks out of switch + + default: + + LOGMSG_ERR("Error: Panic - invalid state! Exiting...") + return -1; + // breaks out of switch + } + } + } + + // required args + if ( ((argc - currArg) != (NUM_REQ_ARGS - 1)) ) // -1 to exclude program name from NUM_REQ_ARGS + { + LOGMSG_ERR("Error: incorrect number of required arguments supplied. Received %d, minimum %d -> maximum %d", + argc, NUM_REQ_ARGS, MAX_NUM_ARGS) + usage(); + return -1; + } + + pFileName = argv[currArg++]; + dstIpAddr = argv[currArg++]; + dstPort = atoi(argv[currArg]); + + LOGMSG_DBG("Args: [Verbose: %s] [Data Rate: %d] [Payload Size: %d]" + " [Source File: %s] [Destination Address: %s] [Destination Port: %d]", + (Verbose == 1) ? "Yes" : "No", throughput, payloadSize, + pFileName , dstIpAddr, dstPort) + + // ------------------------------------------------------------------------- + // open the file to read the binary data from + // ------------------------------------------------------------------------- + FILE *pSrcFile = NULL; + ssize_t bytes_read = 0; + pSrcFile = fopen(pFileName, "r"); + if (pSrcFile == NULL) + { + LOGMSG_ERR("Error: unable to open the requested file %s. Errno: %s [%d]", + pFileName, strerror(errno), errno) + return -1; + } + else + { + LOGMSG_DBG("Opened file for reading: %s", pFileName) + } + + // ------------------------------------------------------------------------- + // create the socket to send the UDP datagrams to the destination IP and + // port + // ------------------------------------------------------------------------- + + // the buffer to send the datagram payload from, size set by program argument + char udpSendBuffer[payloadSize]; + + // socket and interface information + int tx_fd = 0; + ssize_t bytes_sent = 0; + + // create socket file descriptor + if ( (tx_fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 ) + { + LOGMSG_ERR("Error: unable to create UDP transmit socket. Errno: %s [%d]", + strerror(errno), errno) + fclose(pSrcFile); + return -1; + } + + // set the socket REUSEADDR option + int option = 1 ; + if ( setsockopt(tx_fd, SOL_SOCKET, SO_REUSEADDR, &option, sizeof(option)) < 0 ) + { + LOGMSG_ERR("Error: unable to set socket options. Errno: %s [%d]", + strerror(errno), errno) + fclose(pSrcFile); + close(tx_fd); + return -1; + } + + struct sockaddr_in tx_addr ; + + // clear local_addr + memset(&tx_addr, '\0', sizeof(tx_addr)); + + // populate information about the remote and the port to listen on + tx_addr.sin_family = AF_INET; + tx_addr.sin_port = htons(dstPort); + tx_addr.sin_addr.s_addr = inet_addr(dstIpAddr); + + LOGMSG_DBG("Socket %d opened for transmit to IP: %s Port: %d", + tx_fd, dstIpAddr, dstPort) + + // ------------------------------------------------------------------------- + // set up timing variables for target data rate + // ------------------------------------------------------------------------- + + // for measuring approximate throughput + struct timespec ts_now, ts_start; + uint64_t delta_us = 0; + uint64_t totalBytesSent = 0; + uint64_t bitsSentPerSecond = 0; + uint64_t appxThroughput = 0; + char appxUnit = ' '; + + if ( clock_gettime(CLOCK_REALTIME, &ts_start) < 0) + { + LOGMSG_ERR("Error: Unable to get system time [errno: %d (%s)]", + errno, strerror(errno)) + } + + // work out time to sleep in micro and nano seconds - we can be lazy as + // we are not intending to be super accurate + double totalBitsPerSec = 0.; + double timeToSleepSeconds = 0.; + uint64_t timeToSleep_us = 0; + uint64_t timeToSleep_ns = 0; + + // only calculate the sleep timer if throughput is non-zero + if ( throughput > 0 ) + { + totalBitsPerSec = throughput * rateUnitScalar; + // throughput in bits, payload in bytes + timeToSleepSeconds = ( (double) payloadSize * 8) / totalBitsPerSec; + + timeToSleep_ns = ( timeToSleepSeconds * SI_SIZE_G ); + timeToSleep_ns = timeToSleep_ns % 1000; + + timeToSleep_us = ( timeToSleepSeconds * SI_SIZE_M ); + } + + + // set up sleep timer + // there isn't really much point in waiting for ns in an OS that is not real-time + // and 1 us sleep is still really fast, but usleep is deprecated so we will use + // nanosleep even though we will not get nanosleep precision + struct timespec ts; + ts.tv_sec = ( timeToSleepSeconds > NSLEEP_MAX_VALUE ) ? NSLEEP_MAX_VALUE : timeToSleepSeconds; + ts.tv_nsec = ( timeToSleep_ns > NSLEEP_MAX_VALUE ) ? NSLEEP_MAX_VALUE : timeToSleep_ns; + + // for nicer logging + const uint64_t adjustedBitsPerSec = (totalBitsPerSec < SI_SIZE_K) ? totalBitsPerSec : + (totalBitsPerSec < SI_SIZE_M) ? totalBitsPerSec / SI_SIZE_K : + (totalBitsPerSec < SI_SIZE_G) ? totalBitsPerSec / SI_SIZE_M : + totalBitsPerSec / SI_SIZE_G ; + const char adjustedUnit = (totalBitsPerSec < SI_SIZE_K) ? ' ': + (totalBitsPerSec < SI_SIZE_M) ? 'k' : + (totalBitsPerSec < SI_SIZE_G) ? 'M' : + 'G' ; + + LOGMSG_DBG("Time interval for target data rate: %ld us %ld ns - payload size: %d bits, target rate: %ld%cb/s", + timeToSleep_us, timeToSleep_ns, payloadSize, adjustedBitsPerSec, adjustedUnit) + + // ------------------------------------------------------------------------- + // main loop + // ------------------------------------------------------------------------- + + LOGMSG_INFO("Running udpStreamer: [Verbose: %s] [Data Rate: %ld%cb/s] [Payload Size: %d]" + " [Source File: %s] [Destination Address: %s] [Destination Port: %d]", + (Verbose == 1) ? "Yes" : "No", adjustedBitsPerSec, adjustedUnit, payloadSize, + pFileName , dstIpAddr, dstPort) + + while(run == 1) + { + // ------------------------------------------------------------------------- + // read the next payload size chunk of data in btyes from the source file + // and store in the transmit buffer + // ------------------------------------------------------------------------- + bytes_read = fread(udpSendBuffer, sizeof(char), (size_t) payloadSize, pSrcFile); + if (bytes_read == (ssize_t) payloadSize) + { + LOGMSG_DBG("Successfully read %d bytes from input source %s", + bytes_read, pFileName) + } + else + { + if ( feof(pSrcFile) ) // no more of input to read + { + LOGMSG_INFO("Reached end of source file %s", pFileName) + run = 0; // exit + if ( bytes_read == 0 ) + { + break; + } + } + else if ( ferror(pSrcFile) ) // error conditions + { + LOGMSG_ERR("Error: Problem reading from %s. Errno: %s [%d]", + pFileName, strerror(errno), errno) + } + } + + // ------------------------------------------------------------------------- + // wait until our target time period before transmitting to destination + // ------------------------------------------------------------------------- + if ( throughput > 0 ) + { + if ( nanosleep(&ts, NULL) < 0 ) + { + // EINVAL shouldn't happen due to range checks at initilisation of ts + // so only really covering off EFAULT + if (errno == EINTR) // system interrupt (like ctrl + c) + { + LOGMSG_DBG("Leaving nanosleep due to system interrupt. Errno: %s [%d]", + strerror(errno), errno) + } + else + { + LOGMSG_ERR("Error: Problem with nanosleep. Errno: %s [%d]", + strerror(errno), errno) + } + } + } + + // ------------------------------------------------------------------------- + // transmit the bytes read from the source file + // ------------------------------------------------------------------------- + bytes_sent = sendto(tx_fd, udpSendBuffer, bytes_read, 0 /*flags*/, (struct sockaddr *) &tx_addr, sizeof(tx_addr)); + + // time of transmission + if ( clock_gettime(CLOCK_REALTIME, &ts_now) < 0) + { + LOGMSG_ERR("Error: Unable to get system time [errno: %d (%s)]", + errno, strerror(errno)) + } + + // verify the transmission worked correctly on the transmit side + if ( bytes_sent == bytes_read ) + { + LOGMSG_DBG("Successfully sent %d bytes (buffer size %d) (read in from file %d)", + bytes_sent, payloadSize, bytes_read) + + delta_us = ( (ts_now.tv_sec - ts_start.tv_sec) * SI_SIZE_M ) + + ( (ts_now.tv_nsec - ts_start.tv_nsec) / SI_SIZE_K ); + + totalBytesSent += bytes_sent; + bitsSentPerSecond = (uint64_t) ( ( (double) totalBytesSent / (double) delta_us ) * SI_SIZE_M * 8); + + appxThroughput = (bitsSentPerSecond < SI_SIZE_K) ? bitsSentPerSecond : + (bitsSentPerSecond < SI_SIZE_M) ? bitsSentPerSecond / SI_SIZE_K : + (bitsSentPerSecond < SI_SIZE_G) ? bitsSentPerSecond / SI_SIZE_M : + bitsSentPerSecond / SI_SIZE_G ; + appxUnit = (bitsSentPerSecond < SI_SIZE_K) ? ' ': + (bitsSentPerSecond < SI_SIZE_M) ? 'k' : + (bitsSentPerSecond < SI_SIZE_G) ? 'M' : + 'G' ; + + LOGMSG_DBG("Target throughput: %d%cb/s | Approximate throughput: %d%cb/s | Total Bytes: %ld | Total Time: %ld us", + adjustedBitsPerSec, adjustedUnit, appxThroughput, appxUnit, totalBytesSent, delta_us) + } + else + { + if (bytes_sent > 0) + { + LOGMSG_ERR("Error: Failed to send all bytes: Errno: %s [%d] Bytes read: %d, Bytes sent: %d", + strerror(errno), errno, bytes_read, bytes_sent) + } + else + { + LOGMSG_ERR("Error: Failed to send any bytes: Errno: %s [%d] Bytes read: %d, Bytes sent: %d", + strerror(errno), errno, bytes_read, bytes_sent) + } + } + } + + // ------------------------------------------------------------------------- + // clean up + // ------------------------------------------------------------------------- + LOGMSG_INFO("Finished transmission: [Total Bytes Sent: %ld] [Total Time: %lf s] [Approximate throughput: %d%cb/s]", + totalBytesSent, ( (double) delta_us / SI_SIZE_M), appxThroughput, appxUnit) + + fclose(pSrcFile); + close(tx_fd); + + //TODO: this should be something added to the logging lib + LOGMSG_INFO( + "\n######################################################################\n" + "\t\t\tProgram Exit\n" + "######################################################################") + + return 0; +} \ No newline at end of file diff --git a/README.md b/README.md index f99fe32..2e82606 100644 --- a/README.md +++ b/README.md @@ -10,4 +10,9 @@ This is used by most applications I build, certainly all within this repository. ## Networking -Contains a standard UDP listener implementation that writes to a file. \ No newline at end of file +We have tcpdump at home... +Plenty of useful things in here, and generally reflects code I write elsewhere, so is a good sanity test for when something new I have written isn't working. + +Contains: +* a standard UDP listener implementation that writes to a file. +* a standard UDP streamer that can read from a file and send at a target rate or as fast as possible. Also allows for varied transmission sizes for path fragmenting/MTU checks. \ No newline at end of file