Real-time Communication Layer
Overview
The MyTradeX platform employs a sophisticated real-time communication architecture to deliver low-latency trading data, order updates, and system notifications across multiple client platforms. This layer is critical for providing traders with real-time market data and immediate feedback on their trading activities.
Architecture Philosophy
Core Principles
- Low Latency: Sub-100ms message delivery for critical trading updates
- High Reliability: Automatic reconnection and message recovery
- Scalability: Support for thousands of concurrent connections
- Multi-platform: Consistent real-time experience across web, mobile, and desktop
- Event-driven: Loose coupling through event-based communication
- Fault Tolerance: Graceful handling of network failures
Technology Stack
| Component | Technology | Purpose |
|---|---|---|
| WebSocket Framework | Socket.IO (Client), Spring WebSocket (Server) | Real-time bidirectional communication |
| Message Broker | STOMP over WebSocket | Structured message routing |
| State Management | TanStack React Query | Client-side data synchronization |
| Connection Management | Custom connection pooling | Connection lifecycle management |
| Message Serialization | JSON | Cross-platform message format |
WebSocket Architecture
Client-Side Implementation
WebSocket Client Configuration
Location: src/lib/socket.ts, mytradex-monorepo/packages/api/src/socketClient.ts
import { io, Socket } from 'socket.io-client';
let socket: Socket | null = null;
export const getSocket = (): Socket => {
if (!socket) {
socket = io(process.env.NEXT_PUBLIC_WS_URL || 'wss://api.mytradex.id', {
transports: ['websocket'],
withCredentials: true,
autoConnect: false,
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: Infinity,
timeout: 20000,
forceNew: true
});
}
return socket;
};
React Query Integration
Location: src/app/providers.tsx
export function Providers({ children }: { children: React.ReactNode }) {
const { user } = useAuth();
useEffect(() => {
const socket = getSocket();
// Connection event handlers
socket.on('connect', () => {
console.log('Connected to WebSocket');
if (user) {
socket.emit('subscribe', `order:${user.id}`);
socket.emit('subscribe', `market:ALL`);
}
});
socket.on('disconnect', () => {
console.log('Disconnected from WebSocket');
});
socket.on('connect_error', (error) => {
console.error('WebSocket connection error:', error);
});
// Message event handlers
socket.on('market:update', (data) => {
queryClient.setQueryData(['market', data.symbol], data);
});
socket.on('order:update', (data) => {
queryClient.invalidateQueries({ queryKey: ['orders'] });
});
socket.on('position:update', (data) => {
queryClient.setQueryData(['positions', data.userId], data);
});
return () => {
socket.off('connect');
socket.off('disconnect');
socket.off('connect_error');
socket.off('market:update');
socket.off('order:update');
socket.off('position:update');
};
}, [user, queryClient]);
return (
<QueryClientProvider client={queryClient}>
{children}
</QueryClientProvider>
);
}
Market Data Subscription
Location: src/components/trading/MarketWatch.tsx
const handleSubscribe = (symbol: string) => {
const socket = getSocket();
if (subscribedSymbols.has(symbol)) {
socket.emit('unsubscribe', `market:${symbol}`);
setSubscribedSymbols(prev => {
const newSet = new Set(prev);
newSet.delete(symbol);
return newSet;
});
} else {
socket.emit('subscribe', `market:${symbol}`);
setSubscribedSymbols(prev => new Set(prev).add(symbol));
}
};
Server-Side Implementation
Spring WebSocket Configuration
Location: mytradex-backend/service/src/main/java/com/mytradex/service/OrderService.java
@Service
@RequiredArgsConstructor
public class OrderService {
private final SimpMessagingTemplate messagingTemplate;
@Transactional
public Order createOrder(User user, String symbol, Order.OrderSide side,
Order.OrderType type, BigDecimal quantity,
BigDecimal price, BigDecimal stopPrice,
Order.TimeInForce timeInForce) {
// Create order logic...
Order savedOrder = orderRepository.save(order);
// Emit WebSocket events
messagingTemplate.convertAndSend("/topic/orders/" + user.getId(), savedOrder);
messagingTemplate.convertAndSend("/topic/market/" + symbol, savedOrder);
messagingTemplate.convertAndSend("/topic/orders/active", savedOrder);
return savedOrder;
}
@Transactional
public Order updateStatus(String protocolOrderId, Order.OrderStatus newStatus,
BigDecimal filledQuantity, BigDecimal fillPrice) {
Order order = orderRepository.findByProtocolOrderId(protocolOrderId)
.orElseThrow(() -> new IllegalArgumentException("Order not found"));
order.setStatus(newStatus);
if (filledQuantity != null && fillPrice != null) {
order.fill(filledQuantity, fillPrice);
}
Order updatedOrder = orderRepository.save(order);
// Broadcast status update
messagingTemplate.convertAndSend("/topic/orders/" + order.getUser().getId(), updatedOrder);
messagingTemplate.convertAndSend("/topic/market/" + order.getSymbol(), updatedOrder);
return updatedOrder;
}
}
Market Data Broadcasting
Location: mytradex-backend/service/src/main/java/com/mytradex/service/MarketDataService.java
@Service
public class MarketDataService {
private final SimpMessagingTemplate messagingTemplate;
public void broadcastMarketData(MarketData marketData) {
// Broadcast to all subscribers of this symbol
messagingTemplate.convertAndSend("/topic/market/" + marketData.getSymbol(), marketData);
// Broadcast to general market data topic
messagingTemplate.convertAndSend("/topic/market", marketData);
// Broadcast to price alert subscribers
messagingTemplate.convertAndSend("/topic/alerts/" + marketData.getSymbol(), marketData);
}
@Transactional
public MarketData ingestITCHData(String symbol, BigDecimal lastPrice,
BigDecimal bidPrice, BigDecimal askPrice,
BigDecimal bidQuantity, BigDecimal askQuantity,
Long volume) {
MarketData marketData = // ... processing logic
MarketData savedData = marketDataRepository.save(marketData);
// Real-time broadcast
broadcastMarketData(savedData);
// Trigger P&L recalculation
positionService.recalculatePnLForSymbol(symbol, lastPrice);
return savedData;
}
}
Message Types & Topics
1. Order Management Topics
Order Status Updates
// Topic: /topic/orders/{userId}
interface OrderUpdateMessage {
type: 'order_update';
data: {
orderId: string;
status: 'NEW' | 'PARTIAL' | 'FILLED' | 'CANCELLED' | 'REJECTED';
filledQuantity?: number;
averageFillPrice?: number;
remainingQuantity?: number;
timestamp: string;
};
}
// Topic: /topic/orders/active
interface ActiveOrderMessage {
type: 'active_order';
data: Order;
}
Order Execution Reports
// Topic: /topic/executions/{userId}
interface ExecutionReportMessage {
type: 'execution_report';
data: {
orderId: string;
tradeId: string;
quantity: number;
price: number;
side: 'BUY' | 'SELL';
timestamp: string;
commission?: number;
};
}
2. Market Data Topics
Real-time Price Updates
// Topic: /topic/market/{symbol}
interface MarketDataMessage {
type: 'market_data';
data: {
symbol: string;
lastPrice: number;
bidPrice?: number;
askPrice?: number;
bidQuantity?: number;
askQuantity?: number;
volume: number;
change24h?: number;
changePercent24h?: number;
timestamp: string;
};
}
// Topic: /topic/market
interface MarketSummaryMessage {
type: 'market_summary';
data: {
symbols: string[];
lastUpdate: string;
marketStatus: 'OPEN' | 'CLOSED' | 'PRE_MARKET' | 'AFTER_HOURS';
};
}
Order Book Updates
// Topic: /topic/orderbook/{symbol}
interface OrderBookMessage {
type: 'order_book_update';
data: {
symbol: string;
bids: [number, number][]; // [price, quantity]
asks: [number, number][];
timestamp: string;
};
}
3. Position & Portfolio Topics
Position Updates
// Topic: /topic/positions/{userId}
interface PositionUpdateMessage {
type: 'position_update';
data: {
positionId: string;
symbol: string;
quantity: number;
averageCost: number;
currentPrice: number;
unrealizedPnL: number;
realizedPnL: number;
timestamp: string;
};
}
Portfolio Summary
// Topic: /topic/portfolio/{userId}
interface PortfolioSummaryMessage {
type: 'portfolio_summary';
data: {
totalValue: number;
totalPnL: number;
dayPnL: number;
positions: Position[];
timestamp: string;
};
}
4. System Topics
Connection Management
// Topic: /topic/system/connections
interface ConnectionStatusMessage {
type: 'connection_status';
data: {
connected: boolean;
clientId: string;
subscriptions: string[];
lastActivity: string;
};
}
// Topic: /topic/system/maintenance
interface MaintenanceMessage {
type: 'maintenance';
data: {
message: string;
startTime: string;
expectedDuration: number;
affectedServices: string[];
};
}
Event-Driven Architecture
1. Order Event Flow
2. Market Data Event Flow
3. Connection Lifecycle
Connection Management
1. Connection Pooling
WebSocket Connection Pool:
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new TradingWebSocketHandler(), "/ws/trading")
.setAllowedOrigins("*")
.withSockJS();
}
}
@Component
public class TradingWebSocketHandler extends TextWebSocketHandler {
private final ConnectionPool connectionPool = new ConnectionPool();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
connectionPool.addConnection(session);
// Send initial market data
sendMarketDataSnapshot(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
connectionPool.removeConnection(session);
// Cleanup user subscriptions
cleanupUserSubscriptions(session);
}
}
2. Session Management
User Session Tracking:
@Service
public class SessionManager {
private final Map<String, Set<WebSocketSession>> userSessions = new ConcurrentHashMap<>();
private final Map<WebSocketSession, Set<String>> sessionSubscriptions = new ConcurrentHashMap<>();
public void registerSession(String userId, WebSocketSession session) {
userSessions.computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet()).add(session);
sessionSubscriptions.put(session, ConcurrentHashMap.newKeySet());
}
public void subscribeToTopic(WebSocketSession session, String topic) {
sessionSubscriptions.get(session).add(topic);
}
public void broadcastToUser(String userId, Object message) {
Set<WebSocketSession> sessions = userSessions.get(userId);
if (sessions != null) {
sessions.forEach(session -> {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(message)));
} catch (Exception e) {
log.error("Failed to send message to session", e);
}
}
});
}
}
}
3. Subscription Management
Topic Subscription Handler:
class SubscriptionManager {
private subscriptions = new Map<string, Set<string>>();
private socket: Socket;
subscribe(topic: string, callback: (data: any) => void): void {
this.socket.on(topic, callback);
if (!this.subscriptions.has(topic)) {
this.subscriptions.set(topic, new Set());
this.socket.emit('subscribe', topic);
}
}
unsubscribe(topic: string, callback?: (data: any) => void): void {
if (callback) {
this.socket.off(topic, callback);
}
const listeners = this.subscriptions.get(topic);
if (listeners && listeners.size === 0) {
this.subscriptions.delete(topic);
this.socket.emit('unsubscribe', topic);
}
}
}
Message Delivery Guarantees
1. At-Least-Once Delivery
Message Acknowledgment:
interface MessageAcknowledgment {
messageId: string;
timestamp: string;
status: 'delivered' | 'failed';
}
socket.on('message_ack', (ack: MessageAcknowledgment) => {
if (ack.status === 'failed') {
// Retry logic
retryMessage(ack.messageId);
}
});
Server-side Acknowledgment:
@Component
public class GuaranteedMessageSender {
private final Map<String, MessageStatus> pendingMessages = new ConcurrentHashMap<>();
public void sendGuaranteedMessage(WebSocketSession session, String messageId, Object payload) {
pendingMessages.put(messageId, MessageStatus.PENDING);
try {
TextMessage message = new TextMessage(objectMapper.writeValueAsString(payload));
session.sendMessage(message);
// Schedule acknowledgment check
scheduler.schedule(() -> checkAcknowledgment(messageId), 30, TimeUnit.SECONDS);
} catch (Exception e) {
pendingMessages.put(messageId, MessageStatus.FAILED);
log.error("Failed to send message: {}", messageId, e);
}
}
}
2. Message Ordering
Sequential Message Delivery:
class OrderedMessageHandler {
private messageQueue: Array<{id: string, data: any, sequence: number}> = [];
private expectedSequence = 0;
handleMessage(message: any & {sequence: number, id: string}): void {
if (message.sequence === this.expectedSequence) {
this.processMessage(message);
this.expectedSequence++;
this.processQueuedMessages();
} else if (message.sequence > this.expectedSequence) {
this.messageQueue.push(message);
this.messageQueue.sort((a, b) => a.sequence - b.sequence);
}
}
private processQueuedMessages(): void {
while (this.messageQueue.length > 0 &&
this.messageQueue[0].sequence === this.expectedSequence) {
const msg = this.messageQueue.shift()!;
this.processMessage(msg);
this.expectedSequence++;
}
}
}
Performance Optimization
1. Message Batching
Client-side Batching:
class BatchedMessageSender {
private batch: any[] = [];
private batchTimeout: NodeJS.Timeout | null = null;
private readonly batchSize = 100;
private readonly batchInterval = 100; // ms
sendMessage(message: any): void {
this.batch.push(message);
if (this.batch.length >= this.batchSize) {
this.flushBatch();
} else if (!this.batchTimeout) {
this.batchTimeout = setTimeout(() => this.flushBatch(), this.batchInterval);
}
}
private flushBatch(): void {
if (this.batch.length > 0) {
this.socket.emit('batch_messages', this.batch);
this.batch = [];
}
if (this.batchTimeout) {
clearTimeout(this.batchTimeout);
this.batchTimeout = null;
}
}
}
Server-side Batching:
@Service
public class BatchedMessageSender {
private final Map<String, List<Object>> messageBatches = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@PostConstruct
public void init() {
scheduler.scheduleAtFixedRate(this::flushBatches, 100, 100, TimeUnit.MILLISECONDS);
}
public void sendMessage(String topic, Object message) {
messageBatches.computeIfAbsent(topic, k -> new ArrayList<>()).add(message);
}
private void flushBatches() {
messageBatches.forEach((topic, messages) -> {
if (!messages.isEmpty()) {
messagingTemplate.convertAndSend(topic, messages);
messages.clear();
}
});
}
}
2. Message Compression
Compression Configuration:
const socket = io(serverUrl, {
transports: ['websocket'],
compression: true,
perMessageDeflate: {
threshold: 1024, // compress messages larger than 1KB
zlibDeflateOptions: {
level: 6 // compression level
}
}
});
Server-side Compression:
@Configuration
public class WebSocketCompressionConfig {
@Bean
public WebSocketHandlerDecorator compressionDecorator() {
return new WebSocketHandlerDecorator(new TextWebSocketHandler()) {
@Override
public void afterConnectionEstablished(WebSocketSession session) {
super.afterConnectionEstablished(session);
// Enable compression if client supports it
String acceptEncoding = session.getAttributes().get("Sec-WebSocket-Extensions");
if (acceptEncoding != null && acceptEncoding.contains("permessage-deflate")) {
session.getAttributes().put("compression.enabled", true);
}
}
};
}
}
3. Memory Management
Connection Cleanup:
@Component
public class ConnectionCleanupService {
@Scheduled(fixedRate = 30000) // Every 30 seconds
public void cleanupStaleConnections() {
Set<WebSocketSession> staleSessions = connectionPool.getStaleSessions(300000); // 5 minutes
staleSessions.forEach(session -> {
try {
log.info("Closing stale session: {}", session.getId());
session.close(CloseStatus.SERVICE_RESTARTED);
} catch (Exception e) {
log.error("Error closing stale session", e);
}
});
}
}
Security Considerations
1. Connection Authentication
Token-based Authentication:
// WebSocket authentication
socket.on('connect', () => {
const token = getAuthToken();
if (token) {
socket.emit('authenticate', { token });
}
});
// Server-side validation
@Component
public class WebSocketAuthenticationInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String token = accessor.getFirstNativeHeader("Authorization");
if (!jwtService.isTokenValid(token)) {
throw new AuthenticationException("Invalid token");
}
// Set user context
User user = jwtService.getUserFromToken(token);
accessor.setUser(new UserPrincipal(user));
}
return message;
}
}
2. Topic Authorization
Subscription Authorization:
// Client-side authorization check
function authorizeSubscription(topic: string): boolean {
const user = getCurrentUser();
if (topic.startsWith('/topic/admin/') && user?.role !== 'ADMIN') {
return false;
}
if (topic.includes(user.id) || user.role === 'ADMIN') {
return true;
}
return false;
}
// Server-side authorization
@EventListener
public void handleSubscribeEvent(SessionSubscribeEvent event) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(event.getMessage());
String destination = headers.getDestination();
StompHeaderAccessor sessionHeaders = StompHeaderAccessor.wrap(event.getSession());
User user = (User) sessionHeaders.getSessionAttributes().get("user");
if (!authorizationService.isAuthorized(user, destination)) {
throw new AccessDeniedException("Not authorized for topic: " + destination);
}
}
3. Rate Limiting
Message Rate Limiting:
@Component
public class WebSocketRateLimiter {
private final Map<String, RateLimiter> userRateLimiters = new ConcurrentHashMap<>();
public boolean allowMessage(String userId, int messageSize) {
RateLimiter limiter = userRateLimiters.computeIfAbsent(userId,
k -> RateLimiter.create(100)); // 100 messages per second
return limiter.tryAcquire() && messageSize <= MAX_MESSAGE_SIZE;
}
}
Monitoring & Observability
1. Connection Metrics
Connection Monitoring:
@Component
public class WebSocketMetrics {
private final Counter activeConnections;
private final Timer connectionDuration;
private final Counter messagesSent;
private final Counter messagesReceived;
public WebSocketMetrics(MeterRegistry registry) {
this.activeConnections = Counter.builder("websocket.connections.active")
.description("Active WebSocket connections")
.register(registry);
this.connectionDuration = Timer.builder("websocket.connection.duration")
.description("WebSocket connection duration")
.register(registry);
this.messagesSent = Counter.builder("websocket.messages.sent")
.description("Messages sent via WebSocket")
.register(registry);
this.messagesReceived = Counter.builder("websocket.messages.received")
.description("Messages received via WebSocket")
.register(registry);
}
}
2. Message Analytics
Message Tracking:
class MessageAnalytics {
private messageCount = 0;
private messageLatency: number[] = [];
private errorCount = 0;
trackMessageSend(messageType: string, startTime: number): void {
this.messageCount++;
this.messagesSent.increment();
}
trackMessageReceive(messageType: string, startTime: number): void {
const latency = Date.now() - startTime;
this.messageLatency.push(latency);
if (latency > 1000) {
console.warn(`High latency message: ${messageType} took ${latency}ms`);
}
}
getMetrics() {
return {
totalMessages: this.messageCount,
averageLatency: this.messageLatency.reduce((a, b) => a + b, 0) / this.messageLatency.length,
errorRate: this.errorCount / this.messageCount,
latencyPercentiles: this.calculatePercentiles()
};
}
}
3. Health Checks
WebSocket Health Check:
@Component
public class WebSocketHealthIndicator implements HealthIndicator {
@Override
public Health health() {
int activeConnections = connectionManager.getActiveConnections();
int maxConnections = connectionManager.getMaxConnections();
Map<String, Object> details = Map.of(
"activeConnections", activeConnections,
"maxConnections", maxConnections,
"connectionUtilization", (double) activeConnections / maxConnections
);
if (activeConnections > maxConnections * 0.9) {
return Health.down()
.withDetails(details)
.withStatus("CRITICAL")
.withException(new RuntimeException("High connection load"))
.build();
}
return Health.up()
.withDetails(details)
.build();
}
}
Next: Security Architecture
Continue to Security Architecture for detailed authentication, authorization, and security implementation documentation.