読者です 読者をやめる 読者になる 読者になる

sorta kinda...

主にAWS関連ですが、これに限らずいろいろ勉強したことや思ったことを書いていきます。

Lambda から elasticsearch service に何かする [cloudpack OSAKA blog]

ナスです。

elasticsearch service (ES) 2.3 の古くなったインデックスを削除することにしたんですが、完成までわりと苦労したので書きます。

 

まずは ES への接続

通常、ES へは curl で操作するんですが、この curlAWS の認証情報をつけることができません。(いや、できるかもしれんけどわからない

今回は、Python で ES に接続します。elasticsearch モジュールがあったので、これも使います。
Python Elasticsearch Client — Elasticsearch 5.2.0 documentation

AWS 認証情報を作るところは、この aws4auth を使います。 pypi.python.org

ES 側は、アクセスポリシーで特定の IAM ロールからのみアクセス許可するようにしています。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789012:role/rolename"
      },
      "Action": "es:*",
      "Resource": "arn:aws:es:ap-northeast-1:123456789012:domain/domainname/*"
    }
  ]
}

あとは、Lambda から ES へ接続するだけです。今回は下記のサンプルからほぼパクリました。本当にありがとうございます! docs.aws.amazon.com blog.youyo.info

 

接続できたらやりたいことを実装

テスト的に 1 週間経ったインデックスは削除するように作りました。作ったのがこれ↓
インデックス名に年月日が含まれていたので、この文字列から何日経過したかを判定しました。普段コードはあまり書かないので、変数名がもう一つかもしれません…

# -*- coding: utf-8 -*-
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import requests
import json, os
import boto3
import datetime

region = 'ap-northeast-1'
es_endpoint = 'search-domainname-hogehoge.ap-northeast-1.es.amazonaws.com'
role_name = 'rolename'

def lambda_handler(event, context):
    es = connect_to_es()

    for key in (es.indices.get_alias()).keys():
        datename=key.split("-")
        pastdate=datetime.datetime.strptime(datename[len(datename)-1], '%Y.%m.%d')
        today=datetime.datetime.now()
        delta=(today-pastdate).days
        print(str(delta) + ' days has passed since index ' + key + ' was created.')
        if delta > 7 and es.indices.exists(index=key):
            if es.indices.delete(index=key):
                print('[INFO] Index ' + key + ' was deleted successfully.')
            else:
                print("[ERROR] Index " + key + " couldn't be deleted.")

def connect_to_es():
    credentials = get_credential()
    awsauth = AWS4Auth(
        credentials['access_key'],
        credentials['secret_key'],
        region,
        'es',
        session_token=credentials['token']
    )
    es = Elasticsearch(
        hosts=[{'host': es_endpoint, 'port': 443}],
        http_auth=awsauth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection
    )
    return es

def get_credential():
    sts_client = boto3.client('sts')
    assumedRoleObject = sts_client.assume_role(
        RoleArn="arn:aws:iam::123456789012:role/rolename",
        RoleSessionName="Access_to_ES_from_lambda"
    )
    credentials = assumedRoleObject['Credentials']
    return { 'access_key': credentials['AccessKeyId'],
             'secret_key': credentials['SecretAccessKey'],
             'token': credentials['SessionToken'] }

今まで curl で取ってきた JSON からデータ取り出していろいろして、ってやってたのが、elasticsearch モジュールに変えただけでむちゃくちゃシンプルなコードになりました!

本当は、Curator ってのを使って楽をしたかったのですが、Amazon ES のサポートが微妙だったので、今回はやめました。そのうち普通に使えるようになると信じてます。 github.com

 

ここまでたどり着くのに 1 週間弱もかかってしまったけど、これでもう困らないだろう。