Generative AI in Industrial Data with AWS Bedrock

Amazeng Technical Team
10 min read
AWS BedrockGenerative AIClaudeLlamaIndustrial AIFoundation ModelsLLM

Introduction

Generative AI is revolutionizing not only chatbots and content generation but also industrial data analysis. AWS Bedrock provides serverless access to advanced foundation models like Claude, Llama, and Amazon Titan without model training.

In this article, we examine how to analyze data from our ZMA Data Acquisition and GDT Digital Transmitter devices using AWS Bedrock for predictive maintenance and reporting.

What is AWS Bedrock?

AWS Bedrock is a fully managed service offering access to foundation models (FMs) from leading AI companies via API.

Available Models

ModelProviderParametersStrengths
Claude 3.5 SonnetAnthropic175BComplex reasoning, coding
Claude 3 HaikuAnthropic20BFast, cost-effective
Llama 3.1Meta8B-405BOpen source, multilingual
Amazon TitanAWS-Text generation, embedding
Mistral LargeMistral AI123BMultilingual, specialized

Bedrock vs Alternatives

FeatureAWS BedrockOpenAI APISelf-Hosted (Ollama)
DeploymentServerlessCloud APILocal infrastructure
Data PrivacyAWS VPCData leaves facilityFully local
Latency200-500ms300-800ms50-200ms
CostPay-per-tokenPay-per-tokenHardware only
Model Selection10+ models5 modelsLimited (7B-13B)
Fine-tuning✅ Supported✅ Supported⚠️ Manual

System Architecture

┌───────────────────────────────────────────────┐
│           Factory Network                     │
│                                               │
│  ┌─────────┐   Modbus   ┌──────────────┐     │
│  │ ZMA/GDT │───────────►│  IoT Gateway │     │
│  │ Devices │            └──────┬───────┘     │
└────────────────────────────────┼─────────────┘
                                 │ MQTT/TLS
                                 ▼
                    ┌────────────────────────┐
                    │      AWS Cloud         │
                    │                        │
                    │  ┌──────────────────┐  │
                    │  │   IoT Core       │  │
                    │  └────────┬─────────┘  │
                    │           │            │
                    │  ┌────────▼─────────┐  │
                    │  │   Timestream     │  │
                    │  │   (Raw Data)     │  │
                    │  └────────┬─────────┘  │
                    │           │            │
                    │  ┌────────▼─────────┐  │
                    │  │   Lambda         │  │
                    │  │   (Processor)    │  │
                    │  └────────┬─────────┘  │
                    │           │            │
                    │  ┌────────▼─────────┐  │
                    │  │  AWS Bedrock     │  │
                    │  │                  │  │
                    │  │  • Claude 3.5    │  │
                    │  │  • Llama 3.1     │  │
                    │  │  • Titan         │  │
                    │  └────────┬─────────┘  │
                    │           │            │
                    │  ┌────────▼─────────┐  │
                    │  │   S3 Bucket      │  │
                    │  │   (Reports)      │  │
                    │  └──────────────────┘  │
                    └────────────────────────┘

Application 1: Anomaly Detection with AI

Scenario

Detect abnormal patterns in loadcell data from GDT that might indicate tank tilt or sensor failure.

Lambda Function with Claude

import boto3
import json
from datetime import datetime, timedelta

bedrock = boto3.client('bedrock-runtime', region_name='eu-west-1')
timestream = boto3.client('timestream-query', region_name='eu-west-1')

def lambda_handler(event, context):
    # Query last hour's data from Timestream
    tank_id = event['tank_id']
    query = f"""
        SELECT
            time,
            measure_value::double as weight
        FROM
            "FactoryData"."TankWeights"
        WHERE
            tank_id = '{tank_id}'
            AND time > ago(1h)
        ORDER BY
            time DESC
    """

    result = timestream.query(QueryString=query)
    weights = [float(row['Data'][1]['ScalarValue'])
               for row in result['Rows']]

    # Create prompt for Claude
    prompt = f"""
You are an industrial automation expert analyzing loadcell data.

Tank {tank_id} weight measurements (last hour, newest first):
{json.dumps(weights[:20], indent=2)}

Statistical summary:
- Mean: {sum(weights)/len(weights):.2f} kg
- Min: {min(weights):.2f} kg
- Max: {max(weights):.2f} kg
- Std Dev: {(sum([(x-sum(weights)/len(weights))**2 for x in weights])/len(weights))**0.5:.2f} kg

Analyze this data and respond in JSON format:
{{
  "anomaly_detected": true/false,
  "anomaly_type": "sudden_drop|gradual_drift|oscillation|normal",
  "severity": "low|medium|high|critical",
  "probable_cause": "explanation",
  "recommended_action": "specific action",
  "confidence": 0.0-1.0
}}
"""

    # Call Claude 3 Haiku (cost-effective)
    response = bedrock.invoke_model(
        modelId='anthropic.claude-3-haiku-20240307-v1:0',
        contentType='application/json',
        accept='application/json',
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 1000,
            "messages": [
                {
                    "role": "user",
                    "content": prompt
                }
            ]
        })
    )

    # Parse response
    response_body = json.loads(response['body'].read())
    analysis = json.loads(response_body['content'][0]['text'])

    # Send SNS alert if critical
    if analysis['severity'] in ['high', 'critical']:
        sns = boto3.client('sns')
        sns.publish(
            TopicArn='arn:aws:sns:eu-west-1:xxx:factory-alerts',
            Subject=f"Critical Anomaly: Tank {tank_id}",
            Message=json.dumps(analysis, indent=2)
        )

    return {
        'statusCode': 200,
        'body': json.dumps(analysis)
    }

