Skip to Content

Python Integration

This guide explains how to integrate FileZen with Python applications using the filezen-python package. This package provides a ZenStorage class that simplifies file upload and management operations with comprehensive error handling and automatic multipart upload support.

Code Example: See the complete Python FastAPI server example  for a working implementation with interactive test interface.

Installation

Install the FileZen Python SDK:

Quick Start

1. Environment Configuration

Set your FileZen API key as an environment variable. This can be done in your local .env file during development or as a system environment variable in production.

Terminal
export FILEZEN_API_KEY=your_api_key_here

The FileZen SDK automatically detects the FILEZEN_API_KEY environment variable for server-side operations, so no additional configuration is needed in your server setup.

2. Initialize ZenStorage

Create a ZenStorage instance with your configuration. The SDK will automatically detect the FILEZEN_API_KEY from your environment variables.

storage.py
from filezen import ZenStorage # Initialize storage with automatic environment variable detection storage = ZenStorage( api_key="your_api_key_here", # Optional: can use FILEZEN_API_KEY env var keep_uploads=False, # Recommended for server-side usage )

3. Upload Files

You can now use the ZenStorage instance to upload files with various methods.

upload_example.py
import asyncio from filezen import ZenStorage async def upload_single_file(): storage = ZenStorage() # Upload a single file with open("my_file.jpg", "rb") as f: upload = await storage.upload(f.read(), { "name": "my_file.jpg", "mime_type": "image/jpeg", }) if upload.error: print(f"Upload failed: {upload.error}") return print(f"File uploaded: {upload.file.url}") await storage.close() # Run the async function asyncio.run(upload_single_file())

FileZen Functions

Single File Upload

Upload individual files with error handling.

single_upload.py
import asyncio from filezen import ZenStorage async def upload_single_file(file_data, filename, mime_type): storage = ZenStorage() upload = await storage.upload(file_data, { "name": filename, "mime_type": mime_type, }) if upload.error: raise Exception(f"Upload failed: {upload.error}") await storage.close() return { "url": upload.file.url, "name": upload.file.name, "size": upload.file.size, "mime_type": upload.file.mime_type, }

Bulk Upload

Upload multiple files concurrently for better performance.

bulk_upload.py
import asyncio from filezen import ZenStorage async def upload_multiple_files(files_data): storage = ZenStorage() upload_items = [] for file in files_data: upload_items.append({ "source": file["buffer"], "options": { "name": file["originalname"], "mime_type": file["mimetype"], }, }) uploads = await storage.bulk_upload(*upload_items) # Check for any upload errors failed_uploads = [upload for upload in uploads if upload.error] if failed_uploads: failures = [{"name": upload.name or "unknown", "error": str(upload.error)} for upload in failed_uploads] raise Exception(f"{len(failed_uploads)} uploads failed") await storage.close() return [{ "url": upload.file.url, "name": upload.file.name, "size": upload.file.size, "mime_type": upload.file.mime_type, } for upload in uploads]

Upload from URL

Upload files directly from external URLs.

url_upload.py
import asyncio from filezen import ZenStorage async def upload_from_url(source_url, file_name): storage = ZenStorage() upload = await storage.upload(source_url, { "name": file_name or "downloaded-file", }) if upload.error: raise Exception(f"URL upload failed: {upload.error}") await storage.close() return { "url": upload.file.url, "name": upload.file.name, "size": upload.file.size, "mime_type": upload.file.mime_type, }

Upload from Base64

Upload files from base64 encoded data.

base64_upload.py
import asyncio from filezen import ZenStorage async def upload_from_base64(base64_data, file_name, mime_type): storage = ZenStorage() upload = await storage.upload(base64_data, { "name": file_name or "base64-file", "mime_type": mime_type, }) if upload.error: raise Exception(f"Base64 upload failed: {upload.error}") await storage.close() return { "url": upload.file.url, "name": upload.file.name, "size": upload.file.size, "mime_type": upload.file.mime_type, }

Manual Multipart Upload Control

For fine-grained control over multipart uploads, especially for large files or when you need to track progress:

multipart_upload.py
import asyncio from filezen import ZenStorage async def manual_multipart_upload(file_data, file_name, mime_type): storage = ZenStorage() # Start multipart upload session session = await storage.multipart.start({ "file_name": file_name, "mime_type": mime_type, "total_size": len(file_data), "upload_mode": "STREAMING", "metadata": {"type": "manual_upload"} }) session_id = session["id"] chunk_size = 5 * 1024 * 1024 # 5MB chunks uploaded_size = 0 # Upload file in chunks while uploaded_size < len(file_data): # Create a chunk from the current position end_pos = min(len(file_data), uploaded_size + chunk_size) chunk = file_data[uploaded_size:end_pos] # Upload this specific chunk await storage.multipart.upload_part({ "session_id": session_id, "chunk": chunk }) # Update progress uploaded_size = end_pos print(f"Uploaded {uploaded_size}/{len(file_data)} bytes") # Finalize the multipart upload result = await storage.multipart.finish({ "session_id": session_id }) await storage.close() return result

Generate Signed URL

Generate secure signed URLs for direct uploads from client applications.

signed_url.py
import asyncio from filezen import ZenStorage async def generate_upload_url(file_key, expiration_time=3600): storage = ZenStorage() signed_url = storage.generate_signed_url({ "path": "/files/upload", "file_key": file_key, "expires_in": expiration_time, # 1 hour by default }) await storage.close() return { "signed_url": signed_url, "expires_in": expiration_time, }

Delete File

Delete files by their URL.

delete_file.py
import asyncio from filezen import ZenStorage async def delete_file(file_url): storage = ZenStorage() try: success = await storage.delete_by_url(file_url) await storage.close() if success: return {"success": True, "message": "File deleted successfully"} else: raise Exception("Failed to delete file") except Exception as error: raise Exception(f"Failed to delete file: {error}")

Complete FastAPI Server Example

Here’s a complete FastAPI server implementation with all FileZen endpoints.

Framework Dependencies: This example uses fastapi, python-multipart, and uvicorn packages for the server setup. Install them with: pip install fastapi python-multipart uvicorn. These are just for the example - you can use any file upload handling method with FileZen.

complete-server.py
from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.responses import JSONResponse import asyncio from filezen import ZenStorage app = FastAPI() # Initialize FileZen storage storage = ZenStorage( keep_uploads=False, # Recommended for server-side usage ) # Health check @app.get("/") async def health_check(): return {"message": "FileZen FastAPI Server Running"} # Single file upload @app.post("/api/files/upload") async def upload_file(file: UploadFile = File(...)): try: # Read file content content = await file.read() # Upload to FileZen upload = await storage.upload(content, { "name": file.filename, "mime_type": file.content_type, }) if upload.error: raise HTTPException(status_code=400, detail=str(upload.error)) return JSONResponse({ "success": True, "file": { "url": upload.file.url, "name": upload.file.name, "size": upload.file.size, } }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Bulk file upload @app.post("/api/files/bulk-upload") async def bulk_upload_files(files: list[UploadFile] = File(...)): try: upload_data = [] for file in files: content = await file.read() upload_data.append({ "source": content, "options": { "name": file.filename, "mime_type": file.content_type, } }) uploads = await storage.bulk_upload(*upload_data) results = [] for upload in uploads: if upload.error: results.append({"error": str(upload.error)}) else: results.append({ "url": upload.file.url, "name": upload.file.name, "size": upload.file.size, }) return JSONResponse({"results": results}) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Upload from URL @app.post("/api/files/upload-from-url") async def upload_from_url(url: str, name: str = None): try: if not url: raise HTTPException(status_code=400, detail="URL is required") upload = await storage.upload(url, { "name": name or "downloaded-file", }) if upload.error: raise HTTPException(status_code=400, detail=str(upload.error)) return JSONResponse({ "success": True, "file": { "url": upload.file.url, "name": upload.file.name, "size": upload.file.size, } }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Upload from Base64 @app.post("/api/files/upload-from-base64") async def upload_from_base64(base64_data: str, name: str = None, mime_type: str = None): try: if not base64_data: raise HTTPException(status_code=400, detail="Base64 data is required") upload = await storage.upload(base64_data, { "name": name or "base64-file", "mime_type": mime_type, }) if upload.error: raise HTTPException(status_code=400, detail=str(upload.error)) return JSONResponse({ "success": True, "file": { "url": upload.file.url, "name": upload.file.name, "size": upload.file.size, } }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Generate signed URL @app.post("/api/files/generate-signed-url") async def generate_signed_url(file_key: str, expires_in: int = 3600): try: if not file_key: raise HTTPException(status_code=400, detail="file_key is required") signed_url = storage.generate_signed_url({ "path": "/files/upload", "file_key": file_key, "expires_in": expires_in, }) return JSONResponse({ "success": True, "signed_url": signed_url, "expires_in": expires_in, }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # Delete file @app.delete("/api/files/delete") async def delete_file(url: str): try: if not url: raise HTTPException(status_code=400, detail="File URL is required") success = await storage.delete_by_url(url) if success: return JSONResponse({"success": True, "message": "File deleted"}) else: raise HTTPException(status_code=404, detail="File not found") except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)
Last updated on
© 2026 FileZen. All rights reserved.