
May 3, 2024
5 min
RAG in production faster with Ray, LangChain and HuggingFace
We’re excited to announce the release of a quickstart solution and reference architecture for retrieval augmented generation (RAG) applications, designed to acc

The last decade of the Industry 4.0 revolution has shown the value and importance of machine learning (ML) across verticals and environments, with more impact on manufacturing than possibly any other application. Organizations implementing a more automated, reliable, and cost-effective Operational Technology (OT) strategy have led the way, recognizing the benefits of ML in predicting assembly line failures to avoid costly and unplanned downtime. Still, challenges remain for teams of all sizes to quickly, and with little effort, demonstrate the value of ML-based anomaly detection in order to persuade management and finance owners to allocate the budget required to implement these new technologies. Without access to data scientists for model training, or ML specialists to deploy solutions at the local level, adoption has seemed out of reach for teams on the factory floor.
Now, teams that collect sensor data signals from machines in the factory can unlock the power of services like Amazon Timestream, Amazon Lookout for Equipment, and AWS IoT Core to easily spin up and test a fully production-ready system at the local edge to help avoid catastrophic downtime events. Lookout for Equipment uses your unique ML model to analyze incoming sensor data in real time and accurately identify early warning signs that could lead to machine failures. This means you can detect equipment abnormalities with speed and precision, quickly diagnose issues, take action to reduce expensive downtime, and reduce false alerts. Response teams can be alerted with specific pinpoints to which sensors are indicating the issue, and the magnitude of impact on the detected event.
In this post, we show you how you can set up a system to simulate events on your factory floor with a trained model and detect abnormal behavior using Timestream, Lookout for Equipment, and AWS Lambda functions. The steps in this post emphasize the AWS Management Console UI, showing how technical people without a developer background or strong coding skills can build a prototype. Using simulated sensor signals will allow you to test your system and gain confidence before cutting over to production. Lastly, in this example, we use Amazon Simple Notification Service (Amazon SNS) to show how teams can receive notifications of predicted events and respond to avoid catastrophic effects of assembly line failures. Additionally, teams can use Amazon QuickSight for further analysis and dashboards for reporting.
To get started, we first collect a historical dataset from your factory sensor readings, ingest the data, and train the model. With the trained model, we then set up IoT Device Simulator to publish MQTT signals to a topic that will allow testing of the system to identify desired production settings before production data is used, keeping costs low.
The following diagram illustrates our solution architecture.

The workflow contains the following steps:
You need access to an AWS account to set up the environment for anomaly detection.
To set up your data and ingestion configuration, complete the following steps:
l4e-training-data), using the default configuration options.
/training-data and the label data to a folder called /labels.Next, you create the ML model to be trained with the data from the S3 bucket. To do this, you first need to create a project.

Ingestion takes a few minutes to complete.
Low.


With a sample rate of 5 minutes, the model should take 20–30 minutes to build.
While the model is building, we can set up the rest of the architecture.
IoTDeviceSimulator to see the stack details.ConsoleURL key and the corresponding URL value.

My_testing_device.factory/line/station/simulated_testing.
signal5, as shown in the following screenshot.
.
signal5
signal6
signal7
signal8
signal48
signal49
signal78
signal109
signal120
signal121
Low
95
347
27
139
458
495
675
632
742
675
Hi
150
460
217
252
522
613
812
693
799
680
Now that signals are being generated, we can set up IoT Core to read the MQTT topics and direct the payloads to the Timestream database.
SELECT signal5, signal6, signal7, signal8, signal48, signal49, signal78, signal109, signal120, signal121 FROM 'factory/line/station/simulated_testing'

A new tab opens with the Timestream console.
sampleDB and choose Create database.
You’re redirected to the Timestream console, where you can view the database you created.
sampleDB for Database name.sampleDB for Database name, enter signalTable for Table name, and choose Create table.Simulated_signal for Dimensions name and 1 for Dimensions value, then choose Create new role.
TimestreamRole and choose Next.You have now added a rule action in IoT Core that directs the data published to the MQTT topic to a Timestream database.
To query Timestream for analysis, complete the following steps:

Now that data is being stored in the stream, you can use Lambda and EventBridge to pull data every 5 minutes from the table, format it, and send it to Lookout for Equipment for inference and prediction results.

