【Python】なまけものエンジニアがPython(Boto3)でDynamoDBにアクセスするコードはこんな感じ

長らく技術的な記事を書かいておらず「お前ホントにエンジニアかよ?」という声が上がりそうなので久々に技術的な記事を書きます。

今回はPythonでDynamoDBにアクセスするコードを「仕事したくない系なまけものエンジニア」が書いた場合にどんな処理に書くのかをお見せしたいと思います。

テーブル定義

今回のコードを確認するにあたって作成したDynamoDBのテーブルはこちら。

テーブル名はuser、キーはパーティションキーだけでソートキーは無し、インデックスはつけず、読み込み/書き込みキャパシティは(お金がかからないよう)最低の1で作成しました。とてもエコノミーなテーブル構造です。

テーブル定義

データについては以下の5データを投入してあります。

投入データ


コード概要

私がデータベースにアクセスするコードをオブジェクト指向言語で書く場合、DBにアクセスするためのベースクラス実際にテーブルにアクセスするクラスという構成で実装します。

今回のコードで言うと「dynamodb_base.py」ファイルのDynamoDbBaseクラスがベースクラス、「user.py」ファイルのUserクラスが実際にテーブルにアクセスするクラスになります。

では実際のソースコードです。まずはDynamoDbBaseクラス。
※dynamodb_base.pyファイルとして作成

import boto3

# テーブルにアクセスするためのベースクラス
class DynamoDbBase:
    table = None
    # コンストラクタ
    def __init__(self, table_name):
        dynamodb = boto3.resource('dynamodb')
        self.table = dynamodb.Table(table_name)

    # DynamoDBからデータや件数を取得する基本メソッド
    def __get_data(self, func, kwargs: dict) -> tuple:
        if kwargs is None:
            kwargs = {}

        ret_items = []
        ret_count = 0
        # 全てのデータが取得できるまでループ
        LastEvaluatedKey = None
        while True:
            if LastEvaluatedKey is not None:
                kwargs['ExclusiveStartKey'] = LastEvaluatedKey

            # 問い合わせ実行
            response = func(**kwargs)
            ret_items.extend(response['Items'])
            ret_count += response['Count']

            # 取り残しデータが存在しない場合は終了
            if 'LastEvaluatedKey' not in response:
                break

            # 取り残しデータが存在する場合は'LastEvaluatedKey'の中身を変数にセット
            LastEvaluatedKey = response.get('LastEvaluatedKey')

        # 取得したデータと件数の両方を返す
        return ret_items, ret_count

    # 件数取得で余計なデータを取得しないようProjectionExpressionにダミーをセット
    def __set_project_exp_forcount(self, kwargs: dict) -> dict:
        if kwargs is None:
            kwargs = {}

        if 'ProjectionExpression' not in kwargs:
            kwargs['ProjectionExpression'] = 'dammy'

        return kwargs

    # データ取得(QUERY)
    def query_items(self, kwargs: dict) -> list:
        items, _ = self.__get_data(self.table.query, kwargs)
        return items

    # データ取得(SCAN)
    def scan_items(self, kwargs: dict) -> list:
        items, _ = self.__get_data(self.table.scan, kwargs)
        return items

    # 件数取得(QUERY)
    def query_count(self, kwargs: dict) -> int:
        kwargs = self.__set_project_exp_forcount(kwargs)
        _, count = self.__get_data(self.table.query, kwargs)
        return count

    # 件数取得(SCAN)
    def scan_count(self, kwargs: dict) -> int:
        kwargs = self.__set_project_exp_forcount(kwargs)
        _, count = self.__get_data(self.table.scan, kwargs)
        return count


次にUserクラス。
※user.pyファイルとして作成

from boto3.dynamodb.conditions import Key
from dynamodb_base import DynamoDbBase

# Userテーブルにアクセスするクラス
class User(DynamoDbBase):
    def __init__(self):
        table_name = 'user'
        super().__init__(table_name)

    # パーティションキーを指定してデータを取得(QUERY)
    def get(self, name) -> list:
        kwargs = {}
        # kwargs['IndexName'] = 'type-index'
        kwargs['ProjectionExpression'] = '#name, age'
        kwargs['ExpressionAttributeNames'] = {"#name": "name"}
        kwargs['KeyConditionExpression'] = Key('name').eq(name)
        return self.query_items(kwargs)

    # パーティションキーを指定してデータ件数を取得(QUERY)
    # ※ソートキーがなく取得データが必ず1件になるのであまり意味なし
    def get_count(self, name) -> int:
        kwargs = {}
        kwargs['KeyConditionExpression'] = Key('name').eq(name)
        return self.query_count(kwargs)

    # 指定した条件によりデータを取得(SCAN)
    def get_cat_list(self) -> list:
        kwargs = {}
        kwargs['ProjectionExpression'] = '#name, age'
        kwargs['ExpressionAttributeNames'] = {"#name": "name"}
        kwargs['FilterExpression'] = Key('type').eq('cat')
        return self.scan_items(kwargs)

    # 指定した条件によりデータ件数を取得(SCAN)
    def get_cat_count(self) -> int:
        kwargs = {}
        kwargs['FilterExpression'] = Key('type').eq('cat')
        return self.scan_count(kwargs)

