Time Series Data Management with Amazon Timestream

Amazeng Technical Team
9 min read
AWS TimestreamTime Series DatabaseIndustrial IoTData AnalyticsGrafanaSQL

Introduction

Traditional relational databases (PostgreSQL, MySQL) are not optimized for time series data from industrial sensors. Amazon Timestream is a fully managed time series database offering 10x faster writes and 1/10 the cost.

In this article, we examine how to efficiently store and analyze data from our ZMA Data Acquisition and GDT Digital Transmitter devices using AWS Timestream.

What is Time Series Data?

Time series data: Data points recorded at sequential time intervals.

Industrial Examples

  • Weight measurement: Tank weight every second
  • Temperature: Oven temperature every 5 seconds
  • Vibration: Motor vibration 1000 samples/second
  • Energy consumption: Power meter every 15 minutes

Relational DB vs Time Series DB

FeaturePostgreSQLAmazon Timestream
Write Speed~10K/sec~100K/sec
Compression~30%~90% (10x better)
Time QueriesComplex SQLNative time functions
Auto-TieringManual partitioningAutomatic (Memory→Magnetic)
Cost (1TB/month)~$200~$20

System Architecture

┌─────────────────────────────────────────────────┐
│              Factory Network                    │
│                                                 │
│  ┌─────────┐    Modbus TCP   ┌──────────────┐  │
│  │ ZMA/GDT │◄────────────────►│  IoT Gateway │  │
│  │ Devices │                  │ (Python)     │  │
│  └─────────┘                  └──────┬───────┘  │
└─────────────────────────────────────┼───────────┘
                                      │ MQTT
                                      │ TLS 8883
                                      ▼
                         ┌────────────────────────┐
                         │      AWS Cloud         │
                         │                        │
                         │  ┌──────────────────┐  │
                         │  │   IoT Core       │  │
                         │  └────────┬─────────┘  │
                         │           │            │
                         │  ┌────────▼─────────┐  │
                         │  │   Timestream     │  │
                         │  │   Database       │  │
                         │  │                  │  │
                         │  │  Memory Store    │  │
                         │  │  (Recent data)   │  │
                         │  │        ▼         │  │
                         │  │  Magnetic Store  │  │
                         │  │  (Historical)    │  │
                         │  └────────┬─────────┘  │
                         │           │            │
                         │  ┌────────▼─────────┐  │
                         │  │    Grafana       │  │
                         │  │   Dashboards     │  │
                         │  └──────────────────┘  │
                         └────────────────────────┘

Timestream Database Setup

1. Create Database and Table

# Create database
aws timestream-write create-database \
  --database-name FactoryData

# Create table
aws timestream-write create-table \
  --database-name FactoryData \
  --table-name TankWeights \
  --retention-properties '{
    "MemoryStoreRetentionPeriodInHours": 24,
    "MagneticStoreRetentionPeriodInDays": 90
  }'

Retention Policy:

  • Memory Store (24 hours): Fast queries for recent data
  • Magnetic Store (90 days): Cost-effective long-term storage

2. IoT Core Rule to Write to Timestream

IoT Rule SQL:

SELECT
  device_name as device_id,
  values.net_weight as weight,
  values.temperature as temperature,
  timestamp
FROM
  'amazeng/factory/tank/+/data'

Action: Timestream

{
  "databaseName": "FactoryData",
  "tableName": "TankWeights",
  "dimensions": [{ "name": "device_id", "value": "${device_id}" }],
  "timestamp": {
    "value": "${timestamp}",
    "unit": "MILLISECONDS"
  }
}

Data Ingestion: Python Example

Write Data to Timestream

import boto3
import time
from datetime import datetime

