Factor out FIFO handling; reimplement as a linked list, not a circular buffer.

Update all the SDR implementation to use it.

This was getting pretty ugly with code getting copy&pasted in all the SDR
implementations. Unify it all and give it a simpler API. Linked list works out
much simpler than the circular buffer. Also, simplify copying the overlap region
around by just using a separate buffer (it's only a few hundred bytes long, so
the double copy is not a big deal).
This commit is contained in:
Oliver Jowett 2020-08-05 16:18:59 +08:00
parent aa2929196d
commit 74607b31ed
14 changed files with 561 additions and 349 deletions

View File

@ -29,7 +29,7 @@ CPPFLAGS += -DMODES_DUMP1090_VERSION=\"$(DUMP1090_VERSION)\" -DMODES_DUMP1090_VA
DIALECT = -std=c11 DIALECT = -std=c11
CFLAGS += $(DIALECT) -O3 -g -Wall -Wmissing-declarations -Werror -W -D_DEFAULT_SOURCE -fno-common CFLAGS += $(DIALECT) -O3 -g -Wall -Wmissing-declarations -Werror -W -D_DEFAULT_SOURCE -fno-common
LIBS = -lpthread -lm -lrt LIBS = -lpthread -lm -lrt
SDR_OBJ = sdr.o sdr_ifile.o SDR_OBJ = sdr.o fifo.o sdr_ifile.o
ifeq ($(RTLSDR), yes) ifeq ($(RTLSDR), yes)
SDR_OBJ += sdr_rtlsdr.o SDR_OBJ += sdr_rtlsdr.o

View File

