Skip to main content

Streaming

Handle streaming responses for real-time data, Server-Sent Events (SSE), and large file downloads.

Stream Endpoints

Use declareStream from @navios/builder and the @Stream() decorator:

import { builder } from '@navios/builder'

const API = builder()

const streamEvents = API.declareStream({
url: '/events/stream',
})

Adapter Differences

Fastify Adapter

Use the reply object to write to the response stream:

@Stream(streamEvents)
async streamEvents(params: StreamParams<typeof streamEvents>, reply: Reply) {
reply.raw.writeHead(200, { 'Content-Type': 'text/event-stream' })
reply.raw.write('data: hello\n\n')
reply.raw.end()
}

Bun Adapter

Return a Response object or BodyInit (the first argument for the Response constructor):

@Stream(streamEvents)
async streamEvents(params: StreamParams<typeof streamEvents>) {
return new Response(
new ReadableStream({
async start(controller) {
controller.enqueue('data: hello\n\n')
controller.close()
},
}),
{
headers: { 'Content-Type': 'text/event-stream' },
}
)
}

Server-Sent Events (SSE) - Fastify

Implement real-time event streaming:

import { Controller, Reply, Stream, StreamParams } from '@navios/core'

@Controller()
class EventController {
@Stream(streamEvents)
async streamEvents(params: StreamParams<typeof streamEvents>, reply: Reply) {
// Set SSE headers
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
})

// Send events
const sendEvent = (data: object) => {
reply.raw.write(`data: ${JSON.stringify(data)}\n\n`)
}

// Initial event
sendEvent({ type: 'connected', timestamp: Date.now() })

// Subscribe to events
const unsubscribe = this.eventService.subscribe((event) => {
sendEvent(event)
})

// Handle client disconnect
reply.raw.on('close', () => {
unsubscribe()
})
}
}

File Downloads

Fastify

Stream large files efficiently:

const downloadFile = API.declareStream({
url: '/files/$fileId/download',
})

@Controller()
class FileController {
private storage = inject(StorageService)

@Stream(downloadFile)
async downloadFile(params: StreamParams<typeof downloadFile>, reply: Reply) {
const { fileId } = params.urlParams
const file = await this.storage.getFile(fileId)

if (!file) {
throw new NotFoundException('File not found')
}

reply.raw.writeHead(200, {
'Content-Type': file.mimeType,
'Content-Disposition': `attachment; filename="${file.name}"`,
'Content-Length': file.size,
})

// Pipe file stream to response
const stream = await this.storage.createReadStream(fileId)
stream.pipe(reply.raw)
}
}

Bun

Return a Response with the file buffer:

@Controller()
class FileController {
private storage = inject(StorageService)

@Stream(downloadFile)
async downloadFile(params: StreamParams<typeof downloadFile>) {
const { fileId } = params.urlParams
const file = await this.storage.getFile(fileId)

if (!file) {
throw new NotFoundException('File not found')
}

// Get buffer from S3 or other storage
const buffer = await this.storage.getBuffer(fileId)

return new Response(buffer, {
headers: {
'Content-Type': file.mimeType,
'Content-Disposition': `attachment; filename="${file.name}"`,
'Content-Length': String(file.size),
},
})
}
}

Real-Time Progress (Fastify)

Stream progress updates:

const processTask = API.declareStream({
method: 'POST',
url: '/tasks/process',
requestSchema: z.object({ taskId: z.string() }),
})

@Controller()
class TaskController {
@Stream(processTask)
async processTask(params: StreamParams<typeof processTask>, reply: Reply) {
const { taskId } = params.data

reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
})

const sendProgress = (progress: number, status: string) => {
reply.raw.write(`data: ${JSON.stringify({ progress, status })}\n\n`)
}

sendProgress(0, 'Starting...')

// Simulate processing steps
await this.step1(taskId)
sendProgress(25, 'Step 1 complete')

await this.step2(taskId)
sendProgress(50, 'Step 2 complete')

await this.step3(taskId)
sendProgress(75, 'Step 3 complete')

await this.step4(taskId)
sendProgress(100, 'Complete!')

reply.raw.end()
}
}

Chunked JSON Streaming (Fastify)

Stream large JSON arrays:

@Controller()
class ExportController {
@Stream(exportUsers)
async exportUsers(params: StreamParams<typeof exportUsers>, reply: Reply) {
reply.raw.writeHead(200, {
'Content-Type': 'application/json',
})

reply.raw.write('[\n')

let first = true
for await (const user of this.userService.streamAll()) {
if (!first) {
reply.raw.write(',\n')
}
reply.raw.write(JSON.stringify(user))
first = false
}

reply.raw.write('\n]')
reply.raw.end()
}
}

Client-Side Consumption

Connect to SSE endpoints from the client:

// Browser
const eventSource = new EventSource('/events/stream')

eventSource.onmessage = (event) => {
const data = JSON.parse(event.data)
console.log('Received:', data)
}

eventSource.onerror = (error) => {
console.error('SSE Error:', error)
eventSource.close()
}

// Clean up
// eventSource.close()

Error Handling in Streams (Fastify)

Handle errors gracefully:

@Stream(streamData)
async streamData(params: StreamParams<typeof streamData>, reply: Reply) {
reply.raw.writeHead(200, {
'Content-Type': 'text/event-stream',
})

try {
for await (const item of this.dataSource.stream()) {
reply.raw.write(`data: ${JSON.stringify(item)}\n\n`)
}
} catch (error) {
// Send error event before closing
reply.raw.write(`data: ${JSON.stringify({ error: error.message })}\n\n`)
} finally {
reply.raw.end()
}
}