if __name__ == '__main__':
    user = User()
    name = 'chaneko'
    print(f'get = {user.get(name)}')
    print(f'get_count = {user.get_count(name)}')
    print(f'get_cat_list = {user.get_cat_list()}')
    print(f'get_cat_count = {user.get_cat_count()}')


それでは各コードの詳細な説明をしていきます。

DynamoDbBaseクラスの説明

DynamoDbBaseクラスはDynamoDBにアクセスするための基本的な処理を提供するクラスで、このクラスは単体では使用せず別クラスで継承して利用することを想定しています。


コンストラクタ

    table = None
    # コンストラクタ
    def __init__(self, table_name):
        dynamodb = boto3.resource('dynamodb')
        self.table = dynamodb.Table(table_name)

DynamoDbBaseクラスのコンストラクタではテーブル名からDynamoDBにアクセスするためのテーブルオブジェクトを生成し、インスタンス変数として保持しています。継承先のクラスでテーブル名を渡してテーブルオブジェクトを生成します。


__get_dataメソッド

__get_dataメソッドがDynamoDBに接続するための中核となる処理です。このメソッドには多数のポイントがあるので少しずつ見ていきます。

引数

    def __get_data(self, func, kwargs: dict) -> tuple:

まずこのメソッドの引数としてfuncを渡せるようにしています。funcはdynamodbに問い合わせを行うためのメソッド(queryかscan)を渡します。こうすることで 問い合わせがquery でもscan でも1つのメソッドで処理できるようになっています。

引数のkwargsですが、問い合わせの際に指定するキーワード引数を辞書型として受け取ります。これにより問い合わせのオプションを動的に指定できるようにしています。

ループとLastEvaluatedKey

次にループ部分。

        # 全てのデータが取得できるまでループ
        LastEvaluatedKey = None
        while True:
            if LastEvaluatedKey is not None:
                kwargs['ExclusiveStartKey'] = LastEvaluatedKey

            # 問い合わせ実行
            response = func(**kwargs)
            ret_items.extend(response['Items'])
            ret_count += response['Count']

            # 取り残しデータが存在しない場合は終了
            if 'LastEvaluatedKey' not in response:
                break

            # 取り残しデータが存在する場合は'LastEvaluatedKey'の中身を変数にセット
            LastEvaluatedKey = response.get('LastEvaluatedKey')

DynamoDBのqueryやscanには結果セットのサイズ上限が1MBという制約があり、取得しようとしたデータが1MB以上ある場合に1回の問い合わせでは全てのデータを取得することができません。


そのため1MB以上のデータがある場合でも全てのデータが取得できるようループで問い合わせを何度も行うようになっています。

全てのデータが取得できたかどうかは、問い合わせ結果の結果セットのキーに’LastEvaluatedKey’があるかどうかで判断できます。結果セットに ‘LastEvaluatedKey’ が存在しない場合はループを終了(13行目)、存在する場合は ‘LastEvaluatedKey’ の値を変数LastEvaluatedKey にセット(17行目)し、ループの最初に戻って再度問い合わせを行います。

2回目以降の問い合わせでは変数LastEvaluatedKeyにデータが入っている(4行目)ので、変数の値をkwargsにセットして問い合わせの際のオプションとして指定します。

補足
結果セットで返ってくるLastEvaluatedKeyの値には最後に取得できたデータのキーが入ってきます。今回の例だと{‘name’:’nene’}のような辞書型のデータになります。
これを問い合わせパラメータの’ExclusiveStartKey’にセットすることで、name(キー)が’nene’というデータの次データから再問い合わせを開始することができます。

このコードでは最初に変数LastEvaluatedKeyにNoneを入れ、ループの最初にNoneチェックをすることで 1回目の処理では’ExclusiveStartKey’がセットされないようにし、1回目の処理と2回目以降の処理を1つのコードで処理しています。