@ -72,8 +72,11 @@ void demodulate2400(struct mag_buf *mag)
unsigned char *bestmsg; unsigned char *bestmsg;
int bestscore, bestphase; int bestscore, bestphase;
// maximum lookahead we use
assert(mag->overlap >= 19 + 1 + 269);
uint16_t *m = mag->data; uint16_t *m = mag->data;
uint32_t mlen = mag->length; uint32_t mlen = mag->validLength - mag->overlap;
uint64_t sum_scaled_signal_power = 0; uint64_t sum_scaled_signal_power = 0;
@ -368,8 +371,8 @@ void demodulate2400(struct mag_buf *mag)
/* update noise power */ /* update noise power */
{ {
double sum_signal_power = sum_scaled_signal_power / 65535.0 / 65535.0; double sum_signal_power = sum_scaled_signal_power / 65535.0 / 65535.0;
Modes.stats_current.noise_power_sum += (mag->mean_power * mag->length - sum_signal_power); Modes.stats_current.noise_power_sum += (mag->mean_power * mlen - sum_signal_power);
Modes.stats_current.noise_power_count += mag->length; Modes.stats_current.noise_power_count += mlen;
} }
} }
@ -471,7 +474,7 @@ void demodulate2400AC(struct mag_buf *mag)
{ {
struct modesMessage mm; struct modesMessage mm;
uint16_t *m = mag->data; uint16_t *m = mag->data;
uint32_t mlen = mag->length; uint32_t mlen = mag->validLength - mag->overlap;
unsigned f1_sample; unsigned f1_sample;
memset(&mm, 0, sizeof(mm)); memset(&mm, 0, sizeof(mm));
@ -552,7 +555,7 @@ void demodulate2400AC(struct mag_buf *mag)
// F2 is 20.3us / 14 bit periods after F1 // F2 is 20.3us / 14 bit periods after F1
unsigned f2_clock = f1_clock + (87 * 14); unsigned f2_clock = f1_clock + (87 * 14);
unsigned f2_sample = f2_clock / 25; unsigned f2_sample = f2_clock / 25;
assert(f2_sample < mlen + Modes.trailing_samples); assert(f2_sample < mlen + mag->overlap);
if (!(m[f2_sample-1] < m[f2_sample+0])) if (!(m[f2_sample-1] < m[f2_sample+0]))
continue; continue;

View File

@ -132,9 +132,6 @@ static void modesInitConfig(void) {
static void modesInit(void) { static void modesInit(void) {
int i; int i;
pthread_mutex_init(&Modes.data_mutex,NULL);
pthread_cond_init(&Modes.data_cond,NULL);
Modes.sample_rate = 2400000.0; Modes.sample_rate = 2400000.0;
// Allocate the various buffers used by Modes // Allocate the various buffers used by Modes
@ -146,15 +143,9 @@ static void modesInit(void) {
exit(1); exit(1);
} }
for (i = 0; i < MODES_MAG_BUFFERS; ++i) { if (!fifo_create(MODES_MAG_BUFFERS, MODES_MAG_BUF_SAMPLES + Modes.trailing_samples, Modes.trailing_samples)) {
if ( (Modes.mag_buffers[i].data = calloc(MODES_MAG_BUF_SAMPLES+Modes.trailing_samples, sizeof(uint16_t))) == NULL ) { fprintf(stderr, "Out of memory allocating FIFO\n");
fprintf(stderr, "Out of memory allocating magnitude buffer.\n"); exit(1);
exit(1);
}
Modes.mag_buffers[i].length = 0;
Modes.mag_buffers[i].dropped = 0;
Modes.mag_buffers[i].sampleTimestamp = 0;
} }
// Validate the users Lat/Lon home location inputs // Validate the users Lat/Lon home location inputs
@ -224,13 +215,10 @@ static void *readerThreadEntryPoint(void *arg)
sdrRun(); sdrRun();
// Wake the main thread (if it's still waiting)
pthread_mutex_lock(&Modes.data_mutex);
if (!Modes.exit) if (!Modes.exit)
Modes.exit = 2; // unexpected exit Modes.exit = 2; // unexpected exit
pthread_cond_signal(&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex);
fifo_halt(); // wakes the main thread, if it's still waiting
return NULL; return NULL;
} }
// //
@ -684,59 +672,34 @@ int main(int argc, char **argv) {
int watchdogCounter = 10; // about 1 second int watchdogCounter = 10; // about 1 second
// Create the thread that will read the data from the device. // Create the thread that will read the data from the device.
pthread_mutex_lock(&Modes.data_mutex);
pthread_create(&Modes.reader_thread, NULL, readerThreadEntryPoint, NULL); pthread_create(&Modes.reader_thread, NULL, readerThreadEntryPoint, NULL);
while (!Modes.exit) { while (!Modes.exit) {
// get the next sample buffer off the FIFO; wait only up to 100ms
// this is fairly aggressive as all our network I/O runs out of the background work!
struct mag_buf *buf = fifo_dequeue(100 /* milliseconds */);
struct timespec start_time; struct timespec start_time;
if (Modes.first_free_buffer == Modes.first_filled_buffer) { if (buf) {
/* wait for more data. // Process one buffer
* we should be getting data every 50-60ms. wait for max 100ms before we give up and do some background work.
* this is fairly aggressive as all our network I/O runs out of the background work!
*/
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_nsec += 100000000;
normalize_timespec(&ts);
pthread_cond_timedwait(&Modes.data_cond, &Modes.data_mutex, &ts); // This unlocks Modes.data_mutex, and waits for Modes.data_cond
}
// Modes.data_mutex is locked, and possibly we have data.
if (Modes.first_free_buffer != Modes.first_filled_buffer) {
// FIFO is not empty, process one buffer.
struct mag_buf *buf;
start_cpu_timing(&start_time); start_cpu_timing(&start_time);
buf = &Modes.mag_buffers[Modes.first_filled_buffer];
// Process data after releasing the lock, so that the capturing
// thread can read data while we perform computationally expensive
// stuff at the same time.
pthread_mutex_unlock(&Modes.data_mutex);
demodulate2400(buf); demodulate2400(buf);
if (Modes.mode_ac) { if (Modes.mode_ac) {
demodulate2400AC(buf); demodulate2400AC(buf);
} }
Modes.stats_current.samples_processed += buf->length; Modes.stats_current.samples_processed += buf->validLength;
Modes.stats_current.samples_dropped += buf->dropped; Modes.stats_current.samples_dropped += buf->dropped;
end_cpu_timing(&start_time, &Modes.stats_current.demod_cpu); end_cpu_timing(&start_time, &Modes.stats_current.demod_cpu);
// Mark the buffer we just processed as completed. // Return the buffer to the FIFO freelist for reuse
pthread_mutex_lock(&Modes.data_mutex); fifo_release(buf);
Modes.first_filled_buffer = (Modes.first_filled_buffer + 1) % MODES_MAG_BUFFERS;
pthread_cond_signal(&Modes.data_cond); // We got something so reset the watchdog
pthread_mutex_unlock(&Modes.data_mutex);
watchdogCounter = 10; watchdogCounter = 10;
} else { } else {
// Nothing to process this time around. // Nothing to process this time around.
pthread_mutex_unlock(&Modes.data_mutex);
if (--watchdogCounter <= 0) { if (--watchdogCounter <= 0) {
log_with_timestamp("No data received from the SDR for a long time, it may have wedged"); log_with_timestamp("No data received from the SDR for a long time, it may have wedged");
watchdogCounter = 600; watchdogCounter = 600;
@ -746,16 +709,11 @@ int main(int argc, char **argv) {
start_cpu_timing(&start_time); start_cpu_timing(&start_time);
backgroundTasks(); backgroundTasks();
end_cpu_timing(&start_time, &Modes.stats_current.background_cpu); end_cpu_timing(&start_time, &Modes.stats_current.background_cpu);
pthread_mutex_lock(&Modes.data_mutex);
} }
pthread_mutex_unlock(&Modes.data_mutex);
log_with_timestamp("Waiting for receive thread termination"); log_with_timestamp("Waiting for receive thread termination");
fifo_halt(); // Reader thread should do this anyway, but just in case..
pthread_join(Modes.reader_thread,NULL); // Wait on reader thread exit pthread_join(Modes.reader_thread,NULL); // Wait on reader thread exit
pthread_cond_destroy(&Modes.data_cond); // Thread cleanup - only after the reader thread is dead!
pthread_mutex_destroy(&Modes.data_mutex);
} }
interactiveCleanup(); interactiveCleanup();

View File

@ -273,6 +273,7 @@ typedef enum {
#include "icao_filter.h" #include "icao_filter.h"
#include "convert.h" #include "convert.h"
#include "sdr.h" #include "sdr.h"
#include "fifo.h"
//======================== structure declarations ========================= //======================== structure declarations =========================
@ -280,27 +281,10 @@ typedef enum {
SDR_NONE, SDR_IFILE, SDR_RTLSDR, SDR_BLADERF, SDR_HACKRF, SDR_LIMESDR SDR_NONE, SDR_IFILE, SDR_RTLSDR, SDR_BLADERF, SDR_HACKRF, SDR_LIMESDR
} sdr_type_t; } sdr_type_t;
// Structure representing one magnitude buffer
struct mag_buf {
uint16_t *data; // Magnitude data. Starts with Modes.trailing_samples worth of overlap from the previous block
unsigned length; // Number of valid samples _after_ overlap. Total buffer length is buf->length + Modes.trailing_samples.
uint64_t sampleTimestamp; // Clock timestamp of the start of this block, 12MHz clock
uint64_t sysTimestamp; // Estimated system time at start of block
uint32_t dropped; // Number of dropped samples preceding this buffer
double mean_level; // Mean of normalized (0..1) signal level
double mean_power; // Mean of normalized (0..1) power level
};
// Program global state // Program global state
struct _Modes { // Internal state struct _Modes { // Internal state
pthread_t reader_thread; pthread_t reader_thread;
pthread_mutex_t data_mutex; // Mutex to synchronize buffer access
pthread_cond_t data_cond; // Conditional variable associated
struct mag_buf mag_buffers[MODES_MAG_BUFFERS]; // Converted magnitude buffers from RTL or file input
unsigned first_free_buffer; // Entry in mag_buffers that will next be filled with input.
unsigned first_filled_buffer; // Entry in mag_buffers that has valid data and will be demodulated next. If equal to next_free_buffer, there is no unprocessed data.
pthread_mutex_t reader_cpu_mutex; // mutex protecting reader_cpu_accumulator pthread_mutex_t reader_cpu_mutex; // mutex protecting reader_cpu_accumulator
struct timespec reader_cpu_accumulator; // accumulated CPU time used by the reader thread struct timespec reader_cpu_accumulator; // accumulated CPU time used by the reader thread
struct timespec reader_cpu_start; // start time for the last reader thread CPU measurement struct timespec reader_cpu_start; // start time for the last reader thread CPU measurement

253
fifo.c Normal file
View File

@ -0,0 +1,253 @@
// Part of dump1090, a Mode S message decoder for RTLSDR devices.
//
// fifo.c: Cross-thread SDR to demodulator FIFO support
//
// Copyright (c) 2020 FlightAware LLC
//
// This file is free software: you may copy, redistribute and/or modify it
// under the terms of the GNU General Public License as published by the
// Free Software Foundation, either version 2 of the License, or (at your
// option) any later version.
//
// This file is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "fifo.h"
#include "util.h"
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
static pthread_mutex_t fifo_mutex = PTHREAD_MUTEX_INITIALIZER; // mutex protecting the queues
static pthread_cond_t fifo_notempty_cond = PTHREAD_COND_INITIALIZER; // condition used to signal FIFO-not-empty
static pthread_cond_t fifo_empty_cond = PTHREAD_COND_INITIALIZER; // condition used to signal FIFO-empty
static pthread_cond_t fifo_free_cond = PTHREAD_COND_INITIALIZER; // condition used to signal freelist-not-empty
static struct mag_buf *fifo_head; // head of queued buffers awaiting demodulation
static struct mag_buf *fifo_tail; // tail of queued buffers awaiting demodulation
static struct mag_buf *fifo_freelist; // freelist of preallocated buffers
static bool fifo_halted; // true if queue has been halted
static unsigned overlap_length; // desired overlap size in samples (size of overlap_buffer)
static uint16_t *overlap_buffer; // buffer used to save overlapping data
// Create the queue structures. Not threadsafe.
bool fifo_create(unsigned buffer_count, unsigned buffer_size, unsigned overlap)
{
if (!(overlap_buffer = calloc(overlap, sizeof(overlap_buffer[0]))))
goto nomem;
overlap_length = overlap;
for (unsigned i = 0; i < buffer_count; ++i) {
struct mag_buf *newbuf;
if (!(newbuf = calloc(1, sizeof(*newbuf)))) {
goto nomem;
}
if (!(newbuf->data = calloc(buffer_size, sizeof(newbuf->data[0])))) {
free(newbuf);
goto nomem;
}
newbuf->totalLength = buffer_size;
newbuf->next = fifo_freelist;
fifo_freelist = newbuf;
}
return true;
nomem:
fifo_destroy();
return false;
}
static void free_buffer_list(struct mag_buf *head)
{
while (head) {
struct mag_buf *next = head->next;
free(head->data);
free(head);
head = next;
}
}
void fifo_destroy()
{
free_buffer_list(fifo_head);
fifo_freelist = NULL;
free_buffer_list(fifo_freelist);
fifo_head = fifo_tail = NULL;
free(overlap_buffer);
overlap_buffer = NULL;
}
void fifo_drain()
{
pthread_mutex_lock(&fifo_mutex);
while (fifo_head && !fifo_halted) {
pthread_cond_wait(&fifo_empty_cond, &fifo_mutex);
}
pthread_mutex_unlock(&fifo_mutex);
}
void fifo_halt()
{
pthread_mutex_lock(&fifo_mutex);
// Drain all enqueued buffers to the freelist
while (fifo_head) {
struct mag_buf *freebuf = fifo_head;
fifo_head = freebuf->next;
freebuf->next = fifo_freelist;
fifo_freelist = freebuf;
}
fifo_tail = NULL;
fifo_halted = true;
// wake all waiters
pthread_cond_broadcast(&fifo_notempty_cond);
pthread_cond_broadcast(&fifo_empty_cond);
pthread_cond_broadcast(&fifo_free_cond);
pthread_mutex_unlock(&fifo_mutex);
}
struct mag_buf *fifo_acquire(uint32_t timeout_ms)
{
struct timespec deadline;
if (timeout_ms)
get_deadline(timeout_ms, &deadline);
pthread_mutex_lock(&fifo_mutex);
struct mag_buf *result = NULL;
while (!fifo_halted && !fifo_freelist) {
if (!timeout_ms) {
// Non-blocking
goto done;
}
// No free buffers, wait for one
if (pthread_cond_timedwait(&fifo_free_cond, &fifo_mutex, &deadline) < 0) {
if (errno != ETIMEDOUT)
return NULL; // unexpected error, mutex no longer held!
goto done; // timed out
}
}
if (!fifo_halted) {
result = fifo_freelist;
fifo_freelist = result->next;
result->overlap = overlap_length;
result->validLength = result->overlap;
result->sampleTimestamp = 0;
result->sysTimestamp = 0;
result->flags = 0;
result->next = NULL;
}
done:
pthread_mutex_unlock(&fifo_mutex);
return result;
}
void fifo_enqueue(struct mag_buf *buf)
{
assert(buf->validLength <= buf->totalLength);
assert(buf->validLength >= overlap_length);
pthread_mutex_lock(&fifo_mutex);
if (fifo_halted) {
// Shutting down, just return the buffer to the freelist.
buf->next = fifo_freelist;
fifo_freelist = buf;
goto done;
}
// Populate the overlap region
if (buf->flags & MAGBUF_DISCONTINUOUS) {
// This buffer is discontinuous to the previous, so the overlap region is not valid; zero it out
memset(buf->data, 0, overlap_length * sizeof(buf->data[0]));
} else {
memcpy(buf->data, overlap_buffer, overlap_length * sizeof(buf->data[0]));
}
// Save the tail of the buffer for next time
memcpy(overlap_buffer, &buf->data[buf->validLength - overlap_length], overlap_length * sizeof(overlap_buffer[0]));
// enqueue and tell the main thread
buf->next = NULL;
if (!fifo_head) {
fifo_head = fifo_tail = buf;
pthread_cond_signal(&fifo_notempty_cond);
} else {
fifo_tail->next = buf;
}
done:
pthread_mutex_unlock(&fifo_mutex);
}
struct mag_buf *fifo_dequeue(uint32_t timeout_ms)
{
struct timespec deadline;
if (timeout_ms)
get_deadline(timeout_ms, &deadline);
pthread_mutex_lock(&fifo_mutex);
struct mag_buf *result = NULL;
while (!fifo_head && !fifo_halted) {
if (!timeout_ms) {
// Non-blocking
goto done;
}
// No data pending, wait for some
if (pthread_cond_timedwait(&fifo_notempty_cond, &fifo_mutex, &deadline) < 0) {
if (errno != ETIMEDOUT)
return NULL; // unexpected error, mutex no longer held!
goto done; // timed out
}
}
if (!fifo_halted) {
result = fifo_head;
fifo_head = result->next;
result->next = NULL;
if (!fifo_head) {
fifo_tail = NULL;
pthread_cond_broadcast(&fifo_empty_cond);
}
}
done:
pthread_mutex_unlock(&fifo_mutex);
return result;
}
void fifo_release(struct mag_buf *buf)
{
pthread_mutex_lock(&fifo_mutex);
if (!fifo_freelist)
pthread_cond_signal(&fifo_free_cond);
buf->next = fifo_freelist;
fifo_freelist = buf;
pthread_mutex_unlock(&fifo_mutex);
}

119
fifo.h Normal file
View File

@ -0,0 +1,119 @@
// Part of dump1090, a Mode S message decoder for RTLSDR devices.
//
// fifo.h: Cross-thread SDR to demodulator FIFO support
//
// Copyright (c) 2020 FlightAware LLC
//
// This file is free software: you may copy, redistribute and/or modify it
// under the terms of the GNU General Public License as published by the
// Free Software Foundation, either version 2 of the License, or (at your
// option) any later version.
//
// This file is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
#ifndef FIFO_H
#define FIFO_H
#include <stdbool.h>
#include <stdint.h>
// Values for mag_buf.flags
typedef enum {
MAGBUF_DISCONTINUOUS = 1, // this buffer is discontinuous to the previous buffer
} mag_buf_flags;
// Structure representing one magnitude buffer
// The contained data looks like this:
//
// 0 overlap validLength-overlap validLength totalLength
// | | | | |
// | overlap data from | new sample data | new sample data that | optional |
// | previous buffer | | will be used as overlap | unused |
// | | | in the next buffer | space |
// | | | | |
// | | [this is the position of the]| |
// | | [last message that the ]| |
// | | [demodulator will decode ]| |
// | | | | |
// | | | [partial messages that start ] |
// | [copied here in ]| <---------------------- [after the cutoff will be copied] |
// | [the next buffer]| | [to the starting overlap of the ] |
// | | | [next buffer ] |
//
// The demodulator looks for signals starting at offsets 0 .. validLength-overlap-1,
// with the trailing overlap region allowing decoding of a maximally-sized message that starts
// at validLength-overlap-1. Signals that start after this point are not decoded, but they will
// be copied into the starting overlap of the next buffer and decoded on the next iteration.
struct mag_buf {
uint16_t *data; // Magnitude data, starting with overlap from the previous block
unsigned totalLength; // Maximum number of samples (allocated size of "data")
unsigned validLength; // Number of valid samples in "data", including overlap samples
unsigned overlap; // Number of leading overlap samples at the start of "data";
// also the number of trailing samples that will be preserved for next time
uint64_t sampleTimestamp; // Clock timestamp of the start of this block, 12MHz clock
uint64_t sysTimestamp; // Estimated system time at start of block
mag_buf_flags flags; // bitwise flags for this buffer
double mean_level; // Mean of normalized (0..1) signal level
double mean_power; // Mean of normalized (0..1) power level
unsigned dropped; // (approx) number of dropped samples, if flag MAGBUF_DISCONTINUOUS is set
struct mag_buf *next; // linked list forward link
};
// Create the queue structures. Not threadsafe. Returns true on success.
//
// buffer_count - the number of buffers to preallocate
// buffer_size - the size of each magnitude buffer, in samples, including overlap
// overlap - the number of samples to overlap between adjacent buffers
bool fifo_create(unsigned buffer_count, unsigned buffer_size, unsigned overlap);
// Destroy the fifo structures allocated in magbuf_fifo_create. Not threadsafe; ensure all FIFO users
// are done before calling.
void fifo_destroy();
// Block until the FIFO is empty.
void fifo_drain();
// Mark the FIFO as halted. Move any buffers in FIFO to the freelist immediately.
// Future calls to magbuf_acquire() will immediately return NULL.
// Future calls to magbuf_produce() will immediately put the produced buffer on the freelist.
// Future alls to magbuf_consume() will immediately return NULL; if there are
// existing calls waiting on data, they will be immediately awoken and return NULL.
void fifo_halt();
// Get an unused buffer from the freelist and return it.
// Block up to timeout_ms waiting for a free buffer. Return NULL if there are no
// free buffers available within the timeout, or if the FIFO is halted.
struct mag_buf *fifo_acquire(uint32_t timeout_ms);
// Put a filled buffer (previously obtained from fifo_acquire) onto the head of the FIFO.
// The caller should have filled:
// buf->validLength
// buf->data[buf->overlap .. buf->validLength-1]
// buf->sampleTimestamp
// buf->sysTimestamp
// buf->flags
// buf->mean_level (if flags & HAS_METRICS)
// buf->mean_power (if flags & HAS_METRICS)
// buf->dropped (if flags & DISCONTINUOUS)
void fifo_enqueue(struct mag_buf *buf);
// Get a buffer from the tail of the FIFO.
// If the FIFO is halted (or becomes halted), return NULL immediately.
// If the FIFO is empty, wait for up to "timeout_ms" milliseconds
// for more data; return NULL if no data arrives within the timeout.
struct mag_buf *fifo_dequeue(uint32_t timeout_ms);
// Release a buffer previously returned by fifo_acquire() or fifo_pop() back to the freelist.
void fifo_release(struct mag_buf *buf);
#endif

View File

@ -310,7 +310,6 @@ static void *handle_bladerf_samples(struct bladerf *dev,
void *user_data) void *user_data)
{ {
static uint64_t nextTimestamp = 0; static uint64_t nextTimestamp = 0;
static bool dropping = false;
MODES_NOTUSED(dev); MODES_NOTUSED(dev);
MODES_NOTUSED(stream); MODES_NOTUSED(stream);
@ -323,37 +322,20 @@ static void *handle_bladerf_samples(struct bladerf *dev,
sdrMonitor(); sdrMonitor();
pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) { if (Modes.exit) {
pthread_mutex_unlock(&Modes.data_mutex);
return BLADERF_STREAM_SHUTDOWN; return BLADERF_STREAM_SHUTDOWN;
} }
unsigned next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS; struct mag_buf *outbuf = fifo_acquire(/* don't wait */ 0);
struct mag_buf *outbuf = &Modes.mag_buffers[Modes.first_free_buffer];
struct mag_buf *lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
unsigned free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS;
if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) { if (!outbuf) {
// FIFO is full. Drop this block. // FIFO is full. Drop this block.
dropping = true;
pthread_mutex_unlock(&Modes.data_mutex);
return samples; return samples;
} }
dropping = false;
pthread_mutex_unlock(&Modes.data_mutex);
// Copy trailing data from last block (or reset if not valid)
if (outbuf->dropped == 0) {
memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t));
} else {
memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t));
}
// start handling metadata blocks // start handling metadata blocks
outbuf->dropped = 0; outbuf->dropped = 0;
outbuf->length = 0; outbuf->validLength = outbuf->overlap;
outbuf->mean_level = outbuf->mean_power = 0; outbuf->mean_level = outbuf->mean_power = 0;
unsigned blocks_processed = 0; unsigned blocks_processed = 0;
@ -392,8 +374,9 @@ static void *handle_bladerf_samples(struct bladerf *dev,
// dropped data or lost sync. start again. // dropped data or lost sync. start again.
if (metadata_timestamp > nextTimestamp) if (metadata_timestamp > nextTimestamp)
outbuf->dropped += (metadata_timestamp - nextTimestamp); outbuf->dropped += (metadata_timestamp - nextTimestamp);
outbuf->dropped += outbuf->length; outbuf->dropped += outbuf->validLength - outbuf->overlap;
outbuf->length = 0; outbuf->validLength = outbuf->overlap;
outbuf->flags |= MAGBUF_DISCONTINUOUS;
blocks_processed = 0; blocks_processed = 0;
outbuf->mean_level = outbuf->mean_power = 0; outbuf->mean_level = outbuf->mean_power = 0;
nextTimestamp = metadata_timestamp; nextTimestamp = metadata_timestamp;
@ -409,8 +392,8 @@ static void *handle_bladerf_samples(struct bladerf *dev,
// Convert a block of data // Convert a block of data
double mean_level, mean_power; double mean_level, mean_power;
BladeRF.converter(sample_data, &outbuf->data[Modes.trailing_samples + outbuf->length], samples_per_block, BladeRF.converter_state, &mean_level, &mean_power); BladeRF.converter(sample_data, &outbuf->data[outbuf->validLength], samples_per_block, BladeRF.converter_state, &mean_level, &mean_power);
outbuf->length += samples_per_block; outbuf->validLength += samples_per_block;
outbuf->mean_level += mean_level; outbuf->mean_level += mean_level;
outbuf->mean_power += mean_power; outbuf->mean_power += mean_power;
nextTimestamp += samples_per_block * BladeRF.decimation; nextTimestamp += samples_per_block * BladeRF.decimation;
@ -422,21 +405,14 @@ static void *handle_bladerf_samples(struct bladerf *dev,
if (blocks_processed) { if (blocks_processed) {
// Get the approx system time for the start of this block // Get the approx system time for the start of this block
unsigned block_duration = 1e3 * outbuf->length / Modes.sample_rate; unsigned block_duration = 1e3 * (outbuf->validLength - outbuf->overlap) / Modes.sample_rate;
outbuf->sysTimestamp = entryTimestamp - block_duration; outbuf->sysTimestamp = entryTimestamp - block_duration;
outbuf->mean_level /= blocks_processed; outbuf->mean_level /= blocks_processed;
outbuf->mean_power /= blocks_processed; outbuf->mean_power /= blocks_processed;
// Push the new data to the demodulation thread // Push the new data to the demodulation thread
pthread_mutex_lock(&Modes.data_mutex); fifo_enqueue(outbuf);
Modes.mag_buffers[next_free_buffer].dropped = 0;
Modes.mag_buffers[next_free_buffer].length = 0; // just in case
Modes.first_free_buffer = next_free_buffer;
pthread_cond_signal(&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex);
} }
return samples; return samples;

View File

@ -197,82 +197,63 @@ bool hackRFOpen()
static int handle_hackrf_samples(hackrf_transfer *transfer) static int handle_hackrf_samples(hackrf_transfer *transfer)
{ {
struct mag_buf *outbuf; static unsigned dropped = 0;
struct mag_buf *lastbuf;
unsigned char *buf;
int32_t len;
uint32_t slen;
unsigned next_free_buffer;
unsigned free_bufs;
unsigned block_duration;
static int dropping = 0;
static uint64_t sampleCounter = 0; static uint64_t sampleCounter = 0;
sdrMonitor(); sdrMonitor();
// Lock the data buffer variables before accessing them if (Modes.exit || transfer->valid_length < 0)
pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) {
pthread_mutex_unlock(&Modes.data_mutex);
return -1; return -1;
}
next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS; uint8_t *buf = transfer->buffer;
outbuf = &Modes.mag_buffers[Modes.first_free_buffer]; unsigned len = transfer->valid_length;
lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS;
buf = transfer->buffer;
len = transfer->buffer_length;
// Values returned by HackRF need conversion from signed to unsigned // Values returned by HackRF need conversion from signed to unsigned
for (int32_t i = 0; i < len; i++) { for (unsigned i = 0; i < len; i++) {
buf[i] ^= (uint8_t)0x80; buf[i] ^= (uint8_t)0x80;
} }
slen = len/2; // Drops any trailing odd sample, that's OK unsigned samples_read = len / 2; // Drops any trailing odd sample, that's OK
if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) { struct mag_buf *outbuf = fifo_acquire(0 /* don't wait */);
if (!outbuf) {
// FIFO is full. Drop this block. // FIFO is full. Drop this block.
dropping = 1; dropped += samples_read;
outbuf->dropped += slen; sampleCounter += samples_read;
sampleCounter += slen; return 0;
pthread_mutex_unlock(&Modes.data_mutex);
return -1;
} }
dropping = 0; outbuf->flags = 0;
pthread_mutex_unlock(&Modes.data_mutex);
if (dropped) {
// We previously dropped some samples due to no buffers being available
outbuf->flags |= MAGBUF_DISCONTINUOUS;
outbuf->dropped = dropped;
}
dropped = 0;
// Compute the sample timestamp and system timestamp for the start of the block // Compute the sample timestamp and system timestamp for the start of the block
outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate; outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate;
sampleCounter += slen; sampleCounter += samples_read;
// Get the approx system time for the start of this block // Get the approx system time for the start of this block
block_duration = 1e3 * slen / Modes.sample_rate; uint64_t block_duration = 1e3 * samples_read / Modes.sample_rate;
outbuf->sysTimestamp = mstime() - block_duration; outbuf->sysTimestamp = mstime() - block_duration;
// Copy trailing data from last block (or reset if not valid) // Convert the new data
if (outbuf->dropped == 0) { unsigned to_convert = samples_read;
memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t)); if (to_convert + outbuf->overlap > outbuf->totalLength) {
} else { // how did that happen?
memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t)); to_convert = outbuf->totalLength - outbuf->overlap;
dropped = samples_read - to_convert;
} }
// Convert the new data HackRF.converter(buf, &outbuf->data[outbuf->overlap], to_convert, HackRF.converter_state, &outbuf->mean_level, &outbuf->mean_power);
outbuf->length = slen; outbuf->validLength = outbuf->overlap + to_convert;
HackRF.converter(buf, &outbuf->data[Modes.trailing_samples], slen, HackRF.converter_state, &outbuf->mean_level, &outbuf->mean_power);
// Push the new data to the demodulation thread // Push to the demodulation thread
pthread_mutex_lock(&Modes.data_mutex); fifo_enqueue(outbuf);
Modes.mag_buffers[next_free_buffer].dropped = 0;
Modes.mag_buffers[next_free_buffer].length = 0; // just in case
Modes.first_free_buffer = next_free_buffer;
pthread_cond_signal(&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex);
return 0; return 0;
} }

