The CachedDataServer accepts timestamped field:value data, holds it in an in-memory cache, and serves it to clients over WebSockets. It also accepts plain HTTP GET requests on the same port for simple one-off queries. It is used to feed display widgets, to provide intermediate caching for derived-data transforms, and as a general-purpose data bus between OpenRVDAS components.

In the default OpenRVDAS installation a CachedDataServer is already running and listening on port 8766.


Running the Server

Default installation

The default installation uses supervisord to start and maintain the server:

server/cached_data_server.py --port 8766 \
    --disk_cache /var/tmp/openrvdas/disk_cache \
    --max_records 86400 -v

This serves WebSocket connections on port 8766, retains at most 86400 records per field (one day at 1 Hz), and maintains a disk cache at /var/tmp/openrvdas/disk_cache that is used to warm the in-memory cache on restart. It does not listen on a UDP port — all data arrives via WebSocket publish messages.

Stdout and stderr are written to /var/log/openrvdas/cached_data_server.std{out,err}. The full supervisord spec is in /etc/supervisor/conf.d/openrvdas.conf (Ubuntu) or /etc/supervisord.d/openrvdas.ini (CentOS/Redhat).

To start, stop, or restart the server, use the supervisord web interface at http://openrvdas:9001 or the command line:

root@openrvdas:~# supervisorctl
cached_data_server               RUNNING   pid 5641, uptime 1:35:54
logger_manager                   RUNNING   pid 5646, uptime 1:35:53

supervisor> stop cached_data_server
cached_data_server: stopped

supervisor> start cached_data_server
cached_data_server: started

supervisor> exit

Manual invocation

You can also run the server directly. If you are running a LoggerManager, pass --start_data_server to have it start its own CachedDataServer automatically.

A typical standalone invocation:

server/cached_data_server.py \
  --port 8766 \
  --disk_cache /var/tmp/openrvdas/disk_cache \
  --back_seconds 3600 \
  --cleanup_interval 60 \
  -v

Server command-line flags

Flag Default Description
--port (required) WebSocket port to serve clients on
--udp (none) (deprecated) Comma-separated UDP port(s) to listen for incoming data on. Prefix with a multicast group to use multicast, e.g. 239.0.0.1:6225
--disk_cache (none) Directory for the disk-backed cache. On restart, data is reloaded from here to warm the in-memory cache
--back_seconds 86400 Maximum age (seconds) of data to retain
--max_records 2880 Maximum number of records to retain per field. Set to 0 for unlimited
--min_back_records 64 Minimum number of records to keep per field even when purging old data
--cleanup_interval 60 How often (seconds) to purge old data and flush the disk cache
--interval 0.5 How often (seconds) the server pushes updates to subscribed clients

The WebSocket Protocol

All interaction with the CachedDataServer — reading and writing — happens over a single WebSocket connection. Connect to ws://host:8766 and exchange JSON messages. Every message you send has a "type" field; every response from the server has "type", "status" (HTTP-style, e.g. 200), and "data" fields.

fields — list available fields

{"type": "fields"}

Returns a list of all field names currently held in the cache:

{"type": "data", "status": 200, "data": ["S330CourseTrue", "S330SpeedKt", ...]}

describe — get field metadata

{"type": "describe", "fields": ["S330CourseTrue", "S330SpeedKt"]}

Returns a dict of metadata (units, description, device, etc.) for each named field. Omit "fields" to get metadata for every field in the cache:

{
  "type": "data", "status": 200,
  "data": {
    "S330CourseTrue": {"description": "True course", "units": "degrees", ...},
    "S330SpeedKt":   {"description": "Speed in knots", "units": "kt", ...}
  }
}

subscribe — stream field updates

Subscribing is a two-step loop: send a subscribe message once to register interest, then send ready repeatedly to receive successive batches of updates.

{
  "type": "subscribe",
  "fields": {
    "S330CourseTrue": {"seconds": 30},
    "S330SpeedKt":    {"seconds": 0},
    "S330HeadingTrue":{"seconds": -1}
  }
}

The "seconds" value controls how much historical data is returned in the first response:

seconds value Meaning
0 Only new values that arrive after the subscription
-1 The single most recent value, then all future new values
N (positive) Up to N seconds of historical data, then all future new values

If "seconds" is omitted, 0 is used.

back_records (optional, per-field): guarantee a minimum number of historical records regardless of the seconds window. Useful when data arrives irregularly:

{"S330CourseTrue": {"seconds": 60, "back_records": 10}}

interval (optional, top-level): override the server’s default push interval (seconds). Useful for low-bandwidth clients or slow-changing data:

