Skip to content

Code Examples

Ready-to-use examples for the most common FileSafety workflows. Each example is self-contained and can be copy-pasted into your project.

Terminal window
curl -X POST https://api.filesafety.dev/v1/scan \
-H "x-api-key: $FILESAFETY_API_KEY" \
-F "file=@./document.pdf" \
-F "webhook_url=https://your-app.com/webhooks/filesafety"
Terminal window
curl https://api.filesafety.dev/v1/scan/scn_01HX7Z9K3M2N4P5Q6R7S8T9U0V \
-H "x-api-key: $FILESAFETY_API_KEY"
Terminal window
curl https://api.filesafety.dev/v1/usage \
-H "x-api-key: $FILESAFETY_API_KEY"
Terminal window
# Step 1: Get presigned URL
curl -X POST https://api.filesafety.dev/v1/scan \
-H "x-api-key: $FILESAFETY_API_KEY" \
-H "Content-Type: application/json" \
-d '{"webhook_url": "https://your-app.com/webhooks/filesafety"}'
# Step 2: Upload file to the presigned URL
curl -X PUT "PRESIGNED_URL_FROM_STEP_1" \
-H "Content-Type: application/octet-stream" \
--data-binary @./document.pdf

All examples use the built-in fetch API (Node.js 18+). No dependencies required.

const API_KEY = process.env.FILESAFETY_API_KEY;
const BASE_URL = "https://api.filesafety.dev";
async function scanFile(filePath, webhookUrl) {
const fs = await import("node:fs");
const path = await import("node:path");
const fileBuffer = fs.readFileSync(filePath);
const fileName = path.basename(filePath);
const formData = new FormData();
formData.append("file", new Blob([fileBuffer]), fileName);
formData.append("webhook_url", webhookUrl);
const res = await fetch(`${BASE_URL}/v1/scan`, {
method: "POST",
headers: { "x-api-key": API_KEY },
body: formData,
});
if (!res.ok) {
const { error } = await res.json();
throw new Error(`Scan submission failed (${res.status}): ${error}`);
}
return res.json();
}
async function pollResult(scanId, maxAttempts = 20) {
for (let i = 0; i < maxAttempts; i++) {
const res = await fetch(`${BASE_URL}/v1/scan/${scanId}`, {
headers: { "x-api-key": API_KEY },
});
if (!res.ok) {
throw new Error(`Poll failed (${res.status})`);
}
const data = await res.json();
if (data.status === "complete" || data.status === "failed") {
return data;
}
await new Promise((r) => setTimeout(r, 3000));
}
throw new Error("Scan timed out");
}
// Usage
const scan = await scanFile("./uploads/report.pdf", "https://your-app.com/webhooks/filesafety");
console.log(`Scan submitted: ${scan.scan_id}`);
const result = await pollResult(scan.scan_id);
console.log(`Verdict: ${result.verdict}`);
if (!result.virus.clean) {
console.log(`Threat detected: ${result.virus.signature}`);
}
if (!result.nsfw.clean) {
console.log(`NSFW categories: ${result.nsfw.categories.join(", ")}`);
}
async function scanWithPresignedUrl(webhookUrl) {
const res = await fetch(`${BASE_URL}/v1/scan`, {
method: "POST",
headers: {
"x-api-key": API_KEY,
"Content-Type": "application/json",
},
body: JSON.stringify({ webhook_url: webhookUrl }),
});
if (!res.ok) {
const { error } = await res.json();
throw new Error(`Failed to get presigned URL: ${error}`);
}
const { scan_id, upload_url } = await res.json();
return { scanId: scan_id, uploadUrl: upload_url };
}
async function uploadToPresignedUrl(uploadUrl, fileBuffer) {
const res = await fetch(uploadUrl, {
method: "PUT",
headers: { "Content-Type": "application/octet-stream" },
body: fileBuffer,
});
if (!res.ok) {
throw new Error(`Upload failed: ${res.status}`);
}
}
// Usage
const { scanId, uploadUrl } = await scanWithPresignedUrl("https://your-app.com/webhooks/filesafety");
const fs = await import("node:fs");
const fileBuffer = fs.readFileSync("./uploads/image.png");
await uploadToPresignedUrl(uploadUrl, fileBuffer);
const result = await pollResult(scanId);
console.log(`Verdict: ${result.verdict}`);
import express from "express";
const app = express();
app.use(express.json());
const processedScans = new Set();
app.post("/webhooks/filesafety", (req, res) => {
res.status(200).send("ok");
const payload = req.body;
if (processedScans.has(payload.scan_id)) {
return;
}
processedScans.add(payload.scan_id);
handleScanResult(payload);
});
function handleScanResult(payload) {
switch (payload.verdict) {
case "clean":
console.log(`File ${payload.scan_id} is clean`);
break;
case "infected":
console.log(`File ${payload.scan_id} infected: ${payload.virus.signature}`);
break;
case "nsfw":
console.log(`File ${payload.scan_id} NSFW: ${payload.nsfw.categories}`);
break;
case "mixed":
console.log(`File ${payload.scan_id} has virus and NSFW issues`);
break;
case "failed":
console.log(`File ${payload.scan_id} scan failed — retry`);
break;
}
}
app.listen(3000, () => console.log("Webhook server running on port 3000"));
async function checkUsage() {
const res = await fetch(`${BASE_URL}/v1/usage`, {
headers: { "x-api-key": API_KEY },
});
const usage = await res.json();
const remaining = usage.scans_quota - usage.scans_used;
console.log(`Plan: ${usage.plan}`);
console.log(`Used: ${usage.scans_used} / ${usage.scans_quota}`);
console.log(`Remaining: ${remaining}`);
console.log(`Period ends: ${usage.period_ends}`);
return usage;
}

