Happy New Year everyone! I'm starting off the new year with a project. Throughout the second half of 2020 I picked up Go and started using it more in my work process. It even edged my go to programming language (Python) in use cases where concurrency was needed.
What is Go and why use it? Go is a programming language developed at Google that follows C style in syntax. The creators of Go created it to be efficient, responsive, high performant, and easy to pickup and run with by anyone reading or writing the code.
Today I'll be writing Go code to be used on AWS Lambda which will consume messages in SQS. These messages will be processed by Lambda and sent to S3 and DynamoDB concurrently.
Creating the Base Code
To start things off write package main
at the top, this will tell the Go compiler to compile the code as an executable and not as a shared library.
Next import the following packages needed for this project. For now I'll import the following AWS packages, you'll import more later on but these are all you need to get started.
- github.com/aws/aws-lambda-go/events
- github.com/aws/aws-lambda-go/lambda
- github.com/aws/aws-sdk-go/aws
- github.com/aws/aws-sdk-go/aws/session
- github.com/aws/aws-sdk-go/service/dynamodb
- github.com/aws/aws-sdk-go/service/s3
Create a main function that will serve as the entry point for the executable. Inside of the main function write lambda.Start(Handler)
. The lambda.Start()
function will communicate with a Lambda endpoint and pass requests to the handler in this case the handler is a function that will be written in the next section.
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/s3"
)
func main() {
lambda.Start(Handler)
}
Writing the Handler Function
The handler function will receive the events from SQS and process it by extracting the message body and ID, and send it on it's way to S3 and DynamoDB. To receive SQS events you will need the github.com/aws/aws-lambda-go/events
package. SQS events will have the type events.SQSEvent
.
func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
}
Initialize and assign the variable message
to the output of the message. Then print out the message to stdout.
func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
message := sqsEvent.Records[0]
fmt.Println(message)
}
Next to make our code concurrent, create two channels. One channel for uploading to S3 and the other for put data into a DynamoDB table. To create a channel, use the make
function and pass in chan
and the type that will be returned from the channel. In this case *s3.PutObjectOutput
and *dynamodb.PutItemOutput
.
For S3, you will only upload the body of the message (message.Body
). In DynamoDb, you will put in a table the message ID (message.MessageId
) along with the body (message.Body
).
The second part of making this code concurrent is utilizing Go routines. By default Go creates a single Go routine when executing code, however if we want Go to upload to S3 and DynamoDB we will need an additional two routines.
To create the routines, simply put the keyword go
in front of the functions that will upload to S3 (uploadS3Bucket
) and DynamoDB (sendToDynamo
). Please note that these functions will be created in the next section. The uploadS3Bucket function will take the message body and the channel as arguments and the sendToDynamo function will take the message ID, body, and channel as arguments.
func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
message := sqsEvent.Records[0]
fmt.Println(message.Body)
c1 := make(chan *s3.PutObjectOutput)
go uploadS3Bucket(message.Body, c1)
for c1Msg := range c1 {
fmt.Println(c1Msg)
}
c2 := make(chan *dynamodb.PutItemOutput)
go sendToDynamo(message.MessageId, message.Body, c2)
for c2Msg := range c2 {
fmt.Println(c2Msg)
}
return nil
}
Uploading to S3
The uploadS3Bucket
will take a string which will be the body of the message and a channel with the type *s3.PutObjectOutput
. When you create the Lambda function pass in the bucket name as an environment variable with the key BUCKET
and retrieve the value with os.Getenv("BUCKET")
.
To create the S3 object filename I will convert the output of the time.Now()
function to a string. I will upload objects with this filename format: exampleobject-YYYY-MM-DD-HH-MM-SS
.
Next create and assign a variable called input
that is a pointer to s3.PutObjectInput
. Add the values for the Body
, Bucket
, Key
, and Tagging
for the object that will be uploaded to S3.
To actually upload the object, use the PutObject
function with input
as the argument to be passed in.
Return the result of the PutObject
function through the channel to update the main Go routine. Finally close the Go routine close(c)
.
func uploadS3Bucket(s string, c chan *s3.PutObjectOutput) {
BUCKETNAME := os.Getenv("BUCKET")
// Date for S3 filename
now := time.Now()
year := strconv.Itoa(int(now.Year()))
month := strconv.Itoa(int(now.Month()))
day := strconv.Itoa(int(now.Day()))
hour := strconv.Itoa(int(now.Hour()))
min := strconv.Itoa(int(now.Minute()))
sec := strconv.Itoa(int(now.Second()))
myFormat := year + "-" + month + "-" + day + "-" + hour + "-" + min + "-" + sec
svc := s3.New(session.New())
input := &s3.PutObjectInput{
Body: aws.ReadSeekCloser(strings.NewReader(s)),
Bucket: aws.String(BUCKETNAME),
// Filename will be in the following format: exampleobject-YYYY-MM-DD-HH-MM-SS
Key: aws.String("exampleobject-" + myFormat),
Tagging: aws.String("env=dev&owner=pafable"),
}
result, err := svc.PutObject(input)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
c <- result
close(c)
}
Uploading to DynamoDb
This will be similar to the uploading to S3 function, however it won't need a name to be manipulated using the time
package. It will take two strings which will be the SQS message ID and body as arguments along with the channel.
The DynamoDB table name will be added as an environment variable in Lambda and the value will be retrieved using the os.Getenv()
function.
The input will be a pointer to the struct dynamodb.PutItemInput
. TableName
, Item
, SQS_ID
, and MSG
will be specified.
Similar to the uploadS3Bucket
to send the data to DyanmoDb, I will use the PutItem
function and pass in input
as an argument.
Lastly the result will be sent through the channel and the Go routine will be closed.
func sendToDynamo(id string, body string, c chan *dynamodb.PutItemOutput) {
TABLENAME := os.Getenv("TABLE")
svc := dynamodb.New(session.New())
input := &dynamodb.PutItemInput{
TableName: aws.String(TABLENAME),
Item: map[string]*dynamodb.AttributeValue{
"SQS_ID": {
S: aws.String(id),
},
"MSG": {
S: aws.String(body),
},
},
}
result, err := svc.PutItem(input)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
c <- result
close(c)
}
Compiling and Packaging for Lambda
Compile and build the binary.
GOOS=linux go build sqs-s3-dynamo.go
Zip up the binary.
zip sqs-s3-dynamo.zip sqs-s3-dynamo
Lastly upload the zip to Lambda using either the console or CLI.
Conclusion
That wraps up this tutorial! As you can see Go is rather straight forward, but is strict with types. You have to specify the correct type to be passed into the functions otherwise Go will throw an error.
For more information on how to use the Go SDK and Lambda check out the resources below. Pay close attention to the different events Lambda can process from numerous AWS services. To get an idea of what each service sends to Lambda as an event, checkout their Github page.
https://docs.aws.amazon.com/sdk-for-go/api/aws/
https://github.com/aws/aws-lambda-go/tree/master/events
https://docs.aws.amazon.com/lambda/latest/dg/golang-handler.html
My full code:
package main
/*
This lambda function will take a message from SQS, extract the body and upload it to s3.
It will also send the message body and ID to DynamoDB.
The message attribute corresponding to "env" will be displayed in stdout.
The dynamodb table name and SQS queue names are configured as environment variables in Lambda.
*/
import (
"context"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/s3"
)
func main() {
lambda.Start(Handler)
}
func Handler(ctx context.Context, sqsEvent events.SQSEvent) error {
message := sqsEvent.Records[0]
fmt.Println(message.Body)
// fmt.Printf("Msg Atr: %s\nType: %T", *message.MessageAttributes["env"].StringValue, message.MessageAttributes["env"])
c1 := make(chan *s3.PutObjectOutput)
go uploadS3Bucket(message.Body, c1)
for c1Msg := range c1 {
fmt.Println(c1Msg)
}
c2 := make(chan *dynamodb.PutItemOutput)
go sendToDynamo(message.MessageId, message.Body, c2)
for c2Msg := range c2 {
fmt.Println(c2Msg)
}
return nil
}
func uploadS3Bucket(s string, c chan *s3.PutObjectOutput) {
BUCKETNAME := os.Getenv("BUCKET")
// Date for S3 filename
now := time.Now()
year := strconv.Itoa(int(now.Year()))
month := strconv.Itoa(int(now.Month()))
day := strconv.Itoa(int(now.Day()))
hour := strconv.Itoa(int(now.Hour()))
min := strconv.Itoa(int(now.Minute()))
sec := strconv.Itoa(int(now.Second()))
myFormat := year + "-" + month + "-" + day + "-" + hour + "-" + min + "-" + sec
svc := s3.New(session.New())
input := &s3.PutObjectInput{
Body: aws.ReadSeekCloser(strings.NewReader(s)),
Bucket: aws.String(BUCKETNAME),
// Filename will be in the following format: exampleobject-YYYY-MM-DD-HH-MM-SS
Key: aws.String("exampleobject-" + myFormat),
Tagging: aws.String("env=dev&owner=pafable"),
}
result, err := svc.PutObject(input)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
c <- result
close(c)
}
func sendToDynamo(id string, body string, c chan *dynamodb.PutItemOutput) {
TABLENAME := os.Getenv("TABLE")
svc := dynamodb.New(session.New())
input := &dynamodb.PutItemInput{
TableName: aws.String(TABLENAME),
Item: map[string]*dynamodb.AttributeValue{
"SQS_ID": {
S: aws.String(id),
},
"MSG": {
S: aws.String(body),
},
},
}
result, err := svc.PutItem(input)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
c <- result
close(c)
}