Scan DockerHub via Deepbits Platform

When you have a deepbits account and using app.deepbits.com to scan your Docker Hub repository, you can follow below instructions to scan Docker Hub Repository programmatically.

import requests
import hashlib
import os
import time
import json
import urllib.parse

# The json file you want store the scan result 
RESULT_JSON = './docker-sbom.json'

# Update to the repo you want to scan
DOCKER_HUB_URL = "https://hub.docker.com/_/busybox"

# Your API key generated from app.deepbits.com
API_KEY = 'YOUR API KEY'
API_BASE = 'https://api.deepbits.com/api/v1'

# Your project id created from app.deepbits.com
project_id = ''

default_headers = {"x-api-key": API_KEY }

def add_dockerhub_url(url):
    print(f"Adding dockerhub url {url}...")
    add_dockerhub_res = requests.post(f"{API_BASE}/dockerhubga", headers=default_headers, json={
        "url": url,
    })

    return add_dockerhub_res.json()['data']


def add_docker_hub_to_project(add_dockerhub_res, project_id):
    dockerhub_repos_res = requests.get(f"{API_BASE}/dockerhub/repos", headers=default_headers)
    dockerhub_repos = dockerhub_repos_res.json()['data']

    target_repo = None
    for repo in dockerhub_repos:
        if repo['dockerHubNamespace']['_id'] == add_dockerhub_res['namespace']['_id'] and repo['name'] == add_dockerhub_res['repoName']:
            target_repo = repo
            break

    project_res = requests.get(f"{API_BASE}/project/{project_id}", headers=default_headers)
    current_project = project_res.json()['data']

    updated_project_assets = current_project['assets']

    for asset in updated_project_assets:
        if asset['assetType'] == 'DockerHub':
            asset['assetIds'].append(target_repo['_id'])
            break

    if not updated_project_assets:
        updated_project_assets.append({
            "assetType": "DockerHub",
            "assetIds": [target_repo['_id']]
        })

    put_project_res = requests.put(
        f"{API_BASE}/project/{project_id}",
        headers=default_headers,
        json={
            "name": current_project['name'],
            "assets": updated_project_assets
        }
    )

    created_assets = put_project_res.json()['data']['createdAssets']
    for asset in created_assets:
        if 'dockerHubRepoId' in asset and asset['dockerHubRepoId'] == target_repo['_id']:
            projectAssetId = asset['_id']
            break

    return {
        "dockerHubRepoId": target_repo['_id'],
        "projectAssetId": projectAssetId
    }
def list_docker_hub_repo_tags(docker_hub_repo_id):
    res = requests.get(f"{API_BASE}/dockerhub/{docker_hub_repo_id}/tags", headers=default_headers)
    return res.json()['data']['docs']

def watch_stream(project_id, project_asset_id, tag_name):
    print('Watching tag...', tag_name)

    response = requests.put(
        f"{API_BASE}/project/{project_id}/{project_asset_id}/stream_watch",
        headers=default_headers,
        json={
            "action": "watch",
            "identifier": tag_name,
        }
    )

    return response.json()['data']['_id']

def check_status(project_id, project_asset_id, stream_id):
    max_retries = 50
    retry_interval = 10  # in seconds

    for i in range(max_retries):
        response = requests.get(
            f"{API_BASE}/project/{project_id}/{project_asset_id}/{stream_id}/scan_result",
            headers=default_headers,
        )

        parsed_result = response.json()
        
        if not parsed_result:  # Check if parsed_result is not truthy
            print(parsed_result)
            print(f"Current Status: scan is still running...")
            print(f"Retrying in {retry_interval} seconds...")
            time.sleep(retry_interval)
            continue

        scan_end_at = parsed_result.get('data', {}).get('scanResult', {}).get('scanEndAt')

        if scan_end_at:
            return parsed_result
        else:
            print(f"Current Status: scan is still running...")
            print(f"Retrying in {retry_interval} seconds...")
            time.sleep(retry_interval)
    else:
        raise ValueError("Max retries exceeded. API call failed.")

# add docker hub to deepbits
add_dockerhub_res = add_dockerhub_url(DOCKER_HUB_URL)
docker_repo_assets_res = add_docker_hub_to_project(add_dockerhub_res, project_id)
docker_tags = list_docker_hub_repo_tags(docker_repo_assets_res['dockerHubRepoId'])
tag_to_watch = docker_tags[0]['name']
print(f"Going to watch the tag {tag_to_watch}")

watched_stream = watch_stream(project_id, docker_repo_assets_res['projectAssetId'], tag_to_watch)

scan_result_detail = check_status(project_id, docker_repo_assets_res['projectAssetId'], watched_stream)

filtered_scan_result_detail = {k: v for k, v in scan_result_detail.items() if k != 'scanResult'}
print(f"\nScanResultDetail: {filtered_scan_result_detail}")

with open(RESULT_JSON, 'w') as file:
    json.dump(scan_result_detail, file)

print(f"Scan result saved to {RESULT_JSON}")
print("Demo run successfully without error")

print(f"You can also check the result on https://app.deepbits.com/project/{project_id}")