Skip to content

Aws sig v4 headers

async_boto.core.aws_sig_v4_headers

sign

sign(key, msg)
Source code in async_boto/core/aws_sig_v4_headers.py
10
11
def sign(key: bytes, msg: str):
    return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()

get_signature_key

get_signature_key(key, date_stamp, region_name, service_name)
Source code in async_boto/core/aws_sig_v4_headers.py
14
15
16
17
18
19
def get_signature_key(key: str, date_stamp: str, region_name: str, service_name: str):
    date = sign(("AWS4" + key).encode("utf-8"), date_stamp)
    region = sign(date, region_name)
    service = sign(region, service_name)
    signed = sign(service, "aws4_request")
    return signed

aws_sig_v4_headers

aws_sig_v4_headers(session, service, url, method, headers=None, query=None, payload=None)
Source code in async_boto/core/aws_sig_v4_headers.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def aws_sig_v4_headers(
    session: boto3.Session,
    service: str,
    url: str,
    method: str,
    headers: dict[str, Any] = None,
    query: list[tuple[str, str]] = None,
    payload: str = None,
):
    if not headers:
        headers = {}
    parsed_url = urllib.parse.urlparse(url)
    t = datetime.datetime.utcnow()
    amz_date = t.strftime("%Y%m%dT%H%M%SZ")
    date_stamp = t.strftime("%Y%m%d")
    credentials = session.get_credentials()

    canonical_uri = parsed_url.path
    if not canonical_uri:
        canonical_uri = "/"
    canonical_querystring = ""
    if query:
        canonical_querystring = "&".join(
            [f"{query_[0]}={query_[1]}" for query_ in query]
        )
    canonical_headers = (
        f"content-type:{headers.get('Content-Type', 'application/json')}\n"
        f"host:{parsed_url.hostname}\n"
        f"x-amz-date:{amz_date}\n"
    )
    signed_headers = "content-type;host;x-amz-date"
    if payload is None:
        payload_hash = hashlib.sha256(b"").hexdigest()
    else:
        payload_hash = hashlib.sha256(payload.encode("utf-8")).hexdigest()

    canonical_request = "\n".join(
        [
            method,
            canonical_uri,
            canonical_querystring,
            canonical_headers,
            signed_headers,
            payload_hash,
        ]
    )

    algorithm = "AWS4-HMAC-SHA256"
    credential_scope = f"{date_stamp}/{session.region_name}/{service}/aws4_request"
    string_to_sign = "\n".join(
        [
            algorithm,
            amz_date,
            credential_scope,
            hashlib.sha256(canonical_request.encode("utf-8")).hexdigest(),
        ]
    )

    signing_key = get_signature_key(
        credentials.secret_key, date_stamp, session.region_name, service
    )
    signature = hmac.new(
        signing_key, (string_to_sign).encode("utf-8"), hashlib.sha256
    ).hexdigest()
    authorization_header = (
        f"{algorithm} Credential={credentials.access_key}/"
        f"{credential_scope}, SignedHeaders={signed_headers}, "
        f"Signature={signature}"
    )

    return {
        **headers,
        "x-amz-security-token": credentials.token,
        "X-Amz-Date": amz_date,
        "Authorization": authorization_header,
    }