こういう処理で1回は普通に問い合わせを行い2回目以降の問い合わせをループにするようなコードを見かけますが、上記のようにすることで全ての問い合わせを同一のコードで処理できます。同じようなコードの重複記述はバグの温床になりますからね。


問い合わせの実行(8行目)のところでは辞書型であるkwargsに**をつけてキーワード引数に変換して渡しています。これにより問い合わせオプションが動的に指定できるようになっています。 また結果セットにはデータ(配列)とデータ件数(数値)の情報が返ってくるので、それを戻り値用の変数にそれぞれセットしています。

リターン

        return ret_items, ret_count

DynamoDBの問い合わせ結果の結果セットにはデータ(配列)とデータ件数(数値)の情報が返ってくるので、戻り値としてその2つを返しています。


__set_project_exp_forcountメソッド

    def __set_project_exp_forcount(self, kwargs: dict) -> dict:
        if kwargs is None:
            kwargs = {}

        if 'ProjectionExpression' not in kwargs:
            kwargs['ProjectionExpression'] = 'dammy'

        return kwargs

先ほど説明した__get_dataを使用することでデータとデータ件数を取得できますが、 データ件数のみを取得したい場合でもデータも取れてきてしまい、データ転送量の面で無駄に料金が増えてしまいます。

そのためデータ件数のみを取得する場合には、このメソッドで問い合わせパラメータに小細工をすることでデータ転送量を減らすようにしています。

パラメータの’ProjectionExpression’ではテーブルのどの列のデータを取得するか?を指定するのですが、ここに存在しない適当な値を指定することで不要なデータを取得しないようにしています。今回のコードではdammyという列を指定しています。
※dammyという列名は使用されることが考えられるので、もっとランダムな文字列にした方が良いかもしれません。

このようにすることで結果セットの’Items’の値に、空の辞書型の配列([{}, {}, {}, {}])のような最小限のデータが返ってくるようにしています。


query_itemsメソッド、scan_itemsメソッド

    def query_items(self, kwargs: dict) -> list:
        items, _ = self.__get_data(self.table.query, kwargs)
        return items

    def scan_items(self, kwargs: dict) -> list:
        items, _ = self.__get_data(self.table.scan, kwargs)
        return items

この2つのメソッドは__get_dataメソッドを使ってデータを取得するメソッドです。dynamoDBに問い合わせをするメソッド(query or scan)が違うだけですね。

__get_dataの1つ目の引数に使用する問い合わせメソッドを渡し、2つ目の引数には自身の引数として渡されたkwargsをそのまま渡しています。

この2つのメソッドはデータ取得のみを目的としたものなので、返ってきたデータ件数は捨てています。受け取る際の変数をアンダーバー( ‘_’ )にすることで「このデータは使わないよー」という事を明示的に示しています。


query_countメソッド、scan_countメソッド

    def query_count(self, kwargs: dict) -> int:
        kwargs = self.__set_project_exp_forcount(kwargs)
        _, count = self.__get_data(self.table.query, kwargs)
        return count

    def scan_count(self, kwargs: dict) -> int:
        kwargs = self.__set_project_exp_forcount(kwargs)
        _, count = self.__get_data(self.table.scan, kwargs)
        return count

先ほどの2つのメソッドのデータ件数を取るバージョンなのであまり説明することはありません。

異なるところと言えば__set_project_exp_forcountメソッドを使用している部分(2行目、7 行目)。この2つのメソッドではデータ件数のみが必要なので、問い合わせで無駄なデータが取得されないように先ほど説明した__set_project_exp_forcountメソッドでパラメータを変換しています。


Userクラスの説明

UserクラスはDynamoDbBaseクラスで記述された処理を利用して、実際にuserテーブルにアクセスするためのクラスです。そのためUserクラスの定義でDynamoDbBaseクラスを継承しています。

class User(DynamoDbBase):


コンストラクタ

    def __init__(self):
        table_name = 'user'
        super().__init__(table_name)

Userクラスのコンストラクタでは自身がアクセスするテーブルのテーブル名(この場合はuser)を、親クラスであるDynamoDbBaseクラスのコンストラクタに渡しています。こうすることでuserテーブルにアクセスするためのテーブルオブジェクトをDynamoDbBaseクラス側で 生成してくれます。

もし別のテーブル、例えばcompanyという名前のテーブルにアクセスしたい場合はCompanyクラスを作成し、Userクラスと同様にコンストラクタで’company’というテーブル名を渡します。


getメソッド

    def get(self, name) -> list:
        kwargs = {}
        # kwargs['IndexName'] = 'type-index'
        kwargs['ProjectionExpression'] = '#name, age'
        kwargs['ExpressionAttributeNames'] = {"#name": "name"}
        kwargs['KeyConditionExpression'] = Key('name').eq(name)
        return self.query_items(kwargs)

