Scan files via DeepSCA

More about DeepSCA feature introduction is in: https://docs.deepbits.com/docs/sbomsaasbom-builder

Below is a demo script for how to programmatically use the DeepSCA feature.

import requests
import hashlib
import os
import time
import json

# Update these two
FILE_TO_UPLOAD = './xxx.apk' 
RESULT_JSON = './output.json'

API_KEY = ''

API_BASE = 'https://api.deepbits.com/api/v1'

default_headers = {"x-api-key": API_KEY, "x-public-tool": "true"}

def upload_file(file_path, api_key):
    with open(file_path, "rb") as f:
        file_contents = f.read()

    print(f"Get upload URL for {os.path.basename(file_path)}...")
    file_name = os.path.basename(file_path)
    upload_url_response = requests.post(
        f"{API_BASE}/sbom_builder/upload_url",
        json={"fileName": file_name},
        headers=default_headers
    )

    if (upload_url_response.json()['meta']['code'] == 403):
        print("API key is invalid")
        raise Exception("API key is invalid")

    put_url = upload_url_response.json()['data']['uploadUrl']

    headers = {
        "Content-Type": "application/octet-stream",
        "x-api-key": api_key, 
        }

    print(f"Upload {os.path.basename(file_path)} to S3...")
    response = requests.put(
        put_url,
        data=file_contents,
        headers=headers
    )

    print(f"Trigger upload_success...")
    upload_success_response = requests.put(
        f"{API_BASE}/sbom_builder/upload_success",
        json={
            "path": upload_url_response.json()['data']['path'],
            "hash": hashlib.sha256(file_contents).hexdigest(),
            "fileName": os.path.basename(file_path),
            "desc": "sample apk file",
            # selected type could be meta | binary | repo | sbom
            "selectedType": "binary" 
        },
        headers=default_headers
    )

    print(f"Upload Success Response: {upload_success_response.json()}\n")
    return upload_success_response.json()


def get_shared_project_id():
    response = requests.get(
        f"{API_BASE}/user",
        headers=default_headers,
    )

    return response.json()['data']['publicTool']['sharedProjectId']

def add_file_to_project(file_id, project_id):
    putProjectRequest = requests.put(
        f"{API_BASE}/project/{project_id}",
        headers=default_headers,
        json={
            "name": "public tool project",
            "assets": [
                {
                    "assetType": "SBOMBuilder",
                    "assetIds": [
                        file_id
                    ]
                }
            ]
        }
    )

    createdAssets = putProjectRequest.json()['data']['createdAssets'][0]
    projectAssetId = createdAssets['_id']

    streamWatchResult = requests.put(
		f"{API_BASE}/project/{project_id}/{projectAssetId}/stream_watch",
		headers=default_headers,
		json={
			"action": "watch",
			"identifier": file_id,
		}
	)
    print(streamWatchResult.json())
    sbomStreamId = streamWatchResult.json()['data']['_id']
    return {
        "projectAssetId": projectAssetId,
        "sbomStreamId": sbomStreamId
    }

def check_status(project_id, project_asset_id, stream_id):
    max_retries = 50
    retry_interval = 10  # in seconds

    for i in range(max_retries):
        response = requests.get(
            f"{API_BASE}/project/{project_id}/{project_asset_id}/{stream_id}/scan_result",
            headers=default_headers,
        )

        parsedResult = response.json()
        scanEndAt = parsedResult.get('data', {}).get('scanResult', {}).get('scanEndAt')

        if scanEndAt:
            return parsedResult
        else:
            print(f"Current Status: scan is still running...")
            print(f"Retrying in {retry_interval} seconds...")
            time.sleep(retry_interval)
    else:
        raise ValueError("Max retries exceeded. API call failed.")

# upload file
project_id = get_shared_project_id()
upload_res = upload_file(FILE_TO_UPLOAD, API_KEY)
detail_id = upload_res['data']['_id']
add_file_res = add_file_to_project(detail_id, project_id)

# check scan status
detail_res = check_status(project_id, add_file_res['projectAssetId'], add_file_res['sbomStreamId'])

# Save detail_res to a local JSON file
with open(RESULT_JSON, 'w') as file:
    json.dump(detail_res, file)

print(f"Scan result saved to {RESULT_JSON}")
print("Demo run successfully without error")
print(f"You can also check the result on https://tools.deepbits.com/sbom/details/{detail_id}")