On my previous post we decided to catch DynamoDB tables capacity exceptions by using AWS SQS Queues as a buffer to push data while we ask for higher capacity (that takes an unpredictable time).
To implement the proposed solution, we need to:
As I'm coding c# on the API side but want to test AWS Lambda, catching code (1 to 3) is writed in c# and AWS Lambda node.js code is used for the last 2 (4 and 5).
- Catch capacity exception when writing data.
- Ask DynamoDB table to get more capacity.
- Push data to an SQS queue for later processing.
- Pop data from the SQS queue and finally save it to DynamoDB.
- Donwgrade table capacity if not required (and save money).
As I'm coding c# on the API side but want to test AWS Lambda, catching code (1 to 3) is writed in c# and AWS Lambda node.js code is used for the last 2 (4 and 5).
Lets code.
Catching DynamoDB write exception.
When you write to DynamoDB table over the provisioned capacity, the system (at some unpredictable moment :) may discard your data throwing a capacity exception. If you don't catch that exception and save your data somewhere, your data will be lost.
As getting a single ProvisionedThroughputExceededException mean that your table will probably going to be stressed for a while, we also ask for more throughput to prevent future exceptions (note that new
capacity will take some time to get effective).
When you write to DynamoDB table over the provisioned capacity, the system (at some unpredictable moment :) may discard your data throwing a capacity exception. If you don't catch that exception and save your data somewhere, your data will be lost.
} catch (ProvisionedThroughputExceededException e) {
// save data for later recovery
...
}
} catch (ProvisionedThroughputExceededException e) {
// upgrade DynamoDB table capacity
...
// save data for later recovey
...
}
Upgrade DynamoDB table capacity.
To upgrade DynamoDB table capacity we just need to call 'UpdateTable' with the required capacity values. You just need to check your metrics and decide how much capacity you will ask for (should I go from 10 to 100 write capacity? to 1000? - we are just asking for temporary capacity so you can use some margin here -).
To upgrade DynamoDB table capacity we just need to call 'UpdateTable' with the required capacity values. You just need to check your metrics and decide how much capacity you will ask for (should I go from 10 to 100 write capacity? to 1000? - we are just asking for temporary capacity so you can use some margin here -).
public bool ChangeTableThroughput(string TableName, int ReadThroughput, int WriteThroughput, AmazonDynamoDBClient Client) {
var request = new UpdateTableRequest
{
ProvisionedThroughput = new ProvisionedThroughput
{
ReadCapacityUnits = ReadThroughput,
WriteCapacityUnits = WriteThroughput
},
TableName = TableName,
};
var response = Client.UpdateTable(request);
return response.HttpStatusCode == System.Net.HttpStatusCode.OK;
}
Saving data to SQS Queue (for later recovey).
Writing data to an SQS queue is as simple as,
As we are saving DynamoDB write operations that will be processed by an AWS Lambda routine using AWS Javascript SDK we can directly push SDK required format messages and a single write message will then look like this:
Writing data to an SQS queue is as simple as,
SendMessageResponse res = SQS.SendMessage(message);and the key point is the message format you use (how you include any required meta-data).
As we are saving DynamoDB write operations that will be processed by an AWS Lambda routine using AWS Javascript SDK we can directly push SDK required format messages and a single write message will then look like this:
{ Key: { mykey: { S : "40000000"}, TIMESTAMP: { N: "1460455781493"} }, TableName: "mytable", AttributeUpdates : { myattribute : { Action: "PUT", Value: { N : "0"} } }}
Using all that simple code we finally have our catching routine:
} catch (ProvisionedThroughputExceededException e) {
if (LowThroughput("mytable"))
ChangeTableThroughput("mytable",10,100, AWSClient);
SQS.SendMessage("{ Key: { mykey: { ... ");
}
Recovering data from the queue.
Amazon launched AWS Lambda to execute code without worrying about the execution environment (some kind of server-less way). Also take note that AWS Lambda is "event" oriented, meaning that your Lambda routines must be triggered by some event, like a direct API call, and can run for up to 5 minutes max.
Unfortunately there is no 'new SQS message arrived' related trigger (oops) but you can still use cron to start your routine on some schedule (- I decided to run our pop-queue Lambda function every 5 minutes (a no-messages in the queue execution takes just 400ms) and run for 3 minutes max poping messages from the queue -).
Amazon launched AWS Lambda to execute code without worrying about the execution environment (some kind of server-less way). Also take note that AWS Lambda is "event" oriented, meaning that your Lambda routines must be triggered by some event, like a direct API call, and can run for up to 5 minutes max.
Unfortunately there is no 'new SQS message arrived' related trigger (oops) but you can still use cron to start your routine on some schedule (- I decided to run our pop-queue Lambda function every 5 minutes (a no-messages in the queue execution takes just 400ms) and run for 3 minutes max poping messages from the queue -).
'use strict';
var _AWS = require('aws-sdk');
var _SQSExceptionQueueURL = 'https://sqs.us-east-1.amazonaws.com/9342..99/my-SQS-queue';
exports.handler = (event, context, callback) => {
processMessages(Date.now() + 180000); // 3 minutes processing.
};
function processMessages(maxtime){
if (Date.now() > maxtime) return;
var sqs = new _AWS.SQS();
sqs.receiveMessage({QueueUrl:_SQSExceptionQueueURL, MaxNumberOfMessages: 10}, function(err, data) {
if (err) console.log(err.message);
else {
if (!data.Messages) return;
var db = new _AWS.DynamoDB();
data.Messages.forEach(function(message) {
try{
// get DynamoDB message from SQS ...
var params = JSON.parse(message.Body);
// ... and save it to DynamoDB
db.updateItem(params, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else { // delete from queue on saving confirmation.
sqs.deleteMessage({QueueUrl:_SQSExceptionQueueURL, ReceiptHandle: message.ReceiptHandle }, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
});
}
});
}catch(Err){ // if message has errors we delete from queue (bad JSON format?)
sqs.deleteMessage({QueueUrl:_SQSExceptionQueueURL, ReceiptHandle: message.ReceiptHandle }, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
});
}
}); // end foreach
processMessages(maxtime);
}
});
}
Downgrade DynamoDB capacity.
Ok,
The main restriction is that we can only donwgrade capacity 4 times a day for a table. That means we can't go upgrade-downgrade each time we need, and we must decide when we should downgrade table capacity. In my scenario, I'm running the next code 4 times a day. It basically checks if queue is empty (so no need for high capacity - yes, that's only a prediction) and downgrade table capacity to some initial best-fit value.
Of course you probably have to do some tuning for your scenario, but I hope you get a starting point ;).
Ok,
- We have some code to catch DynamoDB capacity exceptions and temporaly save data to SQS queue while we ask for more capacity.
- We also implemented a routine that checks the queue at regular intervals to finally save data to DynamoDB.
The main restriction is that we can only donwgrade capacity 4 times a day for a table. That means we can't go upgrade-downgrade each time we need, and we must decide when we should downgrade table capacity. In my scenario, I'm running the next code 4 times a day. It basically checks if queue is empty (so no need for high capacity - yes, that's only a prediction) and downgrade table capacity to some initial best-fit value.
'use strict';
var _AWS = require('aws-sdk');
var _SQSExceptionQueueURL = 'https://sqs.us-east-1.amazonaws.com/9342..99/my-SQS-queue';
var _tableNames = ['table_1', 'table_2'];
var _desiredWriteCapacity = 20, _desiredReadCapacity = 10;
exports.handler = (event, context, callback) => {
// check if SQS queue is empty.
var sqs = new _AWS.SQS();
sqs.getQueueAttributes({QueueUrl: _SQSExceptionQueueURL, AttributeNames: ['ApproximateNumberOfMessages'] }, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else {
if (data.Attributes.ApproximateNumberOfMessages === '0') { // if no messages on SQS, downgrade DynamoDB capacity
var db = new _AWS.DynamoDB();
_tableNames.forEach(function(table){
db.describeTable({TableName: table}, function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else {
if (parseInt(data.Table.ProvisionedThroughput.WriteCapacityUnits) > _desiredWriteCapacity) {
db.updateTable({TableName: data.Table.TableName, ProvisionedThroughput:
{ ReadCapacityUnits: _desiredReadCapacity, WriteCapacityUnits:_desiredWriteCapacity }},
function(err, data) {
if (err) console.log(err, err.stack); // an error occurred
else console.log('Downgrading ' + data.TableDescription.TableName);
});
}
}
});
});
}else{
console.log('SQS is not empty.')
}
}
});
};
And that's it. We now use some simple code to catch DynamoDB capacity exceptions, ask for more capacity, save data, recover data and downgrade capacity. Of course you probably have to do some tuning for your scenario, but I hope you get a starting point ;).