End-to-End AWS Real-time Fraud Detection
Fraud detection system using machine learning models deployed on AWS for real-time transaction monitoring.
End-to-End Real-time Fraud Detection on AWS
Detecting potential fraud in financial systems is a major challenge for organizations worldwide. Building robust solutions that enable real-time actions is essential for companies aiming to provide greater security to their customers during financial transactions.
This project demonstrates a complete machine learning pipeline for credit card fraud detection using the Kaggle Credit Card Fraud Detection dataset, which contains 284,807 European cardholder transactions from 2013 (including 492 fraudulent cases) with 28 PCA-transformed features plus original Amount and Time variables.
The project showcases a production-ready streaming architecture that integrates Amazon SageMaker for training both supervised and unsupervised ML models and deploying them as managed endpoints.
Architecture overview:

Pre-requisites
- An AWS account with appropriate permissions to create and manage resources such as S3, SageMaker, Kinesis, Lambda, and RDS.
- AWS CLI installed and configured on your local machine.
- Terraform installed for infrastructure provisioning.
- Python 3.x installed for running the training notebooks and API application.
- AWS Chalice installed for creating and deploying the API layer.
- Basic understanding of machine learning concepts, AWS services, and infrastructure as code (IaC) practices.
Key Components
- Data Ingestion: Simulated real-time transaction data is ingested into Amazon Kinesis Data Streams.
- Data Processing: AWS Glue or Spark processes the streaming data, performing feature engineering and transformations.
- Model Training: Amazon SageMaker is used to train both supervised (e.g., XGBoost) and unsupervised (e.g., Random Cut Forest for anomaly detection) models on the historical dataset.
- Model Deployment: Trained models are deployed as SageMaker endpoints for real-time inference.
- API Layer: A REST API is created using AWS Chalice, which serves as the interface for making predictions and receiving fraud alerts.
- Monitoring: CloudWatch is used to monitor the performance of the deployed models and the overall system. The project also includes scripts for simulating transaction data and testing the end-to-end pipeline.
Configure AWS credentials
Before running the project, ensure that you have your AWS credentials configured properly. You can set up your credentials using the AWS CLI or by configuring environment variables. For example, you can run the following command to configure your AWS credentials:
$ aws configureThis command will prompt you to enter your AWS Access Key ID Key, AWS Secret Access Key, default region, and output format. Make sure to use the same region that you specified in the Terraform configuration for the AWS resources. More details on configuring AWS credentials can be found in the AWS documentation.
Infrastructure Provisioning
Most of part of the infrastructure including AWS Kinesis stream, SageMaker, Glue, RDS instance are provisioned using Terraform which is an infrastructure as code (IaC) tool that allows you to define and manage your cloud resources in a declarative configuration language. This approach ensures that the infrastructure is consistent, repeatable, and version-controlled.
Services Overview
First of all, let us start with the Terraform configuration for provisioning the necessary AWS resources. Let's explain some of the key resources that are defined in the Terraform configuration files.
S3 Buckets
We define two S3 buckets in our Terraform configuration: one for storing the raw data and another for storing the streaming data and intermediate results. The S3 buckets are configured with lifecycle policies to manage the data retention and ensure that old data is automatically deleted after a certain period. Below is an example of how to define the S3 buckets in Terraform which you can find in the s3.tf file in the project here
# s3.tf
resource "aws_s3_bucket" "fraud_data_bucket" {
bucket = "fraud-detection-data-bucket-${var.aws_region}"
force_destroy = true
tags = {
Environment = "dev"
Project = "fraud-detection"
}
}
resource "aws_s3_bucket_lifecycle_configuration" "data_bucket_lifecycle" {
bucket = aws_s3_bucket.fraud_data_bucket.id
rule {
id = "expire-old-data"
status = "Enabled"
filter {
prefix = ""
}
expiration {
days = 90
}
}
}
resource "aws_s3_bucket" "fraud_streaming_bucket" {
bucket = "fraud-detection-stream-bucket-${var.aws_region}"
force_destroy = true
tags = {
Environment = "dev"
Project = "fraud-detection"
}
}Kinesis Streams
For real-time data ingestion, we use Amazon Kinesis Data Streams. The Kinesis stream is configured with appropriate shard count and retention period to handle the expected data volume and ensure that the data is retained for a sufficient amount of time for processing. Below is an excerpt of how to define the Kinesis stream in Terraform which you can find in the kinesis.tf file in the project here.
# Kinesis Stream
resource "aws_kinesis_stream" "fraud_predictions_stream" {
name = var.kinesis_stream_name
shard_count = var.kinesis_shard_count
retention_period = 24 # in hours
tags = {
Environment = "dev"
Project = "fraud-detection"
}
}RDS PostgreSQL
For storing the transaction data and model predictions, we use an RDS PostgreSQL instance. The RDS instance is configured with appropriate security groups, parameter groups, and backup settings to ensure data durability and security. Below is an excerpt of how to define the RDS PostgreSQL instance in Terraform which you can find in the rds.tf file in the project here
# PostgreSQL RDS Instance
resource "aws_db_instance" "fraudit_postgres" {
identifier = "fraudit-postgres-db"
engine = "postgres"
engine_version = "17.5" # Version supported by AWS RDS
instance_class = "db.t3.micro"
allocated_storage = 20
max_allocated_storage = 100
storage_type = "gp2"
db_name = var.postgres_db
username = var.postgres_user
password = var.postgres_password
port = var.postgres_port
# Network configuration
db_subnet_group_name = aws_db_subnet_group.fraudit_subnet_group.name
vpc_security_group_ids = [aws_security_group.rds_sg.id]
publicly_accessible = true # Required for access from Glue without VPC
# Backup configuration
backup_retention_period = 7
backup_window = "03:00-04:00"
maintenance_window = "sun:04:00-sun:05:00"
# Development configuration
skip_final_snapshot = true
deletion_protection = false
apply_immediately = true
# Monitoring disabled for simplicity
performance_insights_enabled = false
monitoring_interval = 0
tags = {
Name = "fraudit-postgres"
Environment = "dev"
Project = "fraud-detection"
}
}The database credentials such as postgres_user, postgres_password, and postgres_db are defined as variables in vars/dev.tf file and should be provided during the Terraform deployment.
Tip
Make sure to use strong credentials for the database and consider using AWS Secrets Manager for managing sensitive information in a production environment which is not covered in this project for simplicity.
Glue
The terraform configuration for the Glue job is defined in the glue.tf file. below is an excerpt of the Glue job definition:
# glue.tf
resource "aws_glue_job" "fraudit_streaming_job" {
name = "fraudit-streaming-job"
role_arn = aws_iam_role.glue-role.arn
# Job entrypoint
command {
script_location = "s3://${aws_s3_bucket.fraud_streaming_bucket.bucket}/spark-jobs/glue_job.py"
python_version = "3"
}
glue_version = "4.0"
max_capacity = 2
default_arguments = {
"--job-language" = "python"
"--additional-python-modules" = "s3://${aws_s3_bucket.fraud_streaming_bucket.bucket}/wheel/fraudit-0.0.1-py3-none-any.whl"
"--python-modules-installer-option" = "--upgrade"
"--extra-jars" = "s3://${aws_s3_bucket.fraud_streaming_bucket.bucket}/jars/spark-streaming-sql-kinesis-connector_2.12-1.0.0.jar"
"--postgres_host" = aws_db_instance.fraudit_postgres.address
"--postgres_port" = aws_db_instance.fraudit_postgres.port
"--postgres_user" = var.postgres_user
"--postgres_password" = var.postgres_password
"--postgres_db" = var.postgres_db
"--kinesis_stream" = aws_kinesis_stream.fraud_predictions_stream.name
"--kinesis_endpoint" = "https://kinesis.${var.aws_region}.amazonaws.com"
"--aws_region" = var.aws_region
"--s3_checkpoint_bucket" = "s3://${aws_s3_bucket.fraud_streaming_bucket.bucket}/checkpoints/"
# Monitoring
"--enable-continuous-cloudwatch-log" = "true"
"--enable-job-insights" = "true"
"--enable-metrics" = "true"
"--enable-spark-ui" = "true"
"--enable-auto-scaling" = "true"
}
tags = {
Project = "fraud-detection"
Environment = "dev"
}
}Let's break down some of the key configurations for the Glue job:
script_location: This specifies the location of the Glue job script in S3. The script contains the logic for reading from the Kinesis stream, processing the data, and writing the results to RDS.additional-python-modules: This argument allows you to specify additional Python modules that are required for the Glue job. In this case, we are referencing a wheel package that contains the necessary dependencies for the Glue job, which is also stored in S3.extra-jars: This argument is used to specify additional JAR files that are required for the Glue job. Here, we are referencing the Spark Kinesis connector JAR file, which is necessary for reading from the Kinesis stream in the Glue job.postgres_host,postgres_port,postgres_user,postgres_password,postgres_db: These arguments are used to pass the PostgreSQL database connection details to the Glue job, allowing it to write the processed data into the RDS instance.kinesis_streamandkinesis_endpoint: These arguments specify the name of the Kinesis stream and the endpoint URL for connecting to the Kinesis service.s3_checkpoint_bucket: This argument specifies the S3 bucket location for storing the checkpoints of the Spark structured streaming job, which is essential for fault tolerance and ensuring that the job can resume from the last checkpoint in case of failures.
SageMaker AI
SageMaker is AWS service that provides a fully managed platform for building, training, and deploying machine learning models. In this project, we use SageMaker to train both supervised and unsupervised models for fraud detection. The Terraform configuration for provisioning the SageMaker resources is defined in the sagemaker.tf file. Below is an excerpt of how to define a SageMaker notebook instance in Terraform which you can find in the sagemaker.tf file in the project here.
resource "aws_sagemaker_notebook_instance" "fraudit_notebook" {
name = var.sagemaker_notebook_name
instance_type = var.sagemaker_notebook_instance_type
role_arn = aws_iam_role.sagemaker_execution_role.arn
volume_size = var.sagemaker_root_volume_size
# Optional VPC configuration - only if subnet is provided
subnet_id = var.sagemaker_subnet_id != "" ? var.sagemaker_subnet_id : null
# Security groups - only if subnet is provided (VPC attachment)
security_groups = length(var.sagemaker_security_group_ids) > 0 ? var.sagemaker_security_group_ids : null
# Optional lifecycle configuration - only if content is provided
lifecycle_config_name = var.sagemaker_lifecycle_startup_content != "" ? aws_sagemaker_notebook_instance_lifecycle_configuration.fraudit_nb_lifecycle[0].name : null
tags = {
Project = "fraud-detection"
Environment = "dev"
}
}Deploying the infrastructure
Alright, now that we have an overview of the key AWS resources that are provisioned using Terraform, let's proceed to deploy the infrastructure. First, make sure to update the devops/infra/main/dev.tfvars file with the appropriate values for your AWS environment, such as the database credentials and any optional overrides for the SageMaker notebook instance.
# dev.tfvars
aws_region = "eu-west-1"
kinesis_stream_name = "fraud-predictions-stream"
kinesis_shard_count = 4
postgres_user = "postgres_user"
postgres_password = "postgres_password" # CHANGE ME before apply
postgres_db = "fraudit_postgres_db"
postgres_port = 5432
sagemaker_notebook_name = "fraudit-notebook"
sagemaker_notebook_instance_type = "ml.t3.medium"
sagemaker_root_volume_size = 5
sagemaker_subnet_id = "" # keep empty for no VPC attachment in dev
sagemaker_security_group_ids = [] # list of SG IDs if attaching to a VPC
sagemaker_lifecycle_startup_content = "enabled"Warning
Please do not commit sensitive information such as database credentials to version control. Make sure to use strong credentials and consider using environment variables or AWS Secrets Manager for managing sensitive information in a production environment.
Once you have updated the variables, you can run the following commands in the terminal to deploy the infrastructure:
$ cd devops/infra/main
$ terraform init
$ terraform plan -var-file=vars/dev.tfvars
$ terraform apply -var-file=vars/dev.tfvarsNote the outputs from the Terraform deployment, which include important information about the resources that were created. These outputs are essential for configuring the subsequent steps in the project. The outputs include:
kinesis_stream_name: the name of the Kinesis stream that was created for ingesting transaction data.lambda_exec_role_arn: the ARN of the IAM role that is assigned to the AWS Lambda function for invoking SageMaker endpoints.rds_postgres_endpoint: the endpoint URL of the RDS PostgreSQL instance that is beused for storing transaction data and model predictions.rds_postgres_port: the port number for connecting to the RDS PostgreSQL instance.
You can use the output values to configure your .env file that will be used by the training notebooks and the API application to connect to the appropriate AWS resources. For example, you can create a .env from .env.example file located in the root of the project and update it with the output values from Terraform.
You have two options:
- You can copy the output values manually from the terminal and paste them into the
.envfile. - Alternatively, you can run the
Makefilecommand in the terminal to automatically populate the.envfile with the output values from Terraform:
$ Make env.syncImplementation and Deployment
Now that the infrastructure is deployed and the necessary configurations are in place, we can proceed to implement the machine learning models, the API layer, and the Glue job for data processing. The project includes detailed notebooks and scripts for each of these components, which are organized in the respective directories in the project repository. Below is an overview of each component and how to implement and deploy them.
Fraud dectection ML Models
The machine learning models for fraud detection are implemented in Jupyter notebooks that are designed to be run in the SageMaker notebook instance that we provisioned using Terraform.
The training notebooks are located in the sagemaker directory of the project, and they utilize the SageMaker Python SDK to manage the training and deployment processes. For example, the notebook for training and deploying the XGBoost model can be found here. This notebook loads the training data from the S3 bucket, trains the XGBoost model using SageMaker, and then deploys the trained model as a SageMaker endpoint for real-time inference. Similarly, the notebook for training and deploying the Random Cut Forest model can be found here. This notebook follows a similar process, but it trains an unsupervised model for anomaly detection. Both notebooks include detailed comments and instructions for running the training and deployment processes. Make sure to update the S3 bucket names and other parameters in the notebook based on the outputs from the Terraform deployment.
API Layer
Once the models are deployed as SageMaker endpoints, the next step is to create an API layer that allows you to handle incoming requests and make predictions using the deployed models. For this purpose, we use AWS Chalice, which is a microframework for writing serverless applications in Python. Chalice makes it easy to create and deploy REST APIs that can interact with AWS services.
To create a new Chalice project, you can run the following commands in the terminal:
$ mkdir chalice
$ chalice new-project ml-inference-apiThis command creates a new Chalice project with the necessary directory structure and files. The app.py file contains the code for the API endpoints that will be used to make predictions using the deployed SageMaker models. The API includes endpoints for both the supervised and unsupervised models, allowing you to send transaction data and receive predictions in real-time. The requirements.txt file lists the dependencies required for the Chalice application, including the SageMaker Python SDK and any other libraries needed for making predictions and handling requests. The directory structure for the Chalice application is as follows:
├── app.py
├── requirements.txt
├── .chalice/
│ ├── config.jsonThe next step consists to configure config.json file located in the .chalice directory to specify the AWS region and other settings for the Chalice application. You can update the config.json file with the following content:
{
"version": "2.0",
"app_name": "ml-inference-api",
"stages": {
"dev": {
"api_gateway_stage": "api",
"manage_iam_role": false,
"iam_role_arn": "arn:aws:iam::xxxxxxxxxxxx:role/ml-inference-api-lambda-exec-role",
"environment_variables": {
"solution_prefix": "fraud-detection",
"stream_name": "fraud-predictions-stream",
"aws_region": "eu-west-1"
}
}
}
}Note that iam_role_arn should be updated with the ARN of the IAM role that was created for the Lambda function in the Terraform deployment. This role allows the Lambda function to invoke the SageMaker endpoints and access other AWS resources as needed.
After configuring the config.json file, we can proceed to implement the API endpoints in the app.py file that will look like this:
@app.route('/predict', methods=['POST'])
def predict():
event = app.current_request.json_body
metadata = event.get('metadata')
data_payload = event.get('data')
if not metadata or not data_payload:
raise BadRequestError("Requête invalide : champ 'metadata' ou 'data' manquant.")
anomaly = get_anomaly_prediction(data_payload)
fraud = get_fraud_prediction(data_payload)
output = {
"anomaly_detector": anomaly,
"fraud_classifier": fraud
}
store_data_prediction(output, metadata) # store the prediction results in Kinesis stream for further processing
return outputYou can find the complete implementation of the API endpoints in the app.py file in the project here. This API endpoint receives transaction data in JSON format, processes it to make predictions using the deployed SageMaker models, and then returns the predictions in the response. Additionally, the prediction results are stored in a Kinesis stream for further processing.
Once the API endpoints are implemented, you can deploy the Chalice application to AWS using the following command:
$ chalice deployThis command deploys the Chalice application to AWS, creating the necessary Lambda functions and API Gateway resources. After deployment, you will receive the URL for the API endpoint, which you can use to send requests and receive predictions in real-time.
Glue Job for Data Processing
In order to process the streaming data from the Kinesis stream and perform feature engineering and transformations, we define a Glue job that reads the data from the Kinesis stream, applies the necessary transformations, and then writes the processed data into RDS for further analysis.
The Glue job is implemented in the glue_job.py file located here using PySpark which is is the Python API for Apache Spark. It enables you to perform real-time, large-scale data processing in a distributed environment using Python. Here is an example of how the main function of the Glue job looks like:
def main():
"""Main execution function"""
logger.info("Starting Fraud Detection Streaming Job")
# Configure checkpoint location
checkpoint_location = f"s3a://{args['s3_checkpoint_bucket']}/checkpoints/{args['JOB_NAME']}/"
logger.info(f"Using checkpoint location: {checkpoint_location}")
# Read from Kinesis stream
kinesis_df = (
spark.readStream
.format("aws-kinesis")
.option("kinesis.streamName", args["kinesis_stream"])
.option("kinesis.region", args["aws_region"])
.option("kinesis.startingposition", "TRIM_HORIZON") # Start from the latest data
.option("kinesis.endpointUrl", f"https://kinesis.{args['aws_region']}.amazonaws.com")
.load()
)
logger.info("Successfully configured Kinesis stream reader")
# Start streaming query
query = (
kinesis_df
.writeStream
.foreachBatch(process_kinesis_batch)
.outputMode("append")
.option("checkpointLocation", checkpoint_location)
.trigger(processingTime='30 seconds')
.start()
)
logger.info("Streaming query started successfully")
# Wait for termination
query.awaitTermination()Once the Glue job is defined, we can deploy it as package into AWS Glue. To do this we need to create a wheel package that contains the necessary dependencies for the Glue job, and then upload this package to S3. We need to upload the Kinesis connector JAR file to S3 as well, which is required for reading from the Kinesis stream in the Glue job.
To do this we will use our custom cli located in devops/cli directory which provides a convenient interface for building the wheel package if not done yet, downloading the Kinesis connector JAR file if it does'nt exist, and uploading both to S3. You can run the following command in the terminal to perform these steps:
python -m devops.deploy.cli deployOnce the package and the JAR, and glue job script fileq are uploaded to S3, we can start the Glue job using the AWS Management Console The Glue job will read the streaming data from the Kinesis stream, process it according to the logic defined in the glue_job.py file, and then write the processed data into RDS for further analysis.
Generate Transaction Data and Test the Pipeline
To test the end-to-end pipeline, we can use the generate_data.py script located in the scripts/ directory to simulate real-time transaction data and send it to the (Chalice API) endpoint. This script generates transaction data by using part of original Kaggle dataset, enriches it with additional metadata, and then sends it to the API endpoint for processing.
here is how it look like:
# Load environment variables from .env file
load_dotenv()
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
DATASET_LOCAL_DIR = os.path.join(Path(ROOT_DIR).parent.absolute(), 'dataset')
PARALLEL_INVOCATION = False
if __name__ == "__main__":
# Load credit card fraud dataset
data = pd.read_csv(f"{DATASET_LOCAL_DIR}/creditcard.csv", delimiter=',')
# Get fraud and non-fraud counts
nonfrauds, frauds = data.groupby('Class').size()
print('Number of frauds: ', frauds)
print('Number of non-frauds: ', nonfrauds)
print('Percentage of fradulent data:', 100. * frauds / (frauds + nonfrauds))
# Split features and labels
feature_columns = data.columns[:-1]
label_column = data.columns[-1]
features = data[feature_columns].values.astype('float32')
labels = (data[label_column].values).astype('float32')
# Split data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
features, labels, test_size=0.2, random_state=42, stratify=labels
)
if PARALLEL_INVOCATION:
# Run multiple simulations in parallel using threads
threads = []
for i in range(10):
thread = Thread(target=generate_data, args=(np.copy(X_test), 100_000))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print("All simulations completed.")
else:
# Run one simulation sequentially
generate_data(np.copy(X_test), max_requests=100_000) # Adjust max_requests as neededNote
Make sure to install all the necessary dependencies for running the generate_data.py script, which are listed in the requirements.txt file in the directory.
You can install the dependencies using pip:
$ pip install -r requirements.txtYou can then run the following command in the terminal to execute the data generation script:
$ python scripts/generate_data.pyWant to Contribute?
This project is open source. Contributions, issues, and feature requests are welcome!