Generative AI in Industrial Data with AWS Bedrock
Introduction
Generative AI is revolutionizing not only chatbots and content generation but also industrial data analysis. AWS Bedrock provides serverless access to advanced foundation models like Claude, Llama, and Amazon Titan without model training.
In this article, we examine how to analyze data from our ZMA Data Acquisition and GDT Digital Transmitter devices using AWS Bedrock for predictive maintenance and reporting.
What is AWS Bedrock?
AWS Bedrock is a fully managed service offering access to foundation models (FMs) from leading AI companies via API.
Available Models
| Model | Provider | Parameters | Strengths |
|---|---|---|---|
| Claude 3.5 Sonnet | Anthropic | 175B | Complex reasoning, coding |
| Claude 3 Haiku | Anthropic | 20B | Fast, cost-effective |
| Llama 3.1 | Meta | 8B-405B | Open source, multilingual |
| Amazon Titan | AWS | - | Text generation, embedding |
| Mistral Large | Mistral AI | 123B | Multilingual, specialized |
Bedrock vs Alternatives
| Feature | AWS Bedrock | OpenAI API | Self-Hosted (Ollama) |
|---|---|---|---|
| Deployment | Serverless | Cloud API | Local infrastructure |
| Data Privacy | AWS VPC | Data leaves facility | Fully local |
| Latency | 200-500ms | 300-800ms | 50-200ms |
| Cost | Pay-per-token | Pay-per-token | Hardware only |
| Model Selection | 10+ models | 5 models | Limited (7B-13B) |
| Fine-tuning | ✅ Supported | ✅ Supported | ⚠️ Manual |
System Architecture
┌───────────────────────────────────────────────┐
│ Factory Network │
│ │
│ ┌─────────┐ Modbus ┌──────────────┐ │
│ │ ZMA/GDT │───────────►│ IoT Gateway │ │
│ │ Devices │ └──────┬───────┘ │
└────────────────────────────────┼─────────────┘
│ MQTT/TLS
▼
┌────────────────────────┐
│ AWS Cloud │
│ │
│ ┌──────────────────┐ │
│ │ IoT Core │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌────────▼─────────┐ │
│ │ Timestream │ │
│ │ (Raw Data) │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌────────▼─────────┐ │
│ │ Lambda │ │
│ │ (Processor) │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌────────▼─────────┐ │
│ │ AWS Bedrock │ │
│ │ │ │
│ │ • Claude 3.5 │ │
│ │ • Llama 3.1 │ │
│ │ • Titan │ │
│ └────────┬─────────┘ │
│ │ │
│ ┌────────▼─────────┐ │
│ │ S3 Bucket │ │
│ │ (Reports) │ │
│ └──────────────────┘ │
└────────────────────────┘
Application 1: Anomaly Detection with AI
Scenario
Detect abnormal patterns in loadcell data from GDT that might indicate tank tilt or sensor failure.
Lambda Function with Claude
import boto3
import json
from datetime import datetime, timedelta
bedrock = boto3.client('bedrock-runtime', region_name='eu-west-1')
timestream = boto3.client('timestream-query', region_name='eu-west-1')
def lambda_handler(event, context):
# Query last hour's data from Timestream
tank_id = event['tank_id']
query = f"""
SELECT
time,
measure_value::double as weight
FROM
"FactoryData"."TankWeights"
WHERE
tank_id = '{tank_id}'
AND time > ago(1h)
ORDER BY
time DESC
"""
result = timestream.query(QueryString=query)
weights = [float(row['Data'][1]['ScalarValue'])
for row in result['Rows']]
# Create prompt for Claude
prompt = f"""
You are an industrial automation expert analyzing loadcell data.
Tank {tank_id} weight measurements (last hour, newest first):
{json.dumps(weights[:20], indent=2)}
Statistical summary:
- Mean: {sum(weights)/len(weights):.2f} kg
- Min: {min(weights):.2f} kg
- Max: {max(weights):.2f} kg
- Std Dev: {(sum([(x-sum(weights)/len(weights))**2 for x in weights])/len(weights))**0.5:.2f} kg
Analyze this data and respond in JSON format:
{{
"anomaly_detected": true/false,
"anomaly_type": "sudden_drop|gradual_drift|oscillation|normal",
"severity": "low|medium|high|critical",
"probable_cause": "explanation",
"recommended_action": "specific action",
"confidence": 0.0-1.0
}}
"""
# Call Claude 3 Haiku (cost-effective)
response = bedrock.invoke_model(
modelId='anthropic.claude-3-haiku-20240307-v1:0',
contentType='application/json',
accept='application/json',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [
{
"role": "user",
"content": prompt
}
]
})
)
# Parse response
response_body = json.loads(response['body'].read())
analysis = json.loads(response_body['content'][0]['text'])
# Send SNS alert if critical
if analysis['severity'] in ['high', 'critical']:
sns = boto3.client('sns')
sns.publish(
TopicArn='arn:aws:sns:eu-west-1:xxx:factory-alerts',
Subject=f"Critical Anomaly: Tank {tank_id}",
Message=json.dumps(analysis, indent=2)
)
return {
'statusCode': 200,
'body': json.dumps(analysis)
}
Example Output:
{
"anomaly_detected": true,
"anomaly_type": "sudden_drop",
"severity": "high",
"probable_cause": "Weight dropped 450 kg in 2 minutes, indicating possible valve opening or leak",
"recommended_action": "Inspect Tank 2 outlet valve and piping for leaks. Check flow meter readings.",
"confidence": 0.89
}
Application 2: Natural Language Querying
Scenario
Operations manager asks: "Which tank had the highest fill rate yesterday?"
Text-to-SQL with Claude
def natural_language_to_sql(question, table_schema):
"""Convert natural language to Timestream SQL"""
prompt = f"""
You are an expert in AWS Timestream SQL. Convert the user's question to a valid SQL query.
Database Schema:
Table: FactoryData.TankWeights
Columns:
- tank_id (VARCHAR): Tank identifier (1, 2, 3, etc.)
- time (TIMESTAMP): Measurement timestamp
- measure_name (VARCHAR): Metric name
- measure_value (DOUBLE): Metric value
User Question: "{question}"
Requirements:
1. Use Timestream-specific functions (ago(), bin(), etc.)
2. Return only the SQL query, no explanations
3. Use appropriate time filters
SQL Query:
"""
response = bedrock.invoke_model(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 500,
"messages": [{"role": "user", "content": prompt}]
})
)
sql_query = json.loads(response['body'].read())['content'][0]['text']
return sql_query.strip()
# Example usage
question = "Which tank had the highest fill rate yesterday?"
sql = natural_language_to_sql(question, schema)
print(sql)
# Output:
# WITH hourly_avg AS (
# SELECT
# tank_id,
# bin(time, 1h) as hour_bin,
# AVG(measure_value::double) as avg_weight
# FROM "FactoryData"."TankWeights"
# WHERE time BETWEEN ago(48h) AND ago(24h)
# GROUP BY tank_id, bin(time, 1h)
# )
# SELECT
# tank_id,
# MAX(avg_weight) - MIN(avg_weight) as total_fill
# FROM hourly_avg
# GROUP BY tank_id
# ORDER BY total_fill DESC
# LIMIT 1
# Execute the generated SQL
result = timestream.query(QueryString=sql)
Application 3: Automated Maintenance Reports
Scenario
Generate weekly maintenance report summarizing all tanks' status.
Report Generation with Claude
def generate_maintenance_report(start_date, end_date):
"""Generate comprehensive weekly report"""
# Collect data from multiple sources
tank_data = get_tank_statistics(start_date, end_date)
alarms = get_alarm_history(start_date, end_date)
calibrations = get_calibration_log(start_date, end_date)
prompt = f"""
Generate a professional weekly maintenance report for a milk processing facility.
Data Summary:
{json.dumps(tank_data, indent=2)}
Alarms:
{json.dumps(alarms, indent=2)}
Calibrations:
{json.dumps(calibrations, indent=2)}
Report Structure:
1. Executive Summary
2. Tank-by-Tank Status
3. Anomalies and Alarms
4. Maintenance Activities
5. Recommendations for Next Week
Format as HTML for email distribution.
"""
response = bedrock.invoke_model(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4000,
"messages": [{"role": "user", "content": prompt}]
})
)
report_html = json.loads(response['body'].read())['content'][0]['text']
# Save to S3
s3 = boto3.client('s3')
s3.put_object(
Bucket='factory-reports',
Key=f'maintenance/weekly_{start_date}.html',
Body=report_html,
ContentType='text/html'
)
# Email report
ses = boto3.client('ses')
ses.send_email(
Source='[email protected]',
Destination={'ToAddresses': ['[email protected]']},
Message={
'Subject': {'Data': f'Weekly Report: {start_date}'},
'Body': {'Html': {'Data': report_html}}
}
)
return report_html
Application 4: RAG for Operation Manuals
Retrieval Augmented Generation (RAG)
Use Knowledge Bases for Amazon Bedrock to create a chatbot answering questions from operation manuals.
def setup_knowledge_base():
"""Create knowledge base from PDF manuals"""
bedrock_agent = boto3.client('bedrock-agent')
# Create knowledge base
kb_response = bedrock_agent.create_knowledge_base(
name='FactoryOperationManuals',
roleArn='arn:aws:iam::xxx:role/BedrockKBRole',
knowledgeBaseConfiguration={
'type': 'VECTOR',
'vectorKnowledgeBaseConfiguration': {
'embeddingModelArn': 'arn:aws:bedrock:eu-west-1::foundation-model/amazon.titan-embed-text-v1'
}
},
storageConfiguration={
'type': 'OPENSEARCH_SERVERLESS',
'opensearchServerlessConfiguration': {
'collectionArn': 'arn:aws:aoss:eu-west-1:xxx:collection/factory-kb',
'vectorIndexName': 'factory-docs',
'fieldMapping': {
'vectorField': 'vector',
'textField': 'text',
'metadataField': 'metadata'
}
}
}
)
kb_id = kb_response['knowledgeBase']['knowledgeBaseId']
# Add data source (S3 bucket with PDFs)
bedrock_agent.create_data_source(
knowledgeBaseId=kb_id,
name='OperationManualsPDF',
dataSourceConfiguration={
'type': 'S3',
's3Configuration': {
'bucketArn': 'arn:aws:s3:::factory-manuals'
}
}
)
return kb_id
def query_knowledge_base(kb_id, question):
"""Query operation manuals"""
bedrock_agent_runtime = boto3.client('bedrock-agent-runtime')
response = bedrock_agent_runtime.retrieve_and_generate(
input={'text': question},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': kb_id,
'modelArn': 'arn:aws:bedrock:eu-west-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0'
}
}
)
answer = response['output']['text']
sources = response['citations']
return answer, sources
# Example
answer, sources = query_knowledge_base(
kb_id='ABCD1234',
question='How do I calibrate the GDT-410M loadcell transmitter?'
)
print(answer)
# "To calibrate the GDT-410M: 1) Enter calibration mode by pressing..."
# Sources: [GDT_Manual_v2.3.pdf, page 45-47]
Cost Optimization
Pricing (Claude 3 Models)
| Model | Input | Output | Use Case |
|---|---|---|---|
| Claude 3.5 Sonnet | $0.003/1K tokens | $0.015/1K tokens | Complex analysis |
| Claude 3 Haiku | $0.00025/1K tokens | $0.00125/1K tokens | Fast, simple tasks |
| Claude 3 Opus | $0.015/1K tokens | $0.075/1K tokens | Most advanced |
Cost Example
Scenario: Hourly anomaly detection for 10 tanks
- Prompt: ~500 tokens (data + instructions)
- Response: ~200 tokens (JSON)
- Total: 700 tokens/check
Monthly Cost:
10 tanks × 24 checks/day × 30 days = 7,200 checks
7,200 × 700 tokens = 5,040,000 tokens (~5M)
Using Claude 3 Haiku:
Input: 5M × 0.5 = 2.5M tokens × $0.00025 = $0.625
Output: 5M × 0.2 = 1.0M tokens × $0.00125 = $1.250
TOTAL: ~$1.90/month
Compare to hiring data scientist: $5,000+/month
Guardrails for Safety
Prevent Harmful Outputs
bedrock_config = {
"guardrailIdentifier": "factory-ai-guardrails",
"guardrailVersion": "1",
"trace": "enabled"
}
response = bedrock.invoke_model(
modelId='anthropic.claude-3-sonnet-20240229-v1:0',
body=json.dumps({
"messages": [{"role": "user", "content": prompt}],
**bedrock_config
})
)
Guardrail Rules:
- Block PII data in responses
- Filter toxic content
- Enforce output format (JSON only)
- Content filtering for accuracy
Conclusion
AWS Bedrock brings enterprise-grade AI to industrial applications without ML expertise:
✅ Anomaly detection without model training
✅ Natural language SQL for business users
✅ Automated reporting saving hours of manual work
✅ RAG chatbots for instant manual lookups
Cost-effective with models like Claude 3 Haiku (~$2/month for 10 devices).
For data privacy-sensitive applications, consider our On-Premise AI with Ollama solution.