From 0ae4a8769ac72629d200ebe03ca883b15d53ccdb Mon Sep 17 00:00:00 2001 From: Dennis Zollo Date: Fri, 22 Aug 2025 12:36:10 -0400 Subject: [PATCH 1/2] filter out relayback to ntrip_client to be ONLY GGA --- src/streamsvr.c | 177 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 171 insertions(+), 6 deletions(-) diff --git a/src/streamsvr.c b/src/streamsvr.c index 850abe7b2..16a5d1d52 100644 --- a/src/streamsvr.c +++ b/src/streamsvr.c @@ -33,6 +33,155 @@ * use integer types in stdint.h *-----------------------------------------------------------------------------*/ #include "rtklib.h" +#include + +/* GGA filter constants and types */ +#define MAX_NMEA_LEN 1024 + +/* NMEA sentence parser state machine */ +typedef enum { + GGA_STATE_SEARCH_START, /* Looking for '$' */ + GGA_STATE_COLLECT_SENTENCE, /* Collecting sentence until \r\n */ + GGA_STATE_VALIDATE /* Validating complete sentence */ +} gga_parser_state_t; + +/* GGA filter state for input stream */ +typedef struct { + gga_parser_state_t state; + char sentence[MAX_NMEA_LEN]; + int sentence_len; +} gga_filter_t; + +/* Calculate NMEA checksum */ +static unsigned char calc_nmea_checksum(const char *sentence, int len) +{ + unsigned char checksum=0; + int i; + + /* Start after '$', stop before '*' */ + for (i=1; ilen) return 0; + + /* Validate checksum format (two hex digits) */ + if (!isxdigit((unsigned char)sentence[star_pos+1])|| + !isxdigit((unsigned char)sentence[star_pos+2])) { + return 0; + } + + /* Calculate and compare checksum */ + calc_cs=calc_nmea_checksum(sentence,star_pos); + if (sscanf(&sentence[star_pos+1],"%2x",&recv_cs_val)!=1) { + return 0; + } + + return (calc_cs==(unsigned char)recv_cs_val); +} + +/* Check if sentence contains GGA */ +static int is_gga_sentence(const char *sentence, int len) +{ + /* Look for "GGA" after talker ID (positions 3-5 typically) */ + if (len>=6) { + return (strncmp(&sentence[3],"GGA",3)==0|| + strncmp(&sentence[2],"GGA",3)==0); /* Handle 2-char talker IDs */ + } + return 0; +} + +/* Initialize GGA filter */ +static void init_gga_filter(gga_filter_t *filter) +{ + filter->state=GGA_STATE_SEARCH_START; + filter->sentence_len=0; +} + +/* Process data through GGA filter, return filtered data length */ +static int process_gga_filter(gga_filter_t *filter, const uint8_t *input, int input_len, + uint8_t *output, int output_max) +{ + int output_len=0; + int i; + + for (i=0;istate) { + case GGA_STATE_SEARCH_START: + if (byte=='$') { + filter->sentence[0]='$'; + filter->sentence_len=1; + filter->state=GGA_STATE_COLLECT_SENTENCE; + } + break; + + case GGA_STATE_COLLECT_SENTENCE: + if (filter->sentence_lensentence[filter->sentence_len++]=byte; + } + + /* Check for end of sentence */ + if (byte=='\n' && filter->sentence_len>=2&& + filter->sentence[filter->sentence_len-2]=='\r') { + filter->sentence[filter->sentence_len]='\0'; + filter->state=GGA_STATE_VALIDATE; + } + /* Handle sentences that are too long */ + else if (filter->sentence_len>=MAX_NMEA_LEN-1) { + filter->state=GGA_STATE_SEARCH_START; + filter->sentence_len=0; + } + break; + + case GGA_STATE_VALIDATE: + /* We have a complete sentence, validate and check for GGA */ + if (validate_nmea_sentence(filter->sentence,filter->sentence_len)&& + is_gga_sentence(filter->sentence,filter->sentence_len)) { + /* Copy GGA sentence to output if there's space */ + if (output_len+filter->sentence_len<=output_max) { + memcpy(output+output_len,filter->sentence,filter->sentence_len); + output_len+=filter->sentence_len; + } + } + + /* Reset for next sentence - but check if current byte starts a new one */ + filter->state=GGA_STATE_SEARCH_START; + filter->sentence_len=0; + + /* Don't lose the current byte - process it again */ + i--; /* Will be incremented by for loop */ + break; + } + } + + return output_len; +} + +/* Global GGA filter state for input stream */ +static gga_filter_t gga_filter; /* test observation data message ---------------------------------------------*/ static int is_obsmsg(int msg) @@ -500,14 +649,28 @@ static void *strsvrthread(void *arg) /* read data from input stream */ while ((n=strread(svr->stream,svr->buff,svr->buffsize))>0&&svr->state) { + uint8_t *data_to_write=svr->buff; + int data_len=n; + uint8_t filtered_buff[4096]; + + /* Apply GGA filtering if input is NTRIP and relayback is enabled */ + if (svr->relayback>0 && svr->stream[0].type==STR_NTRIPCLI) { + + /* Filter through GGA filter for input stream */ + data_len=process_gga_filter(&gga_filter,svr->buff,n, + filtered_buff,sizeof(filtered_buff)); + data_to_write=filtered_buff; + } /* write data to output streams */ - for (i=1;instr;i++) { - if (svr->conv[i-1]) { - strconv(svr->stream+i,svr->conv[i-1],svr->buff,n); - } - else { - strwrite(svr->stream+i,svr->buff,n); + if (data_len>0) { + for (i=1;instr;i++) { + if (svr->conv[i-1]) { + strconv(svr->stream+i,svr->conv[i-1],data_to_write,data_len); + } + else { + strwrite(svr->stream+i,data_to_write,data_len); + } } } /* write data to log stream */ @@ -580,6 +743,8 @@ extern void strsvrinit(strsvr_t *svr, int nout) for (i=0;istrlog+i); svr->nstr=i; for (i=0;i<16;i++) svr->conv[i]=NULL; + /* Initialize GGA filter for input stream */ + init_gga_filter(&gga_filter); svr->thread=0; initlock(&svr->lock); } From c3db2e52e8a04191508b23d7b7346aaf06532aca Mon Sep 17 00:00:00 2001 From: Dennis Zollo Date: Mon, 25 Aug 2025 14:15:31 -0400 Subject: [PATCH 2/2] Update; I had the filtering semantics backwards --- src/streamsvr.c | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/src/streamsvr.c b/src/streamsvr.c index 16a5d1d52..524d99ad6 100644 --- a/src/streamsvr.c +++ b/src/streamsvr.c @@ -649,28 +649,14 @@ static void *strsvrthread(void *arg) /* read data from input stream */ while ((n=strread(svr->stream,svr->buff,svr->buffsize))>0&&svr->state) { - uint8_t *data_to_write=svr->buff; - int data_len=n; - uint8_t filtered_buff[4096]; - - /* Apply GGA filtering if input is NTRIP and relayback is enabled */ - if (svr->relayback>0 && svr->stream[0].type==STR_NTRIPCLI) { - - /* Filter through GGA filter for input stream */ - data_len=process_gga_filter(&gga_filter,svr->buff,n, - filtered_buff,sizeof(filtered_buff)); - data_to_write=filtered_buff; - } /* write data to output streams */ - if (data_len>0) { - for (i=1;instr;i++) { - if (svr->conv[i-1]) { - strconv(svr->stream+i,svr->conv[i-1],data_to_write,data_len); - } - else { - strwrite(svr->stream+i,data_to_write,data_len); - } + for (i=1;instr;i++) { + if (svr->conv[i-1]) { + strconv(svr->stream+i,svr->conv[i-1],svr->buff,n); + } + else { + strwrite(svr->stream+i,svr->buff,n); } } /* write data to log stream */ @@ -689,7 +675,18 @@ static void *strsvrthread(void *arg) /* relay back message from output stream to input stream */ if (i==svr->relayback) { - strwrite(svr->stream,buff,n); + /* Apply GGA filtering if relaying back to NTRIP client */ + if (svr->stream[0].type==STR_NTRIPCLI) { + uint8_t filtered_buff[4096]; + int filtered_len=process_gga_filter(&gga_filter,buff,n, + filtered_buff,sizeof(filtered_buff)); + if (filtered_len>0) { + strwrite(svr->stream,filtered_buff,filtered_len); + } + } + else { + strwrite(svr->stream,buff,n); + } } /* write data to log stream */ strwrite(svr->strlog+i,buff,n);