Python Integration
This guide explains how to integrate FileZen with Python applications using the filezen-python package. This package provides a ZenStorage class that simplifies file upload and management operations with comprehensive error handling and automatic multipart upload support.
Code Example: See the complete Python FastAPI server example for a working implementation with interactive test interface.
Installation
Install the FileZen Python SDK:
Quick Start
1. Environment Configuration
Set your FileZen API key as an environment variable. This can be done in your local .env file during development or as a system environment variable in production.
export FILEZEN_API_KEY=your_api_key_hereThe FileZen SDK automatically detects the FILEZEN_API_KEY environment variable for server-side operations, so no additional configuration is needed in your server setup.
2. Initialize ZenStorage
Create a ZenStorage instance with your configuration. The SDK will automatically detect the FILEZEN_API_KEY from your environment variables.
from filezen import ZenStorage
# Initialize storage with automatic environment variable detection
storage = ZenStorage(
api_key="your_api_key_here", # Optional: can use FILEZEN_API_KEY env var
keep_uploads=False, # Recommended for server-side usage
)3. Upload Files
You can now use the ZenStorage instance to upload files with various methods.
import asyncio
from filezen import ZenStorage
async def upload_single_file():
storage = ZenStorage()
# Upload a single file
with open("my_file.jpg", "rb") as f:
upload = await storage.upload(f.read(), {
"name": "my_file.jpg",
"mime_type": "image/jpeg",
})
if upload.error:
print(f"Upload failed: {upload.error}")
return
print(f"File uploaded: {upload.file.url}")
await storage.close()
# Run the async function
asyncio.run(upload_single_file())FileZen Functions
Single File Upload
Upload individual files with error handling.
import asyncio
from filezen import ZenStorage
async def upload_single_file(file_data, filename, mime_type):
storage = ZenStorage()
upload = await storage.upload(file_data, {
"name": filename,
"mime_type": mime_type,
})
if upload.error:
raise Exception(f"Upload failed: {upload.error}")
await storage.close()
return {
"url": upload.file.url,
"name": upload.file.name,
"size": upload.file.size,
"mime_type": upload.file.mime_type,
}Bulk Upload
Upload multiple files concurrently for better performance.
import asyncio
from filezen import ZenStorage
async def upload_multiple_files(files_data):
storage = ZenStorage()
upload_items = []
for file in files_data:
upload_items.append({
"source": file["buffer"],
"options": {
"name": file["originalname"],
"mime_type": file["mimetype"],
},
})
uploads = await storage.bulk_upload(*upload_items)
# Check for any upload errors
failed_uploads = [upload for upload in uploads if upload.error]
if failed_uploads:
failures = [{"name": upload.name or "unknown", "error": str(upload.error)} for upload in failed_uploads]
raise Exception(f"{len(failed_uploads)} uploads failed")
await storage.close()
return [{
"url": upload.file.url,
"name": upload.file.name,
"size": upload.file.size,
"mime_type": upload.file.mime_type,
} for upload in uploads]Upload from URL
Upload files directly from external URLs.
import asyncio
from filezen import ZenStorage
async def upload_from_url(source_url, file_name):
storage = ZenStorage()
upload = await storage.upload(source_url, {
"name": file_name or "downloaded-file",
})
if upload.error:
raise Exception(f"URL upload failed: {upload.error}")
await storage.close()
return {
"url": upload.file.url,
"name": upload.file.name,
"size": upload.file.size,
"mime_type": upload.file.mime_type,
}Upload from Base64
Upload files from base64 encoded data.
import asyncio
from filezen import ZenStorage
async def upload_from_base64(base64_data, file_name, mime_type):
storage = ZenStorage()
upload = await storage.upload(base64_data, {
"name": file_name or "base64-file",
"mime_type": mime_type,
})
if upload.error:
raise Exception(f"Base64 upload failed: {upload.error}")
await storage.close()
return {
"url": upload.file.url,
"name": upload.file.name,
"size": upload.file.size,
"mime_type": upload.file.mime_type,
}Manual Multipart Upload Control
For fine-grained control over multipart uploads, especially for large files or when you need to track progress:
import asyncio
from filezen import ZenStorage
async def manual_multipart_upload(file_data, file_name, mime_type):
storage = ZenStorage()
# Start multipart upload session
session = await storage.multipart.start({
"file_name": file_name,
"mime_type": mime_type,
"total_size": len(file_data),
"upload_mode": "STREAMING",
"metadata": {"type": "manual_upload"}
})
session_id = session["id"]
chunk_size = 5 * 1024 * 1024 # 5MB chunks
uploaded_size = 0
# Upload file in chunks
while uploaded_size < len(file_data):
# Create a chunk from the current position
end_pos = min(len(file_data), uploaded_size + chunk_size)
chunk = file_data[uploaded_size:end_pos]
# Upload this specific chunk
await storage.multipart.upload_part({
"session_id": session_id,
"chunk": chunk
})
# Update progress
uploaded_size = end_pos
print(f"Uploaded {uploaded_size}/{len(file_data)} bytes")
# Finalize the multipart upload
result = await storage.multipart.finish({
"session_id": session_id
})
await storage.close()
return resultGenerate Signed URL
Generate secure signed URLs for direct uploads from client applications.
import asyncio
from filezen import ZenStorage
async def generate_upload_url(file_key, expiration_time=3600):
storage = ZenStorage()
signed_url = storage.generate_signed_url({
"path": "/files/upload",
"file_key": file_key,
"expires_in": expiration_time, # 1 hour by default
})
await storage.close()
return {
"signed_url": signed_url,
"expires_in": expiration_time,
}Delete File
Delete files by their URL.
import asyncio
from filezen import ZenStorage
async def delete_file(file_url):
storage = ZenStorage()
try:
success = await storage.delete_by_url(file_url)
await storage.close()
if success:
return {"success": True, "message": "File deleted successfully"}
else:
raise Exception("Failed to delete file")
except Exception as error:
raise Exception(f"Failed to delete file: {error}")Complete FastAPI Server Example
Here’s a complete FastAPI server implementation with all FileZen endpoints.
Framework Dependencies: This example uses fastapi, python-multipart, and uvicorn packages for the server setup. Install them with: pip install fastapi python-multipart uvicorn. These are just for the example - you can use any file upload handling method with FileZen.
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import JSONResponse
import asyncio
from filezen import ZenStorage
app = FastAPI()
# Initialize FileZen storage
storage = ZenStorage(
keep_uploads=False, # Recommended for server-side usage
)
# Health check
@app.get("/")
async def health_check():
return {"message": "FileZen FastAPI Server Running"}
# Single file upload
@app.post("/api/files/upload")
async def upload_file(file: UploadFile = File(...)):
try:
# Read file content
content = await file.read()
# Upload to FileZen
upload = await storage.upload(content, {
"name": file.filename,
"mime_type": file.content_type,
})
if upload.error:
raise HTTPException(status_code=400, detail=str(upload.error))
return JSONResponse({
"success": True,
"file": {
"url": upload.file.url,
"name": upload.file.name,
"size": upload.file.size,
}
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Bulk file upload
@app.post("/api/files/bulk-upload")
async def bulk_upload_files(files: list[UploadFile] = File(...)):
try:
upload_data = []
for file in files:
content = await file.read()
upload_data.append({
"source": content,
"options": {
"name": file.filename,
"mime_type": file.content_type,
}
})
uploads = await storage.bulk_upload(*upload_data)
results = []
for upload in uploads:
if upload.error:
results.append({"error": str(upload.error)})
else:
results.append({
"url": upload.file.url,
"name": upload.file.name,
"size": upload.file.size,
})
return JSONResponse({"results": results})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Upload from URL
@app.post("/api/files/upload-from-url")
async def upload_from_url(url: str, name: str = None):
try:
if not url:
raise HTTPException(status_code=400, detail="URL is required")
upload = await storage.upload(url, {
"name": name or "downloaded-file",
})
if upload.error:
raise HTTPException(status_code=400, detail=str(upload.error))
return JSONResponse({
"success": True,
"file": {
"url": upload.file.url,
"name": upload.file.name,
"size": upload.file.size,
}
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Upload from Base64
@app.post("/api/files/upload-from-base64")
async def upload_from_base64(base64_data: str, name: str = None, mime_type: str = None):
try:
if not base64_data:
raise HTTPException(status_code=400, detail="Base64 data is required")
upload = await storage.upload(base64_data, {
"name": name or "base64-file",
"mime_type": mime_type,
})
if upload.error:
raise HTTPException(status_code=400, detail=str(upload.error))
return JSONResponse({
"success": True,
"file": {
"url": upload.file.url,
"name": upload.file.name,
"size": upload.file.size,
}
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Generate signed URL
@app.post("/api/files/generate-signed-url")
async def generate_signed_url(file_key: str, expires_in: int = 3600):
try:
if not file_key:
raise HTTPException(status_code=400, detail="file_key is required")
signed_url = storage.generate_signed_url({
"path": "/files/upload",
"file_key": file_key,
"expires_in": expires_in,
})
return JSONResponse({
"success": True,
"signed_url": signed_url,
"expires_in": expires_in,
})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Delete file
@app.delete("/api/files/delete")
async def delete_file(url: str):
try:
if not url:
raise HTTPException(status_code=400, detail="File URL is required")
success = await storage.delete_by_url(url)
if success:
return JSONResponse({"success": True, "message": "File deleted"})
else:
raise HTTPException(status_code=404, detail="File not found")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)