Factor out sdr thread CPU monitoring

This commit is contained in:
Oliver Jowett 2020-08-05 12:00:50 +08:00
parent f82b7b7a8c
commit 8b21104d66
11 changed files with 64 additions and 43 deletions

View File

@ -382,6 +382,9 @@ static void backgroundTasks(void) {
interactiveShowData();
}
// copy out reader CPU time and reset it
sdrUpdateCPUTime(&Modes.stats_current.reader_cpu);
// always update end time so it is current when requests arrive
Modes.stats_current.end = mstime();
@ -703,11 +706,6 @@ int main(int argc, char **argv) {
// Modes.data_mutex is locked, and possibly we have data.
// copy out reader CPU time and reset it
add_timespecs(&Modes.reader_cpu_accumulator, &Modes.stats_current.reader_cpu, &Modes.stats_current.reader_cpu);
Modes.reader_cpu_accumulator.tv_sec = 0;
Modes.reader_cpu_accumulator.tv_nsec = 0;
if (Modes.first_free_buffer != Modes.first_filled_buffer) {
// FIFO is not empty, process one buffer.

View File

@ -301,7 +301,9 @@ struct _Modes { // Internal state
struct mag_buf mag_buffers[MODES_MAG_BUFFERS]; // Converted magnitude buffers from RTL or file input
unsigned first_free_buffer; // Entry in mag_buffers that will next be filled with input.
unsigned first_filled_buffer; // Entry in mag_buffers that has valid data and will be demodulated next. If equal to next_free_buffer, there is no unprocessed data.
struct timespec reader_cpu_accumulator; // CPU time used by the reader thread, copied out and reset by the main thread under the mutex
pthread_mutex_t reader_cpu_mutex; // mutex protecting reader_cpu_accumulator
struct timespec reader_cpu_accumulator; // accumulated CPU time used by the reader thread
struct timespec reader_cpu_start; // start time for the last reader thread CPU measurement
unsigned trailing_samples; // extra trailing samples in magnitude buffers
double sample_rate; // actual sample rate in use (in hz)

29
sdr.c
View File

@ -168,16 +168,45 @@ static sdr_handler *current_handler()
bool sdrOpen()
{
pthread_mutex_init(&Modes.reader_cpu_mutex, NULL);
return current_handler()->open();
}
void sdrRun()
{
set_thread_name("dump1090-sdr");
pthread_mutex_lock(&Modes.reader_cpu_mutex);
Modes.reader_cpu_accumulator.tv_sec = 0;
Modes.reader_cpu_accumulator.tv_nsec = 0;
start_cpu_timing(&Modes.reader_cpu_start);
pthread_mutex_unlock(&Modes.reader_cpu_mutex);
current_handler()->run();
pthread_mutex_lock(&Modes.reader_cpu_mutex);
end_cpu_timing(&Modes.reader_cpu_start, &Modes.reader_cpu_accumulator);
pthread_mutex_unlock(&Modes.reader_cpu_mutex);
}
void sdrClose()
{
pthread_mutex_destroy(&Modes.reader_cpu_mutex);
current_handler()->close();
}
void sdrMonitor()
{
pthread_mutex_lock(&Modes.reader_cpu_mutex);
update_cpu_timing(&Modes.reader_cpu_start, &Modes.reader_cpu_accumulator);
pthread_mutex_unlock(&Modes.reader_cpu_mutex);
}
void sdrUpdateCPUTime(struct timespec *addTo)
{
pthread_mutex_lock(&Modes.reader_cpu_mutex);
add_timespecs(&Modes.reader_cpu_accumulator, addTo, addTo);
Modes.reader_cpu_accumulator.tv_sec = 0;
Modes.reader_cpu_accumulator.tv_nsec = 0;
pthread_mutex_unlock(&Modes.reader_cpu_mutex);
}

5
sdr.h
View File

@ -30,4 +30,9 @@ bool sdrOpen();
void sdrRun();
void sdrClose();
// Call periodically from the SDR read thread to update reader thread CPU stats:
void sdrMonitor();
// Retrieve CPU stats and add new CPU time to *addTo
void sdrUpdateCPUTime(struct timespec *addTo);
#endif

View File

@ -300,7 +300,6 @@ bool bladeRFOpen()
return false;
}
static struct timespec thread_cpu;
static unsigned timeouts = 0;
static void *handle_bladerf_samples(struct bladerf *dev,
@ -322,6 +321,8 @@ static void *handle_bladerf_samples(struct bladerf *dev,
// record initial time for later sys timestamp calculation
uint64_t entryTimestamp = mstime();
sdrMonitor();
pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) {
pthread_mutex_unlock(&Modes.data_mutex);
@ -430,10 +431,6 @@ static void *handle_bladerf_samples(struct bladerf *dev,
// Push the new data to the demodulation thread
pthread_mutex_lock(&Modes.data_mutex);
// accumulate CPU while holding the mutex, and restart measurement
end_cpu_timing(&thread_cpu, &Modes.reader_cpu_accumulator);
start_cpu_timing(&thread_cpu);
Modes.mag_buffers[next_free_buffer].dropped = 0;
Modes.mag_buffers[next_free_buffer].length = 0; // just in case
Modes.first_free_buffer = next_free_buffer;
@ -482,8 +479,6 @@ void bladeRFRun()
goto out;
}
start_cpu_timing(&thread_cpu);
timeouts = 0; // reset to zero when we get a callback with some data
retry:
if ((status = bladerf_stream(stream, BLADERF_MODULE_RX)) < 0) {

View File

@ -195,8 +195,6 @@ bool hackRFOpen()
return true;
}
struct timespec thread_cpu;
static int handle_hackrf_samples(hackrf_transfer *transfer)
{
struct mag_buf *outbuf;
@ -211,6 +209,8 @@ static int handle_hackrf_samples(hackrf_transfer *transfer)
static int dropping = 0;
static uint64_t sampleCounter = 0;
sdrMonitor();
// Lock the data buffer variables before accessing them
pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) {
@ -271,10 +271,6 @@ static int handle_hackrf_samples(hackrf_transfer *transfer)
Modes.mag_buffers[next_free_buffer].length = 0; // just in case
Modes.first_free_buffer = next_free_buffer;
// accumulate CPU while holding the mutex, and restart measurement
end_cpu_timing(&thread_cpu, &Modes.reader_cpu_accumulator);
start_cpu_timing(&thread_cpu);
pthread_cond_signal(&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex);
@ -289,8 +285,6 @@ void hackRFRun()
return;
}
start_cpu_timing(&thread_cpu);
int status = hackrf_start_rx(HackRF.device, &handle_hackrf_samples, NULL);
if (status != 0) {

View File

@ -179,9 +179,6 @@ void ifileRun()
int eof = 0;
struct timespec next_buffer_delivery;
struct timespec thread_cpu;
start_cpu_timing(&thread_cpu);
uint64_t sampleCounter = 0;
clock_gettime(CLOCK_MONOTONIC, &next_buffer_delivery);
@ -205,6 +202,8 @@ void ifileRun()
lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
pthread_mutex_unlock(&Modes.data_mutex);
sdrMonitor();
// Compute the sample timestamp for the start of the block
outbuf->sampleTimestamp = sampleCounter * 12e6 / Modes.sample_rate;
sampleCounter += MODES_MAG_BUF_SAMPLES;
@ -253,9 +252,6 @@ void ifileRun()
// Push the new data to the main thread
pthread_mutex_lock(&Modes.data_mutex);
Modes.first_free_buffer = next_free_buffer;
// accumulate CPU while holding the mutex, and restart measurement
end_cpu_timing(&thread_cpu, &Modes.reader_cpu_accumulator);
start_cpu_timing(&thread_cpu);
pthread_cond_signal(&Modes.data_cond);
}

View File

@ -315,8 +315,6 @@ bool limesdrOpen(void)
return false;
}
static struct timespec limesdr_thread_cpu;
static void limesdrCallback(unsigned char *buf, uint32_t len, void *ctx)
{
struct mag_buf *outbuf;
@ -331,6 +329,8 @@ static void limesdrCallback(unsigned char *buf, uint32_t len, void *ctx)
MODES_NOTUSED(ctx);
sdrMonitor();
// Lock the data buffer variables before accessing them
pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) {
@ -396,10 +396,6 @@ static void limesdrCallback(unsigned char *buf, uint32_t len, void *ctx)
Modes.mag_buffers[next_free_buffer].length = 0; // just in case
Modes.first_free_buffer = next_free_buffer;
// accumulate CPU while holding the mutex, and restart measurement
end_cpu_timing(&limesdr_thread_cpu, &Modes.reader_cpu_accumulator);
start_cpu_timing(&limesdr_thread_cpu);
pthread_cond_signal(&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex);
}
@ -414,8 +410,6 @@ void limesdrRun()
LMS_StartStream(&LimeSDR.stream);
start_cpu_timing(&limesdr_thread_cpu);
while (!LimeSDR.is_stop) {
int sampleCnt = LMS_RecvStream(&LimeSDR.stream, buffer, MODES_RTL_BUF_SIZE / LimeSDR.bytes_in_sample, NULL, 1000);
if (sampleCnt) {

View File

@ -265,8 +265,6 @@ bool rtlsdrOpen(void) {
return true;
}
static struct timespec rtlsdr_thread_cpu;
static void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
struct mag_buf *outbuf;
struct mag_buf *lastbuf;
@ -280,6 +278,8 @@ static void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
MODES_NOTUSED(ctx);
sdrMonitor();
// Lock the data buffer variables before accessing them
pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) {
@ -346,10 +346,6 @@ static void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
Modes.mag_buffers[next_free_buffer].length = 0; // just in case
Modes.first_free_buffer = next_free_buffer;
// accumulate CPU while holding the mutex, and restart measurement
end_cpu_timing(&rtlsdr_thread_cpu, &Modes.reader_cpu_accumulator);
start_cpu_timing(&rtlsdr_thread_cpu);
pthread_cond_signal(&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex);
}
@ -360,8 +356,6 @@ void rtlsdrRun()
return;
}
start_cpu_timing(&rtlsdr_thread_cpu);
rtlsdr_read_async(RTLSDR.dev, rtlsdrCallback, NULL,
/* MODES_RTL_BUFFERS */ 4,
MODES_RTL_BUF_SIZE);

11
util.c
View File

@ -106,6 +106,17 @@ void end_cpu_timing(const struct timespec *start_time, struct timespec *add_to)
normalize_timespec(add_to);
}
/* add difference between start_time and the current CPU time to add_to; then store the current CPU time in start_time */
void update_cpu_timing(struct timespec *start_time, struct timespec *add_to)
{
struct timespec end_time;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &end_time);
add_to->tv_sec += end_time.tv_sec - start_time->tv_sec;
add_to->tv_nsec += end_time.tv_nsec - start_time->tv_nsec;
normalize_timespec(add_to);
*start_time = end_time;
}
void set_thread_name(const char *name)
{
#if (__GLIBC__ > 2) || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 12)

3
util.h
View File

@ -51,6 +51,9 @@ void start_cpu_timing(struct timespec *start_time);
/* add difference between start_time and the current CPU time to add_to */
void end_cpu_timing(const struct timespec *start_time, struct timespec *add_to);
/* like end_cpu_timing followed by start_cpu_timing, but without a gap */
void update_cpu_timing(struct timespec *start_time, struct timespec *add_to);
/* set current thread name, if supported */
void set_thread_name(const char *name);