Example Output:

{
  "anomaly_detected": true,
  "anomaly_type": "sudden_drop",
  "severity": "high",
  "probable_cause": "Weight dropped 450 kg in 2 minutes, indicating possible valve opening or leak",
  "recommended_action": "Inspect Tank 2 outlet valve and piping for leaks. Check flow meter readings.",
  "confidence": 0.89
}

Application 2: Natural Language Querying

Scenario

Operations manager asks: "Which tank had the highest fill rate yesterday?"

Text-to-SQL with Claude

def natural_language_to_sql(question, table_schema):
    """Convert natural language to Timestream SQL"""

    prompt = f"""
You are an expert in AWS Timestream SQL. Convert the user's question to a valid SQL query.

Database Schema:
Table: FactoryData.TankWeights
Columns:
- tank_id (VARCHAR): Tank identifier (1, 2, 3, etc.)
- time (TIMESTAMP): Measurement timestamp
- measure_name (VARCHAR): Metric name
- measure_value (DOUBLE): Metric value

User Question: "{question}"

Requirements:
1. Use Timestream-specific functions (ago(), bin(), etc.)
2. Return only the SQL query, no explanations
3. Use appropriate time filters

SQL Query:
"""

    response = bedrock.invoke_model(
        modelId='anthropic.claude-3-sonnet-20240229-v1:0',
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 500,
            "messages": [{"role": "user", "content": prompt}]
        })
    )

    sql_query = json.loads(response['body'].read())['content'][0]['text']
    return sql_query.strip()

# Example usage
question = "Which tank had the highest fill rate yesterday?"
sql = natural_language_to_sql(question, schema)

print(sql)
# Output:
# WITH hourly_avg AS (
#   SELECT
#     tank_id,
#     bin(time, 1h) as hour_bin,
#     AVG(measure_value::double) as avg_weight
#   FROM "FactoryData"."TankWeights"
#   WHERE time BETWEEN ago(48h) AND ago(24h)
#   GROUP BY tank_id, bin(time, 1h)
# )
# SELECT
#   tank_id,
#   MAX(avg_weight) - MIN(avg_weight) as total_fill
# FROM hourly_avg
# GROUP BY tank_id
# ORDER BY total_fill DESC
# LIMIT 1

# Execute the generated SQL
result = timestream.query(QueryString=sql)

Application 3: Automated Maintenance Reports

Scenario

Generate weekly maintenance report summarizing all tanks' status.

Report Generation with Claude

def generate_maintenance_report(start_date, end_date):
    """Generate comprehensive weekly report"""

    # Collect data from multiple sources
    tank_data = get_tank_statistics(start_date, end_date)
    alarms = get_alarm_history(start_date, end_date)
    calibrations = get_calibration_log(start_date, end_date)

    prompt = f"""
Generate a professional weekly maintenance report for a milk processing facility.

Data Summary:
{json.dumps(tank_data, indent=2)}

Alarms:
{json.dumps(alarms, indent=2)}

Calibrations:
{json.dumps(calibrations, indent=2)}

Report Structure:
1. Executive Summary
2. Tank-by-Tank Status
3. Anomalies and Alarms
4. Maintenance Activities
5. Recommendations for Next Week

Format as HTML for email distribution.
"""

    response = bedrock.invoke_model(
        modelId='anthropic.claude-3-sonnet-20240229-v1:0',
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "max_tokens": 4000,
            "messages": [{"role": "user", "content": prompt}]
        })
    )

    report_html = json.loads(response['body'].read())['content'][0]['text']

    # Save to S3
    s3 = boto3.client('s3')
    s3.put_object(
        Bucket='factory-reports',
        Key=f'maintenance/weekly_{start_date}.html',
        Body=report_html,
        ContentType='text/html'
    )

    # Email report
    ses = boto3.client('ses')
    ses.send_email(
        Source='[email protected]',
        Destination={'ToAddresses': ['[email protected]']},
        Message={
            'Subject': {'Data': f'Weekly Report: {start_date}'},
            'Body': {'Html': {'Data': report_html}}
        }
    )

    return report_html

