Deepbits Platform: Link a file to a project to scan
This API script demonstrates linking a file to a project on the Deepbits platform and checking the scan status of the linked file.
The link_file_to_project function links a specified file (file_id) to a project (project_id) on the Deepbits platform. It retrieves the project name, updates the project with the new file ID, and establishes a stream watch for the file. If successful, it returns the projectAssetId and sbomStreamId for further operations.
The check_status function monitors the scan status of a linked file within a project. It makes repeated requests to the API endpoint (GET /project/{project_id}/{project_asset_id}/{stream_id}/scan_result) to retrieve the scan result.
Below is a demo script demonstrating how to link a file (FILE_ID) to a project (PROJECT_ID), check the scan status, and save the scan result to a local JSON file (RESULT_JSON). Adjust the placeholders , , and with your specific values for integration.
import requests
import time
import json
API_KEY = '<Your API key generated from app.deepbits.com>'
API_BASE = 'https://api.deepbits.com/api/v1'
default_headers = {"x-api-key": API_KEY }
FILE_ID = '<Your File ID>'
PROJECT_ID = '<Your Project ID>'
def main():
response = link_file_to_project(FILE_ID, PROJECT_ID)
if response:
# check scan status
detail_res = check_status(PROJECT_ID, response.get('projectAssetId', None), response.get('sbomStreamId', None))
# Save detail_res to a local JSON file
RESULT_JSON = '/home/wei/code/deepbits/API/requirements.txt.json'
with open(RESULT_JSON, 'w') as file:
json.dump(detail_res, file)
print(f"Scan result saved to {RESULT_JSON}")
print(f"You can also check the result on project {PROJECT_ID}")
else:
print("Failed to link file to project")
def link_file_to_project(file_id, project_id):
try:
project_name = get_project_name(project_id)
if not project_name:
print(f"Failed to get project name for project ID {project_id}")
return None
file_id_list = get_file_id_list_by_project_id(project_id)
if file_id_list is None:
print(f"Failed to get file ID list for project ID {project_id}")
return None
file_id_list.append(file_id)
put_response = requests.put(
f"{API_BASE}/project/{project_id}",
headers=default_headers,
json={
"name": project_name,
"assets": [
{
"assetType": "SBOMBuilder",
"assetIds": file_id_list
}
]
}
)
put_response.raise_for_status()
created_assets = put_response.json().get('data', {}).get('createdAssets', [])
if not created_assets:
print("Failed to get created assets")
return None
for asset in created_assets:
if asset.get('sbomBuilderId') == file_id:
project_asset_id = asset.get('_id')
if not project_asset_id:
print("Failed to get project asset ID")
return None
stream_watch_response = requests.put(
f"{API_BASE}/project/{project_id}/{project_asset_id}/stream_watch",
headers=default_headers,
json={
"action": "watch",
"identifier": file_id,
}
)
stream_watch_response.raise_for_status()
sbom_stream_id = stream_watch_response.json().get('data', {}).get('_id')
print(f"Successfully linked file {file_id} to project {project_id}")
return {
"projectAssetId": project_asset_id,
"sbomStreamId": sbom_stream_id
}
except (requests.RequestException, ValueError, KeyError) as err:
print(f"An error occurred: {err}")
return None
def get_project_name(project_id):
try:
resp = requests.get(f"{API_BASE}/project/{project_id}", headers=default_headers)
resp.raise_for_status()
return resp.json().get('data', {}).get('name')
except (requests.RequestException, ValueError, KeyError) as err:
print(f"An error occurred: {err}")
return None
def get_file_id_list_by_project_id(project_id):
try:
resp = requests.get(f"{API_BASE}/project/{project_id}/assets", headers=default_headers)
resp.raise_for_status()
data = resp.json().get('data', {})
docs = data.get('docs', [])
if docs:
return docs[0].get('assetIds', [])
else:
return []
except (requests.RequestException, ValueError, KeyError) as err:
print(f"An error occurred: {err}")
return []
def check_status(project_id, project_asset_id, stream_id):
max_retries = 50
retry_interval = 3 # in seconds
for i in range(max_retries):
response = requests.get(
f"{API_BASE}/project/{project_id}/{project_asset_id}/{stream_id}/scan_result",
headers=default_headers,
)
parsedResult = response.json()
scanEndAt = None
scanResult = parsedResult.get('data', {}).get('scanResult', {})
if scanResult:
scanEndAt = scanResult.get('scanEndAt', None)
if scanEndAt:
return parsedResult
else:
print(f"Current Status: scan is still running...")
print(f"Retrying in {retry_interval} seconds...")
time.sleep(retry_interval)
else:
raise ValueError("Max retries exceeded. API call failed.")
if __name__ == "__main__":
main()
Updated 4 months ago