Time Series Data Management with Amazon Timestream
Introduction
Traditional relational databases (PostgreSQL, MySQL) are not optimized for time series data from industrial sensors. Amazon Timestream is a fully managed time series database offering 10x faster writes and 1/10 the cost.
In this article, we examine how to efficiently store and analyze data from our ZMA Data Acquisition and GDT Digital Transmitter devices using AWS Timestream.
What is Time Series Data?
Time series data: Data points recorded at sequential time intervals.
Industrial Examples
- Weight measurement: Tank weight every second
- Temperature: Oven temperature every 5 seconds
- Vibration: Motor vibration 1000 samples/second
- Energy consumption: Power meter every 15 minutes
Relational DB vs Time Series DB
| Feature | PostgreSQL | Amazon Timestream |
|---|---|---|
| Write Speed | ~10K/sec | ~100K/sec |
| Compression | ~30% | ~90% (10x better) |
| Time Queries | Complex SQL | Native time functions |
| Auto-Tiering | Manual partitioning | Automatic (Memory→Magnetic) |
| Cost (1TB/month) | ~$200 | ~$20 |
System Architecture
┌─────────────────────────────────────────────────┐
│ Factory Network │
│ │
│ ┌─────────┐ Modbus TCP ┌──────────────┐ │
│ │ ZMA/GDT │◄────────────────►│ IoT Gateway │ │
│ │ Devices │ │ (Python) │ │
│ └─────────┘ └──────┬───────┘ │
└─────────────────────────────────────┼───────────┘
│ MQTT
│ TLS 8883
▼
┌────────────────────────┐
│ AWS Cloud │
│ │
│ ┌──────────────────┐ │
│ │ IoT Core │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌────────▼─────────┐ │
│ │ Timestream │ │
│ │ Database │ │
│ │ │ │
│ │ Memory Store │ │
│ │ (Recent data) │ │
│ │ ▼ │ │
│ │ Magnetic Store │ │
│ │ (Historical) │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌────────▼─────────┐ │
│ │ Grafana │ │
│ │ Dashboards │ │
│ └──────────────────┘ │
└────────────────────────┘
Timestream Database Setup
1. Create Database and Table
# Create database
aws timestream-write create-database \
--database-name FactoryData
# Create table
aws timestream-write create-table \
--database-name FactoryData \
--table-name TankWeights \
--retention-properties '{
"MemoryStoreRetentionPeriodInHours": 24,
"MagneticStoreRetentionPeriodInDays": 90
}'
Retention Policy:
- Memory Store (24 hours): Fast queries for recent data
- Magnetic Store (90 days): Cost-effective long-term storage
2. IoT Core Rule to Write to Timestream
IoT Rule SQL:
SELECT
device_name as device_id,
values.net_weight as weight,
values.temperature as temperature,
timestamp
FROM
'amazeng/factory/tank/+/data'
Action: Timestream
{
"databaseName": "FactoryData",
"tableName": "TankWeights",
"dimensions": [{ "name": "device_id", "value": "${device_id}" }],
"timestamp": {
"value": "${timestamp}",
"unit": "MILLISECONDS"
}
}
Data Ingestion: Python Example
Write Data to Timestream
import boto3
import time
from datetime import datetime
class TimestreamWriter:
def __init__(self, database_name, table_name):
self.client = boto3.client('timestream-write', region_name='eu-west-1')
self.database = database_name
self.table = table_name
def write_weight_data(self, tank_id, weight, temperature):
"""Write single measurement"""
current_time = str(int(time.time() * 1000)) # Milliseconds
records = [
{
'Dimensions': [
{'Name': 'tank_id', 'Value': str(tank_id)},
{'Name': 'location', 'Value': 'Istanbul-Factory'},
],
'MeasureName': 'metrics',
'MeasureValues': [
{'Name': 'weight', 'Value': str(weight), 'Type': 'DOUBLE'},
{'Name': 'temperature', 'Value': str(temperature), 'Type': 'DOUBLE'}
],
'MeasureValueType': 'MULTI',
'Time': current_time
}
]
try:
result = self.client.write_records(
DatabaseName=self.database,
TableName=self.table,
Records=records
)
print(f"✓ Written: Tank {tank_id} - {weight} kg")
return result
except Exception as e:
print(f"✗ Error: {e}")
return None
def write_batch(self, data_list):
"""Write multiple measurements (up to 100)"""
records = []
current_time = str(int(time.time() * 1000))
for data in data_list:
records.append({
'Dimensions': [
{'Name': 'tank_id', 'Value': str(data['tank_id'])},
],
'MeasureName': 'weight',
'MeasureValue': str(data['weight']),
'MeasureValueType': 'DOUBLE',
'Time': current_time
})
result = self.client.write_records(
DatabaseName=self.database,
TableName=self.table,
Records=records
)
return result
# Usage
writer = TimestreamWriter('FactoryData', 'TankWeights')
# Single write
writer.write_weight_data(tank_id=1, weight=4523.5, temperature=4.2)
# Batch write
batch_data = [
{'tank_id': 1, 'weight': 4523.5},
{'tank_id': 2, 'weight': 3890.2},
{'tank_id': 3, 'weight': 4102.8}
]
writer.write_batch(batch_data)
Querying Data with SQL
Basic Queries
1. Latest Weight per Tank:
SELECT
tank_id,
measure_value::double as weight,
time
FROM
"FactoryData"."TankWeights"
WHERE
time > ago(5m)
ORDER BY
time DESC
2. Average Weight (Last Hour):
SELECT
tank_id,
AVG(measure_value::double) as avg_weight,
COUNT(*) as sample_count
FROM
"FactoryData"."TankWeights"
WHERE
time > ago(1h)
AND measure_name = 'weight'
GROUP BY
tank_id
3. Fill Rate (kg/hour):
WITH binned_data AS (
SELECT
tank_id,
bin(time, 1h) as hour_bin,
AVG(measure_value::double) as avg_weight
FROM
"FactoryData"."TankWeights"
WHERE
time > ago(24h)
GROUP BY
tank_id, bin(time, 1h)
)
SELECT
tank_id,
hour_bin,
avg_weight,
avg_weight - LAG(avg_weight) OVER (
PARTITION BY tank_id
ORDER BY hour_bin
) as fill_rate_per_hour
FROM
binned_data
ORDER BY
tank_id, hour_bin DESC
Advanced Analytics
4. Anomaly Detection (Statistical):
WITH stats AS (
SELECT
tank_id,
AVG(measure_value::double) as mean_weight,
STDDEV(measure_value::double) as stddev_weight
FROM
"FactoryData"."TankWeights"
WHERE
time BETWEEN ago(7d) AND ago(1d)
GROUP BY
tank_id
)
SELECT
t.tank_id,
t.time,
t.measure_value::double as weight,
s.mean_weight,
s.stddev_weight,
ABS(t.measure_value::double - s.mean_weight) / s.stddev_weight as z_score,
CASE
WHEN ABS(t.measure_value::double - s.mean_weight) / s.stddev_weight > 3
THEN 'ANOMALY'
ELSE 'NORMAL'
END as status
FROM
"FactoryData"."TankWeights" t
JOIN
stats s ON t.tank_id = s.tank_id
WHERE
t.time > ago(1h)
ORDER BY
z_score DESC
5. Data Interpolation (Missing Values):
SELECT
tank_id,
INTERPOLATE_LINEAR(
CREATE_TIME_SERIES(time, measure_value::double),
SEQUENCE(ago(1h), now(), 1m)
) as interpolated_weights
FROM
"FactoryData"."TankWeights"
WHERE
time > ago(1h)
GROUP BY
tank_id
Grafana Integration
Add Timestream Data Source
Grafana Plugin:
grafana-cli plugins install grafana-timestream-datasource
Data Source Configuration:
{
"name": "AWS Timestream",
"type": "grafana-timestream-datasource",
"access": "proxy",
"jsonData": {
"defaultRegion": "eu-west-1",
"defaultDatabase": "FactoryData",
"defaultTable": "TankWeights",
"authType": "keys"
},
"secureJsonData": {
"accessKey": "AKIA...",
"secretKey": "..."
}
}
Example Dashboard Query
Panel: Tank Weight Over Time
SELECT
tank_id as metric,
time,
measure_value::double as weight
FROM
$__database.$__table
WHERE
$__timeFilter
AND tank_id IN ($tank_ids)
ORDER BY
time
Variables:
$tank_ids: Multi-select dropdown (1, 2, 3)$__timeFilter: Grafana auto-generated time filter
Cost Optimization
Pricing (eu-west-1)
| Component | Price | Example Cost |
|---|---|---|
| Writes | $0.50 / million writes | $1.30/day (100K writes/day) |
| Memory Store | $0.036 / GB-hour | $0.86/day (1GB, 24h) |
| Magnetic Store | $0.03 / GB-month | $30/month (1TB) |
| Queries (scanned) | $0.01 / GB scanned | Variable |
Cost Optimization Strategies
1. Batch Writes:
# ❌ Bad: 1000 individual writes = $0.50
for i in range(1000):
write_single_record(data[i])
# ✅ Good: 10 batch writes (100 records each) = $0.005
for batch in chunks(data, 100):
write_batch(batch)
2. Shorter Retention:
# Reduce Memory Store to 12 hours instead of 24
aws timestream-write update-table \
--database-name FactoryData \
--table-name TankWeights \
--retention-properties '{
"MemoryStoreRetentionPeriodInHours": 12,
"MagneticStoreRetentionPeriodInDays": 30
}'
3. Query Optimization:
-- ❌ Bad: Scans entire table
SELECT * FROM TankWeights
-- ✅ Good: Limited time range + specific columns
SELECT tank_id, time, measure_value::double
FROM TankWeights
WHERE time > ago(1h)
AND measure_name = 'weight'
Real-World Cost Example
Scenario: 10 tanks, 1-second polling, 90-day retention
| Item | Calculation | Monthly Cost |
|---|---|---|
| Writes | 10 devices × 86,400/day × 30 days = 25.9M writes | $12.95 |
| Memory Store | 10 devices × 0.1 GB/day × 1 day × $0.036/hour | $0.86 |
| Magnetic Store | 25.9M × 50 bytes = 1.3 GB/month | $0.04 |
| Queries | ~100 GB scanned/month | $1.00 |
| TOTAL | ~$15/month |
Compare to self-hosted PostgreSQL: $50-100/month (EC2 + EBS)
Conclusion
Amazon Timestream is the ideal solution for storing and analyzing data from industrial sensors:
✅ 10x faster writes than relational databases
✅ 90% compression (10x cost savings)
✅ Automatic lifecycle management (Memory → Magnetic)
✅ Native time functions (interpolation, binning, windowing)
✅ Grafana integration for real-time dashboards
Our ZMA and GDT devices can be easily integrated with Timestream via AWS IoT Core for complete cloud-based monitoring.