class TimestreamWriter:
    def __init__(self, database_name, table_name):
        self.client = boto3.client('timestream-write', region_name='eu-west-1')
        self.database = database_name
        self.table = table_name

    def write_weight_data(self, tank_id, weight, temperature):
        """Write single measurement"""
        current_time = str(int(time.time() * 1000))  # Milliseconds

        records = [
            {
                'Dimensions': [
                    {'Name': 'tank_id', 'Value': str(tank_id)},
                    {'Name': 'location', 'Value': 'Istanbul-Factory'},
                ],
                'MeasureName': 'metrics',
                'MeasureValues': [
                    {'Name': 'weight', 'Value': str(weight), 'Type': 'DOUBLE'},
                    {'Name': 'temperature', 'Value': str(temperature), 'Type': 'DOUBLE'}
                ],
                'MeasureValueType': 'MULTI',
                'Time': current_time
            }
        ]

        try:
            result = self.client.write_records(
                DatabaseName=self.database,
                TableName=self.table,
                Records=records
            )
            print(f"✓ Written: Tank {tank_id} - {weight} kg")
            return result
        except Exception as e:
            print(f"✗ Error: {e}")
            return None

    def write_batch(self, data_list):
        """Write multiple measurements (up to 100)"""
        records = []
        current_time = str(int(time.time() * 1000))

        for data in data_list:
            records.append({
                'Dimensions': [
                    {'Name': 'tank_id', 'Value': str(data['tank_id'])},
                ],
                'MeasureName': 'weight',
                'MeasureValue': str(data['weight']),
                'MeasureValueType': 'DOUBLE',
                'Time': current_time
            })

        result = self.client.write_records(
            DatabaseName=self.database,
            TableName=self.table,
            Records=records
        )

        return result

# Usage
writer = TimestreamWriter('FactoryData', 'TankWeights')

# Single write
writer.write_weight_data(tank_id=1, weight=4523.5, temperature=4.2)

# Batch write
batch_data = [
    {'tank_id': 1, 'weight': 4523.5},
    {'tank_id': 2, 'weight': 3890.2},
    {'tank_id': 3, 'weight': 4102.8}
]
writer.write_batch(batch_data)

Querying Data with SQL

Basic Queries

1. Latest Weight per Tank:

SELECT
  tank_id,
  measure_value::double as weight,
  time
FROM
  "FactoryData"."TankWeights"
WHERE
  time > ago(5m)
ORDER BY
  time DESC

2. Average Weight (Last Hour):

SELECT
  tank_id,
  AVG(measure_value::double) as avg_weight,
  COUNT(*) as sample_count
FROM
  "FactoryData"."TankWeights"
WHERE
  time > ago(1h)
  AND measure_name = 'weight'
GROUP BY
  tank_id

3. Fill Rate (kg/hour):

WITH binned_data AS (
  SELECT
    tank_id,
    bin(time, 1h) as hour_bin,
    AVG(measure_value::double) as avg_weight
  FROM
    "FactoryData"."TankWeights"
  WHERE
    time > ago(24h)
  GROUP BY
    tank_id, bin(time, 1h)
)
SELECT
  tank_id,
  hour_bin,
  avg_weight,
  avg_weight - LAG(avg_weight) OVER (
    PARTITION BY tank_id
    ORDER BY hour_bin
  ) as fill_rate_per_hour
FROM
  binned_data
ORDER BY
  tank_id, hour_bin DESC

Advanced Analytics

4. Anomaly Detection (Statistical):

WITH stats AS (
  SELECT
    tank_id,
    AVG(measure_value::double) as mean_weight,
    STDDEV(measure_value::double) as stddev_weight
  FROM
    "FactoryData"."TankWeights"
  WHERE
    time BETWEEN ago(7d) AND ago(1d)
  GROUP BY
    tank_id
)
SELECT
  t.tank_id,
  t.time,
  t.measure_value::double as weight,
  s.mean_weight,
  s.stddev_weight,
  ABS(t.measure_value::double - s.mean_weight) / s.stddev_weight as z_score,
  CASE
    WHEN ABS(t.measure_value::double - s.mean_weight) / s.stddev_weight > 3
    THEN 'ANOMALY'
    ELSE 'NORMAL'
  END as status
