Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ inputs:
signing-key-file:
description: 'The .pem file used to sign the statement'
required: true
datatrails-url:
description: 'The fully url of the DataTrails SCITT Service'
required: false
default: 'https://app.datatrails.ai'

runs:
using: 'docker'
Expand All @@ -38,3 +42,4 @@ runs:
- ${{ inputs.transparent-statement-file }}
- ${{ inputs.issuer }}
- ${{ inputs.signing-key-file }}
- ${{ inputs.datatrails-url }}
1 change: 1 addition & 0 deletions scitt-scripts/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__
40 changes: 40 additions & 0 deletions scitt-scripts/api_requests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import os
import logging
import requests

REQUEST_TIMEOUT = 30


def get_app_auth_header(datatrails_url: str = "https://app.datatrails.ai") -> str:
"""
Get DataTrails bearer token from OIDC credentials in env
"""
# Pick up credentials from env
client_id = os.environ.get("DATATRAILS_CLIENT_ID")
client_secret = os.environ.get("DATATRAILS_CLIENT_SECRET")

if client_id is None or client_secret is None:
raise ValueError(
"Please configure your DataTrails credentials in the shell environment"
)

# Get token from the auth endpoint
url = f"{datatrails_url}/archivist/iam/v1/appidp/token"
response = requests.post(
url,
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret,
},
timeout=REQUEST_TIMEOUT,
)

if response.status_code != 200:
raise ValueError(
"FAILED to acquire bearer token %s, %s", response.text, response.reason
)