/input (create a bucket folder for these data stream files if not already present).This code uses the awswrangler library to easily format the data in the required CSV form needed for Lookout for Equipment. The Lambda function also dynamically names the data files as required.
import json
import boto3
import awswrangler as wr
from datetime import datetime
import pytz
def lambda_handler(event, context):
# TODO implement
UTC = pytz.utc
my_date = datetime.now(UTC).strftime('%Y-%m-%d-%H-%M-%S')
print(my_date)
df = wr.timestream.query('SELECT time as Timestamp, max(case when measure_name = 'signal5' then measure_value::double/1000 end) as "signal-005", max(case when measure_name = 'signal6' then measure_value::double/1000 end) as "signal-006", max(case when measure_name = 'signal7' then measure_value::double/1000 end) as "signal-007", max(case when measure_name = 'signal8' then measure_value::double/1000 end) as "signal-008", max(case when measure_name = 'signal48' then measure_value::double/1000 end) as "signal-048", max(case when measure_name = 'signal49' then measure_value::double/1000 end) as "signal-049", max(case when measure_name = 'signal78' then measure_value::double/1000 end) as "signal-078", max(case when measure_name = 'signal109' then measure_value::double/1000 end) as "signal-109", max(case when measure_name = 'signal120' then measure_value::double/1000 end) as "signal-120", max(case when measure_name = 'signal121' then measure_value::double/1000 end) as "signal-121"
FROM "<YOUR DB NAME>"."<YOUR TABLE NAME>" WHERE time > ago(5m) group by time order by time desc')
print(df)
s3path ="s3://<EDIT-PATH-HERE>/input/<YOUR FILE NAME>_%s.csv" % my_date
wr.s3.to_csv(df, s3path, index=False)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
Choose Deploy.
On the Configuration tab, choose General configuration.
For Timeout, choose 5 minutes.
In the Function overview section, choose Add trigger with EventBridge as the source.
Select Create a new rule.
Name the rule eventbridge-cron-job-lambda-read-timestream and add rate(5 minutes) for Schedule expression.
Choose Add.

Add the following policy to your Lambda execution role:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::<YOUR BUCKET HERE>/*"
},
{
"Effect": "Allow",
"Action": [
"timestream:DescribeEndpoints",
"timestream:ListTables",
"timestream:Select"
],
"Resource": "*"
}
]
}
To set up anomaly prediction and notification, complete the following steps:
/input location where files are written using the Lambda function and EventBridge trigger./output as the folder and leave other default values.After 5 minutes, check the S3 /output path to verify prediction files are created. For more information about the results, refer to Reviewing inference results.
Finally, you create a second Lambda function that triggers a notification using Amazon SNS when an anomaly is predicted.
On the Amazon SNS console, choose Create topic.
For Name, enter emailnoti.
Choose Create.
In the Details section, for Type, select Standard.
Choose Create topic.

On the Subscriptions tab, create a subscription with Email type as Protocol and an endpoint email address you can access.
Choose Create subscription and confirm the subscription when the email arrives.
On the Topic tab, copy the ARN.

Create another Lambda function with the following code and enter the ARN topic in MY_SYS_ARN:
import boto3
import sys
import logging
import os
import datetime
import csv
import json
MY_SNS_TOPIC_ARN = 'MY_SNS_ARN'
client = boto3.client('s3')
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
sns_client = boto3.client('sns')
lambda_tmp_dir = '/tmp'
def lambda_handler(event, context):
for r in event['Records']:
s3 = r['s3']
bucket = s3['bucket']['name']
key = s3['object']['key']
source = download_json(bucket, key)
with open(source, 'r') as content_file:
content = json.load(content_file)
if content['prediction'] == 1 :
Messages = 'Time: ' + str(content['timestamp']) + 'n' + 'Equipment is predicted failure.' + 'n' + 'Diagnostics: '
# Send message to SNS
for diag in content['diagnostics']:
Messages = Messages + str(diag) + 'n'
sns_client.publish(
TopicArn = MY_SNS_TOPIC_ARN,
Subject = 'Equipment failure prediction',
Message = Messages
)
def download_json(bucket, key):
local_source_json = lambda_tmp_dir + "/" + key.split('/')[-1]
directory = os.path.dirname(local_source_json)
if not os.path.exists(directory):
os.makedirs(directory)
client.download_file(bucket, key.replace("%3A", ":"), local_source_json)
return local_source_json
Choose Deploy to deploy the function.
When Lookout for Equipment detects an anomaly, the prediction value is 1 in the results. The Lambda code uses the JSONL file and sends an email notification to the address configured.
AmazonS3FullAccess and AmazonSNSFullAccess to the role./output bucket.
After a few minutes, you will start to see emails arrive every 5 minutes.
After Amazon S3 stores the prediction results, we can use the AWS Glue Data Catalog with Athena and QuickSight to create reporting dashboards.
inference_crawler.results.jsonl files.anycompanyinferenceresult).
select * from the /output S3 folder.
To visualize the prediction results, navigate to the QuickSight console.
Choose New analysis and New dataset.
For Dataset source, choose Athena.
For Data source name, enter MyDataset.
Choose Create data source.
Choose the table you created, then choose Use custom SQL.

Enter the following query:
with dataset AS
(SELECT timestamp,prediction, names
FROM "anycompanyinferenceresult"."output"
CROSS JOIN UNNEST(diagnostics) AS t(names))
SELECT SPLIT_PART(timestamp,'.',1) AS timestamp, prediction,
SPLIT_PART(names.name,'',1) AS subsystem,
SPLIT_PART(names.name,'',2) AS sensor,
names.value AS ScoreValue
FROM dataset
Confirm the query and choose Visualize.
Choose Pivot table.
Specify timestamp and sensor for Rows.
Specify prediction and ScoreValue for Values.

Choose Add Visual to add a visual object.
Choose Vertical bar chart.
Specify Timestamp for X axis, ScoreValue for Value, and Sensor for Group/Color.
Change ScoreValue to Aggregate:Average.

Failure to delete resources can result in additional charges. To clean up your resources, complete the following steps:
In this post, you learned how to implement machine learning for predictive maintenance using real-time streaming data with a low-code approach. You learned different tools that can help you in this process, using managed AWS services like Timestream, Lookout for Equipment, and Lambda, so operational teams see the value without adding additional workloads for overhead. Because the architecture uses serverless technology, it can scale up and down to meet your needs.
For more data-based learning resources, visit the AWS Blog home page.
Matt Reed is a Senior Solutions Architect in Automotive and Manufacturing at AWS. He is passionate about helping customers solve problems with cool technology to make everyone’s life better. Matt loves to mountain bike, ski, and hang out with friends, family, and dogs and cats.Source: Original Article
Last updated: March 23, 2026





