Scan DockerHub via Deepbits Platform
When you have a deepbits account and using app.deepbits.com to scan your Docker Hub repository, you can follow below instructions to scan Docker Hub Repository programmatically.
import requests
import hashlib
import os
import time
import json
import urllib.parse
# The json file you want store the scan result
RESULT_JSON = './docker-sbom.json'
# Update to the repo you want to scan
DOCKER_HUB_URL = "https://hub.docker.com/_/busybox"
# Your API key generated from app.deepbits.com
API_KEY = 'YOUR API KEY'
API_BASE = 'https://api.deepbits.com/api/v1'
# Your project id created from app.deepbits.com
project_id = ''
default_headers = {"x-api-key": API_KEY }
def add_dockerhub_url(url):
print(f"Adding dockerhub url {url}...")
add_dockerhub_res = requests.post(f"{API_BASE}/dockerhubga", headers=default_headers, json={
"url": url,
})
return add_dockerhub_res.json()['data']
def add_docker_hub_to_project(add_dockerhub_res, project_id):
dockerhub_repos_res = requests.get(f"{API_BASE}/dockerhub/repos", headers=default_headers)
dockerhub_repos = dockerhub_repos_res.json()['data']
target_repo = None
for repo in dockerhub_repos:
if repo['dockerHubNamespace']['_id'] == add_dockerhub_res['namespace']['_id'] and repo['name'] == add_dockerhub_res['repoName']:
target_repo = repo
break
project_res = requests.get(f"{API_BASE}/project/{project_id}", headers=default_headers)
current_project = project_res.json()['data']
updated_project_assets = current_project['assets']
for asset in updated_project_assets:
if asset['assetType'] == 'DockerHub':
asset['assetIds'].append(target_repo['_id'])
break
if not updated_project_assets:
updated_project_assets.append({
"assetType": "DockerHub",
"assetIds": [target_repo['_id']]
})
put_project_res = requests.put(
f"{API_BASE}/project/{project_id}",
headers=default_headers,
json={
"name": current_project['name'],
"assets": updated_project_assets
}
)
created_assets = put_project_res.json()['data']['createdAssets']
for asset in created_assets:
if 'dockerHubRepoId' in asset and asset['dockerHubRepoId'] == target_repo['_id']:
projectAssetId = asset['_id']
break
return {
"dockerHubRepoId": target_repo['_id'],
"projectAssetId": projectAssetId
}
def list_docker_hub_repo_tags(docker_hub_repo_id):
res = requests.get(f"{API_BASE}/dockerhub/{docker_hub_repo_id}/tags", headers=default_headers)
return res.json()['data']['docs']
def watch_stream(project_id, project_asset_id, tag_name):
print('Watching tag...', tag_name)
response = requests.put(
f"{API_BASE}/project/{project_id}/{project_asset_id}/stream_watch",
headers=default_headers,
json={
"action": "watch",
"identifier": tag_name,
}
)
return response.json()['data']['_id']
def check_status(project_id, project_asset_id, stream_id):
max_retries = 50
retry_interval = 10 # in seconds
for i in range(max_retries):
response = requests.get(
f"{API_BASE}/project/{project_id}/{project_asset_id}/{stream_id}/scan_result",
headers=default_headers,
)
parsed_result = response.json()
if not parsed_result: # Check if parsed_result is not truthy
print(parsed_result)
print(f"Current Status: scan is still running...")
print(f"Retrying in {retry_interval} seconds...")
time.sleep(retry_interval)
continue
scan_end_at = parsed_result.get('data', {}).get('scanResult', {}).get('scanEndAt')
if scan_end_at:
return parsed_result
else:
print(f"Current Status: scan is still running...")
print(f"Retrying in {retry_interval} seconds...")
time.sleep(retry_interval)
else:
raise ValueError("Max retries exceeded. API call failed.")
# add docker hub to deepbits
add_dockerhub_res = add_dockerhub_url(DOCKER_HUB_URL)
docker_repo_assets_res = add_docker_hub_to_project(add_dockerhub_res, project_id)
docker_tags = list_docker_hub_repo_tags(docker_repo_assets_res['dockerHubRepoId'])
tag_to_watch = docker_tags[0]['name']
print(f"Going to watch the tag {tag_to_watch}")
watched_stream = watch_stream(project_id, docker_repo_assets_res['projectAssetId'], tag_to_watch)
scan_result_detail = check_status(project_id, docker_repo_assets_res['projectAssetId'], watched_stream)
filtered_scan_result_detail = {k: v for k, v in scan_result_detail.items() if k != 'scanResult'}
print(f"\nScanResultDetail: {filtered_scan_result_detail}")
with open(RESULT_JSON, 'w') as file:
json.dump(scan_result_detail, file)
print(f"Scan result saved to {RESULT_JSON}")
print("Demo run successfully without error")
print(f"You can also check the result on https://app.deepbits.com/project/{project_id}")
Updated about 1 year ago