# Format as a request header
res = response.json()
return f'{res["token_type"]} {res["access_token"]}'
3 changes: 2 additions & 1 deletion scitt-scripts/check_operation_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def poll_operation_status(

except requests.HTTPError as e:
logger.debug("failed getting operation status, error: %s", e)

time_sleep(POLL_INTERVAL)

raise TimeoutError("signed statement not registered within polling duration")
Expand Down Expand Up @@ -137,5 +137,6 @@ def main():
print(e, file=sys.stderr)
sys.exit(1)


if __name__ == "__main__":
main()
5 changes: 3 additions & 2 deletions scitt-scripts/create_hashed_signed_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
# CBOR Object Signing and Encryption (COSE) "typ" (type) Header Parameter
# https://datatracker.ietf.org/doc/rfc9596/
HEADER_LABEL_TYPE = 16
COSE_TYPE="application/hashed+cose"
COSE_TYPE = "application/hashed+cose"


def open_signing_key(key_file: str) -> SigningKey:
"""
Expand Down Expand Up @@ -208,7 +209,7 @@ def main():
payload=payload_contents,
payload_location=args.payload_location,
signing_key=signing_key,
subject=args.subject
subject=args.subject,
)

with open(args.output_file, "wb") as output_file:
Expand Down
6 changes: 6 additions & 0 deletions scitt-scripts/create_signed_statement.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ def create_signed_statement(
HEADER_LABEL_CWT: {
HEADER_LABEL_CWT_ISSUER: issuer,
HEADER_LABEL_CWT_SUBJECT: subject,
# HEADER_LABEL_CWT_ISSUER: issuer.encode("ascii"),
# HEADER_LABEL_CWT_SUBJECT: subject.encode("ascii"),
HEADER_LABEL_CWT_CNF: {
HEADER_LABEL_CNF_COSE_KEY: {
KpKty: KtyEC2,
Expand Down Expand Up @@ -162,13 +164,17 @@ def main():
"--subject",
type=str,
help="subject to correlate statements made about an artifact.",
# a default of None breaks registration because registration does not allow nil issuer
default="scitt-subject",
)

# issuer
parser.add_argument(
"--issuer",
type=str,
help="issuer who owns the signing key.",
# a default of None breaks registration because registration does not allow nil subject
default="scitt-issuer",
)

# output file
Expand Down
18 changes: 11 additions & 7 deletions scitt-scripts/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ SUBJECT=${4}
TRANSPARENT_STATEMENT_FILE=${5}
ISSUER=${6}
SIGNING_KEY_FILE=${7}
DATATRAILS_URL=${8}

SIGNED_STATEMENT_FILE="signed-statement.cbor"
TOKEN_FILE="./bearer-token.txt"
Expand All @@ -23,19 +24,20 @@ TOKEN_FILE="./bearer-token.txt"
# echo "SIGNING_KEY_FILE: " ${SIGNING_KEY_FILE}
# echo "SIGNED_STATEMENT_FILE: " ${SIGNED_STATEMENT_FILE}
# echo "TOKEN_FILE: " ${TOKEN_FILE}
# echo "DATATRAILS_URL: " ${DATATRAILS_URL}

if [ ! -f $PAYLOAD_FILE ]; then
echo "ERROR: Payload File: [$PAYLOAD_FILE] Not found!"
exit 126
fi

# "Create an access token"
/scripts/create-token.sh $TOKEN_FILE
# /scripts/create-token.sh $TOKEN_FILE

if [ ! -f $TOKEN_FILE ]; then
echo "ERROR: Token File: [$TOKEN_FILE] Not found!"
exit 126
fi
#if [ ! -f $TOKEN_FILE ]; then
# echo "ERROR: Token File: [$TOKEN_FILE] Not found!"
# exit 126
#fi

echo "Create a Signed Statement, hashing the payload"
python /scripts/create_hashed_signed_statement.py \
Expand All @@ -52,13 +54,15 @@ if [ ! -f $SIGNED_STATEMENT_FILE ]; then
exit 126
fi

echo "Register the SCITT Signed Statement to https://app.datatrails.ai/archivist/v1/publicscitt/entries"
# --datatrails-url $DATATRAILS_URL \
echo "Register the SCITT Signed Statement to $DATATRAILS_URL/archivist/v1/publicscitt/entries"
python /scripts/register_signed_statement.py \
--signed-statement-file $SIGNED_STATEMENT_FILE \
--output-file $TRANSPARENT_STATEMENT_FILE \
--datatrails-url https://app.dev-robin-0.dev.datatrails.ai \
--log-level INFO

python /scripts/dump_cbor.py \
--input $TRANSPARENT_STATEMENT_FILE

# curl https://app.datatrails.ai/archivist/v2/publicassets/-/events?event_attributes.subject=$SUBJECT | jq
# curl https://$DATATRAILS_URL/archivist/v2/publicassets/-/events?event_attributes.subject=$SUBJECT | jq
26 changes: 26 additions & 0 deletions scitt-scripts/generate_signing_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""
Generates an EXAMPLE issuer signing key using python ecdsa
"""

from ecdsa import SigningKey, NIST256p

FILE_NAME = "scitt-signing-key.pem"


def generate_key(topem=True):
key = SigningKey.generate(curve=NIST256p)
if not topem:
return key
return key.to_pem()


def main():
pem_key = generate_key(topem=True)
# Save the private key to a file
with open(FILE_NAME, "wb") as pem_file:
pem_file.write(pem_key)
print(f"PEM formatted private key generated and saved as '{FILE_NAME}'")


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -1,33 +1,13 @@
""" Module for verifying the counter signed receipt signature """

import re
from base64 import b64decode
import argparse

import requests

from jwcrypto import jwk

from pycose.messages import Sign1Message
from pycose.keys.curves import P384
from pycose.keys.keyparam import KpKty, EC2KpX, EC2KpY, KpKeyOps, EC2KpCurve
from pycose.keys.keytype import KtyEC2
from pycose.keys.keyops import VerifyOp
from pycose.keys import CoseKey
from pycose.headers import KID

HEADER_LABEL_DID = 391


def open_receipt(receipt_file: str) -> str:
"""
opens the receipt from the receipt file.
NOTE: the receipt is expected to be in base64 encoding.
"""
with open(receipt_file, encoding="UTF-8") as file:
receipt = file.read()
return receipt


def get_didweb_pubkey(didurl: str, kid: bytes) -> dict:
"""
Expand Down Expand Up @@ -90,55 +70,3 @@ def get_didweb_pubkey(didurl: str, kid: bytes) -> dict:
return cose_key

raise ValueError(f"no key with kid: {kid} in verification methods of did document")


def verify_receipt(receipt: str) -> bool:
"""
verifies the counter signed receipt signature
"""

# base64 decode the receipt into a cose sign1 message
b64decoded_message = b64decode(receipt)

# decode the cbor encoded cose sign1 message
message = Sign1Message.decode(b64decoded_message)

# get the verification key from didweb
kid: bytes = message.phdr[KID]
didurl = message.phdr[HEADER_LABEL_DID]

cose_key_dict = get_didweb_pubkey(didurl, kid)
cose_key = CoseKey.from_dict(cose_key_dict)

message.key = cose_key

# verify the counter signed receipt signature
verified = message.verify_signature()

return verified


def main():
"""Verifies a counter signed receipt signature"""

parser = argparse.ArgumentParser(description="Create a signed statement.")

# signing key file
parser.add_argument(
"--receipt-file",
type=str,
help="filepath to the stored receipt, in base64 format.",
default="scitt-receipt.txt",
)

args = parser.parse_args()

receipt = open_receipt(args.receipt_file)

verified = verify_receipt(receipt)

print(verified)


if __name__ == "__main__":
main()
99 changes: 99 additions & 0 deletions scitt-scripts/mmriver_algorithms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
Selective copy of

https://github.com/robinbryce/draft-bryce-cose-merkle-mountain-range-proofs/blob/main/algorithms.py

Which is a reference implementation of

https://robinbryce.github.io/draft-bryce-cose-merkle-mountain-range-proofs/draft-bryce-cose-merkle-mountain-range-proofs.html


"""
from typing import List
import hashlib


def included_root(i: int, nodehash: bytes, proof: List[bytes]) -> bytes:
"""Apply the proof to nodehash to produce the implied root

For a valid cose receipt of inclusion, using the returned root as the
detached payload will result in a receipt message whose signature can be
verified.

Args:
i (int): the mmr index where `nodehash` is located.
nodehash (bytes): the value whose inclusion is being proven.
proof (List[bytes]): the siblings required to produce `root` from `nodehash`.

Returns:
the root hash produced for `nodehash` using `path`
"""

# set `root` to the value whose inclusion is to be proven
root = nodehash

# set g to the zero based height of i.
g = index_height(i)

# for each sibling in the proof
for sibling in proof:
# if the height of the entry immediately after i is greater than g, then
# i is a right child.
if index_height(i + 1) > g:
# advance i to the parent. As i is a right child, the parent is at `i+1`
i = i + 1
# Set `root` to `H(i+1 || sibling || root)`
root = hash_pospair64(i + 1, sibling, root)
else:
# Advance i to the parent. As i is a left child, the parent is at `i + (2^(g+1))`
i = i + (2 << g)
# Set `root` to `H(i+1 || root || sibling)`
root = hash_pospair64(i + 1, root, sibling)

# Set g to the height index above the current
g = g + 1

# Return the hash produced. If the path length was zero, the original nodehash is returned
return root


def index_height(i: int) -> int:
"""Returns the 0 based height of the mmr entry indexed by i"""
# convert the index to a position to take advantage of the bit patterns afforded
pos = i + 1
while not all_ones(pos):
pos = pos - (most_sig_bit(pos) - 1)

return pos.bit_length() - 1


def hash_pospair64(pos: int, a: bytes, b: bytes) -> bytes:
"""
Compute the hash of pos || a || b

Args:
pos (int): the 1-based position of an mmr node. If a, b are left and
right children, pos should be the parent position.
a (bytes): the first value to include in the hash
b (bytes): the second value to include in the hash

Returns:
The value for the node identified by pos
"""
h = hashlib.sha256()
h.update(pos.to_bytes(8, byteorder="big", signed=False))
h.update(a)
h.update(b)
return h.digest()


def most_sig_bit(pos) -> int:
"""Returns the mask for the the most significant bit in pos"""
return 1 << (pos.bit_length() - 1)


def all_ones(pos) -> bool:
"""Returns true if all bits, starting with the most significant, are 1"""
imsb = pos.bit_length() - 1
mask = (1 << (imsb + 1)) - 1
return pos == mask
Loading