このメソッドではDynamoDbBaseクラスのquery_itemsメソッドを使用してuserテーブルにアクセスします。

まずDynamoDBの問い合わせに使用するパラメータ用の変数(2行目)を用意します。

‘ProjectionExpression’にはデータを取得したい列を指定(4行目)します。今回はname列とage列を指定しています。name列に関しては’name’という文字列が予約語になっているため’#name’としています。 当然ながら’#name’ のままではエラーになってしまうため、’ExpressionAttributeNames’で’#name’は実際は’name’という名前であることを伝えています。(5行目)

‘ProjectionExpression’ では取得したいデータの検索条件を指定(6行目)します。

最後に上記設定したパラメータをDynamoDbBaseクラスのquery_itemsメソッドに渡してデータを取得(7行目)します。

余談ですが、もしインデックスを使う場合などでも問い合わせパラメータの’IndexName’に使用するインデックスを追加(3行目コメント)してやれば動作します。つまりパラメータで変更できる動作であればいくらでも対応できるという構造になっています。

※get_count、get_cat_list、get_cat_countの3メソッドについてはgetメソッドとそれほど変わらないので説明を省略します。


if name == ‘main‘:

if __name__ == '__main__':
    user = User()
    name = 'chaneko'
    print(user.get(name))
    print(user.get_count(name))
    print(user.get_cat_list())
    print(user.get_cat_count())

最後はUserクラスを使って実際にアクセスしているところ。「dynamodb_base.py」と「user.py」のファイルが保存されているディレクトリに行き、以下のコマンドをたたくことでDynamoDBのuserテーブルからデータが取得できます。

$ python user.py

上記のコマンドをたたいて実際に取得できたデータが以下。

get = [{‘name’: ‘chaneko’, ‘age’: Decimal(’15’)}]
get_count = 1
get_cat_list = [{‘name’: ‘chaneko’, ‘age’: Decimal(’15’)}, {‘name’: ‘nene’, ‘age’: Decimal(‘9’)}, {‘name’: ‘nana’, ‘age’: Decimal(’10’)}, {‘name’: ‘hana’, ‘age’: Decimal(’15’)}]
get_cat_count = 4


ちょっと気になるところ

これで説明は終わりですが最後に気になるところに触れておきます。


AWS SDKのライブラリ参照が分散している

「dynamodb_base.py」ファイル側でDynamoDBにアクセスするためにboto3をimportしていますが、「user.py」ファイルの方でもboto3関連のKeyメソッドをimportしています。

個人的には関連するライブラリ群の参照は1つの場所にとどめておきたいんですよね。例えば「dynamodb_base.py」ファイル内でboto3とKeyメソッドの両方をimportして、「user.py」ファイル の方ではboto3関連のライブラリは使わない(意識しない)ような感じです。

まぁでもあまり理想を追い求めると逆に使いにくくなったり、分かりにくくなったりしますからね。今回はこれで良しとしました。


大量データを取得した際に正しく動作するか分からない

いちおう今回のコードは1MB以上のデータでも全て取得できるようにしたつもりですが、1MB以上のデータが落ちてきた場合に最後まで正常に取得できるかどうかは確認していませんw

実際にに試そうと思うと大量のデータをテーブルに突っ込まないといけないですからね。お金がもったいないのでパス。

NENE(ネネ)
お前ホント最低だな

業務で実装した際は同じようなコードで動作したのでたぶん大丈夫です。もし上記のコードで大量データを取得した際に動かなかった場合は’ExclusiveStartKey’辺りの処理が間違っている可能性があるので修正してみてください。


おわりに

今回はトータル100行程度でそれほど大規模ではないですし大して難しいコードではありませんが、説明を見ても分かる通り考えるべきことは大量にあります。というか業務でコードを書く場合にはこれくらい考えていないとお話になりません。まぁ自分の周りにはこの程度のコードも書けない人ばかりというのが悩みなんですが...

全てのコードでこれだけのことを考えて実装を行うと時間はかかりますが、無駄を減らして保守性の高いコードを書くことで後々の時間の余裕につなげることができます。例えば仕様変更による修正コストを減らしたり、バグが少なくなったり。

というわけで今回は私のようななまけものエンジニアが時間を作るために、どれだけ考えてコードを書いているかというお話でした。

まぁつまり怠けるためにもそれなりに力が必要という事ですね。




とりあえずまともなコードを書きたかったら最低限ここら辺の書籍は読んでおくと良いです。お勧めはkindle版のあるCode Complete。