{
  "type": "subscribe",
  "fields": {"S330CourseTrue": {"seconds": 0}},
  "interval": 15
}

Wildcards: field names may contain * to match multiple fields:

{"type": "subscribe", "fields": {"S330*": {"seconds": -1}}}

format (optional, top-level): controls the shape of the data payload in each response.

"field_dict" (default) — a dict mapping each field name to a list of [timestamp, value] pairs:

{
  "S330CourseTrue": [[1714000000.0, 219.6], [1714000001.0, 219.7], ...],
  "S330SpeedKt":   [[1714000000.0, 8.9],   [1714000001.0, 8.9],   ...]
}

"record_list" — collated by timestamp into a list of DASRecord-like dicts. Useful when processing records in time order across multiple fields:

{
  "type": "subscribe",
  "fields": {"S330CourseTrue": {"seconds": 30}, "S330SpeedKt": {"seconds": 30}},
  "format": "record_list"
}

Response data:

[
  {"timestamp": 1714000000.0, "fields": {"S330CourseTrue": 219.6, "S330SpeedKt": 8.9}},
  {"timestamp": 1714000001.0, "fields": {"S330CourseTrue": 219.7, "S330SpeedKt": 8.9}},
  ...
]

ready — acknowledge and receive the next batch

After the initial subscribe response, send ready each time you are prepared to receive the next update:

{"type": "ready"}

The server responds with a data message containing all field values that have arrived since the previous ready. This back-pressure mechanism prevents a slow client from being overwhelmed with buffered data.

publish — write data into the cache

Any WebSocket client can push data into the cache using a publish message:

{
  "type": "publish",
  "data": {
    "timestamp": 1555468528.452,
    "fields": {
      "field_1": "value_1",
      "field_2": "value_2"
    }
  }
}

This is the mechanism used by the CachedDataWriter component to feed data into the server.


HTTP GET API

Requires websockets >= 12.

In addition to the WebSocket protocol, the server accepts plain HTTP GET requests on the same port. This lets shell scripts, elog systems, and other simple tools retrieve the latest cached values without implementing a WebSocket handshake.

GET /fields — list available fields

GET http://localhost:8766/fields

Returns a JSON object listing all field names currently in the cache:

{"fields": ["S330CourseTrue", "S330SpeedKt", "MwxAirTemp", ...]}

Example:

curl http://localhost:8766/fields

GET /latest/<fields> — get latest value(s)

GET http://localhost:8766/latest/<field_1>[,<field_2>,...]

Returns the most recent cached timestamp and value for each requested field:

{
  "S330CourseTrue": {"timestamp": 1555468528.452, "value": 219.61},
  "S330SpeedKt":    {"timestamp": 1555468530.001, "value": 8.9},
  "MwxAirTemp":     null
}

Fields not present in the cache are returned as null. All other HTTP requests receive 400 Bad Request.

Examples:

# Single field
curl http://localhost:8766/latest/S330CourseTrue

# Multiple fields (comma-separated, no spaces)
curl http://localhost:8766/latest/S330CourseTrue,S330SpeedKt,MwxAirTemp

When to use HTTP GET vs WebSocket

HTTP GET is the right choice for occasional, on-demand queries — populating an elog entry, a status check, or a one-shot script. For anything that needs continuous or frequent updates, use the WebSocket subscription API instead.

HTTP GET requests run inside the same asyncio event loop as all WebSocket connections. High-frequency polling (multiple requests per second) will delay WebSocket data delivery to connected clients.


Reading from the Server

listen.py

logger/listener/listen.py accepts a --cached_data argument that subscribes to one or more fields and prints received records to stdout. Useful for quick inspection and for piping CDS data into other command-line tools.

logger/listener/listen.py --cached_data field_1,field_2,field_3

Connects to localhost:8766 by default. To target a different host or port, append @host:port:

logger/listener/listen.py --cached_data S330CourseTrue,S330SpeedKt@192.168.1.10:8766

All subscribed fields use seconds: 0 — only values that arrive after the subscription starts are returned. The --cached_data flag can be combined with other listen.py transforms and writers in the usual way:

logger/listener/listen.py \
  --cached_data S330CourseTrue,S330SpeedKt \
  --transform_prefix vessel \
  --write_logfile /var/tmp/log/s330

Interactive exploration

For ad-hoc queries or protocol debugging, any interactive WebSocket client works. Two popular command-line options are wscat (Node.js) and websocat (Rust).

List all cached fields:

$ wscat -c ws://localhost:8766
Connected (press CTRL+C to quit)
> {"type":"fields"}
< {"type": "data", "status": 200, "data": ["S330CourseTrue", "S330SpeedKt", ...]}

