Key Rotation Notifications

Post Updated: 2019-10-08

Description: Notification via SNS Topic


We recently completed an audit of a customer's AWS environment. I tend to either dive into the customer's IAM policies as the first item on the list or I leave it for last. I find that the reason behind this, for me, is it depends on the complexity of the environment. At a high-level, it's a fairly simple component (/service) to audit, so getting this out of the way first clears the ‘to-do’ list for the more complex stuff… And as for this recent audit, I worked through the customer's IAM policies as the first item on the ever-growing to-do list.

So, according to AWS Best Practice, they suggest that IAM user's Security Keys should be rotated as often as 90 days.

Of course, the IAM console would show each user's key info in a table format and indicate with that little orange warning sign that a key is required to be rotated. Cool, so this is a very straightforward way of seeing everything with a single pane of glass.

However, you need to log in and navigate to IAM console and possibly manage a set of calendar reminders to prevent these things from falling through the cracks - at least this is the only alternative I could think of.

Ideally, we should be able to get a reminder about all the keys that requires to be rotated before the 90-day ‘deadline’.

It feels like this is a tickbox in the IAM console that I'm missing somewhere. A simple, “notify me via this SNS topic when IAM security keys require rotation”. Alas, I couldn't find this setting. Even after implementing some top-notch googling techniques, and still not getting anywhere with an out-of-the-box solution, I decided to build this for myself. I might still be missing something here, but my solution works for my use-case, and I learned a lot in the process.

At first, I wrote this with Python and ran it from my laptop, this, of course, needed security keys itself to auth to AWS. I quickly figured that I'll have to manage the script's key as well, and I was able to programmatically rotate the script's own key if and when necessary. That was all straight forward, but soon after that, it dawned on me that this legacy way of thinking can be drastically improved on. I opted for the, now obvious choice, Lambda instead.

Lambda has its own set of difficulties. But if you've done this once and you understand this limitation, that goes along with Lambda, it's quite easy to navigate.

For python specifically, Lambda handles all standard Python libraries and dependencies, and if your code requires a non-standard library, which mine unfortunately does, you'll need to install this locally, and then upload all the code together in a package to AWS. More info on the whole process here, but I'll explain what I did for my requirement.

An important item is to keep an eye on the package size, and the number of dependencies. This has an impact on the time your function will take to cold-start. The more lightweight your code the better.

One or two things before we get into the meat of it all, for demonstration purposes, the folder where I've written my python function is called ScriptFolder. The script file itself is called lambda_function.py. I've included a configuration file, keyrotationConfig.txt, where I define the notification threshold (max-age of the key) and the Telegram bot token and chat ID. I'll cover that shortly.

Let's get started

Instead of installing all the python libraries and their dependencies needed to build a Telegram bot, I opted to rather send a notification by means of the Telegram API. This can be done with the ‘requests’ library. It's similar to a simple curl command from a Linux terminal. The requests library is unfortunately not standard, and as mentioned, we'll have to install this locally and upload it to AWS lambda.

(Side note: The Telegram portion is the mechanism I opted for, and I had everything set up already, it made sense for my requirement. You can use a method of comms that works for you)

Within my folder call ‘ScriptFolder’, I installed the ‘requests’ library into a folder called ‘package’. I used the zip command to compile the package folder into a zip file called function.zip.

[user@laptop ScriptFolder]$ pip3 install --target ./package requests
Collecting requests
Using cached

[... output suppressed ...]

Successfully installed certifi-2019.6.16 chardet-3.0.4 idna-2.8 requests-2.22.0 urllib3-1.25.3

[user@laptop ScriptFolder]$ cd package/
[user@laptop package]$ zip -r9 ../function.zip .
adding: urllib3-1.25.3.dist-info/ (stored 0%)
adding: urllib3-1.25.3.dist-info/METADATA (deflated 65%)

[... output suppressed ...]

adding: requests/certs.py (deflated 35%)

Once the requests library is packaged in the function.zip, I add my python function as well as my config file to the package.

[user@laptop ScriptFolder]$ ll
total 956
-rw-rw-r--.  1 user user 965730 Jul  5 15:19 function.zip
-rw-rw-r--.  1 user user     98 Jul  5 15:11 keyrotationConfig.txt
-rwxrwxr-x.  1 user user   3735 Jul  5 15:40 lambda_function.py
drwxr-xr-x. 13 user user   4096 Jul  5 15:17 package