Application 4: RAG for Operation Manuals

Retrieval Augmented Generation (RAG)

Use Knowledge Bases for Amazon Bedrock to create a chatbot answering questions from operation manuals.

def setup_knowledge_base():
    """Create knowledge base from PDF manuals"""

    bedrock_agent = boto3.client('bedrock-agent')

    # Create knowledge base
    kb_response = bedrock_agent.create_knowledge_base(
        name='FactoryOperationManuals',
        roleArn='arn:aws:iam::xxx:role/BedrockKBRole',
        knowledgeBaseConfiguration={
            'type': 'VECTOR',
            'vectorKnowledgeBaseConfiguration': {
                'embeddingModelArn': 'arn:aws:bedrock:eu-west-1::foundation-model/amazon.titan-embed-text-v1'
            }
        },
        storageConfiguration={
            'type': 'OPENSEARCH_SERVERLESS',
            'opensearchServerlessConfiguration': {
                'collectionArn': 'arn:aws:aoss:eu-west-1:xxx:collection/factory-kb',
                'vectorIndexName': 'factory-docs',
                'fieldMapping': {
                    'vectorField': 'vector',
                    'textField': 'text',
                    'metadataField': 'metadata'
                }
            }
        }
    )

    kb_id = kb_response['knowledgeBase']['knowledgeBaseId']

    # Add data source (S3 bucket with PDFs)
    bedrock_agent.create_data_source(
        knowledgeBaseId=kb_id,
        name='OperationManualsPDF',
        dataSourceConfiguration={
            'type': 'S3',
            's3Configuration': {
                'bucketArn': 'arn:aws:s3:::factory-manuals'
            }
        }
    )

    return kb_id

def query_knowledge_base(kb_id, question):
    """Query operation manuals"""

    bedrock_agent_runtime = boto3.client('bedrock-agent-runtime')

    response = bedrock_agent_runtime.retrieve_and_generate(
        input={'text': question},
        retrieveAndGenerateConfiguration={
            'type': 'KNOWLEDGE_BASE',
            'knowledgeBaseConfiguration': {
                'knowledgeBaseId': kb_id,
                'modelArn': 'arn:aws:bedrock:eu-west-1::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0'
            }
        }
    )

    answer = response['output']['text']
    sources = response['citations']

    return answer, sources

# Example
answer, sources = query_knowledge_base(
    kb_id='ABCD1234',
    question='How do I calibrate the GDT-410M loadcell transmitter?'
)

print(answer)
# "To calibrate the GDT-410M: 1) Enter calibration mode by pressing..."
# Sources: [GDT_Manual_v2.3.pdf, page 45-47]

Cost Optimization

Pricing (Claude 3 Models)

ModelInputOutputUse Case
Claude 3.5 Sonnet$0.003/1K tokens$0.015/1K tokensComplex analysis
Claude 3 Haiku$0.00025/1K tokens$0.00125/1K tokensFast, simple tasks
Claude 3 Opus$0.015/1K tokens$0.075/1K tokensMost advanced

Cost Example

Scenario: Hourly anomaly detection for 10 tanks

  • Prompt: ~500 tokens (data + instructions)
  • Response: ~200 tokens (JSON)
  • Total: 700 tokens/check

Monthly Cost:

10 tanks × 24 checks/day × 30 days = 7,200 checks
7,200 × 700 tokens = 5,040,000 tokens (~5M)

Using Claude 3 Haiku:
Input:  5M × 0.5 = 2.5M tokens × $0.00025 = $0.625
Output: 5M × 0.2 = 1.0M tokens × $0.00125 = $1.250
TOTAL: ~$1.90/month

Compare to hiring data scientist: $5,000+/month

Guardrails for Safety

Prevent Harmful Outputs

bedrock_config = {
    "guardrailIdentifier": "factory-ai-guardrails",
    "guardrailVersion": "1",
    "trace": "enabled"
}

response = bedrock.invoke_model(
    modelId='anthropic.claude-3-sonnet-20240229-v1:0',
    body=json.dumps({
        "messages": [{"role": "user", "content": prompt}],
        **bedrock_config
    })
)

Guardrail Rules:

  • Block PII data in responses
  • Filter toxic content
  • Enforce output format (JSON only)
  • Content filtering for accuracy

Conclusion

AWS Bedrock brings enterprise-grade AI to industrial applications without ML expertise:

Anomaly detection without model training
Natural language SQL for business users
Automated reporting saving hours of manual work
RAG chatbots for instant manual lookups

Cost-effective with models like Claude 3 Haiku (~$2/month for 10 devices).

For data privacy-sensitive applications, consider our On-Premise AI with Ollama solution.