API Gateway Websockets

Sign Up to Build

About this Architecture

Here is some information about this architecture.

This solution demonstrates how to use the websocket feature of the API Gateway to route messages to an Amazon SQS FIFO queue. The SQS queue is used as a buffer to handle spikes in traffic. The messages can sit in the queue until a worker is available to process them.

In this solution, a FIFO (First-In-First-Out) queue is used to guarantee that events are processed in the order of their arrival, thus ensuring that the data is processed accurately. The queue is then processed by an AWS Lambda function to return a result.

How to Build This Solution

Here are the steps you can follow to build this solution on your own.

Here are the steps needed to build this architecture.

Get Your AWS Credentials

If you're using the Skillmix Labs feature, open the lab settings (the beaker icon) on the right side of the code editor. Then, click the Start Lab button to start hte lab environment.

Wait for the credentials to load. Then run this in the terminal:

$ aws configure --profile smx-lab
AWS Access Key ID [None]: AKIA3E3W34P42CSHXDH5
AWS Secret Access Key [None]: vTmqpOqefgJfse8i6QwzgpjgswPjHZ6h/oiQq4zf
Default region name [None]: us-west-2
Default output format [None]: json

Be sure to name your credentials profile 'smx-lab'.

Note: If you're using your own AWS account you'll need to ensure that you've created and configured a named AWS CLI profile named smx-lab.

Create the Terraform File

We'll be doing all of our work in one Terraform file. Create a new directory on your computer somewhere, and then create a file named main.tf in it.

Create the Terraform & Provider Block

To start the coding process, open the main.tf file and add the terraform and provider blocks.

Now we will create the Terraform configuration that will define the providers and versions that will be used in our project. This code will specify the providers and versions of the AWS, Random, and Archive Terraform modules that will be used. It will also set the required version of Terraform to be used. Finally, it will configure the AWS provider with the default profile and the region specified in the variable 'aws_region'.

Append this code to the main.tf file:

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 4.53.0"
    }
    random = {
      source  = "hashicorp/random"
      version = "~> 3.1.0"
    }
    archive = {
      source  = "hashicorp/archive"
      version = "~> 2.2.0"
    }
  }

  required_version = ">= 0.14.9"
}

provider "aws" {
  profile = "smx-lab"
  region = var.aws_region
}

Create the Variables

We are going to use several variables throughout the project. Append this code to the main.tf file:

variable "aws_region" {
  description = "AWS region for all resources."

  type    = string
  default = "us-west-2"
}

variable "s3_bucket_prefix" {
  description = "the name of the prefix for s3 bucket"
  type    = string
  default = "apigw-websocket"
}

variable "lambda_name" {
  description = "name of the lambda function"
  type = string
  default = "apigw-websocket-response"
}

variable "sqs_name" {
  description = "the name of the sqs queue"
  type    = string
  default = "tf-APIGWWebsocketQueue.fifo"
}

variable "apigwy_name" {
  description = "name of the apigateway"
  type = string
  default = "APIGWWebsocketSQSLambda"
}

variable "lambda_log_retention" {
  description = "lambda log retention in days"
  type = number
  default = 7
}

Create the Data & Locals Blocks

Next, we will create a local variable to store the account ID of the current user. To do this, we will use the data resource "aws_caller_identity" to access the account ID of the current user. We will then store this account ID in a local variable called "account_id". This local variable can then be used in other Terraform code to reference the current user's account ID.

Append this code to the main.tf file:

data "aws_caller_identity" "current" {}

locals {
  account_id = data.aws_caller_identity.current.account_id
}

Create the SQS Queue

Next, we will create an SQS queue using Terraform. This code will create a FIFO (First-In-First-Out) queue with the name specified in the variable 'sqs_name'. Additionally, the code will enable server-side encryption (SSE) for the queue, ensuring that the data stored in the queue is secure.

Append this code to the main.tf file:

resource "aws_sqs_queue" "fifo_queue" {
  fifo_queue                        = true
  name                              = var.sqs_name
  sqs_managed_sse_enabled           = true
}

Create the Random String Resource

Next, we will create a random string resource using Terraform. This code will generate a random string of length 4, with no special characters. This is useful for generating passwords, API keys, or other random strings that you may need in your Terraform configuration.

Append this code to the main.tf file:

resource "random_string" "random" {
  length           = 4
  special          = false
}

Create the API Gateway Resource

Next, we will create an API Gateway v2 API that will allow us to send websocket data to an SQS queue, which will then be processed by a Lambda function. This code will create an API Gateway v2 API with a unique name, a description, a protocol type of WEBSOCKET, and a route selection expression that will determine which route the data should take.

Append this code to the main.tf file:

resource "aws_apigatewayv2_api" "my_websocket_api" {
  name                         = "${var.apigwy_name}-${random_string.random.id}"
  description                  = "Send websocket data to SQS which is then processed by a Lambda"
  protocol_type                = "WEBSOCKET"
  route_selection_expression   = "$request.body.action"
}

Create the API Gateway Integration Resource

Next, we will create an AWS API Gateway v2 integration resource using Terraform. This resource will allow us to connect our API to an AWS SQS queue, and will enable us to send messages to the queue using the API. The code will set up the integration, including setting the connection type, credentials, integration method, integration type, integration URI, passthrough behavior, request parameters, request templates, template selection expression, and timeout milliseconds. It will also set up a dependency on an IAM role that will be used for authentication.

Append this code to the main.tf file:

resource "aws_apigatewayv2_integration" "demo_integration" {
    api_id                                    = aws_apigatewayv2_api.my_websocket_api.id
    connection_type                           = "INTERNET"
    credentials_arn                           = aws_iam_role.apigwy_websocket_sqs_role.arn
    integration_method                        = "POST"
    integration_type                          = "AWS"
    integration_uri                           = "arn:aws:apigateway:${var.aws_region}:sqs:path/${local.account_id}/${var.sqs_name}"
    passthrough_behavior                      = "NEVER"
    request_parameters                        = {
        "integration.request.header.Content-Type" = "'application/x-www-form-urlencoded'"
    }
    request_templates                         = {
        "$default" = "Action=SendMessage&MessageGroupId=$input.path('$.MessageGroupId')&MessageDeduplicationId=$context.requestId&MessageAttribute.1.Name=connectionId&MessageAttribute.1.Value.StringValue=$context.connectionId&MessageAttribute.1.Value.DataType=String&MessageAttribute.2.Name=requestId&MessageAttribute.2.Value.StringValue=$context.requestId&MessageAttribute.2.Value.DataType=String&MessageBody=$input.json('$')"
    }
    template_selection_expression             = "\\$default"
    timeout_milliseconds                      = 29000
    depends_on = [
    aws_iam_role.apigwy_websocket_sqs_role,  
    ]
}

Create the API Gateway Stage Resource

Next, we will create an AWS API Gateway v2 stage called "production" using the Terraform resource "aws_apigatewayv2_stage". This stage will be associated with the API we created earlier, and will be set to automatically deploy any changes that are made.

Append this code to the main.tf file:

resource "aws_apigatewayv2_stage" "production" {
  api_id          = aws_apigatewayv2_api.my_websocket_api.id
  name            = "production"
  auto_deploy     = true
}

Create the API Gateway Route Resource

Next, we will create a route for our API using the aws_apigatewayv2_route resource. This route will be associated with the API we created earlier, and will use the "$default" route key. The target of this route will be an integration that we will create in the next step. This route will allow us to access the integration from the API.

Append this code to the main.tf file:

resource "aws_apigatewayv2_route" "default" {
    api_id               = aws_apigatewayv2_api.my_websocket_api.id
    route_key            = "$default"
    target               = "integrations/${aws_apigatewayv2_integration.demo_integration.id}"  
}

Create the Lambda Resource