View File

@ -58,7 +58,8 @@ static struct {
int fd; int fd;
unsigned bytes_per_sample; unsigned bytes_per_sample;
void *readbuf; unsigned bufsize;
char *readbuf;
iq_convert_fn converter; iq_convert_fn converter;
struct converter_state *converter_state; struct converter_state *converter_state;
} ifile; } ifile;
@ -70,6 +71,7 @@ void ifileInitConfig(void)
ifile.throttle = false; ifile.throttle = false;
ifile.fd = -1; ifile.fd = -1;
ifile.bytes_per_sample = 0; ifile.bytes_per_sample = 0;
ifile.bufsize = 0;
ifile.readbuf = NULL; ifile.readbuf = NULL;
ifile.converter = NULL; ifile.converter = NULL;
ifile.converter_state = NULL; ifile.converter_state = NULL;
@ -152,7 +154,9 @@ bool ifileOpen(void)
return false; return false;
} }
if (!(ifile.readbuf = malloc(MODES_MAG_BUF_SAMPLES * ifile.bytes_per_sample))) { ifile.bufsize = ifile.bytes_per_sample * MODES_MAG_BUF_SAMPLES; /* ~1M samples, about half a second's worth */
if (!(ifile.readbuf = malloc(ifile.bufsize))) {
fprintf(stderr, "ifile: failed to allocate read buffer\n"); fprintf(stderr, "ifile: failed to allocate read buffer\n");
ifileClose(); ifileClose();
return false; return false;
@ -176,90 +180,68 @@ void ifileRun()
if (ifile.fd < 0) if (ifile.fd < 0)
return; return;
int eof = 0;
struct timespec next_buffer_delivery; struct timespec next_buffer_delivery;
uint64_t sampleCounter = 0;
clock_gettime(CLOCK_MONOTONIC, &next_buffer_delivery); clock_gettime(CLOCK_MONOTONIC, &next_buffer_delivery);
pthread_mutex_lock(&Modes.data_mutex); bool eof = false;
while (!Modes.exit && !eof) { uint64_t sampleCounter = 0;
ssize_t nread, toread;
void *r;
struct mag_buf *outbuf, *lastbuf;
unsigned next_free_buffer;
unsigned slen;
next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS; while (!Modes.exit && !eof) {
if (next_free_buffer == Modes.first_filled_buffer) { sdrMonitor();
// no space for output yet
pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex); /* wait for up to 1000ms for a buffer */
struct mag_buf *outbuf = fifo_acquire(100 /* milliseconds */);
if (!outbuf) {
// maybe we're slow, maybe we halted
continue; continue;
} }
outbuf = &Modes.mag_buffers[Modes.first_free_buffer]; // Compute the sample timestamp and system time for the start of the block
lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
pthread_mutex_unlock(&Modes.data_mutex);
sdrMonitor();
// Compute the sample timestamp for the start of the block
outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate; outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate;
sampleCounter += MODES_MAG_BUF_SAMPLES;
// Copy trailing data from last block (or reset if not valid)
if (lastbuf->length >= Modes.trailing_samples) {
memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t));
} else {
memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t));
}
// Get the system time for the start of this block
outbuf->sysTimestamp = mstime(); outbuf->sysTimestamp = mstime();
toread = MODES_MAG_BUF_SAMPLES * ifile.bytes_per_sample; unsigned bytes_wanted = (outbuf->totalLength - outbuf->overlap) * ifile.bytes_per_sample;
r = ifile.readbuf; if (bytes_wanted > ifile.bufsize)
while (toread) { bytes_wanted = ifile.bufsize;
nread = read(ifile.fd, r, toread);
unsigned bytes_read = 0;
while (bytes_read < bytes_wanted) {
ssize_t nread = read(ifile.fd, ifile.readbuf + bytes_read, bytes_wanted - bytes_read);
if (nread <= 0) { if (nread <= 0) {
if (nread < 0) { if (nread < 0) {
fprintf(stderr, "ifile: error reading input file: %s\n", strerror(errno)); fprintf(stderr, "ifile: error reading input file: %s\n", strerror(errno));
} }
// Done. // Done.
eof = 1; eof = true;
break; break;
} }
r += nread; bytes_read += nread;
toread -= nread;
} }
slen = outbuf->length = MODES_MAG_BUF_SAMPLES - toread / ifile.bytes_per_sample; unsigned samples_read = bytes_read / ifile.bytes_per_sample;
// Convert the new data // Convert the new data
ifile.converter(ifile.readbuf, &outbuf->data[Modes.trailing_samples], slen, ifile.converter_state, &outbuf->mean_level, &outbuf->mean_power); ifile.converter(ifile.readbuf, &outbuf->data[outbuf->overlap], samples_read, ifile.converter_state, &outbuf->mean_level, &outbuf->mean_power);
outbuf->validLength = outbuf->overlap + samples_read;
outbuf->flags = 0;
if (ifile.throttle || Modes.interactive) { if (ifile.throttle || Modes.interactive) {
// Wait until we are allowed to release this buffer to the main thread // Wait until we are allowed to release this buffer to the FIFO
while (clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_buffer_delivery, NULL) == EINTR) while (clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_buffer_delivery, NULL) == EINTR)
; ;
// compute the time we can deliver the next buffer. // compute the time we can deliver the next buffer.
next_buffer_delivery.tv_nsec += outbuf->length * 1e9 / Modes.sample_rate; next_buffer_delivery.tv_nsec += samples_read * 1e9 / Modes.sample_rate;
normalize_timespec(&next_buffer_delivery); normalize_timespec(&next_buffer_delivery);
} }
// Push the new data to the main thread // Push the new data to the FIFO
pthread_mutex_lock(&Modes.data_mutex); fifo_enqueue(outbuf);
Modes.first_free_buffer = next_free_buffer; sampleCounter += samples_read;
pthread_cond_signal(&Modes.data_cond);
} }
// Wait for the main thread to consume all data // Wait for the FIFO to drain so we don't throw away trailing data
while (!Modes.exit && Modes.first_filled_buffer != Modes.first_free_buffer) fifo_drain();
pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex);
pthread_mutex_unlock(&Modes.data_mutex);
} }
void ifileClose() void ifileClose()

