(Re)implement the FlightAware TSV service.

This commit is contained in:
Karl Lehenbauer 2014-03-19 12:50:29 +00:00
parent 807c1aba7f
commit ed17d9629e
5 changed files with 222 additions and 13 deletions

View File

@ -76,6 +76,7 @@ void modesInitConfig(void) {
Modes.net_output_beast_port = MODES_NET_OUTPUT_BEAST_PORT;
Modes.net_input_beast_port = MODES_NET_INPUT_BEAST_PORT;
Modes.net_http_port = MODES_NET_HTTP_PORT;
Modes.net_fatsv_port = MODES_NET_OUTPUT_FA_TSV_PORT;
Modes.interactive_rows = getTermRows();
Modes.interactive_delete_ttl = MODES_INTERACTIVE_DELETE_TTL;
Modes.interactive_display_ttl = MODES_INTERACTIVE_DISPLAY_TTL;
@ -405,6 +406,7 @@ void showHelp(void) {
"--modeac Enable decoding of SSR Modes 3/A & 3/C\n"
"--net-beast TCP raw output in Beast binary format\n"
"--net-only Enable just networking, no RTL device or file used\n"
"--net-fatsv-port <port> FlightAware TSV output port (default: 10001)\n"
"--net-http-port <port> HTTP server port (default: 8080)\n"
"--net-ri-port <port> TCP raw input listen port (default: 30001)\n"
"--net-ro-port <port> TCP raw output listen port (default: 30002)\n"
@ -450,6 +452,7 @@ void showHelp(void) {
void backgroundTasks(void) {
if (Modes.net) {
modesReadFromClients();
showFlightsFATSV();
}
// If Modes.aircrafts is not NULL, remove any stale aircraft
@ -524,6 +527,8 @@ int main(int argc, char **argv) {
Modes.net_input_beast_port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--net-http-port") && more) {
Modes.net_http_port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--net-fatsv-port") && more) {
Modes.net_fatsv_port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--net-sbs-port") && more) {
Modes.net_output_sbs_port = atoi(argv[++j]);
} else if (!strcmp(argv[j],"--onlyaddr")) {
@ -719,3 +724,5 @@ int main(int argc, char **argv) {
//
//=========================================================================
//
// vim: set ts=4 sw=4 sts=4 expandtab :

View File

@ -165,7 +165,7 @@
#define MODES_NET_HEARTBEAT_RATE 900 // Each block is approx 65mS - default is > 1 min
#define MODES_NET_SERVICES_NUM 6
#define MODES_NET_SERVICES_NUM 7
#define MODES_NET_MAX_FD 1024
#define MODES_NET_INPUT_RAW_PORT 30001
#define MODES_NET_OUTPUT_RAW_PORT 30002
@ -173,6 +173,7 @@
#define MODES_NET_INPUT_BEAST_PORT 30004
#define MODES_NET_OUTPUT_BEAST_PORT 30005
#define MODES_NET_HTTP_PORT 8080
#define MODES_NET_OUTPUT_FA_TSV_PORT 10001
#define MODES_CLIENT_BUF_SIZE 1024
#define MODES_NET_SNDBUF_SIZE (1024*64)
@ -190,6 +191,7 @@ struct client {
int service; // TCP port the client is connected to
int buflen; // Amount of data on buffer
char buf[MODES_CLIENT_BUF_SIZE+1]; // Read buffer
char tsvVerbatim[MODES_CLIENT_BUF_SIZE+1]; // data to be quoted in TSV out
};
// Structure used to describe an aircraft in iteractive mode
@ -212,6 +214,10 @@ struct aircraft {
long modeCcount; // Mode C Altitude hit Count
int modeACflags; // Flags for mode A/C recognition
int fatsv_emitted_altitude; // last FA emitted altitude
int fatsv_emitted_track; // last FA emitted angle of flight
time_t fatsv_last_emitted; // time aircraft was last FA emitted
// Encoded latitude and longitude as extracted by odd and even CPR encoded messages
int odd_cprlat;
int odd_cprlon;
@ -221,6 +227,7 @@ struct aircraft {
uint64_t even_cprtime;
double lat, lon; // Coordinated obtained from CPR encoded data
int bFlags; // Flags related to valid fields in this structure
struct client *tsvClient; // client that was last source for this
struct aircraft *next; // Next aircraft in our linked list
};
@ -258,6 +265,7 @@ struct { // Internal state
char aneterr[ANET_ERR_LEN];
struct client *clients[MODES_NET_MAX_FD]; // Our clients
int maxfd; // Greatest fd currently active
int fatsvos; // FlightAware TSV listening socket
int sbsos; // SBS output listening socket
int ros; // Raw output listening socket
int ris; // Raw input listening socket
@ -294,6 +302,7 @@ struct { // Internal state
int net_output_beast_port; // Beast output TCP port
int net_input_beast_port; // Beast input TCP port
int net_http_port; // HTTP port
int net_fatsv_port; // FlightAware TSV port
int quiet; // Suppress stdout
int interactive; // Interactive mode
int interactive_rows; // Interactive mode: max number of rows
@ -330,6 +339,7 @@ struct { // Internal state
unsigned int stat_http_requests;
unsigned int stat_sbs_connections;
unsigned int stat_fatsv_connections;
unsigned int stat_raw_connections;
unsigned int stat_beast_connections;
unsigned int stat_out_of_phase;
@ -412,7 +422,7 @@ int ModeAToModeC (unsigned int ModeA);
void detectModeS (uint16_t *m, uint32_t mlen);
void decodeModesMessage (struct modesMessage *mm, unsigned char *msg);
void displayModesMessage(struct modesMessage *mm);
void useModesMessage (struct modesMessage *mm);
void useModesMessage (struct modesMessage *mm, struct client *c);
void computeMagnitudeVector(uint16_t *pData);
void decodeCPR (struct aircraft *a, int fflag, int surface);
int decodeCPRrelative (struct aircraft *a, int fflag, int surface);
@ -420,7 +430,7 @@ void modesInitErrorInfo ();
//
// Functions exported from interactive.c
//
struct aircraft* interactiveReceiveData(struct modesMessage *mm);
struct aircraft* interactiveReceiveData(struct modesMessage *mm, struct client *c);
void interactiveShowData(void);
void interactiveRemoveStaleAircrafts(void);
int decodeBinMessage (struct client *c, char *p);
@ -433,9 +443,12 @@ void modesReadFromClients (void);
void modesSendAllClients (int service, void *msg, int len);
void modesQueueOutput (struct modesMessage *mm);
void modesReadFromClient(struct client *c, char *sep, int(*handler)(struct client *, char *));
void showFlightsFATSV(void);
#ifdef __cplusplus
}
#endif
#endif // __DUMP1090_H
// vim: set ts=4 sw=4 sts=4 expandtab :

View File

@ -175,7 +175,7 @@ void interactiveUpdateAircraftModeS() {
//
// Receive new messages and populate the interactive mode with more info
//
struct aircraft *interactiveReceiveData(struct modesMessage *mm) {
struct aircraft *interactiveReceiveData(struct modesMessage *mm, struct client *c) {
struct aircraft *a, *aux;
// Return if (checking crc) AND (not crcok) AND (not fixed)
@ -211,6 +211,7 @@ struct aircraft *interactiveReceiveData(struct modesMessage *mm) {
a->seen = time(NULL);
a->timestamp = mm->timestampMsg;
a->messages++;
a->tsvClient = c;
// If a (new) CALLSIGN has been received, copy it to the aircraft structure
if (mm->bFlags & MODES_ACFLAGS_CALLSIGN_VALID) {
@ -470,3 +471,5 @@ void interactiveRemoveStaleAircrafts(void) {
//
//=========================================================================
//
// vim: set ts=4 sw=4 sts=4 expandtab :

View File

@ -1514,7 +1514,7 @@ void detectModeS(uint16_t *m, uint32_t mlen) {
decodeModeAMessage(&mm, ModeA);
// Pass data to the next layer
useModesMessage(&mm);
useModesMessage(&mm, NULL);
j += MODEAC_MSG_SAMPLES;
Modes.stat_ModeAC++;
@ -1778,7 +1778,7 @@ void detectModeS(uint16_t *m, uint32_t mlen) {
}
// Pass data to the next layer
useModesMessage(&mm);
useModesMessage(&mm, NULL);
} else {
if (Modes.debug & MODES_DEBUG_DEMODERR && use_correction) {
@ -1842,11 +1842,11 @@ void detectModeS(uint16_t *m, uint32_t mlen) {
// Basically this function passes a raw message to the upper layers for further
// processing and visualization
//
void useModesMessage(struct modesMessage *mm) {
void useModesMessage(struct modesMessage *mm, struct client *c) {
if ((Modes.check_crc == 0) || (mm->crcok) || (mm->correctedbits)) { // not checking, ok or fixed
// Always track aircraft
interactiveReceiveData(mm);
interactiveReceiveData(mm, c);
// In non-interactive non-quiet mode, display messages on standard output
if (!Modes.interactive && !Modes.quiet) {
@ -2092,4 +2092,6 @@ int decodeCPRrelative(struct aircraft *a, int fflag, int surface) {
}
//
// ===================== Mode S detection and decoding ===================
//
//
// vim: set ts=4 sw=4 sts=4 expandtab :

192
net_io.c
View File

@ -57,7 +57,8 @@ void modesInitNet(void) {
{"Beast TCP output", &Modes.bos, Modes.net_output_beast_port},
{"Beast TCP input", &Modes.bis, Modes.net_input_beast_port},
{"HTTP server", &Modes.https, Modes.net_http_port},
{"Basestation TCP output", &Modes.sbsos, Modes.net_output_sbs_port}
{"Basestation TCP output", &Modes.sbsos, Modes.net_output_sbs_port},
{"FlightAware TSV output", &Modes.fatsvos, MODES_NET_OUTPUT_FA_TSV_PORT}
};
int j;
@ -87,7 +88,7 @@ void modesAcceptClients(void) {
int fd, port;
unsigned int j;
struct client *c;
int services[6];
int services[MODES_NET_SERVICES_NUM];
services[0] = Modes.ros;
services[1] = Modes.ris;
@ -95,6 +96,7 @@ void modesAcceptClients(void) {
services[3] = Modes.bis;
services[4] = Modes.https;
services[5] = Modes.sbsos;
services[6] = Modes.fatsvos;
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
fd = anetTcpAccept(Modes.aneterr, services[j], NULL, &port);
@ -110,11 +112,13 @@ void modesAcceptClients(void) {
c->service = services[j];
c->fd = fd;
c->buflen = 0;
c->tsvVerbatim[0] = '\0';
Modes.clients[fd] = c;
anetSetSendBuffer(Modes.aneterr,fd,MODES_NET_SNDBUF_SIZE);
if (Modes.maxfd < fd) Modes.maxfd = fd;
if (services[j] == Modes.sbsos) Modes.stat_sbs_connections++;
if (services[j] == Modes.fatsvos) Modes.stat_fatsv_connections++;
if (services[j] == Modes.ros) Modes.stat_raw_connections++;
if (services[j] == Modes.bos) Modes.stat_beast_connections++;
@ -134,6 +138,9 @@ void modesFreeClient(int fd) {
if (Modes.clients[fd]->service == Modes.sbsos) {
if (Modes.stat_sbs_connections) Modes.stat_sbs_connections--;
}
else if (Modes.clients[fd]->service == Modes.fatsvos) {
if (Modes.stat_fatsv_connections) Modes.stat_fatsv_connections--;
}
else if (Modes.clients[fd]->service == Modes.ros) {
if (Modes.stat_raw_connections) Modes.stat_raw_connections--;
}
@ -473,7 +480,7 @@ int decodeBinMessage(struct client *c, char *p) {
decodeModesMessage(&mm, msg);
}
useModesMessage(&mm);
useModesMessage(&mm, c);
}
return (0);
}
@ -510,6 +517,8 @@ int decodeHexMessage(struct client *c, char *hex) {
MODES_NOTUSED(c);
memset(&mm, 0, sizeof(mm));
// printf("Decode hex message:%s\n", hex);
// Mark messages received over the internet as remote so that we don't try to
// pass them off as being received by this instance when forwarding them
mm.remote = 1;
@ -519,6 +528,7 @@ int decodeHexMessage(struct client *c, char *hex) {
while(l && isspace(hex[l-1])) {
hex[l-1] = '\0'; l--;
}
while(isspace(*hex)) {
hex++; l--;
}
@ -534,6 +544,11 @@ int decodeHexMessage(struct client *c, char *hex) {
hex += 15; l -= 16; // Skip <, timestamp and siglevel, and ;
break;}
// if the preamble is #, copy verbatim string
case '#': {
strncpy (c->tsvVerbatim, hex+1, l-1);
break;}
case '@': // No CRC check
case '%': { // CRC is OK
hex += 13; l -= 14; // Skip @,%, and timestamp, and ;
@ -572,7 +587,7 @@ int decodeHexMessage(struct client *c, char *hex) {
decodeModesMessage(&mm, msg);
}
useModesMessage(&mm);
useModesMessage(&mm, c);
return (0);
}
//
@ -905,6 +920,175 @@ void modesReadFromClients(void) {
modesReadFromClient(c,"\r\n\r\n",handleHTTPRequest);
}
}
#define TSV_BUFFER_SIZE 8192
#define TSV_MAX_PACKET_SIZE 160
void showFlightsFATSV(void) {
struct aircraft *a = Modes.aircrafts;
time_t now = time(NULL);
int age, emittedSecondsAgo;
static time_t lastTime = 0;
char msg[TSV_BUFFER_SIZE], *p = msg;
int nCombined = 0;
// don't do anything if there are no FA TSV connections
if (Modes.stat_fatsv_connections == 0) {
return;
}
if (now - lastTime < 5) {
return;
}
for(a = Modes.aircrafts; a; a = a->next) {
int altValid = 0;
int alt = 0;
int groundValid = 0;
int ground = 0;
int latlonValid = 0;
if (0 && a->modeACflags & MODEAC_MSG_FLAG) { // skip any fudged ICAO records Mode A/C
a = a->next;
continue;
}
age = (int)(now - a->seen);
emittedSecondsAgo = (int)(now - a->fatsv_last_emitted);
// don't emit if it hasn't updated since last time
if (age > emittedSecondsAgo) {
continue;
}
// don't emit more than once every five seconds
if (emittedSecondsAgo < 5) {
continue;
}
if (a->bFlags & MODES_ACFLAGS_ALTITUDE_VALID) {
altValid = 1;
alt = a->altitude;
}
if (a->bFlags & MODES_ACFLAGS_AOG_VALID) {
groundValid = 1;
if (a->bFlags & MODES_ACFLAGS_AOG) {
alt = 0;
ground = 1;
}
}
if (a->bFlags & MODES_ACFLAGS_LATLON_VALID) {
latlonValid = 1;
}
// if it's over 10,000 feet, don't emit more than once every 10 seconds
if (alt > 10000 && emittedSecondsAgo < 10) {
continue;
}
// disable if you want only ads-b
// also don't send mode S very often
if (!latlonValid) {
if (emittedSecondsAgo < 30) {
continue;
}
} else {
// if it hasn't changed altitude very much and it hasn't changed
// heading very much, don't update real often
if (abs(a->track - a->fatsv_emitted_track) < 2 && abs(alt - a->fatsv_emitted_altitude) < 50) {
if (alt < 10000) {
// it hasn't changed much but we're below 10,000 feet
// so update more frequently
if (emittedSecondsAgo < 10) {
continue;
}
} else {
// above 10,000 feet, don't update so often when it
// hasn't changed much
if (emittedSecondsAgo < 30) {
continue;
}
}
}
}
p += sprintf(p, "clock\t%ld\thexid\t%06X", a->seen, a->addr);
// if ((a->bFlags & MODES_ACFLAGS_CALLSIGN_VALID) && *a->flight != '\0') {
if (*a->flight != '\0') {
p += sprintf(p, "\tident\t%s", a->flight);
}
if (a->bFlags & MODES_ACFLAGS_SQUAWK_VALID) {
p += sprintf(p, "\tsquawk\t%04x", a->modeA);
}
if (altValid) {
p += sprintf(p, "\talt\t%d", alt);
}
if (a->bFlags & MODES_ACFLAGS_SPEED_VALID) {
p += sprintf(p, "\tspeed\t%d", a->speed);
}
if (groundValid) {
if (ground) {
p += sprintf(p, "\tairGround\tG");
} else {
p += sprintf(p, "\tairGround\tA");
}
}
if (a->lat != 0.0 || a->lon != 0.0) {
p += sprintf(p, "\tlat\t%.5f\tlon\t%.5f", a->lat, a->lon);
}
if (a->bFlags & MODES_ACFLAGS_HEADING_VALID) {
p += sprintf(p, "\theading\t%d", a->track);
}
// if there's a verbatim string and it's not null, append it to the
// message
if (a->tsvClient && a->tsvClient->tsvVerbatim && *a->tsvClient->tsvVerbatim != '\0') {
p += sprintf(p, "\t%s", a->tsvClient->tsvVerbatim);
}
p += sprintf(p, "\n");
a->fatsv_last_emitted = now;
a->fatsv_emitted_altitude = alt;
a->fatsv_emitted_track = a->track;
nCombined++;
// we are aggregating multiple messages into the buffer...
// if the buffer is getting pretty full, send what we've assembled
if (p - msg > TSV_BUFFER_SIZE - TSV_MAX_PACKET_SIZE) {
modesSendAllClients(Modes.fatsvos, msg, p-msg);
p = msg;
// printf("combined %d updates into one %ld-byte packet\n", nCombined, p-msg);
nCombined = 0;
}
}
// we've looked at all the candidate aircraft,
// if there's anything in the buffer, send it
if (p != msg) {
modesSendAllClients(Modes.fatsvos, msg, p-msg);
// printf("combined %d updates into one %ld-byte packet\n", nCombined, p-msg);
}
lastTime = now;
}
//
// =============================== Network IO ===========================
//
// vim: set ts=4 sw=4 sts=4 expandtab :