Middleware
Middleware
The middleware system in @ag-ui/client provides a powerful way to transform, filter, and augment event streams flowing through agents. Middleware can intercept and modify events, add logging, implement authentication, filter tool calls, and more.
AbstractAgent,
BaseEvent,
EventType,
FilterToolCallsMiddleware,
Message,
Middleware,
MiddlewareFunction,
RunAgentInput
} from "@ag-ui/client"
Examples below assume the relevant RxJS operators/utilities (map, tap, filter, finalize, catchError, switchMap, timer, of, etc.) are imported.
Types
MiddlewareFunction
A function that transforms the event stream.
type MiddlewareFunction = (
input: RunAgentInput,
next: AbstractAgent,
) => Observable<BaseEvent>;
Function middleware receives events exactly as emitted by the next middleware or agent.
Middleware
Abstract base class for creating middleware.
interface EventWithState {
event: BaseEvent;
messages: Message[];
state: unknown;
}
abstract class Middleware {
abstract run(
input: RunAgentInput,
next: AbstractAgent,
): Observable<BaseEvent>;
protected runNext(
input: RunAgentInput,
next: AbstractAgent,
): Observable<BaseEvent>;
protected runNextWithState(
input: RunAgentInput,
next: AbstractAgent,
): Observable<EventWithState>;
}
runNext()runsnext.run(...)and normalizes chunk events into completeTEXT_MESSAGE_*/TOOL_CALL_*sequences.runNextWithState()does the same and also provides accumulatedmessagesandstateafter each event is applied.
Function-Based Middleware
The simplest way to create middleware is with a function. Function middleware is ideal for stateless transformations.
Basic Example
const loggingMiddleware: MiddlewareFunction = (input, next) => {
console.log(`[${new Date().toISOString()}] Starting run ${input.runId}`);
return next.run(input).pipe(
tap((event) => console.log(`Event: ${event.type}`)),
finalize(() => console.log(`Run ${input.runId} completed`)),
);
};
agent.use(loggingMiddleware);
Transforming Events
const timestampMiddleware: MiddlewareFunction = (input, next) => {
return next.run(input).pipe(
map((event) => {
if (event.type === EventType.RUN_STARTED) {
return {
...event,
timestamp: Date.now(),
};
}
return event;
}),
);
};
Error Handling
const errorMiddleware: MiddlewareFunction = (input, next) => {
return next.run(input).pipe(
catchError((error) => {
console.error("Agent error:", error);
// Return error event
return of({
type: EventType.RUN_ERROR,
message: error.message,
} as BaseEvent);
}),
);
};
Class-Based Middleware
For stateful operations or complex logic, extend the Middleware class.
Basic Implementation
class CounterMiddleware extends Middleware {
private totalEvents = 0;
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
let runEvents = 0;
return this.runNext(input, next).pipe(
tap(() => {
runEvents++;
this.totalEvents++;
}),
finalize(() => {
console.log(`Run events: ${runEvents}, Total: ${this.totalEvents}`);
}),
);
}
}
agent.use(new CounterMiddleware());
Configuration-Based Middleware
class AuthMiddleware extends Middleware {
constructor(
private apiKey: string,
private headerName: string = "Authorization",
) {
super();
}
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
// Attach auth data in forwardedProps for downstream transport/agent logic
const authenticatedInput: RunAgentInput = {
...input,
forwardedProps: {
...input.forwardedProps,
auth: {
headerName: this.headerName,
value: `Bearer ${this.apiKey}`,
},
},
};
return this.runNext(authenticatedInput, next);
}
}
const apiKey = process.env.API_KEY ?? "";
agent.use(new AuthMiddleware(apiKey));
Accumulator Helpers (Class Middleware)
Class middleware can use helper methods from Middleware to work with normalized events and accumulated state.
runNext()
runNext() forwards execution and normalizes chunk events into full TEXT_MESSAGE_* and TOOL_CALL_* events.
runNextWithState()
runNextWithState() returns { event, messages, state }, where messages and state are the accumulated values after each event has been applied.
class MetricsWithStateMiddleware extends Middleware {
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
return this.runNextWithState(input, next).pipe(
tap(({ event, messages, state }) => {
if (event.type === EventType.RUN_FINISHED) {
const stateKeyCount =
state && typeof state === "object" ? Object.keys(state).length : 0;
console.log(
"Assistant messages:",
messages.filter((m) => m.role === "assistant").length,
);
console.log("Final state keys:", stateKeyCount);
}
}),
map(({ event }) => event),
);
}
}
Built-in Middleware
FilterToolCallsMiddleware
Filters tool calls based on allowed or disallowed lists.
FilterToolCallsMiddleware filters emitted TOOL_CALL_* events (including args/results for blocked calls). It does not prevent tool execution in the upstream model/runtime.
Configuration
type FilterToolCallsConfig =
| { allowedToolCalls: string[]; disallowedToolCalls?: never }
| { disallowedToolCalls: string[]; allowedToolCalls?: never };
Allow Specific Tools
const allowFilter = new FilterToolCallsMiddleware({
allowedToolCalls: ["search", "calculate", "summarize"],
});
agent.use(allowFilter);
You can also use disallowedToolCalls instead of allowedToolCalls.
Middleware Patterns
Timing Middleware
const timingMiddleware: MiddlewareFunction = (input, next) => {
const startTime = performance.now();
return next.run(input).pipe(
finalize(() => {
const duration = performance.now() - startTime;
console.log(`Execution time: ${duration.toFixed(2)}ms`);
}),
);
};
Rate Limiting
class RateLimitMiddleware extends Middleware {
private lastCall = 0;
constructor(private minInterval: number) {
super();
}
run(input: RunAgentInput, next: AbstractAgent): Observable<BaseEvent> {
const now = Date.now();
const elapsed = now - this.lastCall;
if (elapsed < this.minInterval) {
// Delay the execution
return timer(this.minInterval - elapsed).pipe(
switchMap(() => {
this.lastCall = Date.now();
return this.runNext(input, next);
}),
);
}
this.lastCall = now;
return this.runNext(input, next);
}
}
// Limit to one request per second
agent.use(new RateLimitMiddleware(1000));
Other common patterns include retry logic and response caching.
Chaining Middleware
Multiple middleware can be combined to create sophisticated processing pipelines.
const logger = loggingMiddleware;
const auth = new AuthMiddleware(apiKey);
const filter = new FilterToolCallsMiddleware({ allowedToolCalls: ["search"] });
agent.use(logger, auth, filter);
// Execution flow:
// logger → auth → filter → agent → filter → auth → logger
Advanced Usage
Conditional Middleware
const debugMiddleware: MiddlewareFunction = (input, next) => {
const isDebug = input.forwardedProps?.debug === true;
if (!isDebug) {
return next.run(input);
}
return next.run(input).pipe(
tap((event) => {
console.debug("[DEBUG]", JSON.stringify(event, null, 2));
}),
);
};
Lifecycle Notes
Middleware added with agent.use(...) runs in runAgent() (and the legacy bridge path). connectAgent() currently calls connect() directly and does not run middleware.
Best Practices
- Single Responsibility: Each middleware should focus on one concern
- Error Handling: Always handle errors gracefully and consider recovery strategies
- Performance: Be mindful of processing overhead in high-throughput scenarios
- State Management: Use class-based middleware when state is required
- Testing: Write unit tests for each middleware independently
- Documentation: Document middleware behavior and side effects
TypeScript Support
The middleware system is fully typed for excellent IDE support:
AbstractAgent,
BaseEvent,
MiddlewareFunction,
RunAgentInput
} from "@ag-ui/client"
// Type-safe middleware function
const typedMiddleware: MiddlewareFunction = (
input: RunAgentInput,
next: AbstractAgent
): Observable<BaseEvent> => {
return next.run(input)
}