Next, we will create an AWS Lambda Function. This code will create a Lambda Function with the name specified in the variable "lambda_name" combined with our random string. It will also set the description to "lambda sqs websocket pattern", set the S3 bucket and key, set the source code hash, set the runtime to Python 3.8, set the handler to "app.lambda_handler", set the role to the Lambda Execution role, set the timeout to 15 seconds, set the environment variables, set the timeouts, and set the tracing config to "PassThrough". Finally, it will set the depends_on to the Lambda Logs and the Production stage of the API Gateway.

Append this code to the main.tf file:

resource "aws_lambda_function" "lambda_sqs_websocket_response" {
    function_name                  = "${var.lambda_name}-${random_string.random.id}"
    description                    = "serverlessland pattern"
    s3_bucket                      = aws_s3_bucket.lambda_bucket.id
    s3_key                         = aws_s3_object.lambda.key
    source_code_hash               = data.archive_file.lambda_source.output_base64sha256
    runtime                        = "python3.8"
    handler                        = "app.lambda_handler"
    role                           = aws_iam_role.lambda_execution.arn
    timeout                        = 15

    environment {
        variables = {
            "ApiGatewayEndpoint" = "https://${aws_apigatewayv2_api.my_websocket_api.id}.execute-api.${var.aws_region}.amazonaws.com/${aws_apigatewayv2_stage.production.name}"
        }
    }

    timeouts {}

    tracing_config {
        mode = "PassThrough"
    }
    depends_on = [aws_cloudwatch_log_group.lambda_logs, aws_apigatewayv2_stage.production]
}

Create the Lambda Function File

Next, we will create a Lambda function to handle incoming records from the API Gateway endpoint. This code will import the necessary libraries, set up logging, and then use the boto3 library to create an API client. It will then loop through the Lambda event records and post a response to each connection ID. Finally, it will return a status code and a message depending on whether the records were successfully processed.

In your working directory, create a folder named src. In that folder, create a file named app.py. Add the following code to app.py:

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

import boto3
import os
import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

endpoint = os.environ.get('ApiGatewayEndpoint')
logging.info(f"Loaded endpoint uri from environemt variable: {endpoint}")
api_client = boto3.client('apigatewaymanagementapi',endpoint_url=endpoint)

def lambda_handler(event, context):
    
    if event['Records']:

        json_dump = json.dumps(event['Records'])
        logging.info(f"json: {json_dump}")
        message = "Received record data!"
        
        for record in event['Records']:
            connection_id = record["messageAttributes"]["connectionId"]["stringValue"]
            response = {
                "connectionId":connection_id,
                "requestId":record["messageAttributes"]["requestId"]["stringValue"],
                "message":record["body"]
            }
            logging.info(f"response: {response}")
            data = json.dumps(response)
            api_client.post_to_connection(ConnectionId=connection_id, Data=data.encode('utf-8'))
        return {
            "statusCode": 200,
            "headers": {
                "Content-Type": "application/json"
            },
            "body": json.dumps({"message": message})
        }
    else:
        message = "Something went wrong"
        return {
            "statusCode": 500,
            "headers": {
                "Content-Type": "application/json"
            },
            "body": json.dumps({"message": message})
        }

Create the CloudWatch Log Group for Lambda Resource

Next, we will create a CloudWatch Log Group for our Lambda function. The log group will have a retention period of the number of days specified in the variable "var.lambda_log_retention".

Append this code to the main.tf file:

resource "aws_cloudwatch_log_group" "lambda_logs" {
  name = "/aws/lambda/${var.lambda_name}-${random_string.random.id}"

  retention_in_days = var.lambda_log_retention
}

Create the Lambda Event Source Mapping Resource

Next, we will create an event source mapping between an Amazon SQS queue and an AWS Lambda function. This event source mapping will allow the Lambda function to be triggered whenever a message is sent to the SQS queue. The Terraform code above will create this event source mapping by referencing the ARN of the SQS queue and the ARN of the Lambda function.