View File

@ -317,87 +317,54 @@ bool limesdrOpen(void)
static void limesdrCallback(unsigned char *buf, uint32_t len, void *ctx) static void limesdrCallback(unsigned char *buf, uint32_t len, void *ctx)
{ {
struct mag_buf *outbuf; static int dropped = 0;
struct mag_buf *lastbuf;
uint32_t slen;
unsigned next_free_buffer;
unsigned free_bufs;
unsigned block_duration;
static int dropping = 0;
static uint64_t sampleCounter = 0; static uint64_t sampleCounter = 0;
MODES_NOTUSED(ctx); MODES_NOTUSED(ctx);
sdrMonitor(); sdrMonitor();
// Lock the data buffer variables before accessing them unsigned samples_read = len / LimeSDR.bytes_in_sample; // Drops any trailing odd sample, not much else we can do there
pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) {
LimeSDR.is_stop = true; // ask our caller to exit
}
next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS; struct mag_buf *outbuf = fifo_acquire(0 /* don't wait */);
outbuf = &Modes.mag_buffers[Modes.first_free_buffer]; if (!outbuf) {
lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS;
// Paranoia! Unlikely, but let's go for belt and suspenders here
if (len != MODES_RTL_BUF_SIZE) {
limesdrLogHandler(LMS_LOG_WARNING, "device gave us a block with an unusual size");
if (len > MODES_RTL_BUF_SIZE) {
// wat?! Discard the start.
unsigned discard = (len - MODES_RTL_BUF_SIZE + 1) / LimeSDR.bytes_in_sample;
outbuf->dropped += discard;
buf += discard * LimeSDR.bytes_in_sample;
len -= discard * LimeSDR.bytes_in_sample;
}
}
slen = len / LimeSDR.bytes_in_sample; // Drops any trailing odd sample, that's OK
if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) {
// FIFO is full. Drop this block. // FIFO is full. Drop this block.
dropping = 1; dropped += samples_read;
outbuf->dropped += slen; sampleCounter += samples_read;
sampleCounter += slen;
pthread_mutex_unlock(&Modes.data_mutex);
return; return;
} }
dropping = 0; outbuf->flags = 0;
pthread_mutex_unlock(&Modes.data_mutex);
if (dropped) {
// We previously dropped some samples due to no buffers being available
outbuf->flags |= MAGBUF_DISCONTINUOUS;
outbuf->dropped = dropped;
}
dropped = 0;
// Compute the sample timestamp and system timestamp for the start of the block // Compute the sample timestamp and system timestamp for the start of the block
outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate; outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate;
sampleCounter += slen; sampleCounter += samples_read;
// Get the approx system time for the start of this block // Get the approx system time for the start of this block
block_duration = 1e3 * slen / Modes.sample_rate; unsigned block_duration = 1e3 * samples_read / Modes.sample_rate;
outbuf->sysTimestamp = mstime() - block_duration; outbuf->sysTimestamp = mstime() - block_duration;
// Copy trailing data from last block (or reset if not valid) // Convert the new data
if (outbuf->dropped == 0) { unsigned to_convert = samples_read;
memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t)); if (to_convert + outbuf->overlap > outbuf->totalLength) {
} else { // how did that happen?
memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t)); to_convert = outbuf->totalLength - outbuf->overlap;
dropped = samples_read - to_convert;
} }
// Convert the new data LimeSDR.converter(buf, &outbuf->data[outbuf->overlap], to_convert, LimeSDR.converter_state, &outbuf->mean_level, &outbuf->mean_power);
outbuf->length = slen; outbuf->validLength = outbuf->overlap + to_convert;
LimeSDR.converter(buf, &outbuf->data[Modes.trailing_samples], slen, LimeSDR.converter_state, &outbuf->mean_level, &outbuf->mean_power);
// Push the new data to the demodulation thread // Push to the demodulation thread
pthread_mutex_lock(&Modes.data_mutex); fifo_enqueue(outbuf);
Modes.mag_buffers[next_free_buffer].dropped = 0;
Modes.mag_buffers[next_free_buffer].length = 0; // just in case
Modes.first_free_buffer = next_free_buffer;
pthread_cond_signal(&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex);
} }
void limesdrRun() void limesdrRun()
@ -406,21 +373,26 @@ void limesdrRun()
return; return;
} }
int16_t *buffer = malloc(MODES_RTL_BUF_SIZE); int16_t *buffer = malloc(MODES_MAG_BUF_SAMPLES * LimeSDR.bytes_in_sample);
if (!buffer) {
limesdrLogHandler(LMS_LOG_ERROR, "out of memory allocating sample buffer");
return;
}
LMS_StartStream(&LimeSDR.stream); LMS_StartStream(&LimeSDR.stream);
while (!LimeSDR.is_stop) { while (!Modes.exit) {
int sampleCnt = LMS_RecvStream(&LimeSDR.stream, buffer, MODES_RTL_BUF_SIZE / LimeSDR.bytes_in_sample, NULL, 1000); int sampleCnt = LMS_RecvStream(&LimeSDR.stream, buffer, MODES_MAG_BUF_SAMPLES, NULL, 1000);
if (sampleCnt < 0) {
limesdrLogHandler(LMS_LOG_ERROR, "LMS_RecvStream failed");
break;
}
if (sampleCnt) { if (sampleCnt) {
limesdrCallback((unsigned char *)buffer, sampleCnt * LimeSDR.bytes_in_sample, NULL); limesdrCallback((unsigned char *)buffer, sampleCnt * LimeSDR.bytes_in_sample, NULL);
} }
} }
if (!Modes.exit) {
limesdrLogHandler(LMS_LOG_WARNING, "async read returned unexpectedly");
}
free(buffer); free(buffer);
LMS_StopStream(&LimeSDR.stream); LMS_StopStream(&LimeSDR.stream);
} }

