Code Examples
Ready-to-use examples for the most common FileSafety workflows. Each example is self-contained and can be copy-pasted into your project.
Upload and scan a file
Section titled “Upload and scan a file”curl -X POST https://api.filesafety.dev/v1/scan \ -H "x-api-key: $FILESAFETY_API_KEY" \ -F "file=@./document.pdf" \ -F "webhook_url=https://your-app.com/webhooks/filesafety"Poll for the result
Section titled “Poll for the result”curl https://api.filesafety.dev/v1/scan/scn_01HX7Z9K3M2N4P5Q6R7S8T9U0V \ -H "x-api-key: $FILESAFETY_API_KEY"Check usage
Section titled “Check usage”curl https://api.filesafety.dev/v1/usage \ -H "x-api-key: $FILESAFETY_API_KEY"Presigned URL flow
Section titled “Presigned URL flow”# Step 1: Get presigned URLcurl -X POST https://api.filesafety.dev/v1/scan \ -H "x-api-key: $FILESAFETY_API_KEY" \ -H "Content-Type: application/json" \ -d '{"webhook_url": "https://your-app.com/webhooks/filesafety"}'
# Step 2: Upload file to the presigned URLcurl -X PUT "PRESIGNED_URL_FROM_STEP_1" \ -H "Content-Type: application/octet-stream" \ --data-binary @./document.pdfNode.js
Section titled “Node.js”All examples use the built-in fetch API (Node.js 18+). No dependencies required.
Upload and poll
Section titled “Upload and poll”const API_KEY = process.env.FILESAFETY_API_KEY;const BASE_URL = "https://api.filesafety.dev";
async function scanFile(filePath, webhookUrl) { const fs = await import("node:fs"); const path = await import("node:path");
const fileBuffer = fs.readFileSync(filePath); const fileName = path.basename(filePath);
const formData = new FormData(); formData.append("file", new Blob([fileBuffer]), fileName); formData.append("webhook_url", webhookUrl);
const res = await fetch(`${BASE_URL}/v1/scan`, { method: "POST", headers: { "x-api-key": API_KEY }, body: formData, });
if (!res.ok) { const { error } = await res.json(); throw new Error(`Scan submission failed (${res.status}): ${error}`); }
return res.json();}
async function pollResult(scanId, maxAttempts = 20) { for (let i = 0; i < maxAttempts; i++) { const res = await fetch(`${BASE_URL}/v1/scan/${scanId}`, { headers: { "x-api-key": API_KEY }, });
if (!res.ok) { throw new Error(`Poll failed (${res.status})`); }
const data = await res.json();
if (data.status === "complete" || data.status === "failed") { return data; }
await new Promise((r) => setTimeout(r, 3000)); }
throw new Error("Scan timed out");}
// Usageconst scan = await scanFile("./uploads/report.pdf", "https://your-app.com/webhooks/filesafety");console.log(`Scan submitted: ${scan.scan_id}`);
const result = await pollResult(scan.scan_id);console.log(`Verdict: ${result.verdict}`);
if (!result.virus.clean) { console.log(`Threat detected: ${result.virus.signature}`);}
if (!result.nsfw.clean) { console.log(`NSFW categories: ${result.nsfw.categories.join(", ")}`);}Presigned URL flow
Section titled “Presigned URL flow”async function scanWithPresignedUrl(webhookUrl) { const res = await fetch(`${BASE_URL}/v1/scan`, { method: "POST", headers: { "x-api-key": API_KEY, "Content-Type": "application/json", }, body: JSON.stringify({ webhook_url: webhookUrl }), });
if (!res.ok) { const { error } = await res.json(); throw new Error(`Failed to get presigned URL: ${error}`); }
const { scan_id, upload_url } = await res.json(); return { scanId: scan_id, uploadUrl: upload_url };}
async function uploadToPresignedUrl(uploadUrl, fileBuffer) { const res = await fetch(uploadUrl, { method: "PUT", headers: { "Content-Type": "application/octet-stream" }, body: fileBuffer, });
if (!res.ok) { throw new Error(`Upload failed: ${res.status}`); }}
// Usageconst { scanId, uploadUrl } = await scanWithPresignedUrl("https://your-app.com/webhooks/filesafety");const fs = await import("node:fs");const fileBuffer = fs.readFileSync("./uploads/image.png");await uploadToPresignedUrl(uploadUrl, fileBuffer);
const result = await pollResult(scanId);console.log(`Verdict: ${result.verdict}`);Webhook handler (Express)
Section titled “Webhook handler (Express)”import express from "express";
const app = express();app.use(express.json());
const processedScans = new Set();
app.post("/webhooks/filesafety", (req, res) => { res.status(200).send("ok");
const payload = req.body;
if (processedScans.has(payload.scan_id)) { return; } processedScans.add(payload.scan_id);
handleScanResult(payload);});
function handleScanResult(payload) { switch (payload.verdict) { case "clean": console.log(`File ${payload.scan_id} is clean`); break; case "infected": console.log(`File ${payload.scan_id} infected: ${payload.virus.signature}`); break; case "nsfw": console.log(`File ${payload.scan_id} NSFW: ${payload.nsfw.categories}`); break; case "mixed": console.log(`File ${payload.scan_id} has virus and NSFW issues`); break; case "failed": console.log(`File ${payload.scan_id} scan failed — retry`); break; }}
app.listen(3000, () => console.log("Webhook server running on port 3000"));Check usage
Section titled “Check usage”async function checkUsage() { const res = await fetch(`${BASE_URL}/v1/usage`, { headers: { "x-api-key": API_KEY }, });
const usage = await res.json(); const remaining = usage.scans_quota - usage.scans_used;
console.log(`Plan: ${usage.plan}`); console.log(`Used: ${usage.scans_used} / ${usage.scans_quota}`); console.log(`Remaining: ${remaining}`); console.log(`Period ends: ${usage.period_ends}`);
return usage;}Python
Section titled “Python”All examples use the requests library. Install with pip install requests.
Upload and poll
Section titled “Upload and poll”import osimport timeimport requests
API_KEY = os.environ["FILESAFETY_API_KEY"]BASE_URL = "https://api.filesafety.dev"
def scan_file(file_path, webhook_url): with open(file_path, "rb") as f: response = requests.post( f"{BASE_URL}/v1/scan", headers={"x-api-key": API_KEY}, files={"file": f}, data={"webhook_url": webhook_url}, )
response.raise_for_status() return response.json()
def poll_result(scan_id, max_attempts=20, interval=3): for _ in range(max_attempts): response = requests.get( f"{BASE_URL}/v1/scan/{scan_id}", headers={"x-api-key": API_KEY}, ) response.raise_for_status() data = response.json()
if data["status"] in ("complete", "failed"): return data
time.sleep(interval)
raise TimeoutError("Scan did not complete in time")
# Usagescan = scan_file("./uploads/report.pdf", "https://your-app.com/webhooks/filesafety")print(f"Scan submitted: {scan['scan_id']}")
result = poll_result(scan["scan_id"])print(f"Verdict: {result['verdict']}")
if not result["virus"]["clean"]: print(f"Threat: {result['virus']['signature']}")
if not result["nsfw"]["clean"]: print(f"NSFW categories: {result['nsfw']['categories']}")Presigned URL flow
Section titled “Presigned URL flow”def get_presigned_url(webhook_url): response = requests.post( f"{BASE_URL}/v1/scan", headers={ "x-api-key": API_KEY, "Content-Type": "application/json", }, json={"webhook_url": webhook_url}, ) response.raise_for_status() data = response.json() return data["scan_id"], data["upload_url"]
def upload_to_presigned_url(upload_url, file_path): with open(file_path, "rb") as f: response = requests.put( upload_url, headers={"Content-Type": "application/octet-stream"}, data=f, ) response.raise_for_status()
# Usagescan_id, upload_url = get_presigned_url("https://your-app.com/webhooks/filesafety")upload_to_presigned_url(upload_url, "./uploads/image.png")
result = poll_result(scan_id)print(f"Verdict: {result['verdict']}")Webhook handler (Flask)
Section titled “Webhook handler (Flask)”from flask import Flask, request, jsonify
app = Flask(__name__)processed_scans = set()
@app.route("/webhooks/filesafety", methods=["POST"])def handle_webhook(): payload = request.get_json() scan_id = payload["scan_id"]
if scan_id in processed_scans: return jsonify({"status": "already_processed"}), 200
processed_scans.add(scan_id)
verdict = payload["verdict"]
if verdict == "clean": print(f"File {scan_id} is clean") elif verdict == "infected": print(f"File {scan_id} infected: {payload['virus']['signature']}") elif verdict == "nsfw": print(f"File {scan_id} NSFW: {payload['nsfw']['categories']}") elif verdict == "mixed": print(f"File {scan_id} has virus and NSFW issues") elif verdict == "failed": print(f"File {scan_id} scan failed — retry")
return jsonify({"status": "ok"}), 200
if __name__ == "__main__": app.run(port=3000)Check usage
Section titled “Check usage”def check_usage(): response = requests.get( f"{BASE_URL}/v1/usage", headers={"x-api-key": API_KEY}, ) response.raise_for_status() usage = response.json()
remaining = usage["scans_quota"] - usage["scans_used"]
print(f"Plan: {usage['plan']}") print(f"Used: {usage['scans_used']} / {usage['scans_quota']}") print(f"Remaining: {remaining}") print(f"Period ends: {usage['period_ends']}")
return usage