You need to process user-uploaded CSV files in a web application without exposing your system to Denial of Service (DoS) attacks or other security threats.
This guide shows you how to implement secure CSV processing using multiple defense layers:
npm install web-csv-toolbox zod hono
Why: Attackers could upload multiple large CSV files simultaneously to overwhelm your server.
import { Hono } from 'hono';
import { stream } from 'hono/streaming';
import { z } from 'zod';
import { ReusableWorkerPool, EnginePresets, parseStringStream } from 'web-csv-toolbox';
const app = new Hono();
// ✅ CRITICAL: Limit concurrent workers
const pool = new ReusableWorkerPool({ maxWorkers: 4 });
// Clean up on shutdown
app.onShutdown(() => {
pool.terminate();
});
Guidelines:
maxWorkers: 2-4Math.min(4, os.cpus().length)Why: Rejecting requests early when the pool is saturated prevents cascading failures.
const recordSchema = z.object({
name: z.string().min(1).max(100),
email: z.string().email(),
age: z.coerce.number().int().min(0).max(150),
});
app.post('/validate-csv', async (c) => {
// 1. Early rejection if pool is saturated
if (pool.isFull()) {
console.warn('WorkerPool saturated - rejecting request');
return c.json(
{ error: 'Service temporarily unavailable. Please try again later.' },
503
);
}
// 2. Verify Content-Type
const contentType = c.req.header('Content-Type');
if (!contentType?.startsWith('text/csv')) {
return c.json({ error: 'Content-Type must be text/csv' }, 400);
}
// 3. Get request body as stream
const csvStream = c.req.raw.body?.pipeThrough(new TextDecoderStream());
if (!csvStream) {
return c.json({ error: 'Request body is required' }, 400);
}
// 4. Process with streaming validation (see next step)
// ...
});
Benefits:
Why: Streaming validation prevents memory exhaustion and provides real-time feedback.
app.post('/validate-csv', async (c) => {
// ... (early rejection and validation from Step 2)
return stream(c, async (stream) => {
c.header('Content-Type', 'text/event-stream');
c.header('Cache-Control', 'no-cache');
c.header('Connection', 'keep-alive');
let validCount = 0;
let errorCount = 0;
for await (const record of parseStringStream(csvStream, {
engine: EnginePresets.balanced({ workerPool: pool })
})) {
try {
// Validate each record
recordSchema.parse(record);
validCount++;
} catch (error) {
errorCount++;
if (error instanceof z.ZodError) {
const errorMessage = {
line: validCount + errorCount,
errors: error.errors.map(e => ({
path: e.path.join('.'),
message: e.message
}))
};
// Send error as SSE event
await stream.write(`event: error\n`);
await stream.write(`data: ${JSON.stringify(errorMessage)}\n\n`);
console.warn('CSV validation error:', errorMessage);
}
}
}
// Send summary as final SSE event
await stream.write(`event: summary\n`);
await stream.write(`data: ${JSON.stringify({ valid: validCount, errors: errorCount })}\n\n`);
});
});
Why SSE?
For production applications, implement defense in depth:
// Security configuration
const SECURITY_CONFIG = {
maxRequestBodySize: 50 * 1024 * 1024, // 50MB
maxWorkers: 4,
parseTimeout: 30000, // 30 seconds
maxBufferSize: 5 * 1024 * 1024, // 5M characters
maxFieldCount: 10000, // 10k fields/record
maxRecordCount: 100000, // 100k records max
maxErrorCount: 1000, // Stop after 1000 validation errors
};
/**
* Creates a TransformStream that counts actual bytes received and enforces size limits.
* This prevents attackers from bypassing Content-Length header checks with chunked encoding
* or by sending more data than declared.
*/
function createByteLimitStream(maxBytes: number) {
let bytesRead = 0;
const stream = new TransformStream({
transform(chunk, controller) {
bytesRead += chunk.byteLength;
if (bytesRead > maxBytes) {
controller.error(
new RangeError(`Request body size ${bytesRead} exceeds limit of ${maxBytes} bytes`)
);
return;
}
controller.enqueue(chunk);
},
});
return {
stream,
getBytesRead: () => bytesRead,
};
}
const pool = new ReusableWorkerPool({ maxWorkers: SECURITY_CONFIG.maxWorkers });
app.post('/validate-csv', async (c) => {
// 1. Early rejection
if (pool.isFull()) {
return c.json({ error: 'Service busy' }, 503);
}
// 2. Verify Content-Type
const contentType = c.req.header('Content-Type');
if (!contentType?.startsWith('text/csv')) {
return c.json({ error: 'Content-Type must be text/csv' }, 415);
}
// 3. Check Content-Length header (early rejection only - actual bytes will be counted)
const contentLength = c.req.header('Content-Length');
if (contentLength && Number.parseInt(contentLength) > SECURITY_CONFIG.maxRequestBodySize) {
return c.json({ error: 'Request body too large' }, 413);
}
// 4. Get request body and add byte counting
const rawBody = c.req.raw.body;
if (!rawBody) {
return c.json({ error: 'Request body is required' }, 400);
}
// 5. Create byte-counting stream to track actual received bytes
// This prevents attackers from bypassing Content-Length checks with chunked encoding
const byteLimitStream = createByteLimitStream(SECURITY_CONFIG.maxRequestBodySize);
const csvStream = rawBody.pipeThrough(byteLimitStream.stream);
// 6. Timeout protection
const signal = AbortSignal.timeout(SECURITY_CONFIG.parseTimeout);
try {
return stream(c, async (stream) => {
c.header('Content-Type', 'text/event-stream');
c.header('Cache-Control', 'no-cache');
c.header('Connection', 'keep-alive');
let validCount = 0;
let errorCount = 0;
let recordCount = 0;
try {
for await (const record of parseBinaryStream(csvStream, {
signal,
engine: EnginePresets.balanced({ workerPool: pool }),
maxBufferSize: SECURITY_CONFIG.maxBufferSize,
maxFieldCount: SECURITY_CONFIG.maxFieldCount,
})) {
recordCount++;
// Enforce record count limit
if (recordCount > SECURITY_CONFIG.maxRecordCount) {
await stream.write(`event: fatal\n`);
await stream.write(`data: ${JSON.stringify({
error: 'Record count limit exceeded',
limit: SECURITY_CONFIG.maxRecordCount,
bytesRead: byteLimitStream.getBytesRead(),
})}\n\n`);
throw new RangeError(`Record count ${recordCount} exceeds limit`);
}
// Enforce error count limit
if (errorCount >= SECURITY_CONFIG.maxErrorCount) {
await stream.write(`event: fatal\n`);
await stream.write(`data: ${JSON.stringify({
error: 'Error count limit exceeded - stopping validation',
limit: SECURITY_CONFIG.maxErrorCount,
valid: validCount,
errors: errorCount,
})}\n\n`);
throw new Error('VALIDATION_ERROR_LIMIT');
}
try {
recordSchema.parse(record);
validCount++;
} catch (error) {
errorCount++;
if (error instanceof z.ZodError) {
const errorMessage = {
line: validCount + errorCount,
errors: error.errors.map(e => ({
path: e.path.join('.'),
message: e.message
}))
};
await stream.write(`event: error\n`);
await stream.write(`data: ${JSON.stringify(errorMessage)}\n\n`);
// Log first 10 errors for monitoring
if (errorCount <= 10) {
console.warn('CSV validation error:', errorMessage);
}
}
}
}
await stream.write(`event: summary\n`);
await stream.write(`data: ${JSON.stringify({
valid: validCount,
errors: errorCount,
total: recordCount,
bytesRead: byteLimitStream.getBytesRead(),
})}\n\n`);
} catch (error) {
// Fatal errors that should close the connection with proper HTTP status
if (error instanceof Error) {
if (error.name === 'AbortError') {
await stream.write(`event: fatal\n`);
await stream.write(`data: ${JSON.stringify({ error: 'Parsing timeout' })}\n\n`);
throw error;
}
if (error.message === 'VALIDATION_ERROR_LIMIT') {
throw new Error('Too many validation errors');
}
}
if (error instanceof RangeError) {
throw error;
}
// Non-fatal errors: send error event but continue
await stream.write(`event: error\n`);
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
await stream.write(`data: ${JSON.stringify({ error: errorMessage })}\n\n`);
}
});
} catch (error) {
// Return proper HTTP status codes for fatal errors
if (error instanceof Error && error.name === 'AbortError') {
return c.json({ error: 'Parsing timeout' }, 408);
}
if (error instanceof RangeError) {
return c.json({
error: 'CSV exceeds limits',
details: error.message,
}, 413);
}
if (error instanceof Error && error.message === 'Too many validation errors') {
return c.json({
error: 'Too many validation errors',
limit: SECURITY_CONFIG.maxErrorCount,
}, 422);
}
return c.json({ error: 'Invalid CSV format' }, 400);
}
});
export default app;
Threat: Attackers can bypass Content-Length header checks by:
Defense: Use a TransformStream to count actual received bytes:
function createByteLimitStream(maxBytes: number) {
let bytesRead = 0;
const stream = new TransformStream({
transform(chunk, controller) {
bytesRead += chunk.byteLength;
if (bytesRead > maxBytes) {
controller.error(
new RangeError(`Request body size ${bytesRead} exceeds limit of ${maxBytes} bytes`)
);
return;
}
controller.enqueue(chunk);
},
});
return {
stream,
getBytesRead: () => bytesRead,
};
}
// Usage
const byteLimitStream = createByteLimitStream(SECURITY_CONFIG.maxRequestBodySize);
const csvStream = rawBody.pipeThrough(byteLimitStream.stream);
Why it works:
Threat: Attackers send millions of valid records to exhaust CPU/memory over time, even if individual records are small.
Defense: Enforce maximum record count:
let recordCount = 0;
for await (const record of parseBinaryStream(csvStream, { ... })) {
recordCount++;
if (recordCount > SECURITY_CONFIG.maxRecordCount) {
await stream.write(`event: fatal\n`);
await stream.write(`data: ${JSON.stringify({
error: 'Record count limit exceeded',
limit: SECURITY_CONFIG.maxRecordCount,
})}\n\n`);
throw new RangeError('Record count exceeded');
}
// Process record...
}
Why it works:
Threat: Attackers send CSV with millions of invalid records, causing:
Defense: Stop processing after maxErrorCount validation errors:
let errorCount = 0;
for await (const record of parseBinaryStream(csvStream, { ... })) {
if (errorCount >= SECURITY_CONFIG.maxErrorCount) {
await stream.write(`event: fatal\n`);
await stream.write(`data: ${JSON.stringify({
error: 'Error count limit exceeded - stopping validation',
limit: SECURITY_CONFIG.maxErrorCount,
})}\n\n`);
throw new Error('VALIDATION_ERROR_LIMIT');
}
try {
recordSchema.parse(record);
validCount++;
} catch (error) {
errorCount++;
// Send error event...
}
}
Why it works:
Threat: Without proper status codes, clients can't determine if their request was accepted or if processing is complete.
Defense: Return 202 Accepted when SSE streaming starts, and communicate fatal errors via event: fatal in the stream:
try {
return stream(c, async (stream) => {
c.status(202); // Accepted - processing in progress
c.header('Content-Type', 'text/event-stream');
try {
// ... parsing and validation
} catch (error) {
// Send fatal event to client
if (error instanceof Error && error.name === 'AbortError') {
await stream.write(`event: fatal\n`);
await stream.write(`data: ${JSON.stringify({ error: 'Parsing timeout' })}\n\n`);
}
if (error instanceof RangeError) {
await stream.write(`event: fatal\n`);
await stream.write(`data: ${JSON.stringify({
error: 'CSV exceeds limits',
bytesRead: bytesRead
})}\n\n`);
}
// ... other fatal errors
}
});
} catch (error) {
// Only errors before SSE starts return non-202 status
if (!sseStarted) {
return c.json({ error: 'Validation failed' }, 400);
}
// ...
}
Why it works:
Implement monitoring to detect potential attacks:
// Monitor pool usage
setInterval(() => {
if (pool.isFull()) {
console.warn('WorkerPool at capacity - possible attack or high load');
// Alert your monitoring system (e.g., Datadog, Sentry)
}
}, 5000);
Before deploying to production, verify:
maxWorkers limitisFull()For advanced configuration options, refer to the EngineConfig type documentation in your IDE or the API Reference.
Cause: WorkerPool is saturated
Solution: Increase maxWorkers or add rate limiting
Cause: Large CSV files or not using streaming
Solution: Verify you're using parseStringStream() with stream input, not parseString() with string input
Cause: CSV processing takes too long Solution: Increase timeout or optimize parsing (use WASM, reduce validation)
For a complete, production-ready implementation of all security measures described in this guide, see the Hono Secure API Example on GitHub.
The example includes:
@hono/node-server