Happy New Year everyone! I'm starting off the new year with a project. Throughout the second half of 2020 I picked up Go and started using it more in my work process. It even edged my go to programming language (Python) in use cases where concurrency was needed.

What is Go and why use it? Go is a programming language developed at Google that follows C style in syntax. The creators of Go created it to be efficient, responsive, high performant, and easy to pickup and run with by anyone reading or writing the code.

Today I'll be writing Go code to be used on AWS Lambda which will consume messages in SQS. These messages will be processed by Lambda and sent to S3 and DynamoDB concurrently.


Creating the Base Code

To start things off write package main at the top, this will tell the Go compiler to compile the code as an executable and not as a shared library.

Next import the following packages needed for this project. For now I'll import the following AWS packages, you'll import more later on but these are all you need to get started.

  • github.com/aws/aws-lambda-go/events
  • github.com/aws/aws-lambda-go/lambda
  • github.com/aws/aws-sdk-go/aws
  • github.com/aws/aws-sdk-go/aws/session
  • github.com/aws/aws-sdk-go/service/dynamodb
  • github.com/aws/aws-sdk-go/service/s3

Create a main function that will serve as the entry point for the executable. Inside of the main function write lambda.Start(Handler). The lambda.Start() function will communicate with a Lambda endpoint and pass requests to the handler in this case the handler is a function that will be written in the next section.

package main

import (
	"fmt"
    
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/dynamodb"
	"github.com/aws/aws-sdk-go/service/s3"
)

func main() {
	lambda.Start(Handler)
}

Writing the Handler Function

The handler function will receive the events from SQS and process it by extracting the message body and ID, and send it on it's way to S3 and DynamoDB. To receive SQS events you will need the github.com/aws/aws-lambda-go/events package. SQS events will have the type events.SQSEvent.

func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {

}

Initialize and assign the variable message to the output of the message. Then print out the message to stdout.

func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
	message := sqsEvent.Records[0]
    	fmt.Println(message)
}

Next to make our code concurrent, create two channels. One channel for uploading to S3 and the other for put data into a DynamoDB table. To create a channel, use the make function and pass in chan and the type that will be returned from the channel. In this case *s3.PutObjectOutput and *dynamodb.PutItemOutput.

For S3, you will only upload the body of the message (message.Body). In DynamoDb, you will put in a table the message ID (message.MessageId) along with the body (message.Body).

The second part of making this code concurrent is utilizing Go routines. By default Go creates a single Go routine when executing code, however if we want Go to upload to S3 and DynamoDB we will need an additional two routines.

To create the routines, simply put the keyword go in front of the functions that will upload to S3 (uploadS3Bucket) and DynamoDB (sendToDynamo). Please note that these functions will be created in the next section. The uploadS3Bucket function will take the message body and the channel as arguments and the sendToDynamo function will take the message ID, body, and channel as arguments.

func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
	message := sqsEvent.Records[0]
	fmt.Println(message.Body)

	c1 := make(chan *s3.PutObjectOutput)
	go uploadS3Bucket(message.Body, c1)

	for c1Msg := range c1 {
		fmt.Println(c1Msg)
	}

	c2 := make(chan *dynamodb.PutItemOutput)
	go sendToDynamo(message.MessageId, message.Body, c2)

	for c2Msg := range c2 {
		fmt.Println(c2Msg)
	}

	return nil
}

Uploading to S3

The uploadS3Bucket will take a string which will be the body of the message and a channel with the type *s3.PutObjectOutput.  When you create the Lambda function pass in the bucket name as an environment variable with the key BUCKET and retrieve the value with os.Getenv("BUCKET").

To create the S3 object filename I will convert the output of the time.Now() function to a string. I will upload objects with this filename format: exampleobject-YYYY-MM-DD-HH-MM-SS.

Next create and assign a variable called input that is a pointer to s3.PutObjectInput. Add the values for the Body, Bucket, Key, and Tagging for the object that will be uploaded to S3.

To actually upload the object, use the PutObject function with input as the argument to be passed in.

Return the result of the PutObject function through the channel to update the main Go routine. Finally close the Go routine close(c).


func uploadS3Bucket(s string, c chan *s3.PutObjectOutput) {
	BUCKETNAME := os.Getenv("BUCKET")

	// Date for S3 filename
	now := time.Now()
	year := strconv.Itoa(int(now.Year()))
	month := strconv.Itoa(int(now.Month()))
	day := strconv.Itoa(int(now.Day()))
	hour := strconv.Itoa(int(now.Hour()))
	min := strconv.Itoa(int(now.Minute()))
	sec := strconv.Itoa(int(now.Second()))
	myFormat := year + "-" + month + "-" + day + "-" + hour + "-" + min + "-" + sec

	svc := s3.New(session.New())

	input := &s3.PutObjectInput{
		Body:   aws.ReadSeekCloser(strings.NewReader(s)),
		Bucket: aws.String(BUCKETNAME),

		// Filename will be in the following format: exampleobject-YYYY-MM-DD-HH-MM-SS
		Key:     aws.String("exampleobject-" + myFormat),
		Tagging: aws.String("env=dev&owner=pafable"),
	}

	result, err := svc.PutObject(input)

	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	c <- result
	close(c)
}

Uploading to DynamoDb

This will be similar to the uploading to S3 function, however it won't need a name to be manipulated using the time package. It will take two strings which will be the SQS message ID and body as arguments along with the channel.

The DynamoDB table name will be added as an environment variable in Lambda and the value will be retrieved using the os.Getenv() function.

The input will be a pointer to the struct dynamodb.PutItemInput. TableName, Item, SQS_ID, and MSG will be specified.

Similar to the uploadS3Bucket to send the data to DyanmoDb, I will use the PutItem function and pass in input as an argument.

Lastly the result will be sent through the channel and the Go routine will be closed.

func sendToDynamo(id string, body string, c chan *dynamodb.PutItemOutput) {
	TABLENAME := os.Getenv("TABLE")
	svc := dynamodb.New(session.New())

	input := &dynamodb.PutItemInput{
		TableName: aws.String(TABLENAME),
		Item: map[string]*dynamodb.AttributeValue{
			"SQS_ID": {
				S: aws.String(id),
			},
			"MSG": {
				S: aws.String(body),
			},
		},
	}

	result, err := svc.PutItem(input)

	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	c <- result
	close(c)
}

Compiling and Packaging for Lambda

Compile and build the binary.

GOOS=linux go build sqs-s3-dynamo.go 

Zip up the binary.

zip sqs-s3-dynamo.zip sqs-s3-dynamo

Lastly upload the zip to Lambda using either the console or CLI.


Conclusion

That wraps up this tutorial! As you can see Go is rather straight forward, but is strict with types. You have to specify the correct type to be passed into the functions otherwise Go will throw an error.

For more information on how to use the Go SDK and Lambda check out the resources below. Pay close attention to the different events Lambda can process from numerous AWS services. To get an idea of what each service sends to Lambda as an event, checkout their Github page.

https://docs.aws.amazon.com/sdk-for-go/api/aws/

https://github.com/aws/aws-lambda-go/tree/master/events

https://docs.aws.amazon.com/lambda/latest/dg/golang-handler.html

My full code:

package main

/*
This lambda function will take a message from SQS, extract the body and upload it to s3.
It will also send the message body and ID to DynamoDB.
The message attribute corresponding to "env" will be displayed in stdout.
The dynamodb table name and SQS queue names are configured as environment variables in Lambda.
*/

import (
	"context"
	"fmt"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/dynamodb"
	"github.com/aws/aws-sdk-go/service/s3"
)

func main() {
	lambda.Start(Handler)
}

func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
	message := sqsEvent.Records[0]
	fmt.Println(message.Body)
	// fmt.Printf("Msg Atr: %s\nType: %T", *message.MessageAttributes["env"].StringValue, message.MessageAttributes["env"])

	c1 := make(chan *s3.PutObjectOutput)
	go uploadS3Bucket(message.Body, c1)

	for c1Msg := range c1 {
		fmt.Println(c1Msg)
	}

	c2 := make(chan *dynamodb.PutItemOutput)
	go sendToDynamo(message.MessageId, message.Body, c2)

	for c2Msg := range c2 {
		fmt.Println(c2Msg)
	}

	return nil
}

func uploadS3Bucket(s string, c chan *s3.PutObjectOutput) {
	BUCKETNAME := os.Getenv("BUCKET")

	// Date for S3 filename
	now := time.Now()
	year := strconv.Itoa(int(now.Year()))
	month := strconv.Itoa(int(now.Month()))
	day := strconv.Itoa(int(now.Day()))
	hour := strconv.Itoa(int(now.Hour()))
	min := strconv.Itoa(int(now.Minute()))
	sec := strconv.Itoa(int(now.Second()))
	myFormat := year + "-" + month + "-" + day + "-" + hour + "-" + min + "-" + sec

	svc := s3.New(session.New())

	input := &s3.PutObjectInput{
		Body:   aws.ReadSeekCloser(strings.NewReader(s)),
		Bucket: aws.String(BUCKETNAME),

		// Filename will be in the following format: exampleobject-YYYY-MM-DD-HH-MM-SS
		Key:     aws.String("exampleobject-" + myFormat),
		Tagging: aws.String("env=dev&owner=pafable"),
	}

	result, err := svc.PutObject(input)

	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	c <- result
	close(c)
}

func sendToDynamo(id string, body string, c chan *dynamodb.PutItemOutput) {
	TABLENAME := os.Getenv("TABLE")
	svc := dynamodb.New(session.New())

	input := &dynamodb.PutItemInput{
		TableName: aws.String(TABLENAME),
		Item: map[string]*dynamodb.AttributeValue{
			"SQS_ID": {
				S: aws.String(id),
			},
			"MSG": {
				S: aws.String(body),
			},
		},
	}

	result, err := svc.PutItem(input)

	if err != nil {
		fmt.Println(err)
		os.Exit(1)
	}

	c <- result
	close(c)
}