diff --git a/Makefile b/Makefile
index 68ab4e0..ff64997 100644
--- a/Makefile
+++ b/Makefile
@@ -29,7 +29,7 @@ CPPFLAGS += -DMODES_DUMP1090_VERSION=\"$(DUMP1090_VERSION)\" -DMODES_DUMP1090_VA
DIALECT = -std=c11
CFLAGS += $(DIALECT) -O3 -g -Wall -Wmissing-declarations -Werror -W -D_DEFAULT_SOURCE -fno-common
LIBS = -lpthread -lm -lrt
-SDR_OBJ = sdr.o sdr_ifile.o
+SDR_OBJ = sdr.o fifo.o sdr_ifile.o
ifeq ($(RTLSDR), yes)
SDR_OBJ += sdr_rtlsdr.o
diff --git a/demod_2400.c b/demod_2400.c
index 6c0fcbd..9cd7cd5 100644
--- a/demod_2400.c
+++ b/demod_2400.c
@@ -72,8 +72,11 @@ void demodulate2400(struct mag_buf *mag)
unsigned char *bestmsg;
int bestscore, bestphase;
+ // maximum lookahead we use
+ assert(mag->overlap >= 19 + 1 + 269);
+
uint16_t *m = mag->data;
- uint32_t mlen = mag->length;
+ uint32_t mlen = mag->validLength - mag->overlap;
uint64_t sum_scaled_signal_power = 0;
@@ -368,8 +371,8 @@ void demodulate2400(struct mag_buf *mag)
/* update noise power */
{
double sum_signal_power = sum_scaled_signal_power / 65535.0 / 65535.0;
- Modes.stats_current.noise_power_sum += (mag->mean_power * mag->length - sum_signal_power);
- Modes.stats_current.noise_power_count += mag->length;
+ Modes.stats_current.noise_power_sum += (mag->mean_power * mlen - sum_signal_power);
+ Modes.stats_current.noise_power_count += mlen;
}
}
@@ -471,7 +474,7 @@ void demodulate2400AC(struct mag_buf *mag)
{
struct modesMessage mm;
uint16_t *m = mag->data;
- uint32_t mlen = mag->length;
+ uint32_t mlen = mag->validLength - mag->overlap;
unsigned f1_sample;
memset(&mm, 0, sizeof(mm));
@@ -552,7 +555,7 @@ void demodulate2400AC(struct mag_buf *mag)
// F2 is 20.3us / 14 bit periods after F1
unsigned f2_clock = f1_clock + (87 * 14);
unsigned f2_sample = f2_clock / 25;
- assert(f2_sample < mlen + Modes.trailing_samples);
+ assert(f2_sample < mlen + mag->overlap);
if (!(m[f2_sample-1] < m[f2_sample+0]))
continue;
diff --git a/dump1090.c b/dump1090.c
index dfbabf1..19fe064 100644
--- a/dump1090.c
+++ b/dump1090.c
@@ -132,9 +132,6 @@ static void modesInitConfig(void) {
static void modesInit(void) {
int i;
- pthread_mutex_init(&Modes.data_mutex,NULL);
- pthread_cond_init(&Modes.data_cond,NULL);
-
Modes.sample_rate = 2400000.0;
// Allocate the various buffers used by Modes
@@ -146,15 +143,9 @@ static void modesInit(void) {
exit(1);
}
- for (i = 0; i < MODES_MAG_BUFFERS; ++i) {
- if ( (Modes.mag_buffers[i].data = calloc(MODES_MAG_BUF_SAMPLES+Modes.trailing_samples, sizeof(uint16_t))) == NULL ) {
- fprintf(stderr, "Out of memory allocating magnitude buffer.\n");
- exit(1);
- }
-
- Modes.mag_buffers[i].length = 0;
- Modes.mag_buffers[i].dropped = 0;
- Modes.mag_buffers[i].sampleTimestamp = 0;
+ if (!fifo_create(MODES_MAG_BUFFERS, MODES_MAG_BUF_SAMPLES + Modes.trailing_samples, Modes.trailing_samples)) {
+ fprintf(stderr, "Out of memory allocating FIFO\n");
+ exit(1);
}
// Validate the users Lat/Lon home location inputs
@@ -224,13 +215,10 @@ static void *readerThreadEntryPoint(void *arg)
sdrRun();
- // Wake the main thread (if it's still waiting)
- pthread_mutex_lock(&Modes.data_mutex);
if (!Modes.exit)
Modes.exit = 2; // unexpected exit
- pthread_cond_signal(&Modes.data_cond);
- pthread_mutex_unlock(&Modes.data_mutex);
+ fifo_halt(); // wakes the main thread, if it's still waiting
return NULL;
}
//
@@ -684,59 +672,34 @@ int main(int argc, char **argv) {
int watchdogCounter = 10; // about 1 second
// Create the thread that will read the data from the device.
- pthread_mutex_lock(&Modes.data_mutex);
pthread_create(&Modes.reader_thread, NULL, readerThreadEntryPoint, NULL);
while (!Modes.exit) {
+ // get the next sample buffer off the FIFO; wait only up to 100ms
+ // this is fairly aggressive as all our network I/O runs out of the background work!
+ struct mag_buf *buf = fifo_dequeue(100 /* milliseconds */);
struct timespec start_time;
- if (Modes.first_free_buffer == Modes.first_filled_buffer) {
- /* wait for more data.
- * we should be getting data every 50-60ms. wait for max 100ms before we give up and do some background work.
- * this is fairly aggressive as all our network I/O runs out of the background work!
- */
-
- struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_nsec += 100000000;
- normalize_timespec(&ts);
-
- pthread_cond_timedwait(&Modes.data_cond, &Modes.data_mutex, &ts); // This unlocks Modes.data_mutex, and waits for Modes.data_cond
- }
-
- // Modes.data_mutex is locked, and possibly we have data.
-
- if (Modes.first_free_buffer != Modes.first_filled_buffer) {
- // FIFO is not empty, process one buffer.
-
- struct mag_buf *buf;
+ if (buf) {
+ // Process one buffer
start_cpu_timing(&start_time);
- buf = &Modes.mag_buffers[Modes.first_filled_buffer];
-
- // Process data after releasing the lock, so that the capturing
- // thread can read data while we perform computationally expensive
- // stuff at the same time.
- pthread_mutex_unlock(&Modes.data_mutex);
-
demodulate2400(buf);
if (Modes.mode_ac) {
demodulate2400AC(buf);
}
- Modes.stats_current.samples_processed += buf->length;
+ Modes.stats_current.samples_processed += buf->validLength;
Modes.stats_current.samples_dropped += buf->dropped;
end_cpu_timing(&start_time, &Modes.stats_current.demod_cpu);
- // Mark the buffer we just processed as completed.
- pthread_mutex_lock(&Modes.data_mutex);
- Modes.first_filled_buffer = (Modes.first_filled_buffer + 1) % MODES_MAG_BUFFERS;
- pthread_cond_signal(&Modes.data_cond);
- pthread_mutex_unlock(&Modes.data_mutex);
+ // Return the buffer to the FIFO freelist for reuse
+ fifo_release(buf);
+
+ // We got something so reset the watchdog
watchdogCounter = 10;
} else {
// Nothing to process this time around.
- pthread_mutex_unlock(&Modes.data_mutex);
if (--watchdogCounter <= 0) {
log_with_timestamp("No data received from the SDR for a long time, it may have wedged");
watchdogCounter = 600;
@@ -746,16 +709,11 @@ int main(int argc, char **argv) {
start_cpu_timing(&start_time);
backgroundTasks();
end_cpu_timing(&start_time, &Modes.stats_current.background_cpu);
-
- pthread_mutex_lock(&Modes.data_mutex);
}
- pthread_mutex_unlock(&Modes.data_mutex);
-
log_with_timestamp("Waiting for receive thread termination");
+ fifo_halt(); // Reader thread should do this anyway, but just in case..
pthread_join(Modes.reader_thread,NULL); // Wait on reader thread exit
- pthread_cond_destroy(&Modes.data_cond); // Thread cleanup - only after the reader thread is dead!
- pthread_mutex_destroy(&Modes.data_mutex);
}
interactiveCleanup();
diff --git a/dump1090.h b/dump1090.h
index 3f0c8d3..2ed3339 100644
--- a/dump1090.h
+++ b/dump1090.h
@@ -273,6 +273,7 @@ typedef enum {
#include "icao_filter.h"
#include "convert.h"
#include "sdr.h"
+#include "fifo.h"
//======================== structure declarations =========================
@@ -280,27 +281,10 @@ typedef enum {
SDR_NONE, SDR_IFILE, SDR_RTLSDR, SDR_BLADERF, SDR_HACKRF, SDR_LIMESDR
} sdr_type_t;
-// Structure representing one magnitude buffer
-struct mag_buf {
- uint16_t *data; // Magnitude data. Starts with Modes.trailing_samples worth of overlap from the previous block
- unsigned length; // Number of valid samples _after_ overlap. Total buffer length is buf->length + Modes.trailing_samples.
- uint64_t sampleTimestamp; // Clock timestamp of the start of this block, 12MHz clock
- uint64_t sysTimestamp; // Estimated system time at start of block
- uint32_t dropped; // Number of dropped samples preceding this buffer
- double mean_level; // Mean of normalized (0..1) signal level
- double mean_power; // Mean of normalized (0..1) power level
-};
-
// Program global state
struct _Modes { // Internal state
pthread_t reader_thread;
- pthread_mutex_t data_mutex; // Mutex to synchronize buffer access
- pthread_cond_t data_cond; // Conditional variable associated
-
- struct mag_buf mag_buffers[MODES_MAG_BUFFERS]; // Converted magnitude buffers from RTL or file input
- unsigned first_free_buffer; // Entry in mag_buffers that will next be filled with input.
- unsigned first_filled_buffer; // Entry in mag_buffers that has valid data and will be demodulated next. If equal to next_free_buffer, there is no unprocessed data.
pthread_mutex_t reader_cpu_mutex; // mutex protecting reader_cpu_accumulator
struct timespec reader_cpu_accumulator; // accumulated CPU time used by the reader thread
struct timespec reader_cpu_start; // start time for the last reader thread CPU measurement
diff --git a/fifo.c b/fifo.c
new file mode 100644
index 0000000..15f4f78
--- /dev/null
+++ b/fifo.c
@@ -0,0 +1,253 @@
+// Part of dump1090, a Mode S message decoder for RTLSDR devices.
+//
+// fifo.c: Cross-thread SDR to demodulator FIFO support
+//
+// Copyright (c) 2020 FlightAware LLC
+//
+// This file is free software: you may copy, redistribute and/or modify it
+// under the terms of the GNU General Public License as published by the
+// Free Software Foundation, either version 2 of the License, or (at your
+// option) any later version.
+//
+// This file is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+#include "fifo.h"
+#include "util.h"
+
+#include
+#include
+#include
+#include
+#include
+#include
+
+static pthread_mutex_t fifo_mutex = PTHREAD_MUTEX_INITIALIZER; // mutex protecting the queues
+static pthread_cond_t fifo_notempty_cond = PTHREAD_COND_INITIALIZER; // condition used to signal FIFO-not-empty
+static pthread_cond_t fifo_empty_cond = PTHREAD_COND_INITIALIZER; // condition used to signal FIFO-empty
+static pthread_cond_t fifo_free_cond = PTHREAD_COND_INITIALIZER; // condition used to signal freelist-not-empty
+static struct mag_buf *fifo_head; // head of queued buffers awaiting demodulation
+static struct mag_buf *fifo_tail; // tail of queued buffers awaiting demodulation
+static struct mag_buf *fifo_freelist; // freelist of preallocated buffers
+static bool fifo_halted; // true if queue has been halted
+
+static unsigned overlap_length; // desired overlap size in samples (size of overlap_buffer)
+static uint16_t *overlap_buffer; // buffer used to save overlapping data
+
+// Create the queue structures. Not threadsafe.
+bool fifo_create(unsigned buffer_count, unsigned buffer_size, unsigned overlap)
+{
+ if (!(overlap_buffer = calloc(overlap, sizeof(overlap_buffer[0]))))
+ goto nomem;
+
+ overlap_length = overlap;
+
+ for (unsigned i = 0; i < buffer_count; ++i) {
+ struct mag_buf *newbuf;
+ if (!(newbuf = calloc(1, sizeof(*newbuf)))) {
+ goto nomem;
+ }
+
+ if (!(newbuf->data = calloc(buffer_size, sizeof(newbuf->data[0])))) {
+ free(newbuf);
+ goto nomem;
+ }
+
+ newbuf->totalLength = buffer_size;
+ newbuf->next = fifo_freelist;
+ fifo_freelist = newbuf;
+ }
+
+ return true;
+
+ nomem:
+ fifo_destroy();
+ return false;
+}
+
+static void free_buffer_list(struct mag_buf *head)
+{
+ while (head) {
+ struct mag_buf *next = head->next;
+ free(head->data);
+ free(head);
+ head = next;
+ }
+}
+
+void fifo_destroy()
+{
+ free_buffer_list(fifo_head);
+ fifo_freelist = NULL;
+
+ free_buffer_list(fifo_freelist);
+ fifo_head = fifo_tail = NULL;
+
+ free(overlap_buffer);
+ overlap_buffer = NULL;
+}
+
+void fifo_drain()
+{
+ pthread_mutex_lock(&fifo_mutex);
+ while (fifo_head && !fifo_halted) {
+ pthread_cond_wait(&fifo_empty_cond, &fifo_mutex);
+ }
+ pthread_mutex_unlock(&fifo_mutex);
+}
+
+void fifo_halt()
+{
+ pthread_mutex_lock(&fifo_mutex);
+
+ // Drain all enqueued buffers to the freelist
+ while (fifo_head) {
+ struct mag_buf *freebuf = fifo_head;
+ fifo_head = freebuf->next;
+
+ freebuf->next = fifo_freelist;
+ fifo_freelist = freebuf;
+ }
+
+ fifo_tail = NULL;
+ fifo_halted = true;
+
+ // wake all waiters
+ pthread_cond_broadcast(&fifo_notempty_cond);
+ pthread_cond_broadcast(&fifo_empty_cond);
+ pthread_cond_broadcast(&fifo_free_cond);
+ pthread_mutex_unlock(&fifo_mutex);
+}
+
+struct mag_buf *fifo_acquire(uint32_t timeout_ms)
+{
+ struct timespec deadline;
+ if (timeout_ms)
+ get_deadline(timeout_ms, &deadline);
+
+ pthread_mutex_lock(&fifo_mutex);
+
+ struct mag_buf *result = NULL;
+ while (!fifo_halted && !fifo_freelist) {
+ if (!timeout_ms) {
+ // Non-blocking
+ goto done;
+ }
+
+ // No free buffers, wait for one
+ if (pthread_cond_timedwait(&fifo_free_cond, &fifo_mutex, &deadline) < 0) {
+ if (errno != ETIMEDOUT)
+ return NULL; // unexpected error, mutex no longer held!
+
+ goto done; // timed out
+ }
+ }
+
+ if (!fifo_halted) {
+ result = fifo_freelist;
+ fifo_freelist = result->next;
+
+ result->overlap = overlap_length;
+ result->validLength = result->overlap;
+ result->sampleTimestamp = 0;
+ result->sysTimestamp = 0;
+ result->flags = 0;
+ result->next = NULL;
+ }
+
+ done:
+ pthread_mutex_unlock(&fifo_mutex);
+ return result;
+}
+
+void fifo_enqueue(struct mag_buf *buf)
+{
+ assert(buf->validLength <= buf->totalLength);
+ assert(buf->validLength >= overlap_length);
+
+ pthread_mutex_lock(&fifo_mutex);
+
+ if (fifo_halted) {
+ // Shutting down, just return the buffer to the freelist.
+ buf->next = fifo_freelist;
+ fifo_freelist = buf;
+ goto done;
+ }
+
+ // Populate the overlap region
+ if (buf->flags & MAGBUF_DISCONTINUOUS) {
+ // This buffer is discontinuous to the previous, so the overlap region is not valid; zero it out
+ memset(buf->data, 0, overlap_length * sizeof(buf->data[0]));
+ } else {
+ memcpy(buf->data, overlap_buffer, overlap_length * sizeof(buf->data[0]));
+ }
+
+ // Save the tail of the buffer for next time
+ memcpy(overlap_buffer, &buf->data[buf->validLength - overlap_length], overlap_length * sizeof(overlap_buffer[0]));
+
+ // enqueue and tell the main thread
+ buf->next = NULL;
+ if (!fifo_head) {
+ fifo_head = fifo_tail = buf;
+ pthread_cond_signal(&fifo_notempty_cond);
+ } else {
+ fifo_tail->next = buf;
+ }
+
+ done:
+ pthread_mutex_unlock(&fifo_mutex);
+}
+
+struct mag_buf *fifo_dequeue(uint32_t timeout_ms)
+{
+ struct timespec deadline;
+ if (timeout_ms)
+ get_deadline(timeout_ms, &deadline);
+
+ pthread_mutex_lock(&fifo_mutex);
+
+ struct mag_buf *result = NULL;
+ while (!fifo_head && !fifo_halted) {
+ if (!timeout_ms) {
+ // Non-blocking
+ goto done;
+ }
+
+ // No data pending, wait for some
+ if (pthread_cond_timedwait(&fifo_notempty_cond, &fifo_mutex, &deadline) < 0) {
+ if (errno != ETIMEDOUT)
+ return NULL; // unexpected error, mutex no longer held!
+
+ goto done; // timed out
+ }
+ }
+
+ if (!fifo_halted) {
+ result = fifo_head;
+ fifo_head = result->next;
+ result->next = NULL;
+ if (!fifo_head) {
+ fifo_tail = NULL;
+ pthread_cond_broadcast(&fifo_empty_cond);
+ }
+ }
+
+ done:
+ pthread_mutex_unlock(&fifo_mutex);
+ return result;
+}
+
+void fifo_release(struct mag_buf *buf)
+{
+ pthread_mutex_lock(&fifo_mutex);
+ if (!fifo_freelist)
+ pthread_cond_signal(&fifo_free_cond);
+ buf->next = fifo_freelist;
+ fifo_freelist = buf;
+ pthread_mutex_unlock(&fifo_mutex);
+}
diff --git a/fifo.h b/fifo.h
new file mode 100644
index 0000000..27d5993
--- /dev/null
+++ b/fifo.h
@@ -0,0 +1,119 @@
+// Part of dump1090, a Mode S message decoder for RTLSDR devices.
+//
+// fifo.h: Cross-thread SDR to demodulator FIFO support
+//
+// Copyright (c) 2020 FlightAware LLC
+//
+// This file is free software: you may copy, redistribute and/or modify it
+// under the terms of the GNU General Public License as published by the
+// Free Software Foundation, either version 2 of the License, or (at your
+// option) any later version.
+//
+// This file is distributed in the hope that it will be useful, but
+// WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+// General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+#ifndef FIFO_H
+#define FIFO_H
+
+#include
+#include
+
+// Values for mag_buf.flags
+typedef enum {
+ MAGBUF_DISCONTINUOUS = 1, // this buffer is discontinuous to the previous buffer
+} mag_buf_flags;
+
+// Structure representing one magnitude buffer
+// The contained data looks like this:
+//
+// 0 overlap validLength-overlap validLength totalLength
+// | | | | |
+// | overlap data from | new sample data | new sample data that | optional |
+// | previous buffer | | will be used as overlap | unused |
+// | | | in the next buffer | space |
+// | | | | |
+// | | [this is the position of the]| |
+// | | [last message that the ]| |
+// | | [demodulator will decode ]| |
+// | | | | |
+// | | | [partial messages that start ] |
+// | [copied here in ]| <---------------------- [after the cutoff will be copied] |
+// | [the next buffer]| | [to the starting overlap of the ] |
+// | | | [next buffer ] |
+//
+// The demodulator looks for signals starting at offsets 0 .. validLength-overlap-1,
+// with the trailing overlap region allowing decoding of a maximally-sized message that starts
+// at validLength-overlap-1. Signals that start after this point are not decoded, but they will
+// be copied into the starting overlap of the next buffer and decoded on the next iteration.
+
+struct mag_buf {
+ uint16_t *data; // Magnitude data, starting with overlap from the previous block
+ unsigned totalLength; // Maximum number of samples (allocated size of "data")
+ unsigned validLength; // Number of valid samples in "data", including overlap samples
+ unsigned overlap; // Number of leading overlap samples at the start of "data";
+ // also the number of trailing samples that will be preserved for next time
+
+ uint64_t sampleTimestamp; // Clock timestamp of the start of this block, 12MHz clock
+ uint64_t sysTimestamp; // Estimated system time at start of block
+
+ mag_buf_flags flags; // bitwise flags for this buffer
+ double mean_level; // Mean of normalized (0..1) signal level
+ double mean_power; // Mean of normalized (0..1) power level
+ unsigned dropped; // (approx) number of dropped samples, if flag MAGBUF_DISCONTINUOUS is set
+
+ struct mag_buf *next; // linked list forward link
+};
+
+// Create the queue structures. Not threadsafe. Returns true on success.
+//
+// buffer_count - the number of buffers to preallocate
+// buffer_size - the size of each magnitude buffer, in samples, including overlap
+// overlap - the number of samples to overlap between adjacent buffers
+bool fifo_create(unsigned buffer_count, unsigned buffer_size, unsigned overlap);
+
+// Destroy the fifo structures allocated in magbuf_fifo_create. Not threadsafe; ensure all FIFO users
+// are done before calling.
+void fifo_destroy();
+
+// Block until the FIFO is empty.
+void fifo_drain();
+
+// Mark the FIFO as halted. Move any buffers in FIFO to the freelist immediately.
+// Future calls to magbuf_acquire() will immediately return NULL.
+// Future calls to magbuf_produce() will immediately put the produced buffer on the freelist.
+// Future alls to magbuf_consume() will immediately return NULL; if there are
+// existing calls waiting on data, they will be immediately awoken and return NULL.
+void fifo_halt();
+
+// Get an unused buffer from the freelist and return it.
+// Block up to timeout_ms waiting for a free buffer. Return NULL if there are no
+// free buffers available within the timeout, or if the FIFO is halted.
+struct mag_buf *fifo_acquire(uint32_t timeout_ms);
+
+// Put a filled buffer (previously obtained from fifo_acquire) onto the head of the FIFO.
+// The caller should have filled:
+// buf->validLength
+// buf->data[buf->overlap .. buf->validLength-1]
+// buf->sampleTimestamp
+// buf->sysTimestamp
+// buf->flags
+// buf->mean_level (if flags & HAS_METRICS)
+// buf->mean_power (if flags & HAS_METRICS)
+// buf->dropped (if flags & DISCONTINUOUS)
+void fifo_enqueue(struct mag_buf *buf);
+
+// Get a buffer from the tail of the FIFO.
+// If the FIFO is halted (or becomes halted), return NULL immediately.
+// If the FIFO is empty, wait for up to "timeout_ms" milliseconds
+// for more data; return NULL if no data arrives within the timeout.
+struct mag_buf *fifo_dequeue(uint32_t timeout_ms);
+
+// Release a buffer previously returned by fifo_acquire() or fifo_pop() back to the freelist.
+void fifo_release(struct mag_buf *buf);
+
+#endif
diff --git a/sdr_bladerf.c b/sdr_bladerf.c
index 673183f..76c163a 100644
--- a/sdr_bladerf.c
+++ b/sdr_bladerf.c
@@ -310,7 +310,6 @@ static void *handle_bladerf_samples(struct bladerf *dev,
void *user_data)
{
static uint64_t nextTimestamp = 0;
- static bool dropping = false;
MODES_NOTUSED(dev);
MODES_NOTUSED(stream);
@@ -323,37 +322,20 @@ static void *handle_bladerf_samples(struct bladerf *dev,
sdrMonitor();
- pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) {
- pthread_mutex_unlock(&Modes.data_mutex);
return BLADERF_STREAM_SHUTDOWN;
}
- unsigned next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS;
- struct mag_buf *outbuf = &Modes.mag_buffers[Modes.first_free_buffer];
- struct mag_buf *lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
- unsigned free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS;
+ struct mag_buf *outbuf = fifo_acquire(/* don't wait */ 0);
- if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) {
+ if (!outbuf) {
// FIFO is full. Drop this block.
- dropping = true;
- pthread_mutex_unlock(&Modes.data_mutex);
return samples;
}
- dropping = false;
- pthread_mutex_unlock(&Modes.data_mutex);
-
- // Copy trailing data from last block (or reset if not valid)
- if (outbuf->dropped == 0) {
- memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t));
- } else {
- memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t));
- }
-
// start handling metadata blocks
outbuf->dropped = 0;
- outbuf->length = 0;
+ outbuf->validLength = outbuf->overlap;
outbuf->mean_level = outbuf->mean_power = 0;
unsigned blocks_processed = 0;
@@ -392,8 +374,9 @@ static void *handle_bladerf_samples(struct bladerf *dev,
// dropped data or lost sync. start again.
if (metadata_timestamp > nextTimestamp)
outbuf->dropped += (metadata_timestamp - nextTimestamp);
- outbuf->dropped += outbuf->length;
- outbuf->length = 0;
+ outbuf->dropped += outbuf->validLength - outbuf->overlap;
+ outbuf->validLength = outbuf->overlap;
+ outbuf->flags |= MAGBUF_DISCONTINUOUS;
blocks_processed = 0;
outbuf->mean_level = outbuf->mean_power = 0;
nextTimestamp = metadata_timestamp;
@@ -409,8 +392,8 @@ static void *handle_bladerf_samples(struct bladerf *dev,
// Convert a block of data
double mean_level, mean_power;
- BladeRF.converter(sample_data, &outbuf->data[Modes.trailing_samples + outbuf->length], samples_per_block, BladeRF.converter_state, &mean_level, &mean_power);
- outbuf->length += samples_per_block;
+ BladeRF.converter(sample_data, &outbuf->data[outbuf->validLength], samples_per_block, BladeRF.converter_state, &mean_level, &mean_power);
+ outbuf->validLength += samples_per_block;
outbuf->mean_level += mean_level;
outbuf->mean_power += mean_power;
nextTimestamp += samples_per_block * BladeRF.decimation;
@@ -422,21 +405,14 @@ static void *handle_bladerf_samples(struct bladerf *dev,
if (blocks_processed) {
// Get the approx system time for the start of this block
- unsigned block_duration = 1e3 * outbuf->length / Modes.sample_rate;
+ unsigned block_duration = 1e3 * (outbuf->validLength - outbuf->overlap) / Modes.sample_rate;
outbuf->sysTimestamp = entryTimestamp - block_duration;
outbuf->mean_level /= blocks_processed;
outbuf->mean_power /= blocks_processed;
// Push the new data to the demodulation thread
- pthread_mutex_lock(&Modes.data_mutex);
-
- Modes.mag_buffers[next_free_buffer].dropped = 0;
- Modes.mag_buffers[next_free_buffer].length = 0; // just in case
- Modes.first_free_buffer = next_free_buffer;
-
- pthread_cond_signal(&Modes.data_cond);
- pthread_mutex_unlock(&Modes.data_mutex);
+ fifo_enqueue(outbuf);
}
return samples;
diff --git a/sdr_hackrf.c b/sdr_hackrf.c
index d25a1ba..66c5bcb 100644
--- a/sdr_hackrf.c
+++ b/sdr_hackrf.c
@@ -197,82 +197,63 @@ bool hackRFOpen()
static int handle_hackrf_samples(hackrf_transfer *transfer)
{
- struct mag_buf *outbuf;
- struct mag_buf *lastbuf;
- unsigned char *buf;
- int32_t len;
- uint32_t slen;
- unsigned next_free_buffer;
- unsigned free_bufs;
- unsigned block_duration;
-
- static int dropping = 0;
+ static unsigned dropped = 0;
static uint64_t sampleCounter = 0;
sdrMonitor();
- // Lock the data buffer variables before accessing them
- pthread_mutex_lock(&Modes.data_mutex);
- if (Modes.exit) {
- pthread_mutex_unlock(&Modes.data_mutex);
+ if (Modes.exit || transfer->valid_length < 0)
return -1;
- }
- next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS;
- outbuf = &Modes.mag_buffers[Modes.first_free_buffer];
- lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
- free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS;
-
- buf = transfer->buffer;
- len = transfer->buffer_length;
+ uint8_t *buf = transfer->buffer;
+ unsigned len = transfer->valid_length;
// Values returned by HackRF need conversion from signed to unsigned
- for (int32_t i = 0; i < len; i++) {
+ for (unsigned i = 0; i < len; i++) {
buf[i] ^= (uint8_t)0x80;
}
- slen = len/2; // Drops any trailing odd sample, that's OK
+ unsigned samples_read = len / 2; // Drops any trailing odd sample, that's OK
- if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) {
+ struct mag_buf *outbuf = fifo_acquire(0 /* don't wait */);
+ if (!outbuf) {
// FIFO is full. Drop this block.
- dropping = 1;
- outbuf->dropped += slen;
- sampleCounter += slen;
- pthread_mutex_unlock(&Modes.data_mutex);
- return -1;
+ dropped += samples_read;
+ sampleCounter += samples_read;
+ return 0;
}
- dropping = 0;
- pthread_mutex_unlock(&Modes.data_mutex);
+ outbuf->flags = 0;
+
+ if (dropped) {
+ // We previously dropped some samples due to no buffers being available
+ outbuf->flags |= MAGBUF_DISCONTINUOUS;
+ outbuf->dropped = dropped;
+ }
+
+ dropped = 0;
// Compute the sample timestamp and system timestamp for the start of the block
outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate;
- sampleCounter += slen;
+ sampleCounter += samples_read;
// Get the approx system time for the start of this block
- block_duration = 1e3 * slen / Modes.sample_rate;
+ uint64_t block_duration = 1e3 * samples_read / Modes.sample_rate;
outbuf->sysTimestamp = mstime() - block_duration;
- // Copy trailing data from last block (or reset if not valid)
- if (outbuf->dropped == 0) {
- memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t));
- } else {
- memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t));
+ // Convert the new data
+ unsigned to_convert = samples_read;
+ if (to_convert + outbuf->overlap > outbuf->totalLength) {
+ // how did that happen?
+ to_convert = outbuf->totalLength - outbuf->overlap;
+ dropped = samples_read - to_convert;
}
- // Convert the new data
- outbuf->length = slen;
- HackRF.converter(buf, &outbuf->data[Modes.trailing_samples], slen, HackRF.converter_state, &outbuf->mean_level, &outbuf->mean_power);
+ HackRF.converter(buf, &outbuf->data[outbuf->overlap], to_convert, HackRF.converter_state, &outbuf->mean_level, &outbuf->mean_power);
+ outbuf->validLength = outbuf->overlap + to_convert;
- // Push the new data to the demodulation thread
- pthread_mutex_lock(&Modes.data_mutex);
-
- Modes.mag_buffers[next_free_buffer].dropped = 0;
- Modes.mag_buffers[next_free_buffer].length = 0; // just in case
- Modes.first_free_buffer = next_free_buffer;
-
- pthread_cond_signal(&Modes.data_cond);
- pthread_mutex_unlock(&Modes.data_mutex);
+ // Push to the demodulation thread
+ fifo_enqueue(outbuf);
return 0;
}
diff --git a/sdr_ifile.c b/sdr_ifile.c
index d7ee0fc..fb49a42 100644
--- a/sdr_ifile.c
+++ b/sdr_ifile.c
@@ -58,7 +58,8 @@ static struct {
int fd;
unsigned bytes_per_sample;
- void *readbuf;
+ unsigned bufsize;
+ char *readbuf;
iq_convert_fn converter;
struct converter_state *converter_state;
} ifile;
@@ -70,6 +71,7 @@ void ifileInitConfig(void)
ifile.throttle = false;
ifile.fd = -1;
ifile.bytes_per_sample = 0;
+ ifile.bufsize = 0;
ifile.readbuf = NULL;
ifile.converter = NULL;
ifile.converter_state = NULL;
@@ -152,7 +154,9 @@ bool ifileOpen(void)
return false;
}
- if (!(ifile.readbuf = malloc(MODES_MAG_BUF_SAMPLES * ifile.bytes_per_sample))) {
+ ifile.bufsize = ifile.bytes_per_sample * MODES_MAG_BUF_SAMPLES; /* ~1M samples, about half a second's worth */
+
+ if (!(ifile.readbuf = malloc(ifile.bufsize))) {
fprintf(stderr, "ifile: failed to allocate read buffer\n");
ifileClose();
return false;
@@ -176,90 +180,68 @@ void ifileRun()
if (ifile.fd < 0)
return;
- int eof = 0;
struct timespec next_buffer_delivery;
-
- uint64_t sampleCounter = 0;
-
clock_gettime(CLOCK_MONOTONIC, &next_buffer_delivery);
- pthread_mutex_lock(&Modes.data_mutex);
- while (!Modes.exit && !eof) {
- ssize_t nread, toread;
- void *r;
- struct mag_buf *outbuf, *lastbuf;
- unsigned next_free_buffer;
- unsigned slen;
+ bool eof = false;
+ uint64_t sampleCounter = 0;
- next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS;
- if (next_free_buffer == Modes.first_filled_buffer) {
- // no space for output yet
- pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex);
+ while (!Modes.exit && !eof) {
+ sdrMonitor();
+
+ /* wait for up to 1000ms for a buffer */
+ struct mag_buf *outbuf = fifo_acquire(100 /* milliseconds */);
+ if (!outbuf) {
+ // maybe we're slow, maybe we halted
continue;
}
- outbuf = &Modes.mag_buffers[Modes.first_free_buffer];
- lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
- pthread_mutex_unlock(&Modes.data_mutex);
-
- sdrMonitor();
-
- // Compute the sample timestamp for the start of the block
+ // Compute the sample timestamp and system time for the start of the block
outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate;
- sampleCounter += MODES_MAG_BUF_SAMPLES;
-
- // Copy trailing data from last block (or reset if not valid)
- if (lastbuf->length >= Modes.trailing_samples) {
- memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t));
- } else {
- memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t));
- }
-
- // Get the system time for the start of this block
outbuf->sysTimestamp = mstime();
- toread = MODES_MAG_BUF_SAMPLES * ifile.bytes_per_sample;
- r = ifile.readbuf;
- while (toread) {
- nread = read(ifile.fd, r, toread);
+ unsigned bytes_wanted = (outbuf->totalLength - outbuf->overlap) * ifile.bytes_per_sample;
+ if (bytes_wanted > ifile.bufsize)
+ bytes_wanted = ifile.bufsize;
+
+ unsigned bytes_read = 0;
+ while (bytes_read < bytes_wanted) {
+ ssize_t nread = read(ifile.fd, ifile.readbuf + bytes_read, bytes_wanted - bytes_read);
if (nread <= 0) {
if (nread < 0) {
fprintf(stderr, "ifile: error reading input file: %s\n", strerror(errno));
}
// Done.
- eof = 1;
+ eof = true;
break;
}
- r += nread;
- toread -= nread;
+ bytes_read += nread;
}
- slen = outbuf->length = MODES_MAG_BUF_SAMPLES - toread / ifile.bytes_per_sample;
+ unsigned samples_read = bytes_read / ifile.bytes_per_sample;
// Convert the new data
- ifile.converter(ifile.readbuf, &outbuf->data[Modes.trailing_samples], slen, ifile.converter_state, &outbuf->mean_level, &outbuf->mean_power);
+ ifile.converter(ifile.readbuf, &outbuf->data[outbuf->overlap], samples_read, ifile.converter_state, &outbuf->mean_level, &outbuf->mean_power);
+ outbuf->validLength = outbuf->overlap + samples_read;
+ outbuf->flags = 0;
if (ifile.throttle || Modes.interactive) {
- // Wait until we are allowed to release this buffer to the main thread
+ // Wait until we are allowed to release this buffer to the FIFO
while (clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_buffer_delivery, NULL) == EINTR)
;
// compute the time we can deliver the next buffer.
- next_buffer_delivery.tv_nsec += outbuf->length * 1e9 / Modes.sample_rate;
+ next_buffer_delivery.tv_nsec += samples_read * 1e9 / Modes.sample_rate;
normalize_timespec(&next_buffer_delivery);
}
- // Push the new data to the main thread
- pthread_mutex_lock(&Modes.data_mutex);
- Modes.first_free_buffer = next_free_buffer;
- pthread_cond_signal(&Modes.data_cond);
+ // Push the new data to the FIFO
+ fifo_enqueue(outbuf);
+ sampleCounter += samples_read;
}
- // Wait for the main thread to consume all data
- while (!Modes.exit && Modes.first_filled_buffer != Modes.first_free_buffer)
- pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex);
-
- pthread_mutex_unlock(&Modes.data_mutex);
+ // Wait for the FIFO to drain so we don't throw away trailing data
+ fifo_drain();
}
void ifileClose()
diff --git a/sdr_limesdr.c b/sdr_limesdr.c
index 13b1b03..8c5107b 100644
--- a/sdr_limesdr.c
+++ b/sdr_limesdr.c
@@ -317,87 +317,54 @@ bool limesdrOpen(void)
static void limesdrCallback(unsigned char *buf, uint32_t len, void *ctx)
{
- struct mag_buf *outbuf;
- struct mag_buf *lastbuf;
- uint32_t slen;
- unsigned next_free_buffer;
- unsigned free_bufs;
- unsigned block_duration;
-
- static int dropping = 0;
+ static int dropped = 0;
static uint64_t sampleCounter = 0;
MODES_NOTUSED(ctx);
sdrMonitor();
- // Lock the data buffer variables before accessing them
- pthread_mutex_lock(&Modes.data_mutex);
- if (Modes.exit) {
- LimeSDR.is_stop = true; // ask our caller to exit
- }
+ unsigned samples_read = len / LimeSDR.bytes_in_sample; // Drops any trailing odd sample, not much else we can do there
- next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS;
- outbuf = &Modes.mag_buffers[Modes.first_free_buffer];
- lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
- free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS;
-
- // Paranoia! Unlikely, but let's go for belt and suspenders here
-
- if (len != MODES_RTL_BUF_SIZE) {
- limesdrLogHandler(LMS_LOG_WARNING, "device gave us a block with an unusual size");
-
- if (len > MODES_RTL_BUF_SIZE) {
- // wat?! Discard the start.
- unsigned discard = (len - MODES_RTL_BUF_SIZE + 1) / LimeSDR.bytes_in_sample;
- outbuf->dropped += discard;
- buf += discard * LimeSDR.bytes_in_sample;
- len -= discard * LimeSDR.bytes_in_sample;
- }
- }
-
- slen = len / LimeSDR.bytes_in_sample; // Drops any trailing odd sample, that's OK
-
- if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) {
+ struct mag_buf *outbuf = fifo_acquire(0 /* don't wait */);
+ if (!outbuf) {
// FIFO is full. Drop this block.
- dropping = 1;
- outbuf->dropped += slen;
- sampleCounter += slen;
- pthread_mutex_unlock(&Modes.data_mutex);
+ dropped += samples_read;
+ sampleCounter += samples_read;
return;
}
- dropping = 0;
- pthread_mutex_unlock(&Modes.data_mutex);
+ outbuf->flags = 0;
+
+ if (dropped) {
+ // We previously dropped some samples due to no buffers being available
+ outbuf->flags |= MAGBUF_DISCONTINUOUS;
+ outbuf->dropped = dropped;
+ }
+
+ dropped = 0;
// Compute the sample timestamp and system timestamp for the start of the block
outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate;
- sampleCounter += slen;
+ sampleCounter += samples_read;
// Get the approx system time for the start of this block
- block_duration = 1e3 * slen / Modes.sample_rate;
+ unsigned block_duration = 1e3 * samples_read / Modes.sample_rate;
outbuf->sysTimestamp = mstime() - block_duration;
- // Copy trailing data from last block (or reset if not valid)
- if (outbuf->dropped == 0) {
- memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t));
- } else {
- memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t));
+ // Convert the new data
+ unsigned to_convert = samples_read;
+ if (to_convert + outbuf->overlap > outbuf->totalLength) {
+ // how did that happen?
+ to_convert = outbuf->totalLength - outbuf->overlap;
+ dropped = samples_read - to_convert;
}
- // Convert the new data
- outbuf->length = slen;
- LimeSDR.converter(buf, &outbuf->data[Modes.trailing_samples], slen, LimeSDR.converter_state, &outbuf->mean_level, &outbuf->mean_power);
+ LimeSDR.converter(buf, &outbuf->data[outbuf->overlap], to_convert, LimeSDR.converter_state, &outbuf->mean_level, &outbuf->mean_power);
+ outbuf->validLength = outbuf->overlap + to_convert;
- // Push the new data to the demodulation thread
- pthread_mutex_lock(&Modes.data_mutex);
-
- Modes.mag_buffers[next_free_buffer].dropped = 0;
- Modes.mag_buffers[next_free_buffer].length = 0; // just in case
- Modes.first_free_buffer = next_free_buffer;
-
- pthread_cond_signal(&Modes.data_cond);
- pthread_mutex_unlock(&Modes.data_mutex);
+ // Push to the demodulation thread
+ fifo_enqueue(outbuf);
}
void limesdrRun()
@@ -406,21 +373,26 @@ void limesdrRun()
return;
}
- int16_t *buffer = malloc(MODES_RTL_BUF_SIZE);
+ int16_t *buffer = malloc(MODES_MAG_BUF_SAMPLES * LimeSDR.bytes_in_sample);
+ if (!buffer) {
+ limesdrLogHandler(LMS_LOG_ERROR, "out of memory allocating sample buffer");
+ return;
+ }
LMS_StartStream(&LimeSDR.stream);
- while (!LimeSDR.is_stop) {
- int sampleCnt = LMS_RecvStream(&LimeSDR.stream, buffer, MODES_RTL_BUF_SIZE / LimeSDR.bytes_in_sample, NULL, 1000);
+ while (!Modes.exit) {
+ int sampleCnt = LMS_RecvStream(&LimeSDR.stream, buffer, MODES_MAG_BUF_SAMPLES, NULL, 1000);
+ if (sampleCnt < 0) {
+ limesdrLogHandler(LMS_LOG_ERROR, "LMS_RecvStream failed");
+ break;
+ }
+
if (sampleCnt) {
limesdrCallback((unsigned char *)buffer, sampleCnt * LimeSDR.bytes_in_sample, NULL);
}
}
- if (!Modes.exit) {
- limesdrLogHandler(LMS_LOG_WARNING, "async read returned unexpectedly");
- }
-
free(buffer);
LMS_StopStream(&LimeSDR.stream);
}
diff --git a/sdr_rtlsdr.c b/sdr_rtlsdr.c
index 7050eff..ec9a7a7 100644
--- a/sdr_rtlsdr.c
+++ b/sdr_rtlsdr.c
@@ -265,89 +265,66 @@ bool rtlsdrOpen(void) {
return true;
}
-static void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
- struct mag_buf *outbuf;
- struct mag_buf *lastbuf;
- uint32_t slen;
- unsigned next_free_buffer;
- unsigned free_bufs;
- unsigned block_duration;
-
- static int dropping = 0;
+static void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx)
+{
+ static unsigned dropped = 0;
static uint64_t sampleCounter = 0;
MODES_NOTUSED(ctx);
sdrMonitor();
- // Lock the data buffer variables before accessing them
- pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) {
rtlsdr_cancel_async(RTLSDR.dev); // ask our caller to exit
- }
-
- next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS;
- outbuf = &Modes.mag_buffers[Modes.first_free_buffer];
- lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
- free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS;
-
- // Paranoia! Unlikely, but let's go for belt and suspenders here
-
- if (len != MODES_RTL_BUF_SIZE) {
- fprintf(stderr, "weirdness: rtlsdr gave us a block with an unusual size (got %u bytes, expected %u bytes)\n",
- (unsigned)len, (unsigned)MODES_RTL_BUF_SIZE);
-
- if (len > MODES_RTL_BUF_SIZE) {
- // wat?! Discard the start.
- unsigned discard = (len - MODES_RTL_BUF_SIZE + 1) / 2;
- outbuf->dropped += discard;
- buf += discard*2;
- len -= discard*2;
- }
- }
-
- slen = len/2; // Drops any trailing odd sample, that's OK
-
- if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) {
- // FIFO is full. Drop this block.
- dropping = 1;
- outbuf->dropped += slen;
- sampleCounter += slen;
- pthread_mutex_unlock(&Modes.data_mutex);
return;
}
- dropping = 0;
- pthread_mutex_unlock(&Modes.data_mutex);
+ unsigned samples_read = len/2; // Drops any trailing odd sample, not much else we can do there
+ if (!samples_read)
+ return; // that wasn't useful
+
+ struct mag_buf *outbuf = fifo_acquire(0 /* don't wait */);
+ if (!outbuf) {
+ // FIFO is full. Drop this block.
+ dropped += samples_read;
+ sampleCounter += samples_read;
+ return;
+ }
+
+ outbuf->flags = 0;
+
+ if (dropped) {
+ // We previously dropped some samples due to no buffers being available
+ outbuf->flags |= MAGBUF_DISCONTINUOUS;
+ outbuf->dropped = dropped;
+ }
+
+ dropped = 0;
// Compute the sample timestamp and system timestamp for the start of the block
outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate;
- sampleCounter += slen;
+ sampleCounter += samples_read;
// Get the approx system time for the start of this block
- block_duration = 1e3 * slen / Modes.sample_rate;
+ uint64_t block_duration = 1e3 * samples_read / Modes.sample_rate;
outbuf->sysTimestamp = mstime() - block_duration;
- // Copy trailing data from last block (or reset if not valid)
- if (outbuf->dropped == 0) {
- memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t));
- } else {
- memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t));
+ // Convert the new data
+ unsigned to_convert = samples_read;
+ if (to_convert + outbuf->overlap > outbuf->totalLength) {
+ // how did that happen?
+ to_convert = outbuf->totalLength - outbuf->overlap;
+ dropped = samples_read - to_convert;
}
- // Convert the new data
- outbuf->length = slen;
- RTLSDR.converter(buf, &outbuf->data[Modes.trailing_samples], slen, RTLSDR.converter_state, &outbuf->mean_level, &outbuf->mean_power);
- // Push the new data to the demodulation thread
- pthread_mutex_lock(&Modes.data_mutex);
- Modes.mag_buffers[next_free_buffer].dropped = 0;
- Modes.mag_buffers[next_free_buffer].length = 0; // just in case
- Modes.first_free_buffer = next_free_buffer;
+ RTLSDR.converter(buf, &outbuf->data[outbuf->overlap], to_convert, RTLSDR.converter_state, &outbuf->mean_level, &outbuf->mean_power);
- pthread_cond_signal(&Modes.data_cond);
- pthread_mutex_unlock(&Modes.data_mutex);
+ outbuf->validLength = outbuf->overlap + to_convert;
+
+ // Push to the demodulation thread
+ fifo_enqueue(outbuf);
}
void rtlsdrRun()
diff --git a/util.c b/util.c
index bdaa38f..43a1674 100644
--- a/util.c
+++ b/util.c
@@ -90,6 +90,13 @@ void normalize_timespec(struct timespec *ts)
}
}
+void get_deadline(uint32_t timeout_ms, struct timespec *ts)
+{
+ clock_gettime(CLOCK_REALTIME, ts);
+ ts->tv_nsec += timeout_ms * 1000000;
+ normalize_timespec(ts);
+}
+
/* record current CPU time in start_time */
void start_cpu_timing(struct timespec *start_time)
{
diff --git a/util.h b/util.h
index be2fe35..fa2db26 100644
--- a/util.h
+++ b/util.h
@@ -45,6 +45,9 @@ int64_t receiveclock_ms_elapsed(uint64_t t1, uint64_t t2);
struct timespec;
void normalize_timespec(struct timespec *ts);
+/* Find the absolute system time that is `timeout_ms` milliseconds in the future, and store that in *ts */
+void get_deadline(uint32_t timeout_ms, struct timespec *ts);
+
/* record current CPU time in start_time */
void start_cpu_timing(struct timespec *start_time);
diff --git a/view1090.c b/view1090.c
index 02df179..c27a47f 100644
--- a/view1090.c
+++ b/view1090.c
@@ -66,9 +66,6 @@ static void view1090InitConfig(void) {
//
static void view1090Init(void) {
- pthread_mutex_init(&Modes.data_mutex,NULL);
- pthread_cond_init(&Modes.data_cond,NULL);
-
#ifdef _WIN32
if ( (!Modes.wsaData.wVersion)
&& (!Modes.wsaData.wHighVersion) ) {