Append this code to the main.tf file:

resource "aws_lambda_event_source_mapping" "apigwy_sqs" {
    event_source_arn = aws_sqs_queue.fifo_queue.arn
    function_name    = aws_lambda_function.lambda_sqs_websocket_response.arn
}

Create the S3 Bucket for Lambda Deployment Resource

Next, we will create an S3 bucket and set its access control list (ACL) to private. The first line of code creates an S3 bucket with the prefix specified in the variable 's3_bucket_prefix'. The second line of code sets the ACL of the bucket to 'private', ensuring that only authorized users can access the bucket.

Append this code to the main.tf file:

resource "aws_s3_bucket" "lambda_bucket" {
  bucket_prefix = var.s3_bucket_prefix
  force_destroy = true
}

resource "aws_s3_bucket_acl" "private_bucket" {
  bucket = aws_s3_bucket.lambda_bucket.id
  acl    = "private"
}

Create the Lambda Zip File Resource

Next, we will create an archive file resource in Terraform. This resource will take the source directory specified in the source_dir argument and compress it into a zip file located at the output_path. This zip file can then be used to deploy a Lambda function to our S3 bucket.

Append this code to the main.tf file:

data "archive_file" "lambda_source" {
  type = "zip"
  source_dir  = "${path.module}/src"
  output_path = "${path.module}/src.zip"
}

Create the Lambda S3 Object Resource

Next, we will create an S3 object for our Lambda zip file. This code will create an object in our project S3 bucket, with the key specified by the "key" parameter, which is our Lambda zip file that we just created.

The source of the object will be the output path of the archive file specified by the "source" parameter, and the etag will be the MD5 hash of the file specified by the "etag" parameter.

Append this code to the main.tf file:

resource "aws_s3_object" "lambda" {
  bucket = aws_s3_bucket.lambda_bucket.id
  key    = "source.zip"
  source = data.archive_file.lambda_source.output_path
  etag = filemd5(data.archive_file.lambda_source.output_path)
}

Create the IAM Role for SQS

Next, we will create an IAM role for our API Gateway websocket to access an SQS queue. This code creates an IAM role with the name "ApiGatewayWebsocketSQSRole" and sets the assume role policy to allow the API Gateway service to assume the role.

Append this code to the main.tf file:

resource "aws_iam_role" "apigwy_websocket_sqs_role" {
  # uncomment the 'permissions_boundary' argument if running this lab on skillmix.io 
  # permissions_boundary = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:policy/LabUserNewResourceBoundaryPolicy"
  name = "ApiGatewayWebsocketSQSRole"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Sid    = ""
      Principal = {
        Service = "apigateway.amazonaws.com"
      }
      }
    ]
  })
}

Create the IAM Role Policy & Attachment for SQS

Next, we will create an IAM policy and attach it to an IAM role that we just created for SQS. This policy will allow the role to send messages to an SQS queue. The policy will be defined in the Terraform code, and the role will be specified by the variable aws_iam_role.apigwy_websocket_sqs_role.name.

Append this code to the main.tf file:

resource "aws_iam_policy" "apigwy_sqs_send_message" {
  name = "APIGatewaySQSSendMessagePolicy"

  policy = <<POLICY
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "sqs:SendMessage",
            "Resource": "arn:aws:sqs:${var.aws_region}:${local.account_id}:${var.sqs_name}",
            "Effect": "Allow"
        }
    ]
}
POLICY
}

resource "aws_iam_role_policy_attachment" "apigwy_policy" {
  role       = aws_iam_role.apigwy_websocket_sqs_role.name
  policy_arn = aws_iam_policy.apigwy_sqs_send_message.arn
}

Create the IAM Role for Lambda Resource

Next, we will create an IAM role for our Lambda function using the aws_iam_role resource. This role will allow the Lambda function to assume the role and access other AWS services. The name of the role will be "WebsocketApi-${var.lambda_name}", and the assume_role_policy will be set to a JSON encoded policy that allows the Lambda service to assume the role.