[user@laptop ScriptFolder]$ zip -g function.zip lambda_function.py
adding: lambda_function.py (deflated 59%)

[user@laptop ScriptFolder]$ zip -g function.zip keyrotationConfig.txt
adding: keyrotationConfig.txt (stored 0%)

Now we have all the files necessary files, compiled in a nice little package, and we can now upload it to our AWS account. Note that, by this time I've already created my function in Lambda (it's called IAM_Key_Rotate_Notification), and I've taken care of the IAM role that the function will assume.

The Lambda function is a blank python3.6, ‘edit code inline’ function. And the role is basically just IAM_ReadOnly access.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "iam:GenerateCredentialReport",
                "iam:GenerateServiceLastAccessedDetails",
                "iam:Get*",
                "iam:List*",
                "iam:SimulateCustomPolicy",
                "iam:SimulatePrincipalPolicy"
            ],
            "Resource": "*"
        }
    ]
}

Uploading the zip file

We upload the function.zip in the following manner.

[user@laptop ScriptFolder]$ aws --profile freetier --region eu-west-1 lambda update-function-code --function-name IAM_Key_Rotate_Notification --zip-file fileb://function.zip
{
"FunctionName": "IAM_Key_Rotate_Notification",
"FunctionArn": "arn:aws:lambda:eu-west-1::function:IAM_Key_Rotate_Notification",
"Runtime": "python3.6",
"Role": "arn:aws:iam:::role/IAM_KeyRotation",
"Handler": "lambda_function.lambda_handler",
"CodeSize": 965730,
"Description": "",
"Timeout": 3,
"MemorySize": 128,
"LastModified": "2019-07-05T13:19:42.001+0000",
"CodeSha256": "",
"Version": "$LATEST",
"TracingConfig": {
"Mode": "PassThrough"
},
"RevisionId": "12345678-1234-1234-1234-123456789011"
}

As for the actual python function code; we have a configuration file (keyrotationConfig.txt) where we define the maximum age of an IAM user's key, and the Telegram parameters, which is the notification mechanism to inform the Admin user of the keys that require rotation. If you'd like more info on Telegram bots and how to interact with them, see the post Telegram Bots - To Lend a Helping Hand. We split the configuration from the code for the simple reason of changing the behaviour of the program without changing the code of the function. The config is rather basic, at least for now. The SECURITY section contains the config for the Telegram bot and the KEY section the maximum age of a key. This means that if an IAM user's key is older than 85 days, the Admin will be notified to rotate the key. The config file will look something like this:

[SECURITY]
BOT_TOKEN=123456789:AAA1AaAaAaaaaaaaaa1AAaAa1A1AA1aaaaA
CHAT_ID=123123123

[KEY]
AGE=85

The Logic

And the logic of the function, here we go:

We get a list of users and save them to a list. We then loop through each and check the status of both of the user's keys. If the key is found to be active, we check if the age of the key is older than the allowed threshold, if it is, we add this to a second list. Once we've gone through all the users, we send a notification to the configured channel/chat/bot, about the keys that need to be rotated.

import json, configparser, calendar, boto3, datetime, requests

NOW = datetime.datetime.now().strftime("%Y-%m-%d")
CURRDATE = datetime.datetime.strptime(NOW, '%Y-%m-%d')

REGION = 'eu-west-1'
CONFIGPATH = 'keyrotationConfig.txt'
config = configparser.ConfigParser()
config.read(CONFIGPATH)

USER_MAX_KEY_AGE = config['KEY']['AGE']
TELEGRAM_BOT_TOKEN = config['SECURITY']['BOT_TOKEN']
CHAT_ID = config['SECURITY']['CHAT_ID']
TGRAMURL = 'https://api.telegram.org/bot' + TELEGRAM_BOT_TOKEN + '/sendMessage?chat_id=' + CHAT_ID + '&text='