All examples use the requests library. Install with pip install requests.

import os
import time
import requests
API_KEY = os.environ["FILESAFETY_API_KEY"]
BASE_URL = "https://api.filesafety.dev"
def scan_file(file_path, webhook_url):
with open(file_path, "rb") as f:
response = requests.post(
f"{BASE_URL}/v1/scan",
headers={"x-api-key": API_KEY},
files={"file": f},
data={"webhook_url": webhook_url},
)
response.raise_for_status()
return response.json()
def poll_result(scan_id, max_attempts=20, interval=3):
for _ in range(max_attempts):
response = requests.get(
f"{BASE_URL}/v1/scan/{scan_id}",
headers={"x-api-key": API_KEY},
)
response.raise_for_status()
data = response.json()
if data["status"] in ("complete", "failed"):
return data
time.sleep(interval)
raise TimeoutError("Scan did not complete in time")
# Usage
scan = scan_file("./uploads/report.pdf", "https://your-app.com/webhooks/filesafety")
print(f"Scan submitted: {scan['scan_id']}")
result = poll_result(scan["scan_id"])
print(f"Verdict: {result['verdict']}")
if not result["virus"]["clean"]:
print(f"Threat: {result['virus']['signature']}")
if not result["nsfw"]["clean"]:
print(f"NSFW categories: {result['nsfw']['categories']}")
def get_presigned_url(webhook_url):
response = requests.post(
f"{BASE_URL}/v1/scan",
headers={
"x-api-key": API_KEY,
"Content-Type": "application/json",
},
json={"webhook_url": webhook_url},
)
response.raise_for_status()
data = response.json()
return data["scan_id"], data["upload_url"]
def upload_to_presigned_url(upload_url, file_path):
with open(file_path, "rb") as f:
response = requests.put(
upload_url,
headers={"Content-Type": "application/octet-stream"},
data=f,
)
response.raise_for_status()
# Usage
scan_id, upload_url = get_presigned_url("https://your-app.com/webhooks/filesafety")
upload_to_presigned_url(upload_url, "./uploads/image.png")
result = poll_result(scan_id)
print(f"Verdict: {result['verdict']}")
from flask import Flask, request, jsonify
app = Flask(__name__)
processed_scans = set()
@app.route("/webhooks/filesafety", methods=["POST"])
def handle_webhook():
payload = request.get_json()
scan_id = payload["scan_id"]
if scan_id in processed_scans:
return jsonify({"status": "already_processed"}), 200
processed_scans.add(scan_id)
verdict = payload["verdict"]
if verdict == "clean":
print(f"File {scan_id} is clean")
elif verdict == "infected":
print(f"File {scan_id} infected: {payload['virus']['signature']}")
elif verdict == "nsfw":
print(f"File {scan_id} NSFW: {payload['nsfw']['categories']}")
elif verdict == "mixed":
print(f"File {scan_id} has virus and NSFW issues")
elif verdict == "failed":
print(f"File {scan_id} scan failed — retry")
return jsonify({"status": "ok"}), 200
if __name__ == "__main__":
app.run(port=3000)
def check_usage():
response = requests.get(
f"{BASE_URL}/v1/usage",
headers={"x-api-key": API_KEY},
)
response.raise_for_status()
usage = response.json()
remaining = usage["scans_quota"] - usage["scans_used"]
print(f"Plan: {usage['plan']}")
print(f"Used: {usage['scans_used']} / {usage['scans_quota']}")
print(f"Remaining: {remaining}")
print(f"Period ends: {usage['period_ends']}")
return usage