Append this code to the main.tf file:

resource "aws_iam_role" "lambda_execution" {
  # uncomment the 'permissions_boundary' argument if running this lab on skillmix.io 
  # permissions_boundary = "arn:aws:iam::${data.aws_caller_identity.current.account_id}:policy/LabUserNewResourceBoundaryPolicy"
  name = "WebsocketApi-${var.lambda_name}"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [{
      Action = "sts:AssumeRole"
      Effect = "Allow"
      Sid    = ""
      Principal = {
        Service = "lambda.amazonaws.com"
      }
      }
    ]
  })
}

Create the IAM Role Policy & Attachment for Lambda Resource

Next, we will create an IAM policy and attach it to an IAM role. This policy will allow the role to manage connections, receive messages from an SQS queue, change message visibility, get queue URLs, delete messages, and get queue attributes. It will also allow the role to create log groups, create log streams, and put log events.

Append this code to the main.tf file:

resource "aws_iam_policy" "lambda_exec_role" {
  name = "SQSWebsocketResponseServiceRole"

  policy = <<POLICY
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": "execute-api:ManageConnections",
            "Resource": "${aws_apigatewayv2_stage.production.execution_arn}/POST/*",
            "Effect": "Allow"
        },
        {
            "Action": [
                "sqs:ReceiveMessage",
                "sqs:ChangeMessageVisibility",
                "sqs:GetQueueUrl",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes"
            ],
            "Resource": "arn:aws:sqs:${var.aws_region}:${local.account_id}:${var.sqs_name}",
            "Effect": "Allow"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        }
    ]
}
POLICY
}

resource "aws_iam_role_policy_attachment" "lambda_policy" {
  role       = aws_iam_role.lambda_execution.name
  policy_arn = aws_iam_policy.lambda_exec_role.arn
}

Create the Terraform Outputs

Lastly, let's output several values from the project. Append this code to the main.tf file:

output "account_id" {
  value = local.account_id
}

output "websocket_URI" {
  description = "API Gateway websocket endpoint URL for Prod stage"

  value = aws_apigatewayv2_stage.production.invoke_url
}

output "Queue" {
  description = "SQS FIFO queue which receives the websocket message events"
  value = aws_sqs_queue.fifo_queue.id
}

output "lambda_log_group" {
  description = "Name of the CloudWatch logs group for the lambda function"
  value = aws_cloudwatch_log_group.lambda_logs.id
}

Deploying the Project

Now that we have all of our code written, we can deploy the project. Open a terminal, navigate to the project, and run these commands.

# initialize the project 
$ terraform init 

# plan the project 
$ terraform plan 

# apply the project 
$ terraform apply

Note the websocket_uri value as we'll use it next.

Testing the Project

To test this project we need to send websocket requests to the API Gateway.

To help us do that, we will need a way to send websocket requests. In this tutorial we will use the wscat NPM utility. This requires you to have NPM installed.

First, install wscat if you don't have it already installed:

# at your terminal 
$ npm install -g wscat

Then, connect to the API gateway via websocket with the following command. Be sure to add your websocket_uri.

# make the connections 
$ wscat -c <YOUR websocket_uri here>

Next, let's send a message via websocket. Replace the values in the command below with your

$ wscat -c <YOUR WEBSOCKET URL> connected (press CTRL+C to quit) > {"MessageGroupId":"test", "data":"hello world"}

If everything is working you should get a response back like this:

$ wscat -c <YOUR WEBSOCKET URL> connected (press CTRL+C to quit) 
> {"MessageGroupId":"test", "data":"hello world"} < {"connectionId":"<CONNECTIONID>","requestId":"<REQUESTID>","message":"{\"MessageGroupId\":\"test\",\"data\":\"hello world\"}"}

That's it! You can send some more messages to play around with it.

Source

This project was sourced from the AWS Repo: https://github.com/aws-samples/serverless-patterns