FROM
  "FactoryData"."TankWeights" t
JOIN
  stats s ON t.tank_id = s.tank_id
WHERE
  t.time > ago(1h)
ORDER BY
  z_score DESC

5. Data Interpolation (Missing Values):

SELECT
  tank_id,
  INTERPOLATE_LINEAR(
    CREATE_TIME_SERIES(time, measure_value::double),
    SEQUENCE(ago(1h), now(), 1m)
  ) as interpolated_weights
FROM
  "FactoryData"."TankWeights"
WHERE
  time > ago(1h)
GROUP BY
  tank_id

Grafana Integration

Add Timestream Data Source

Grafana Plugin:

grafana-cli plugins install grafana-timestream-datasource

Data Source Configuration:

{
  "name": "AWS Timestream",
  "type": "grafana-timestream-datasource",
  "access": "proxy",
  "jsonData": {
    "defaultRegion": "eu-west-1",
    "defaultDatabase": "FactoryData",
    "defaultTable": "TankWeights",
    "authType": "keys"
  },
  "secureJsonData": {
    "accessKey": "AKIA...",
    "secretKey": "..."
  }
}

Example Dashboard Query

Panel: Tank Weight Over Time

SELECT
  tank_id as metric,
  time,
  measure_value::double as weight
FROM
  $__database.$__table
WHERE
  $__timeFilter
  AND tank_id IN ($tank_ids)
ORDER BY
  time

Variables:

  • $tank_ids: Multi-select dropdown (1, 2, 3)
  • $__timeFilter: Grafana auto-generated time filter

Cost Optimization

Pricing (eu-west-1)

ComponentPriceExample Cost
Writes$0.50 / million writes$1.30/day (100K writes/day)
Memory Store$0.036 / GB-hour$0.86/day (1GB, 24h)
Magnetic Store$0.03 / GB-month$30/month (1TB)
Queries (scanned)$0.01 / GB scannedVariable

Cost Optimization Strategies

1. Batch Writes:

# ❌ Bad: 1000 individual writes = $0.50
for i in range(1000):
    write_single_record(data[i])

# ✅ Good: 10 batch writes (100 records each) = $0.005
for batch in chunks(data, 100):
    write_batch(batch)

2. Shorter Retention:

# Reduce Memory Store to 12 hours instead of 24
aws timestream-write update-table \
  --database-name FactoryData \
  --table-name TankWeights \
  --retention-properties '{
    "MemoryStoreRetentionPeriodInHours": 12,
    "MagneticStoreRetentionPeriodInDays": 30
  }'

3. Query Optimization:

-- ❌ Bad: Scans entire table
SELECT * FROM TankWeights

-- ✅ Good: Limited time range + specific columns
SELECT tank_id, time, measure_value::double
FROM TankWeights
WHERE time > ago(1h)
  AND measure_name = 'weight'

Real-World Cost Example

Scenario: 10 tanks, 1-second polling, 90-day retention

ItemCalculationMonthly Cost
Writes10 devices × 86,400/day × 30 days = 25.9M writes$12.95
Memory Store10 devices × 0.1 GB/day × 1 day × $0.036/hour$0.86
Magnetic Store25.9M × 50 bytes = 1.3 GB/month$0.04
Queries~100 GB scanned/month$1.00
TOTAL~$15/month

Compare to self-hosted PostgreSQL: $50-100/month (EC2 + EBS)

Conclusion

Amazon Timestream is the ideal solution for storing and analyzing data from industrial sensors:

10x faster writes than relational databases
90% compression (10x cost savings)
Automatic lifecycle management (Memory → Magnetic)
Native time functions (interpolation, binning, windowing)
Grafana integration for real-time dashboards

Our ZMA and GDT devices can be easily integrated with Timestream via AWS IoT Core for complete cloud-based monitoring.