Skip to content

Base client

async_boto.core.base_client

logger module-attribute

logger = getLogger(__name__)

BaseClient

BaseClient(aws_session, service_name='execute-api')
Source code in async_boto/core/base_client.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(
    self,
    aws_session: boto3.Session | AsyncAWSSession,
    service_name: str = "execute-api",
) -> None:
    self._aws_session = aws_session
    self._service_name = service_name
    self._paginators = {}

    # Collect paginators from decorated methods
    for attr_name in dir(
        self.__class__
    ):  # Iterate over the class, not the instance
        method = getattr(self.__class__, attr_name, None)
        if callable(method) and hasattr(method, "_paginator_metadata"):
            metadata = method._paginator_metadata
            self._paginators[metadata["method"]] = metadata

paginate async

paginate(method_name, request)
Source code in async_boto/core/base_client.py
255
256
257
258
259
260
261
262
263
async def paginate(self, method_name, request: BaseModel):
    if method_name not in self._paginators:
        raise ValueError(
            f"Method {method_name} is not paginatable. "
            f"Available methods: {list(self._paginators.keys())}"
        )
    paginator = paginate(self, request=request, **self._paginators[method_name])
    async for page in paginator:
        yield page

register_paginator

register_paginator(pagination_query_key, pagination_response_key)
Source code in async_boto/core/base_client.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def register_paginator(pagination_query_key, pagination_response_key):
    def decorator(func):
        setattr(
            func,
            "_paginator_metadata",
            {
                "method": func.__name__,
                "pagination_query_key": pagination_query_key,
                "pagination_response_key": pagination_response_key,
            },
        )
        logger.debug(
            f"Registering paginator: {func.__name__} with query key: "
            f"{pagination_query_key} and response key: {pagination_response_key}"
        )

        @functools.wraps(func)  # Preserve the original function's attributes
        async def wrapper(self, *args, **kwargs):
            return await func(self, *args, **kwargs)

        return wrapper

    return decorator