Deepbits Platform: Link a file to a project to scan

This API script demonstrates linking a file to a project on the Deepbits platform and checking the scan status of the linked file.

The link_file_to_project function links a specified file (file_id) to a project (project_id) on the Deepbits platform. It retrieves the project name, updates the project with the new file ID, and establishes a stream watch for the file. If successful, it returns the projectAssetId and sbomStreamId for further operations.

The check_status function monitors the scan status of a linked file within a project. It makes repeated requests to the API endpoint (GET /project/{project_id}/{project_asset_id}/{stream_id}/scan_result) to retrieve the scan result.

Below is a demo script demonstrating how to link a file (FILE_ID) to a project (PROJECT_ID), check the scan status, and save the scan result to a local JSON file (RESULT_JSON). Adjust the placeholders , , and with your specific values for integration.

import requests
import time
import json

API_KEY = '<Your API key generated from app.deepbits.com>'
API_BASE = 'https://api.deepbits.com/api/v1'

default_headers = {"x-api-key": API_KEY }

FILE_ID = '<Your File ID>'
PROJECT_ID = '<Your Project ID>'

def main():
    response = link_file_to_project(FILE_ID, PROJECT_ID)

    if response:
        # check scan status
        detail_res = check_status(PROJECT_ID, response.get('projectAssetId', None), response.get('sbomStreamId', None))

        # Save detail_res to a local JSON file
        RESULT_JSON = '/home/wei/code/deepbits/API/requirements.txt.json'
        with open(RESULT_JSON, 'w') as file:
            json.dump(detail_res, file)

        print(f"Scan result saved to {RESULT_JSON}")
        print(f"You can also check the result on project {PROJECT_ID}")
    else:
        print("Failed to link file to project")

def link_file_to_project(file_id, project_id):
    try:
        project_name = get_project_name(project_id)
        if not project_name:
            print(f"Failed to get project name for project ID {project_id}")
            return None
        
        file_id_list = get_file_id_list_by_project_id(project_id)
        if file_id_list is None:
            print(f"Failed to get file ID list for project ID {project_id}")
            return None
        
        file_id_list.append(file_id)
        
        put_response = requests.put(
            f"{API_BASE}/project/{project_id}",
            headers=default_headers,
            json={
                "name": project_name,
                "assets": [
                    {
                        "assetType": "SBOMBuilder",
                        "assetIds": file_id_list
                    }
                ]
            }
        )
        put_response.raise_for_status()
        
        created_assets = put_response.json().get('data', {}).get('createdAssets', [])
        if not created_assets:
            print("Failed to get created assets")
            return None
        
        for asset in created_assets:
            if asset.get('sbomBuilderId') == file_id:
                project_asset_id = asset.get('_id')
                if not project_asset_id:
                    print("Failed to get project asset ID")
                    return None
            
                stream_watch_response = requests.put(
                    f"{API_BASE}/project/{project_id}/{project_asset_id}/stream_watch",
                    headers=default_headers,
                    json={
                        "action": "watch",
                        "identifier": file_id,
                    }
                )
                stream_watch_response.raise_for_status()
                
                sbom_stream_id = stream_watch_response.json().get('data', {}).get('_id')
                
                print(f"Successfully linked file {file_id} to project {project_id}")
                return {
                    "projectAssetId": project_asset_id,
                    "sbomStreamId": sbom_stream_id
                }
    except (requests.RequestException, ValueError, KeyError) as err:
        print(f"An error occurred: {err}")
        return None

def get_project_name(project_id):
    try:
        resp = requests.get(f"{API_BASE}/project/{project_id}", headers=default_headers)
        resp.raise_for_status()
        return resp.json().get('data', {}).get('name')
    except (requests.RequestException, ValueError, KeyError) as err:
        print(f"An error occurred: {err}")
        return None

def get_file_id_list_by_project_id(project_id):
    try:
        resp = requests.get(f"{API_BASE}/project/{project_id}/assets", headers=default_headers)
        resp.raise_for_status()
        data = resp.json().get('data', {})
        docs = data.get('docs', [])
        if docs:
            return docs[0].get('assetIds', [])
        else:
            return []
    except (requests.RequestException, ValueError, KeyError) as err:
        print(f"An error occurred: {err}")
        return []

def check_status(project_id, project_asset_id, stream_id):
    max_retries = 50
    retry_interval = 3  # in seconds

    for i in range(max_retries):
        response = requests.get(
            f"{API_BASE}/project/{project_id}/{project_asset_id}/{stream_id}/scan_result",
            headers=default_headers,
        )

        parsedResult = response.json()
        scanEndAt = None
        scanResult = parsedResult.get('data', {}).get('scanResult', {})
        if scanResult:
            scanEndAt = scanResult.get('scanEndAt', None)

        if scanEndAt:
            return parsedResult
        else:
            print(f"Current Status: scan is still running...")
            print(f"Retrying in {retry_interval} seconds...")
            time.sleep(retry_interval)
    else:
        raise ValueError("Max retries exceeded. API call failed.")

if __name__ == "__main__":
    main()