def lambda_handler(event, context):
    IAMCLIENT = boto3.client('iam')
    USERS = []
    KEYLIST = []
    RESPONSE = IAMCLIENT.list_users()
    NUMUSERS = len(RESPONSE['Users'])

    for INDEX in range(NUMUSERS):
      USERS.append(RESPONSE['Users'][INDEX]['UserName'])

    for USER in USERS:
        RESPONSE = IAMCLIENT.list_access_keys(UserName=USER)
        NUMKEYS = len(RESPONSE['AccessKeyMetadata'])

        for INDEX in range(NUMKEYS):
            USER_KEY_STATUS = RESPONSE['AccessKeyMetadata'][INDEX]['Status']
            USER_KEY_ID = RESPONSE['AccessKeyMetadata'][INDEX]['AccessKeyId']

            if USER_KEY_STATUS == 'Active':
                USER_KEY_DATE_CREATED = datetime.datetime.strptime(RESPONSE['AccessKeyMetadata'][INDEX]['CreateDate'].strftime("%Y-%m-%d"), '%Y-%m-%d')
                USER_KEY_AGE = CURRDATE - USER_KEY_DATE_CREATED

                if USER_KEY_AGE.days > int(USER_MAX_KEY_AGE):
                    USER_NAME = RESPONSE['AccessKeyMetadata'][INDEX]['UserName']
                    KEYLIST.append("Key: " + str(USER_KEY_ID) + " for IAM user: " + USER_NAME + " is required to be rotated. Key Age = " + str(USER_KEY_AGE.days) + " days.")

    if len(KEYLIST) > 0:        
        MSG = '\n'.join(map(str, KEYLIST))
        RESPONSE = requests.post(TGRAMURL + MSG)

Just a note here: For my use-case, the number of IAM records were fairly limited, so I didn't see the need to build in pagination, but I will be working on this in a version2. See a guide on how to achieve that here.

As a final result, if any of the keys are older than the threshold configured, we send the Administrator a notification that will look similar to this:

(For demo purposes I configured the Max Key Age to 10 days, but in production, I configured this to 85 days)

From here we can use CloudWatch rules to schedule the Lambda function to trigger periodically. As an example: we can schedule the Lambda function to run every weekday at 15:00.
CloudWatch -> Rules -> Create Rule -> Event Source - Schedule - Cron Expression

0 15 ? * MON-FRI *


And that's it… With this implemented, we don't need to keep checking for old IAM Keys, and we'll be notified of IAM Security Keys that require rotation.

Update:

A number of people have asked that I do a post on how to deliver the results via an SNS topic. So here we go…

The first thing that improves on the previous version, is that we'll be able to achieve all of this with standard python libraries, meaning, that we don't have to do all of that local install, bundling and uploading of the ‘requests’ package.

So here's the original code (the one with Telegram notifications):

import json, configparser, calendar, boto3, datetime, requests

NOW = datetime.datetime.now().strftime("%Y-%m-%d")
CURRDATE = datetime.datetime.strptime(NOW, '%Y-%m-%d')

REGION = 'eu-west-1'
CONFIGPATH = 'keyrotationConfig.txt'
config = configparser.ConfigParser()
config.read(CONFIGPATH)

USER_MAX_KEY_AGE = config['KEY']['AGE']
TELEGRAM_BOT_TOKEN = config['SECURITY']['BOT_TOKEN']
CHAT_ID = config['SECURITY']['CHAT_ID']
TGRAMURL = 'https://api.telegram.org/bot' + TELEGRAM_BOT_TOKEN + '/sendMessage?chat_id=' + CHAT_ID + '&text='

def lambda_handler(event, context):
    IAMCLIENT = boto3.client('iam')
    USERS = []
    KEYLIST = []
    RESPONSE = IAMCLIENT.list_users()
    NUMUSERS = len(RESPONSE['Users'])

    for INDEX in range(NUMUSERS):
      USERS.append(RESPONSE['Users'][INDEX]['UserName'])

    for USER in USERS:
        RESPONSE = IAMCLIENT.list_access_keys(UserName=USER)
        NUMKEYS = len(RESPONSE['AccessKeyMetadata'])

        for INDEX in range(NUMKEYS):
            USER_KEY_STATUS = RESPONSE['AccessKeyMetadata'][INDEX]['Status']
            USER_KEY_ID = RESPONSE['AccessKeyMetadata'][INDEX]['AccessKeyId']

            if USER_KEY_STATUS == 'Active':
                USER_KEY_DATE_CREATED = datetime.datetime.strptime(RESPONSE['AccessKeyMetadata'][INDEX]['CreateDate'].strftime("%Y-%m-%d"), '%Y-%m-%d')
                USER_KEY_AGE = CURRDATE - USER_KEY_DATE_CREATED

                if USER_KEY_AGE.days > int(USER_MAX_KEY_AGE):
                    USER_NAME = RESPONSE['AccessKeyMetadata'][INDEX]['UserName']
                    KEYLIST.append("Key: " + str(USER_KEY_ID) + " for IAM user: " + USER_NAME + " is required to be rotated. Key Age = " + str(USER_KEY_AGE.days) + " days.")

    if len(KEYLIST) > 0:        
        MSG = '\n'.join(map(str, KEYLIST))
        RESPONSE = requests.post(TGRAMURL + MSG)