Get the most recent value of a field and exit:

$ echo '{"type":"subscribe","fields":{"S330CourseTrue":{"seconds":-1}}}' \
  | websocat ws://localhost:8766

CachedDataReader in a YAML config file

CachedDataReader is the standard component for pulling data from the CDS inside a logger configuration.

readers:
  class: CachedDataReader
  kwargs:
    data_server: localhost:8766
    subscription:
      fields:
        S330CourseTrue:
          seconds: 0
        S330SpeedKt:
          seconds: -1
        S330HeadingTrue:
          seconds: 60

All subscription options from the WebSocket protocol — wildcards, back_records, interval, format — are available inside the subscription dict.

As a convenience, fields may be given as a list instead of a dict; all fields are then subscribed with seconds: 0:

readers:
  class: CachedDataReader
  kwargs:
    data_server: localhost:8766
    subscription:
      fields:
        - S330CourseTrue
        - S330SpeedKt
        - S330HeadingTrue

CachedDataReader parameters:

Parameter Default Description
data_server localhost:8766 Host and port of the CachedDataServer
subscription (required) Subscription dict (see above)
bundle_seconds 0 If > 0, accumulate records for this many seconds and return them as a list
return_das_record False If True, wrap results in DASRecord objects
data_id None data_id to assign to returned DASRecord objects (requires return_das_record=True)
use_wss False Connect using secure WebSockets (wss://)
check_cert False Verify the server’s TLS certificate; may be a path to a .pem file

Example — read two fields and write them to a logfile:

readers:
  class: CachedDataReader
  kwargs:
    data_server: localhost:8766
    subscription:
      fields:
        S330CourseTrue:
          seconds: 0
        S330SpeedKt:
          seconds: 0
writers:
  class: LogfileWriter
  kwargs:
    filebase: /var/tmp/log/s330_derived

CachedDataReader in Python

from logger.readers.cached_data_reader import CachedDataReader

subscription = {
    'fields': {
        'S330CourseTrue': {'seconds': 30},
        'S330SpeedKt':    {'seconds': -1},
    }
}

reader = CachedDataReader(subscription=subscription, data_server='localhost:8766')

while True:
    record = reader.read()  # blocks until data arrives
    print(record)
    # {'timestamp': ..., 'fields': {'S330CourseTrue': ..., 'S330SpeedKt': ...}}

To return DASRecord objects instead of plain dicts:

reader = CachedDataReader(
    subscription=subscription,
    data_server='localhost:8766',
    return_das_record=True,
    data_id='s330_consumer',
)
record = reader.read()  # returns a DASRecord

To accumulate a time window of records before returning:

reader = CachedDataReader(
    subscription=subscription,
    data_server='localhost:8766',
    bundle_seconds=5,
)
records = reader.read()  # returns a list of dicts

External Python (asyncio + websockets)

Any Python program can talk to the CachedDataServer directly using the websockets library:

import asyncio
import json
import websockets

async def read_fields():
    async with websockets.connect('ws://localhost:8766') as ws:
        # Subscribe
        await ws.send(json.dumps({
            'type': 'subscribe',
            'fields': {
                'S330CourseTrue': {'seconds': 60},
                'S330SpeedKt':    {'seconds': -1},
            }
        }))
        await ws.recv()  # discard subscribe acknowledgement

        # Poll for updates
        while True:
            await ws.send(json.dumps({'type': 'ready'}))
            response = json.loads(await ws.recv())
            if response.get('status') == 200:
                for field, values in response.get('data', {}).items():
                    for timestamp, value in values:
                        print(f'{field}: {value} @ {timestamp}')
            await asyncio.sleep(1)

asyncio.run(read_fields())

One-shot query for the most recent value of all fields matching a pattern:

async def latest_values(pattern='S330*'):
    async with websockets.connect('ws://localhost:8766') as ws:
        await ws.send(json.dumps({
            'type': 'subscribe',
            'fields': {pattern: {'seconds': -1}},
        }))
        await ws.recv()  # discard subscribe acknowledgement
        await ws.send(json.dumps({'type': 'ready'}))
        response = json.loads(await ws.recv())
        return response.get('data', {})

data = asyncio.run(latest_values())

JavaScript (browser)

For browser pages that connect directly to the CDS, the standard browser WebSocket API works with the subscribe/ready protocol described above:

const ws = new WebSocket('ws://localhost:8766');

ws.onopen = () => {
  ws.send(JSON.stringify({
    type: 'subscribe',
    fields: {
      S330CourseTrue: {seconds: 60},
      S330SpeedKt:    {seconds: -1},
    },
    format: 'record_list',
  }));
};

ws.onmessage = (event) => {
  const msg = JSON.parse(event.data);
  if (msg.type === 'data' && msg.status === 200) {
    // msg.data is an array of {timestamp, fields} objects
    console.log(msg.data);
  }
  ws.send(JSON.stringify({type: 'ready'}));
};

Writing to the Server

CachedDataWriter in a YAML config file

CachedDataWriter is the standard component for pushing data into the CDS from a logger pipeline:

writers:
  class: CachedDataWriter
  kwargs:
    data_server: localhost:8766

It accepts DASRecord or dict-format records and forwards them to the server via WebSocket publish messages. If the connection is lost, it buffers up to max_backup records (default: 86400) locally until it can reconnect.

Direct Python API

Code that instantiates a CachedDataServer object directly (rather than connecting to a running server) can call cache_record() directly:

from server.cached_data_server import CachedDataServer

server = CachedDataServer(port=8766)
server.cache_record({
    'timestamp': time.time(),
    'fields': {'field_1': 'value_1', 'field_2': 'value_2'}
})

UDP input (deprecated)

Deprecated. Use CachedDataWriter in a logger pipeline instead. UDP input remains functional but is no longer enabled by default and may be removed in a future release.

If the server is started with one or more --udp ports, it will also accept JSON-encoded records broadcast on those ports:

server/cached_data_server.py --port 8766 --udp 6225

The default supervisord installation does not enable UDP input. To enable it, uncomment the relevant line in scripts/start_openrvdas.sh:

#DATA_SERVER_LISTEN_ON_UDP='--udp $DATA_SERVER_UDP_PORT'

Input Data Formats

Whether data arrives via UDP, WebSocket publish, or direct cache_record() call, the server expects records in one of the following formats.

Standard format — a dict with an optional data_id and timestamp, and a mandatory fields key:

{
  "data_id": "s330",
  "timestamp": 1555468528.452,
  "fields": {
    "S330CourseMag":  244.29,
    "S330CourseTrue": 219.61,
    "S330SpeedKt":    8.9
  }
}

data_id is optional. timestamp is optional — time.time() is used if absent.

Pre-timestamped format — field values may themselves be lists of (timestamp, value) pairs, in which case the top-level timestamp is ignored:

{
  "fields": {
    "S330CourseMag":  [[1555468527.1, 244.1], [1555468528.4, 244.29]],
    "S330CourseTrue": [[1555468527.1, 219.5], [1555468528.4, 219.61]]
  }
}

With metadata — a record may include a metadata key. The server extracts the fields dict inside it and caches it as per-field metadata, which is then returned by describe requests. This metadata is emitted at intervals by RecordParser/ParseTransform when metadata_interval is set:

{
  "data_id": "s330",
  "fields": {"S330CourseMag": 244.29, "S330CourseTrue": 219.61},
  "metadata": {
    "fields": {
      "S330CourseMag":  {"description": "Magnetic course", "units": "degrees", ...},
      "S330CourseTrue": {"description": "True course",     "units": "degrees", ...}
    }
  }
}

Quick Reference

Reading:

Method When to use
listen.py --cached_data Command-line inspection; piping into other tools
curl / HTTP GET One-off queries from shell scripts, elog systems, or tools that can’t use WebSockets
wscat / websocat Interactive protocol debugging
CachedDataReader in YAML Reading CDS data inside a logger or derived-data pipeline
CachedDataReader in Python Scripted consumers within the OpenRVDAS codebase
asyncio + websockets External Python programs or services needing continuous updates
Raw WebSocket (JS/other) Custom browser pages or other language bindings

Writing:

Method When to use
CachedDataWriter in YAML Feeding the CDS from a logger pipeline
WebSocket publish message Any WebSocket client pushing data directly
cache_record() in Python In-process use when directly instantiating a CachedDataServer
--udp port (deprecated) Legacy UDP broadcast

Deprecated

JavaScript WidgetServer

Deprecated. The WidgetServer / widget framework (display/js/widgets/) is no longer the recommended approach for browser-based displays. Use Grafana/InfluxDB-based displays instead. The raw browser WebSocket API (shown in the JavaScript section above) continues to work and is supported.

The OpenRVDAS WidgetServer class in display/js/widgets/widget_server.js handles the subscribe/ready loop automatically and dispatches incoming data to a list of display widgets:

var widgets = [
  new TextWidget('course_div', {'S330CourseTrue': {'seconds': 0}}, 'Degrees'),
  new TextWidget('speed_div',  {'S330SpeedKt':    {'seconds': 0}}, 'kt'),
];

var server = new WidgetServer(widgets, 'ws://localhost:8766');
server.serve();

Updated: