This guide shows you how to use ReusableWorkerPool effectively in production environments for high-throughput CSV processing.
API Reference: For detailed type definitions, see:
Understanding: For design rationale, see Worker Pool Architecture
Learning: For beginner tutorials, see Working with Workers
Worker pools are essential for production environments where:
Skip worker pools when:
Server Application (Node.js/Deno/Bun):
import { ReusableWorkerPool } from 'web-csv-toolbox';
// Create pool once at application startup
export const csvWorkerPool = new ReusableWorkerPool({
maxWorkers: 4 // Adjust based on your CPU cores
});
// Cleanup on shutdown
process.on('SIGTERM', () => {
csvWorkerPool.terminate();
process.exit(0);
});
Hono Example:
import { Hono } from 'hono';
import { parseRequest, EnginePresets } from 'web-csv-toolbox';
import { csvWorkerPool } from './worker-pool';
const app = new Hono();
app.post('/upload-csv', async (c) => {
// Early rejection if pool is saturated
if (csvWorkerPool.isFull()) {
return c.json({ error: 'Service busy, try again later' }, 503);
}
try {
const records = [];
for await (const record of parseRequest(c.req.raw, {
engine: EnginePresets.balanced({ workerPool: csvWorkerPool })
})) {
records.push(record);
}
return c.json({ success: true, count: records.length });
} catch (error) {
return c.json({ error: error.message }, 400);
}
});
export default app;
Note: For Node.js-specific stream handling (Express, Fastify, etc.), see Node.js Stream Conversion.
Complete Example: See the Hono Secure API Example for a production-ready implementation with comprehensive security measures and tests.
CPU-Bound Workloads (CSV Parsing):
import os from 'node:os';
// Conservative: Leave some CPU for other tasks
const maxWorkers = Math.max(1, Math.floor(os.cpus().length * 0.75));
// Aggressive: Use all available cores
const maxWorkers = os.cpus().length;
const pool = new ReusableWorkerPool({ maxWorkers });
Memory-Constrained Environments:
// Limit based on available memory
// Estimate: ~50MB per worker for moderate CSV files
const availableMemoryMB = 1024; // 1GB
const estimatedMemoryPerWorker = 50;
const maxWorkers = Math.floor(availableMemoryMB / estimatedMemoryPerWorker);
const pool = new ReusableWorkerPool({
maxWorkers: Math.min(maxWorkers, 4) // Cap at 4
});
Container Environments (Docker/Kubernetes):
// Read from environment variable
const maxWorkers = process.env.CSV_WORKER_POOL_SIZE
? parseInt(process.env.CSV_WORKER_POOL_SIZE, 10)
: 2; // Default to 2 in containers
const pool = new ReusableWorkerPool({ maxWorkers });
import { performance } from 'node:perf_hooks';
import { parseString, EnginePresets, ReusableWorkerPool } from 'web-csv-toolbox';
async function benchmarkPoolSize(csv: string, poolSize: number) {
using pool = new ReusableWorkerPool({ maxWorkers: poolSize });
const start = performance.now();
// Parse 10 files concurrently
await Promise.all(
Array(10).fill(csv).map(async (c) => {
const records = [];
for await (const record of parseString(c, {
engine: EnginePresets.balanced({ workerPool: pool })
})) {
records.push(record);
}
return records;
})
);
const duration = performance.now() - start;
console.log(`Pool size ${poolSize}: ${duration.toFixed(2)}ms`);
return duration;
}
// Test different pool sizes
const csv = generateLargeCSV(); // Your test data
for (const size of [1, 2, 4, 8]) {
await benchmarkPoolSize(csv, size);
}
Why: Prevents queue buildup and provides immediate feedback.
import { Hono } from 'hono';
import { csvWorkerPool } from './worker-pool';
const app = new Hono();
app.post('/upload-csv', async (c) => {
// Check pool capacity BEFORE accepting request
if (csvWorkerPool.isFull()) {
console.warn('Worker pool saturated, rejecting request');
return c.json({
error: 'Service temporarily unavailable',
retryAfter: 5 // Suggest retry delay in seconds
}, 503);
}
// Process request...
});
// Client-side example
async function uploadCSVWithRetry(
file: File,
maxRetries = 3
): Promise<Response> {
for (let attempt = 0; attempt < maxRetries; attempt++) {
const response = await fetch('/upload-csv', {
method: 'POST',
body: file
});
if (response.status === 503) {
// Exponential backoff
const delay = Math.pow(2, attempt) * 1000;
console.log(`Service busy, retrying in ${delay}ms...`);
await new Promise(resolve => setTimeout(resolve, delay));
continue;
}
return response;
}
throw new Error('Service unavailable after retries');
}
When pool saturation is temporary:
import { Hono } from 'hono';
import { parseRequest, EnginePresets } from 'web-csv-toolbox';
import { csvWorkerPool } from './worker-pool';
const app = new Hono();
// Simple in-memory queue (use Redis/RabbitMQ for production)
const requestQueue: Array<() => Promise<void>> = [];
let processing = false;
async function processQueue() {
if (processing || requestQueue.length === 0) return;
processing = true;
while (requestQueue.length > 0 && !csvWorkerPool.isFull()) {
const handler = requestQueue.shift();
if (handler) {
handler().catch(console.error);
}
}
processing = false;
// Check again after a delay
if (requestQueue.length > 0) {
setTimeout(processQueue, 100);
}
}
app.post('/upload-csv', async (c) => {
if (csvWorkerPool.isFull()) {
// Queue the request instead of rejecting
if (requestQueue.length >= 10) {
return c.json({ error: 'Queue full' }, 503);
}
const promise = new Promise<void>((resolve, reject) => {
requestQueue.push(async () => {
try {
// Process CSV...
resolve();
} catch (error) {
reject(error);
}
});
});
processQueue(); // Trigger queue processing
await promise;
return c.json({ success: true, queued: true });
}
// Process immediately...
});
import { parseString, EnginePresets } from 'web-csv-toolbox';
import { csvWorkerPool } from './worker-pool';
async function parseWithErrorHandling(csv: string) {
try {
const records = [];
for await (const record of parseString(csv, {
engine: EnginePresets.balanced({ workerPool: csvWorkerPool })
})) {
records.push(record);
}
return { success: true, records };
} catch (error) {
// Worker errors
if (error.message.includes('Worker')) {
console.error('Worker failed:', error);
// Optional: Recreate pool on worker failure
// csvWorkerPool.terminate();
// csvWorkerPool = new ReusableWorkerPool({ maxWorkers: 4 });
return { success: false, error: 'Worker failure' };
}
// CSV format errors
if (error.name === 'ParseError') {
console.error('Invalid CSV:', error);
return { success: false, error: 'Invalid CSV format' };
}
// Other errors
console.error('Unexpected error:', error);
return { success: false, error: 'Processing failed' };
}
}
async function parseWithTimeout(csv: string, timeoutMs = 30000) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
const records = [];
for await (const record of parseString(csv, {
engine: EnginePresets.balanced({ workerPool: csvWorkerPool }),
signal: controller.signal
})) {
records.push(record);
}
clearTimeout(timeoutId);
return { success: true, records };
} catch (error) {
clearTimeout(timeoutId);
if (error.name === 'AbortError') {
console.error('Parsing timeout');
return { success: false, error: 'Timeout' };
}
throw error;
}
}
import { csvWorkerPool } from './worker-pool';
// Expose metrics endpoint
app.get('/metrics/worker-pool', (c) => {
return c.json({
activeWorkers: csvWorkerPool.size,
isFull: csvWorkerPool.isFull(),
timestamp: new Date().toISOString()
});
});
// Periodic logging
setInterval(() => {
console.log('Worker pool status:', {
active: csvWorkerPool.size,
full: csvWorkerPool.isFull()
});
}, 60000); // Every minute
let activeRequests = 0;
let totalRequests = 0;
let rejectedRequests = 0;
app.post('/upload-csv', async (c) => {
totalRequests++;
if (csvWorkerPool.isFull()) {
rejectedRequests++;
console.warn(`Request rejected (${rejectedRequests}/${totalRequests})`);
return c.json({ error: 'Service busy' }, 503);
}
activeRequests++;
try {
// Process CSV...
return c.json({ success: true });
} finally {
activeRequests--;
}
});
// Expose metrics
app.get('/metrics', (c) => {
return c.json({
total: totalRequests,
active: activeRequests,
rejected: rejectedRequests,
rejectionRate: (rejectedRequests / totalRequests * 100).toFixed(2) + '%'
});
});
Prometheus Example:
import { register, Counter, Gauge } from 'prom-client';
const csvRequestsTotal = new Counter({
name: 'csv_requests_total',
help: 'Total CSV parsing requests',
labelNames: ['status']
});
const csvWorkerPoolSize = new Gauge({
name: 'csv_worker_pool_size',
help: 'Current worker pool size'
});
// Update metrics
app.post('/upload-csv', async (c) => {
if (csvWorkerPool.isFull()) {
csvRequestsTotal.inc({ status: 'rejected' });
return c.json({ error: 'Service busy' }, 503);
}
csvRequestsTotal.inc({ status: 'accepted' });
csvWorkerPoolSize.set(csvWorkerPool.size);
// Process...
});
// Metrics endpoint
app.get('/metrics', async (c) => {
c.header('Content-Type', register.contentType);
return c.body(await register.metrics());
});
Use different pools for different workloads:
// Small files: dedicated pool
const smallFilePool = new ReusableWorkerPool({ maxWorkers: 2 });
// Large files: dedicated pool with more workers
const largeFilePool = new ReusableWorkerPool({ maxWorkers: 6 });
app.post('/upload-csv', async (c) => {
const contentLength = parseInt(c.req.header('Content-Length') || '0');
// Choose pool based on file size
const pool = contentLength > 10 * 1024 * 1024 // 10MB
? largeFilePool
: smallFilePool;
if (pool.isFull()) {
return c.json({ error: 'Service busy' }, 503);
}
// Process with appropriate pool...
});
interface QueuedRequest {
priority: 'high' | 'normal' | 'low';
handler: () => Promise<void>;
}
const priorityQueue: QueuedRequest[] = [];
async function processQueueByPriority() {
// Sort by priority
priorityQueue.sort((a, b) => {
const priorities = { high: 0, normal: 1, low: 2 };
return priorities[a.priority] - priorities[b.priority];
});
while (priorityQueue.length > 0 && !csvWorkerPool.isFull()) {
const request = priorityQueue.shift();
if (request) {
request.handler().catch(console.error);
}
}
}
class AdaptiveWorkerPool {
private pool: ReusableWorkerPool;
private minWorkers = 1;
private maxWorkers = 8;
private currentSize = 2;
constructor() {
this.pool = new ReusableWorkerPool({ maxWorkers: this.currentSize });
}
adjustPoolSize(load: number) {
// Scale up if load is high
if (load > 0.8 && this.currentSize < this.maxWorkers) {
this.currentSize = Math.min(this.currentSize + 1, this.maxWorkers);
this.pool.terminate();
this.pool = new ReusableWorkerPool({ maxWorkers: this.currentSize });
console.log(`Scaled up to ${this.currentSize} workers`);
}
// Scale down if load is low
if (load < 0.2 && this.currentSize > this.minWorkers) {
this.currentSize = Math.max(this.currentSize - 1, this.minWorkers);
this.pool.terminate();
this.pool = new ReusableWorkerPool({ maxWorkers: this.currentSize });
console.log(`Scaled down to ${this.currentSize} workers`);
}
}
}
using syntax for automatic cleanup in scoped contextsisFull() before accepting requests to prevent queue buildupmaxWorkers based on your CPU cores and memoryAbortSignal for long-running operationsisFull() checks - leads to queue buildupmaxWorkers too high - causes memory/CPU exhaustionSymptoms: Node.js process hangs after completion
Solution:
// Ensure pool is terminated
process.on('SIGTERM', () => {
csvWorkerPool.terminate();
process.exit(0);
});
process.on('SIGINT', () => {
csvWorkerPool.terminate();
process.exit(0);
});
Symptoms: Memory grows continuously
Solutions:
maxWorkersSymptoms: Many requests rejected
Solutions:
maxWorkers if CPU allowsSymptoms: Intermittent worker failures
Solutions: