Here is some information about this architecture.
This solution demonstrates how to use the websocket feature of the API Gateway to route messages to an Amazon SQS FIFO queue. The SQS queue is used as a buffer to handle spikes in traffic. The messages can sit in the queue until a worker is available to process them.
In this solution, a FIFO (First-In-First-Out) queue is used to guarantee that events are processed in the order of their arrival, thus ensuring that the data is processed accurately. The queue is then processed by an AWS Lambda function to return a result.
Here are the steps you can follow to build this solution on your own.
Here are the steps needed to build this architecture.
If you're using the Skillmix Labs feature, open the lab settings (the beaker icon) on the right side of the code editor. Then, click the Start Lab button to start hte lab environment.
Wait for the credentials to load. Then run this in the terminal:
$ aws configure --profile smx-lab
AWS Access Key ID [None]: AKIA3E3W34P42CSHXDH5
AWS Secret Access Key [None]: vTmqpOqefgJfse8i6QwzgpjgswPjHZ6h/oiQq4zf
Default region name [None]: us-west-2
Default output format [None]: json
Be sure to name your credentials profile 'smx-lab'.
Note: If you're using your own AWS account you'll need to ensure that you've created and configured a named AWS CLI profile named smx-lab.
We'll be doing all of our work in one Terraform file. Create a new directory on your computer somewhere, and then create a file named main.tf in it.
To start the coding process, open the main.tf file and add the terraform
and provider
blocks.
Now we will create the Terraform configuration that will define the providers and versions that will be used in our project. This code will specify the providers and versions of the AWS, Random, and Archive Terraform modules that will be used. It will also set the required version of Terraform to be used. Finally, it will configure the AWS provider with the default profile and the region specified in the variable 'aws_region
'.
Append this code to the main.tf file:
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 4.53.0"
}
random = {
source = "hashicorp/random"
version = "~> 3.1.0"
}
archive = {
source = "hashicorp/archive"
version = "~> 2.2.0"
}
}
required_version = ">= 0.14.9"
}
provider "aws" {
profile = "smx-lab"
region = var.aws_region
}
We are going to use several variables throughout the project. Append this code to the main.tf file:
variable "aws_region" {
description = "AWS region for all resources."
type = string
default = "us-west-2"
}
variable "s3_bucket_prefix" {
description = "the name of the prefix for s3 bucket"
type = string
default = "apigw-websocket"
}
variable "lambda_name" {
description = "name of the lambda function"
type = string
default = "apigw-websocket-response"
}
variable "sqs_name" {
description = "the name of the sqs queue"
type = string
default = "tf-APIGWWebsocketQueue.fifo"
}
variable "apigwy_name" {
description = "name of the apigateway"
type = string
default = "APIGWWebsocketSQSLambda"
}
variable "lambda_log_retention" {
description = "lambda log retention in days"
type = number
default = 7
}
Next, we will create a local variable to store the account ID of the current user. To do this, we will use the data resource "aws_caller_identity
" to access the account ID of the current user. We will then store this account ID in a local variable called "account_id". This local variable can then be used in other Terraform code to reference the current user's account ID.
Append this code to the main.tf file:
data "aws_caller_identity" "current" {}
locals {
account_id = data.aws_caller_identity.current.account_id
}
Next, we will create an SQS queue using Terraform. This code will create a FIFO (First-In-First-Out) queue with the name specified in the variable 'sqs_name'. Additionally, the code will enable server-side encryption (SSE) for the queue, ensuring that the data stored in the queue is secure.
Append this code to the main.tf file:
resource "aws_sqs_queue" "fifo_queue" {
fifo_queue = true
name = var.sqs_name
sqs_managed_sse_enabled = true
}
Next, we will create a random string resource using Terraform. This code will generate a random string of length 4, with no special characters. This is useful for generating passwords, API keys, or other random strings that you may need in your Terraform configuration.
Append this code to the main.tf file:
resource "random_string" "random" {
length = 4
special = false
}
Next, we will create an API Gateway v2 API that will allow us to send websocket data to an SQS queue, which will then be processed by a Lambda function. This code will create an API Gateway v2 API with a unique name, a description, a protocol type of WEBSOCKET, and a route selection expression that will determine which route the data should take.
Append this code to the main.tf file:
resource "aws_apigatewayv2_api" "my_websocket_api" {
name = "${var.apigwy_name}-${random_string.random.id}"
description = "Send websocket data to SQS which is then processed by a Lambda"
protocol_type = "WEBSOCKET"
route_selection_expression = "$request.body.action"
}
Next, we will create an AWS API Gateway v2 integration resource using Terraform. This resource will allow us to connect our API to an AWS SQS queue, and will enable us to send messages to the queue using the API. The code will set up the integration, including setting the connection type, credentials, integration method, integration type, integration URI, passthrough behavior, request parameters, request templates, template selection expression, and timeout milliseconds. It will also set up a dependency on an IAM role that will be used for authentication.
Append this code to the main.tf file:
resource "aws_apigatewayv2_integration" "demo_integration" {
api_id = aws_apigatewayv2_api.my_websocket_api.id
connection_type = "INTERNET"
credentials_arn = aws_iam_role.apigwy_websocket_sqs_role.arn
integration_method = "POST"
integration_type = "AWS"
integration_uri = "arn:aws:apigateway:${var.aws_region}:sqs:path/${local.account_id}/${var.sqs_name}"
passthrough_behavior = "NEVER"
request_parameters = {
"integration.request.header.Content-Type" = "'application/x-www-form-urlencoded'"
}
request_templates = {
"$default" = "Action=SendMessage&MessageGroupId=$input.path('$.MessageGroupId')&MessageDeduplicationId=$context.requestId&MessageAttribute.1.Name=connectionId&MessageAttribute.1.Value.StringValue=$context.connectionId&MessageAttribute.1.Value.DataType=String&MessageAttribute.2.Name=requestId&MessageAttribute.2.Value.StringValue=$context.requestId&MessageAttribute.2.Value.DataType=String&MessageBody=$input.json('$')"
}
template_selection_expression = "\\$default"
timeout_milliseconds = 29000
depends_on = [
aws_iam_role.apigwy_websocket_sqs_role,
]
}
Next, we will create an AWS API Gateway v2 stage called "production" using the Terraform resource "aws_apigatewayv2_stage". This stage will be associated with the API we created earlier, and will be set to automatically deploy any changes that are made.
Append this code to the main.tf file:
resource "aws_apigatewayv2_stage" "production" {
api_id = aws_apigatewayv2_api.my_websocket_api.id
name = "production"
auto_deploy = true
}
Next, we will create a route for our API using the aws_apigatewayv2_route resource. This route will be associated with the API we created earlier, and will use the "$default" route key. The target of this route will be an integration that we will create in the next step. This route will allow us to access the integration from the API.
Append this code to the main.tf file:
resource "aws_apigatewayv2_route" "default" {
api_id = aws_apigatewayv2_api.my_websocket_api.id
route_key = "$default"
target = "integrations/${aws_apigatewayv2_integration.demo_integration.id}"
}
Next, we will create an AWS Lambda Function. This code will create a Lambda Function with the name specified in the variable "lambda_name" combined with our random string. It will also set the description to "lambda sqs websocket pattern", set the S3 bucket and key, set the source code hash, set the runtime to Python 3.8, set the handler to "app.lambda_handler
", set the role to the Lambda Execution role, set the timeout to 15 seconds, set the environment variables, set the timeouts, and set the tracing config to "PassThrough". Finally, it will set the depends_on to the Lambda Logs and the Production stage of the API Gateway.
Append this code to the main.tf file:
resource "aws_lambda_function" "lambda_sqs_websocket_response" {
function_name = "${var.lambda_name}-${random_string.random.id}"
description = "serverlessland pattern"
s3_bucket = aws_s3_bucket.lambda_bucket.id
s3_key = aws_s3_object.lambda.key
source_code_hash = data.archive_file.lambda_source.output_base64sha256
runtime = "python3.8"
handler = "app.lambda_handler"
role = aws_iam_role.lambda_execution.arn
timeout = 15
environment {
variables = {
"ApiGatewayEndpoint" = "https://${aws_apigatewayv2_api.my_websocket_api.id}.execute-api.${var.aws_region}.amazonaws.com/${aws_apigatewayv2_stage.production.name}"
}
}
timeouts {}
tracing_config {
mode = "PassThrough"
}
depends_on = [aws_cloudwatch_log_group.lambda_logs, aws_apigatewayv2_stage.production]
}
Next, we will create a Lambda function to handle incoming records from the API Gateway endpoint. This code will import the necessary libraries, set up logging, and then use the boto3 library to create an API client. It will then loop through the Lambda event records and post a response to each connection ID. Finally, it will return a status code and a message depending on whether the records were successfully processed.
In your working directory, create a folder named src
. In that folder, create a file named app.py
. Add the following code to app.py
:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import boto3
import os
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
endpoint = os.environ.get('ApiGatewayEndpoint')
logging.info(f"Loaded endpoint uri from environemt variable: {endpoint}")
api_client = boto3.client('apigatewaymanagementapi',endpoint_url=endpoint)
def lambda_handler(event, context):
if event['Records']:
json_dump = json.dumps(event['Records'])
logging.info(f"json: {json_dump}")
message = "Received record data!"
for record in event['Records']:
connection_id = record["messageAttributes"]["connectionId"]["stringValue"]
response = {
"connectionId":connection_id,
"requestId":record["messageAttributes"]["requestId"]["stringValue"],
"message":record["body"]
}
logging.info(f"response: {response}")
data = json.dumps(response)
api_client.post_to_connection(ConnectionId=connection_id, Data=data.encode('utf-8'))
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json"
},
"body": json.dumps({"message": message})
}
else:
message = "Something went wrong"
return {
"statusCode": 500,
"headers": {
"Content-Type": "application/json"
},
"body": json.dumps({"message": message})
}
Next, we will create a CloudWatch Log Group for our Lambda function. The log group will have a retention period of the number of days specified in the variable "var.lambda_log_retention
".
Append this code to the main.tf file:
resource "aws_cloudwatch_log_group" "lambda_logs" {
name = "/aws/lambda/${var.lambda_name}-${random_string.random.id}"
retention_in_days = var.lambda_log_retention
}
Next, we will create an event source mapping between an Amazon SQS queue and an AWS Lambda function. This event source mapping will allow the Lambda function to be triggered whenever a message is sent to the SQS queue. The Terraform code above will create this event source mapping by referencing the ARN of the SQS queue and the ARN of the Lambda function.
Append this code to the main.tf file:
resource "aws_lambda_event_source_mapping" "apigwy_sqs" {
event_source_arn = aws_sqs_queue.fifo_queue.arn
function_name = aws_lambda_function.lambda_sqs_websocket_response.arn
}
Next, we will create an S3 bucket and set its access control list (ACL) to private. The first line of code creates an S3 bucket with the prefix specified in the variable 's3_bucket_prefix
'. The second line of code sets the ACL of the bucket to 'private', ensuring that only authorized users can access the bucket.
Append this code to the main.tf file:
resource "aws_s3_bucket" "lambda_bucket" {
bucket_prefix = var.s3_bucket_prefix
force_destroy = true
}
resource "aws_s3_bucket_acl" "private_bucket" {
bucket = aws_s3_bucket.lambda_bucket.id
acl = "private"
}
Next, we will create an archive file resource in Terraform. This resource will take the source directory specified in the source_dir
argument and compress it into a zip file located at the output_path
. This zip file can then be used to deploy a Lambda function to our S3 bucket.
Append this code to the main.tf file:
data "archive_file" "lambda_source" {
type = "zip"
source_dir = "${path.module}/src"
output_path = "${path.module}/src.zip"
}
Next, we will create an S3 object for our Lambda zip file. This code will create an object in our project S3 bucket, with the key specified by the "key" parameter, which is our Lambda zip file that we just created.
The source of the object will be the output path of the archive file specified by the "source" parameter, and the etag will be the MD5 hash of the file specified by the "etag" parameter.
Append this code to the main.tf file:
resource "aws_s3_object" "lambda" {
bucket = aws_s3_bucket.lambda_bucket.id
key = "source.zip"
source = data.archive_file.lambda_source.output_path
etag = filemd5(data.archive_file.lambda_source.output_path)
}
Next, we will create an IAM role for our API Gateway websocket to access an SQS queue. This code creates an IAM role with the name "ApiGatewayWebsocketSQSRole" and sets the assume role policy to allow the API Gateway service to assume the role.
Append this code to the main.tf file:
resource "aws_iam_role" "apigwy_websocket_sqs_role" {
# uncomment the 'permissions_boundary' argument if running this lab on skillmix.io
# permissions_boundary = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:policy/LabUserNewResourceBoundaryPolicy"
name = "ApiGatewayWebsocketSQSRole"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Sid = ""
Principal = {
Service = "apigateway.amazonaws.com"
}
}
]
})
}
Next, we will create an IAM policy and attach it to an IAM role that we just created for SQS. This policy will allow the role to send messages to an SQS queue. The policy will be defined in the Terraform code, and the role will be specified by the variable aws_iam_role.apigwy_websocket_sqs_role.name
.
Append this code to the main.tf file:
resource "aws_iam_policy" "apigwy_sqs_send_message" {
name = "APIGatewaySQSSendMessagePolicy"
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:${var.aws_region}:${local.account_id}:${var.sqs_name}",
"Effect": "Allow"
}
]
}
POLICY
}
resource "aws_iam_role_policy_attachment" "apigwy_policy" {
role = aws_iam_role.apigwy_websocket_sqs_role.name
policy_arn = aws_iam_policy.apigwy_sqs_send_message.arn
}
Next, we will create an IAM role for our Lambda function using the aws_iam_role
resource. This role will allow the Lambda function to assume the role and access other AWS services. The name of the role will be "WebsocketApi-${var.lambda_name}", and the assume_role_policy will be set to a JSON encoded policy that allows the Lambda service to assume the role.
Append this code to the main.tf file:
resource "aws_iam_role" "lambda_execution" {
# uncomment the 'permissions_boundary' argument if running this lab on skillmix.io
# permissions_boundary = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:policy/LabUserNewResourceBoundaryPolicy"
name = "WebsocketApi-${var.lambda_name}"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [{
Action = "sts:AssumeRole"
Effect = "Allow"
Sid = ""
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
Next, we will create an IAM policy and attach it to an IAM role. This policy will allow the role to manage connections, receive messages from an SQS queue, change message visibility, get queue URLs, delete messages, and get queue attributes. It will also allow the role to create log groups, create log streams, and put log events.
Append this code to the main.tf file:
resource "aws_iam_policy" "lambda_exec_role" {
name = "SQSWebsocketResponseServiceRole"
policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "execute-api:ManageConnections",
"Resource": "${aws_apigatewayv2_stage.production.execution_arn}/POST/*",
"Effect": "Allow"
},
{
"Action": [
"sqs:ReceiveMessage",
"sqs:ChangeMessageVisibility",
"sqs:GetQueueUrl",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:${var.aws_region}:${local.account_id}:${var.sqs_name}",
"Effect": "Allow"
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "*"
}
]
}
POLICY
}
resource "aws_iam_role_policy_attachment" "lambda_policy" {
role = aws_iam_role.lambda_execution.name
policy_arn = aws_iam_policy.lambda_exec_role.arn
}
Lastly, let's output several values from the project. Append this code to the main.tf file:
output "account_id" {
value = local.account_id
}
output "websocket_URI" {
description = "API Gateway websocket endpoint URL for Prod stage"
value = aws_apigatewayv2_stage.production.invoke_url
}
output "Queue" {
description = "SQS FIFO queue which receives the websocket message events"
value = aws_sqs_queue.fifo_queue.id
}
output "lambda_log_group" {
description = "Name of the CloudWatch logs group for the lambda function"
value = aws_cloudwatch_log_group.lambda_logs.id
}
Now that we have all of our code written, we can deploy the project. Open a terminal, navigate to the project, and run these commands.
# initialize the project
$ terraform init
# plan the project
$ terraform plan
# apply the project
$ terraform apply
Note the websocket_uri
value as we'll use it next.
To test this project we need to send websocket requests to the API Gateway.
To help us do that, we will need a way to send websocket requests. In this tutorial we will use the wscat
NPM utility. This requires you to have NPM installed.
First, install wscat
if you don't have it already installed:
# at your terminal
$ npm install -g wscat
Then, connect to the API gateway via websocket with the following command. Be sure to add your websocket_uri
.
# make the connections
$ wscat -c <YOUR websocket_uri here>
Next, let's send a message via websocket. Replace the values in the command below with your
$ wscat -c <YOUR WEBSOCKET URL> connected (press CTRL+C to quit) > {"MessageGroupId":"test", "data":"hello world"}
If everything is working you should get a response back like this:
$ wscat -c <YOUR WEBSOCKET URL> connected (press CTRL+C to quit)
> {"MessageGroupId":"test", "data":"hello world"} < {"connectionId":"<CONNECTIONID>","requestId":"<REQUESTID>","message":"{\"MessageGroupId\":\"test\",\"data\":\"hello world\"}"}
That's it! You can send some more messages to play around with it.
Source
This project was sourced from the AWS Repo: https://github.com/aws-samples/serverless-patterns