Add support for Kinesis.createStream

This commit is contained in:
Ryan Martin 2016-03-08 23:04:00 -05:00
parent e2f3a6efbe
commit 2269e53b8a
4 changed files with 185 additions and 0 deletions

View file

@ -168,6 +168,32 @@ DynamoDBPutItemsFunctionArn
[dynamo.putItems.template](test/aws/dynamo.putItems.template)
### Create a Kinesis stream
Mirrors the [Kinesis.createStream API method](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#createStream-property).
Yes, CloudFormation does support creating a Kinesis stream, but it does not allow you to specify the steam name; this
helper does.
This will delete the stream when the corresponding stack is deleted.
#### Parameters
##### ShardCount
The number of shards for the stream. Required; no default.
##### StreamName
The name of the stream. Required.
#### Output
StreamName and Arn (of the stream) - matches the output of the existing CloudFormation task.
#### Reference Output Name
KinesisCreateStreamFunctionArn
#### Example/Test Template
[kinesis.createStream.template](test/aws/kinesis.createStream.template)
### Put S3 Objects
Mirrors the [S3.putObject API method](http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html#putObject-property).

61
aws/kinesis.js Normal file
View file

@ -0,0 +1,61 @@
var Promise = require('bluebird'),
AWS = require('aws-sdk'),
base = require('lib/base'),
kinesis = Promise.promisifyAll(new AWS.Kinesis());
// Exposes the SNS.subscribe API method
function CreateStream(event, context) {
base.Handler.call(this, event, context);
}
CreateStream.prototype = Object.create(base.Handler.prototype);
CreateStream.prototype.handleCreate = function() {
var p = this.event.ResourceProperties;
delete p.ServiceToken;
p.ShardCount = parseInt(p.ShardCount);
return kinesis.createStreamAsync(p)
.then(function() {
return waitWhileStatus(p.StreamName, "CREATING");
})
.then(function(arn) {
return {
StreamName: p.StreamName,
Arn: arn
}
});
}
CreateStream.prototype.handleDelete = function(referenceData) {
var p = this.event.ResourceProperties;
return kinesis.deleteStreamAsync({StreamName: p.StreamName})
.then(function() {
return waitWhileStatus(p.StreamName, "DELETING")
})
.catch(function(err) {
return err;
});
}
exports.createStream = function(event, context) {
handler = new CreateStream(event, context);
handler.handle();
}
// Watch until the given status is no longer the status of the stream.
function waitWhileStatus(streamName, status) {
return Promise.try(function() {
var validStatuses = ["CREATING", "DELETING", "ACTIVE", "UPDATING"]
if (validStatuses.indexOf(status) >= 0) {
return kinesis.describeStreamAsync({StreamName: streamName})
.then(function(data) {
console.log("Current status for [" + streamName +"]: " + data.StreamDescription.StreamStatus);
if (data.StreamDescription.StreamStatus == status) {
return Promise.delay(2000)
.then(function() {
return waitWhileStatus(streamName, status);
});
} else {
return data.StreamDescription.StreamARN;
}
});
} else {
throw "status [" + status + "] not one of [" + validStatuses.join(", ") + "]";
}
});
}

View file

@ -162,6 +162,62 @@
"DynamoDBPutItemsFunctionRole"
]
},
"KinesisCreateStreamFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Version" : "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [ "lambda.amazonaws.com" ]
},
"Action": [ "sts:AssumeRole" ]
}
]
},
"ManagedPolicyArns": [
{ "Ref": "RoleBasePolicy" }
],
"Policies": [
{
"PolicyName": "KinesisCreator",
"PolicyDocument": {
"Version" : "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:CreateStream",
"kinesis:DeleteStream",
"kinesis:DescribeStream"
],
"Resource": "*"
}
]
}
}
]
}
},
"KinesisCreateStreamFunction": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"S3Bucket": "com.gilt.public.backoffice",
"S3Key": "lambda_functions/cloudformation-helpers.zip"
},
"Description": "Used to create a Kinesis stream",
"Handler": "aws/kinesis.createStream",
"Role": {"Fn::GetAtt" : [ "KinesisCreateStreamFunctionRole", "Arn" ] },
"Runtime": "nodejs",
"Timeout": 30
},
"DependsOn": [
"KinesisCreateStreamFunctionRole"
]
},
"S3PutObjectFunctionRole": {
"Type": "AWS::IAM::Role",
"Properties": {
@ -283,6 +339,10 @@
"Description": "The ARN of the DynamoDBPutItemsFunction, for use in other CloudFormation templates.",
"Value": { "Fn::GetAtt" : ["DynamoDBPutItemsFunction", "Arn"] }
},
"KinesisCreateStreamFunctionArn": {
"Description": "The ARN of the KinesisCreateStreamFunction, for use in other CloudFormation templates.",
"Value": { "Fn::GetAtt" : ["KinesisCreateStreamFunction", "Arn"] }
},
"SnsSubscribeFunctionArn": {
"Description": "The ARN of the SnsSubscribeFunction, for use in other CloudFormation templates.",
"Value": { "Fn::GetAtt" : ["SnsSubscribeFunction", "Arn"] }

View file

@ -0,0 +1,38 @@
{
"AWSTemplateFormatVersion": "2010-09-09",
"Parameters": {
"CFHelperStackName": {
"Type": "String",
"Description": "The name of the stack where you installed the CloudFormation helper functions. See https://github.com/gilt/cloudformation-helpers."
}
},
"Resources": {
"CFHelperStack": {
"Type": "AWS::CloudFormation::Stack",
"Properties": {
"TemplateURL": "https://s3.amazonaws.com/com.gilt.public.backoffice/cloudformation_templates/lookup_stack_outputs.template"
}
},
"CFHelper": {
"Type": "Custom::CFHelper",
"Properties": {
"ServiceToken": { "Fn::GetAtt" : ["CFHelperStack", "Outputs.LookupStackOutputsArn"] },
"StackName": { "Ref": "CFHelperStackName" }
},
"DependsOn": [
"CFHelperStack"
]
},
"CreateStream": {
"Type": "Custom::CreateStream",
"Properties": {
"ServiceToken": { "Fn::GetAtt" : ["CFHelper", "KinesisCreateStreamFunctionArn"] },
"ShardCount": 1,
"StreamName": { "Fn::Join": [ "-", [ { "Ref": "AWS::StackName" }, "stream" ] ] }
},
"DependsOn": [
"CFHelper"
]
}
}
}