Skip to the content.

Python3 + boto3 の使用方法

諸事情でコピペしていないため、typo があるかも。 見つけたらその時になおす。 ていうか自分用のメモなので自分がわかれば良いのノリで記述する。

セッション作成

あまり使わないかも

import os
import boto3

session = boto3.Session(
    aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID", "<DEFAULT>"),
    aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY", "<DEFAULT>"),
    region_name=os.environ.get("AWS_REGION_NAME", "ap-northeast-1")
)

SQS

Client 作成

from typing import Optional
import os
import boto3

def create_client(endpoint_url: Optional[str] = None):
    client = boto3.client("sqs", endpoint_url=endpoint_url)
    return client

先頭メッセージ取得

visibility_timeout: 可視性タイムアウト(秒) この値を 0 にすると本当に値を取得するだけになる。 取得したデータを削除したい場合などは 0 を超えた値にする必要がある。

from typing import Any, Dict, Optional

def peek(
    queue_url: str,
    visibility_timeout: int = 0,
) -> Optional[Dict[str, Any]:
    resp = client.receive_message(
        QueueUrl=queue_url,
        maxNumberOfMessages=1,
        VisibilityTimeout=visibility_timeout,
        WaitTimeSeconds=1,
    )
    messages = resp.get("Messages")
    if messages is not None and len(messages) > 0:
        return messages[0]

    return None

メッセージ列挙

def enumerate(
    queue_url: str,
    visibility_timeout: int = 60,
    max_number_of_messages: int = 10,
):
    ret = []
    next_token = None
    while True:
        resp = client.receive_message(
            QueueUrl=queue_url,
            MaxNumberOfMessages=max_number_of_messages,
            VisibilityTimeout=visibility_timeout,
            WaitTimeSeconds=1,
            ReceiveRequestAttemptId=next_token,
        )

        messages = resp.get("Messages")
        if message is None:
            break
        
        for message in messages:
            ret.append(message)

        next_token = messages[-1].get("ReceiptHandle")

    return ret

メッセージ追加

from typing import Any, Dict
from uuid import uuid4
import json
import boto3

def enqueue(queue_url: str, data: Dict[str, Any]):
    msg: str = json.dumps(data, ensure_awscii=False)
    if TARGET_IS_FIFO:
        return client.send_message(
            QueueUrl=queue_url,
            MessageBody=msg,
            MessageDeduplicationId=str(uuid4()),
            MessageGroupId=str(uuid4()),
        )
    else:
        return client.send_message(
            QueueUrl=queue_url,
            MessageBody=msg,
        )

メッセージ削除

自分で SQS からデータを取得して消したい場合など。 SourceMapping で呼び出されている場合は不要。

# データを可視性タイムアウト 60 秒で取得する
# 取得後 60 秒は削除可能
msg = peek(queue_url, visibility_timeout=60)
client.delete_message(
    QueueUrl=queue_url,
    ReceiptHandle=msg["ReceiptHandle"],
)

可視性タイムアウト設定

これも自分で SQS からデータを取得して削除せずに、可視性タイムアウトだけリセットしたい場合などに使用する。 Source Mapping で呼び出されている場合は不要。

# データを可視性タイムアウト 60 秒で取得する
# 取得後 60 秒は削除可能
msg = peek(queue_url, visibility_timeout=60)
client.change_message_visibility(
    QueueUrl=queue_url,
    ReceiptHandle=msg["ReceiptHandle"],
    VisibilityTimeout=0,
)

S3

Client 作成

from typing import Optional
import os
import boto3

def create_client(endpoint_url: Optional[str] = None):
    client = boto3.client("s3", endpoint_url=endpoint_url)
    return client

ファイルをアップロード

def upload(bucket_name: str, key: str, buffer: bytes):
    client.put_object(
        Bucket=bucket_name,
        Key=key,
        Body=buffer
    )

ファイルをダウンロード

from typing import Optional

def download(bucket_name: str, key: str) -> Optional[bytes]:
    resp = client.get_object(Bucket=bucket_name, Key=key)
    body=resp.get("Body")
    if body is None:
        return None
    else:
        return body.read()

ファイルを削除

def delete(bucket_name: str, key: str):
    client.delete_object(Bucket=bucket_name, Key=key)

ファイル有無確認

一覧取得などもこれを流用すればいい。

def exists(bucket_name: str, key: str) -> bool:
    paginator = client.get_paginator("list_objects_v2")
    iter = paginator.paginate(Bucket=bucket_name, Prefix=key)

    for page in iter:
        contents = page.get("Contents")
        for obj in contents:
            if key == obj["Key"]:
                return True

    return False

Bucket 間コピー

def copy(src_bucket_name: str, src_key: str, dest_bucket_name: str, dest_key: str):
    source = {
        "Bucket": src_bucket_name,
        "Key": src_key,
    }

    client.copy(source, dest_bucket_name, dest_key)

Lambda

Client 作成

from typing import Optional
import os
import boto3

def create_client(endpoint_url: Optional[str] = None):
    client = boto3.client("lambda", endpoint_url=endpoint_url)
    return client

Lambda 呼び出し

from typing import Any, Dict
import json

def invoke(func_name: str, payload: Dict[str, Any]):
    body = json.dumps(payload, ensure_ascii=False)
    resp = client.invoke(
        FunctionName=func_name,
        Payload=body,
        InvokcationType="RequestResponse",
    )

    return resp.read().decode()