View File

@ -265,89 +265,66 @@ bool rtlsdrOpen(void) {
return true; return true;
} }
static void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) { static void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx)
struct mag_buf *outbuf; {
struct mag_buf *lastbuf; static unsigned dropped = 0;
uint32_t slen;
unsigned next_free_buffer;
unsigned free_bufs;
unsigned block_duration;
static int dropping = 0;
static uint64_t sampleCounter = 0; static uint64_t sampleCounter = 0;
MODES_NOTUSED(ctx); MODES_NOTUSED(ctx);
sdrMonitor(); sdrMonitor();
// Lock the data buffer variables before accessing them
pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) { if (Modes.exit) {
rtlsdr_cancel_async(RTLSDR.dev); // ask our caller to exit rtlsdr_cancel_async(RTLSDR.dev); // ask our caller to exit
}
next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS;
outbuf = &Modes.mag_buffers[Modes.first_free_buffer];
lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS;
// Paranoia! Unlikely, but let's go for belt and suspenders here
if (len != MODES_RTL_BUF_SIZE) {
fprintf(stderr, "weirdness: rtlsdr gave us a block with an unusual size (got %u bytes, expected %u bytes)\n",
(unsigned)len, (unsigned)MODES_RTL_BUF_SIZE);
if (len > MODES_RTL_BUF_SIZE) {
// wat?! Discard the start.
unsigned discard = (len - MODES_RTL_BUF_SIZE + 1) / 2;
outbuf->dropped += discard;
buf += discard*2;
len -= discard*2;
}
}
slen = len/2; // Drops any trailing odd sample, that's OK
if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) {
// FIFO is full. Drop this block.
dropping = 1;
outbuf->dropped += slen;
sampleCounter += slen;
pthread_mutex_unlock(&Modes.data_mutex);
return; return;
} }
dropping = 0; unsigned samples_read = len/2; // Drops any trailing odd sample, not much else we can do there
pthread_mutex_unlock(&Modes.data_mutex); if (!samples_read)
return; // that wasn't useful
struct mag_buf *outbuf = fifo_acquire(0 /* don't wait */);
if (!outbuf) {
// FIFO is full. Drop this block.
dropped += samples_read;
sampleCounter += samples_read;
return;
}
outbuf->flags = 0;
if (dropped) {
// We previously dropped some samples due to no buffers being available
outbuf->flags |= MAGBUF_DISCONTINUOUS;
outbuf->dropped = dropped;
}
dropped = 0;
// Compute the sample timestamp and system timestamp for the start of the block // Compute the sample timestamp and system timestamp for the start of the block
outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate; outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate;
sampleCounter += slen; sampleCounter += samples_read;
// Get the approx system time for the start of this block // Get the approx system time for the start of this block
block_duration = 1e3 * slen / Modes.sample_rate; uint64_t block_duration = 1e3 * samples_read / Modes.sample_rate;
outbuf->sysTimestamp = mstime() - block_duration; outbuf->sysTimestamp = mstime() - block_duration;
// Copy trailing data from last block (or reset if not valid) // Convert the new data
if (outbuf->dropped == 0) { unsigned to_convert = samples_read;
memcpy(outbuf->data, lastbuf->data + lastbuf->length, Modes.trailing_samples * sizeof(uint16_t)); if (to_convert + outbuf->overlap > outbuf->totalLength) {
} else { // how did that happen?
memset(outbuf->data, 0, Modes.trailing_samples * sizeof(uint16_t)); to_convert = outbuf->totalLength - outbuf->overlap;
dropped = samples_read - to_convert;
} }
// Convert the new data
outbuf->length = slen;
RTLSDR.converter(buf, &outbuf->data[Modes.trailing_samples], slen, RTLSDR.converter_state, &outbuf->mean_level, &outbuf->mean_power);
// Push the new data to the demodulation thread
pthread_mutex_lock(&Modes.data_mutex);
Modes.mag_buffers[next_free_buffer].dropped = 0; RTLSDR.converter(buf, &outbuf->data[outbuf->overlap], to_convert, RTLSDR.converter_state, &outbuf->mean_level, &outbuf->mean_power);
Modes.mag_buffers[next_free_buffer].length = 0; // just in case
Modes.first_free_buffer = next_free_buffer;
pthread_cond_signal(&Modes.data_cond); outbuf->validLength = outbuf->overlap + to_convert;
pthread_mutex_unlock(&Modes.data_mutex);
// Push to the demodulation thread
fifo_enqueue(outbuf);
} }
void rtlsdrRun() void rtlsdrRun()