To get to our end result, we will be keeping the majority of this code, and just change the method that we deliver the results to the user, which is basically this last part.

if len(KEYLIST) > 0:
  MSG = ‘\n’.join(map(str, KEYLIST))
  RESPONSE = requests.post(TGRAMURL + MSG)

Instead of using the requests library we'll be posting this info to our SNS topic of choice.

To set up our SNS topic, navigate to the SNS service in the AWS Console, click the ‘topics’ item in the left menu bar, then click Create Topic.

Give your topic a name and display name. The Access Policy is going to be critical for our use case. We need to ensure that our Lambda function is allowed to publish to our SNS topic. Configure the topic based on your requirements, the section ‘Define who can publish messages to the topic’ is most critical for us.

Once the topic is created, we need to create a subscription and recipients. The protocol is, of course, the delivery method of your message.

With the topic and subscription created, copy the ARN of the topic; we'll be using that in our Lambda function.

Ok, back to our code:

We can remove the ‘requests’ library from the ‘imports’ command, and all the variable definitions around the Telegram bot and keys.

We can define the SNS client:

SNS = boto3.client(‘sns’)

And we can change the way the message is delivered.

if len(KEYLIST) > 0:
    MSG = ‘\n’.join(map(str, KEYLIST))
    RESPONSE = SNS.publish(
        TopicArn=’arn:aws:sns:eu-west-1:123456789012:SNS_TOPIC_NAME’,
        Message=MSG,
    )

And ensure you replace the TopicArn value with the one you created earlier.

Here's the full code now with our new method of delivery:

import json, configparser, calendar, boto3, datetime

NOW = datetime.datetime.now().strftime("%Y-%m-%d")
CURRDATE = datetime.datetime.strptime(NOW, '%Y-%m-%d')

REGION = 'eu-west-1'
CONFIGPATH = 'keyrotationConfig.txt'
config = configparser.ConfigParser()
config.read(CONFIGPATH)

USER_MAX_KEY_AGE = config['KEY']['AGE']
SNS = boto3.client('sns')

def lambda_handler(event, context):
    IAMCLIENT = boto3.client('iam')
    USERS = []
    KEYLIST = []
    RESPONSE = IAMCLIENT.list_users()
    NUMUSERS = len(RESPONSE['Users'])

    for INDEX in range(NUMUSERS):
      USERS.append(RESPONSE['Users'][INDEX]['UserName'])

    for USER in USERS:
        RESPONSE = IAMCLIENT.list_access_keys(UserName=USER)
        NUMKEYS = len(RESPONSE['AccessKeyMetadata'])

        for INDEX in range(NUMKEYS):
            USER_KEY_STATUS = RESPONSE['AccessKeyMetadata'][INDEX]['Status']
            USER_KEY_ID = RESPONSE['AccessKeyMetadata'][INDEX]['AccessKeyId']

            if USER_KEY_STATUS == 'Active':
                USER_KEY_DATE_CREATED = datetime.datetime.strptime(RESPONSE['AccessKeyMetadata'][INDEX]['CreateDate'].strftime("%Y-%m-%d"), '%Y-%m-%d')
                USER_KEY_AGE = CURRDATE - USER_KEY_DATE_CREATED

                if USER_KEY_AGE.days > int(USER_MAX_KEY_AGE):
                    USER_NAME = RESPONSE['AccessKeyMetadata'][INDEX]['UserName']
                    KEYLIST.append("Key: " + str(USER_KEY_ID) + " for IAM user: " + USER_NAME + " is required to be rotated. Key Age = " + str(USER_KEY_AGE.days) + " days.")

    if len(KEYLIST) > 0:
        MSG = '\n'.join(map(str, KEYLIST))
        RESPONSE = SNS.publish(
            TopicArn='arn:aws:sns:eu-west-1:998995424342:ALL_CIS_ALARMS',
            Message=MSG,
        )

    print(RESPONSE)