7
util.c
View File

@ -90,6 +90,13 @@ void normalize_timespec(struct timespec *ts)
} }
} }
void get_deadline(uint32_t timeout_ms, struct timespec *ts)
{
clock_gettime(CLOCK_REALTIME, ts);
ts->tv_nsec += timeout_ms * 1000000;
normalize_timespec(ts);
}
/* record current CPU time in start_time */ /* record current CPU time in start_time */
void start_cpu_timing(struct timespec *start_time) void start_cpu_timing(struct timespec *start_time)
{ {

3
util.h
View File

@ -45,6 +45,9 @@ int64_t receiveclock_ms_elapsed(uint64_t t1, uint64_t t2);
struct timespec; struct timespec;
void normalize_timespec(struct timespec *ts); void normalize_timespec(struct timespec *ts);
/* Find the absolute system time that is `timeout_ms` milliseconds in the future, and store that in *ts */
void get_deadline(uint32_t timeout_ms, struct timespec *ts);
/* record current CPU time in start_time */ /* record current CPU time in start_time */
void start_cpu_timing(struct timespec *start_time); void start_cpu_timing(struct timespec *start_time);

View File

@ -66,9 +66,6 @@ static void view1090InitConfig(void) {
// //
static void view1090Init(void) { static void view1090Init(void) {
pthread_mutex_init(&Modes.data_mutex,NULL);
pthread_cond_init(&Modes.data_cond,NULL);
#ifdef _WIN32 #ifdef _WIN32
if ( (!Modes.wsaData.wVersion) if ( (!Modes.wsaData.wVersion)
&& (!Modes.wsaData.wHighVersion) ) { && (!Modes.wsaData.wHighVersion) ) {