Skip to main content

Real-time Communication Layer

Overview

The MyTradeX platform employs a sophisticated real-time communication architecture to deliver low-latency trading data, order updates, and system notifications across multiple client platforms. This layer is critical for providing traders with real-time market data and immediate feedback on their trading activities.

Architecture Philosophy

Core Principles

  • Low Latency: Sub-100ms message delivery for critical trading updates
  • High Reliability: Automatic reconnection and message recovery
  • Scalability: Support for thousands of concurrent connections
  • Multi-platform: Consistent real-time experience across web, mobile, and desktop
  • Event-driven: Loose coupling through event-based communication
  • Fault Tolerance: Graceful handling of network failures

Technology Stack

ComponentTechnologyPurpose
WebSocket FrameworkSocket.IO (Client), Spring WebSocket (Server)Real-time bidirectional communication
Message BrokerSTOMP over WebSocketStructured message routing
State ManagementTanStack React QueryClient-side data synchronization
Connection ManagementCustom connection poolingConnection lifecycle management
Message SerializationJSONCross-platform message format

WebSocket Architecture

Client-Side Implementation

WebSocket Client Configuration

Location: src/lib/socket.ts, mytradex-monorepo/packages/api/src/socketClient.ts

import { io, Socket } from 'socket.io-client';

let socket: Socket | null = null;

export const getSocket = (): Socket => {
if (!socket) {
socket = io(process.env.NEXT_PUBLIC_WS_URL || 'wss://api.mytradex.id', {
transports: ['websocket'],
withCredentials: true,
autoConnect: false,
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: Infinity,
timeout: 20000,
forceNew: true
});
}
return socket;
};

React Query Integration

Location: src/app/providers.tsx

export function Providers({ children }: { children: React.ReactNode }) {
const { user } = useAuth();

useEffect(() => {
const socket = getSocket();

// Connection event handlers
socket.on('connect', () => {
console.log('Connected to WebSocket');
if (user) {
socket.emit('subscribe', `order:${user.id}`);
socket.emit('subscribe', `market:ALL`);
}
});

socket.on('disconnect', () => {
console.log('Disconnected from WebSocket');
});

socket.on('connect_error', (error) => {
console.error('WebSocket connection error:', error);
});

// Message event handlers
socket.on('market:update', (data) => {
queryClient.setQueryData(['market', data.symbol], data);
});

socket.on('order:update', (data) => {
queryClient.invalidateQueries({ queryKey: ['orders'] });
});

socket.on('position:update', (data) => {
queryClient.setQueryData(['positions', data.userId], data);
});

return () => {
socket.off('connect');
socket.off('disconnect');
socket.off('connect_error');
socket.off('market:update');
socket.off('order:update');
socket.off('position:update');
};
}, [user, queryClient]);

return (
<QueryClientProvider client={queryClient}>
{children}
</QueryClientProvider>
);
}

Market Data Subscription

Location: src/components/trading/MarketWatch.tsx

const handleSubscribe = (symbol: string) => {
const socket = getSocket();

if (subscribedSymbols.has(symbol)) {
socket.emit('unsubscribe', `market:${symbol}`);
setSubscribedSymbols(prev => {
const newSet = new Set(prev);
newSet.delete(symbol);
return newSet;
});
} else {
socket.emit('subscribe', `market:${symbol}`);
setSubscribedSymbols(prev => new Set(prev).add(symbol));
}
};

Server-Side Implementation

Spring WebSocket Configuration

Location: mytradex-backend/service/src/main/java/com/mytradex/service/OrderService.java

@Service
@RequiredArgsConstructor
public class OrderService {

private final SimpMessagingTemplate messagingTemplate;

@Transactional
public Order createOrder(User user, String symbol, Order.OrderSide side,
Order.OrderType type, BigDecimal quantity,
BigDecimal price, BigDecimal stopPrice,
Order.TimeInForce timeInForce) {

// Create order logic...
Order savedOrder = orderRepository.save(order);

// Emit WebSocket events
messagingTemplate.convertAndSend("/topic/orders/" + user.getId(), savedOrder);
messagingTemplate.convertAndSend("/topic/market/" + symbol, savedOrder);
messagingTemplate.convertAndSend("/topic/orders/active", savedOrder);

return savedOrder;
}

@Transactional
public Order updateStatus(String protocolOrderId, Order.OrderStatus newStatus,
BigDecimal filledQuantity, BigDecimal fillPrice) {

Order order = orderRepository.findByProtocolOrderId(protocolOrderId)
.orElseThrow(() -> new IllegalArgumentException("Order not found"));

order.setStatus(newStatus);
if (filledQuantity != null && fillPrice != null) {
order.fill(filledQuantity, fillPrice);
}

Order updatedOrder = orderRepository.save(order);

// Broadcast status update
messagingTemplate.convertAndSend("/topic/orders/" + order.getUser().getId(), updatedOrder);
messagingTemplate.convertAndSend("/topic/market/" + order.getSymbol(), updatedOrder);

return updatedOrder;
}
}

Market Data Broadcasting

Location: mytradex-backend/service/src/main/java/com/mytradex/service/MarketDataService.java

@Service
public class MarketDataService {

private final SimpMessagingTemplate messagingTemplate;

public void broadcastMarketData(MarketData marketData) {
// Broadcast to all subscribers of this symbol
messagingTemplate.convertAndSend("/topic/market/" + marketData.getSymbol(), marketData);

// Broadcast to general market data topic
messagingTemplate.convertAndSend("/topic/market", marketData);

// Broadcast to price alert subscribers
messagingTemplate.convertAndSend("/topic/alerts/" + marketData.getSymbol(), marketData);
}

@Transactional
public MarketData ingestITCHData(String symbol, BigDecimal lastPrice,
BigDecimal bidPrice, BigDecimal askPrice,
BigDecimal bidQuantity, BigDecimal askQuantity,
Long volume) {

MarketData marketData = // ... processing logic

MarketData savedData = marketDataRepository.save(marketData);

// Real-time broadcast
broadcastMarketData(savedData);

// Trigger P&L recalculation
positionService.recalculatePnLForSymbol(symbol, lastPrice);

return savedData;
}
}

Message Types & Topics

1. Order Management Topics

Order Status Updates

// Topic: /topic/orders/{userId}
interface OrderUpdateMessage {
type: 'order_update';
data: {
orderId: string;
status: 'NEW' | 'PARTIAL' | 'FILLED' | 'CANCELLED' | 'REJECTED';
filledQuantity?: number;
averageFillPrice?: number;
remainingQuantity?: number;
timestamp: string;
};
}

// Topic: /topic/orders/active
interface ActiveOrderMessage {
type: 'active_order';
data: Order;
}

Order Execution Reports

// Topic: /topic/executions/{userId}
interface ExecutionReportMessage {
type: 'execution_report';
data: {
orderId: string;
tradeId: string;
quantity: number;
price: number;
side: 'BUY' | 'SELL';
timestamp: string;
commission?: number;
};
}

2. Market Data Topics

Real-time Price Updates

// Topic: /topic/market/{symbol}
interface MarketDataMessage {
type: 'market_data';
data: {
symbol: string;
lastPrice: number;
bidPrice?: number;
askPrice?: number;
bidQuantity?: number;
askQuantity?: number;
volume: number;
change24h?: number;
changePercent24h?: number;
timestamp: string;
};
}

// Topic: /topic/market
interface MarketSummaryMessage {
type: 'market_summary';
data: {
symbols: string[];
lastUpdate: string;
marketStatus: 'OPEN' | 'CLOSED' | 'PRE_MARKET' | 'AFTER_HOURS';
};
}

Order Book Updates

// Topic: /topic/orderbook/{symbol}
interface OrderBookMessage {
type: 'order_book_update';
data: {
symbol: string;
bids: [number, number][]; // [price, quantity]
asks: [number, number][];
timestamp: string;
};
}

3. Position & Portfolio Topics

Position Updates

// Topic: /topic/positions/{userId}
interface PositionUpdateMessage {
type: 'position_update';
data: {
positionId: string;
symbol: string;
quantity: number;
averageCost: number;
currentPrice: number;
unrealizedPnL: number;
realizedPnL: number;
timestamp: string;
};
}

Portfolio Summary

// Topic: /topic/portfolio/{userId}
interface PortfolioSummaryMessage {
type: 'portfolio_summary';
data: {
totalValue: number;
totalPnL: number;
dayPnL: number;
positions: Position[];
timestamp: string;
};
}

4. System Topics

Connection Management

// Topic: /topic/system/connections
interface ConnectionStatusMessage {
type: 'connection_status';
data: {
connected: boolean;
clientId: string;
subscriptions: string[];
lastActivity: string;
};
}

// Topic: /topic/system/maintenance
interface MaintenanceMessage {
type: 'maintenance';
data: {
message: string;
startTime: string;
expectedDuration: number;
affectedServices: string[];
};
}

Event-Driven Architecture

1. Order Event Flow

2. Market Data Event Flow

3. Connection Lifecycle

Connection Management

1. Connection Pooling

WebSocket Connection Pool:

@Configuration
public class WebSocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new TradingWebSocketHandler(), "/ws/trading")
.setAllowedOrigins("*")
.withSockJS();
}
}

@Component
public class TradingWebSocketHandler extends TextWebSocketHandler {

private final ConnectionPool connectionPool = new ConnectionPool();

@Override
public void afterConnectionEstablished(WebSocketSession session) {
connectionPool.addConnection(session);
// Send initial market data
sendMarketDataSnapshot(session);
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
connectionPool.removeConnection(session);
// Cleanup user subscriptions
cleanupUserSubscriptions(session);
}
}

2. Session Management

User Session Tracking:

@Service
public class SessionManager {

private final Map<String, Set<WebSocketSession>> userSessions = new ConcurrentHashMap<>();
private final Map<WebSocketSession, Set<String>> sessionSubscriptions = new ConcurrentHashMap<>();

public void registerSession(String userId, WebSocketSession session) {
userSessions.computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet()).add(session);
sessionSubscriptions.put(session, ConcurrentHashMap.newKeySet());
}

public void subscribeToTopic(WebSocketSession session, String topic) {
sessionSubscriptions.get(session).add(topic);
}

public void broadcastToUser(String userId, Object message) {
Set<WebSocketSession> sessions = userSessions.get(userId);
if (sessions != null) {
sessions.forEach(session -> {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(message)));
} catch (Exception e) {
log.error("Failed to send message to session", e);
}
}
});
}
}
}

3. Subscription Management

Topic Subscription Handler:

class SubscriptionManager {
private subscriptions = new Map<string, Set<string>>();
private socket: Socket;

subscribe(topic: string, callback: (data: any) => void): void {
this.socket.on(topic, callback);

if (!this.subscriptions.has(topic)) {
this.subscriptions.set(topic, new Set());
this.socket.emit('subscribe', topic);
}
}

unsubscribe(topic: string, callback?: (data: any) => void): void {
if (callback) {
this.socket.off(topic, callback);
}

const listeners = this.subscriptions.get(topic);
if (listeners && listeners.size === 0) {
this.subscriptions.delete(topic);
this.socket.emit('unsubscribe', topic);
}
}
}

Message Delivery Guarantees

1. At-Least-Once Delivery

Message Acknowledgment:

interface MessageAcknowledgment {
messageId: string;
timestamp: string;
status: 'delivered' | 'failed';
}

socket.on('message_ack', (ack: MessageAcknowledgment) => {
if (ack.status === 'failed') {
// Retry logic
retryMessage(ack.messageId);
}
});

Server-side Acknowledgment:

@Component
public class GuaranteedMessageSender {

private final Map<String, MessageStatus> pendingMessages = new ConcurrentHashMap<>();

public void sendGuaranteedMessage(WebSocketSession session, String messageId, Object payload) {
pendingMessages.put(messageId, MessageStatus.PENDING);

try {
TextMessage message = new TextMessage(objectMapper.writeValueAsString(payload));
session.sendMessage(message);

// Schedule acknowledgment check
scheduler.schedule(() -> checkAcknowledgment(messageId), 30, TimeUnit.SECONDS);

} catch (Exception e) {
pendingMessages.put(messageId, MessageStatus.FAILED);
log.error("Failed to send message: {}", messageId, e);
}
}
}

2. Message Ordering

Sequential Message Delivery:

class OrderedMessageHandler {
private messageQueue: Array<{id: string, data: any, sequence: number}> = [];
private expectedSequence = 0;

handleMessage(message: any & {sequence: number, id: string}): void {
if (message.sequence === this.expectedSequence) {
this.processMessage(message);
this.expectedSequence++;
this.processQueuedMessages();
} else if (message.sequence > this.expectedSequence) {
this.messageQueue.push(message);
this.messageQueue.sort((a, b) => a.sequence - b.sequence);
}
}

private processQueuedMessages(): void {
while (this.messageQueue.length > 0 &&
this.messageQueue[0].sequence === this.expectedSequence) {
const msg = this.messageQueue.shift()!;
this.processMessage(msg);
this.expectedSequence++;
}
}
}

Performance Optimization

1. Message Batching

Client-side Batching:

class BatchedMessageSender {
private batch: any[] = [];
private batchTimeout: NodeJS.Timeout | null = null;
private readonly batchSize = 100;
private readonly batchInterval = 100; // ms

sendMessage(message: any): void {
this.batch.push(message);

if (this.batch.length >= this.batchSize) {
this.flushBatch();
} else if (!this.batchTimeout) {
this.batchTimeout = setTimeout(() => this.flushBatch(), this.batchInterval);
}
}

private flushBatch(): void {
if (this.batch.length > 0) {
this.socket.emit('batch_messages', this.batch);
this.batch = [];
}

if (this.batchTimeout) {
clearTimeout(this.batchTimeout);
this.batchTimeout = null;
}
}
}

Server-side Batching:

@Service
public class BatchedMessageSender {

private final Map<String, List<Object>> messageBatches = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

@PostConstruct
public void init() {
scheduler.scheduleAtFixedRate(this::flushBatches, 100, 100, TimeUnit.MILLISECONDS);
}

public void sendMessage(String topic, Object message) {
messageBatches.computeIfAbsent(topic, k -> new ArrayList<>()).add(message);
}

private void flushBatches() {
messageBatches.forEach((topic, messages) -> {
if (!messages.isEmpty()) {
messagingTemplate.convertAndSend(topic, messages);
messages.clear();
}
});
}
}

2. Message Compression

Compression Configuration:

const socket = io(serverUrl, {
transports: ['websocket'],
compression: true,
perMessageDeflate: {
threshold: 1024, // compress messages larger than 1KB
zlibDeflateOptions: {
level: 6 // compression level
}
}
});

Server-side Compression:

@Configuration
public class WebSocketCompressionConfig {

@Bean
public WebSocketHandlerDecorator compressionDecorator() {
return new WebSocketHandlerDecorator(new TextWebSocketHandler()) {
@Override
public void afterConnectionEstablished(WebSocketSession session) {
super.afterConnectionEstablished(session);

// Enable compression if client supports it
String acceptEncoding = session.getAttributes().get("Sec-WebSocket-Extensions");
if (acceptEncoding != null && acceptEncoding.contains("permessage-deflate")) {
session.getAttributes().put("compression.enabled", true);
}
}
};
}
}

3. Memory Management

Connection Cleanup:

@Component
public class ConnectionCleanupService {

@Scheduled(fixedRate = 30000) // Every 30 seconds
public void cleanupStaleConnections() {
Set<WebSocketSession> staleSessions = connectionPool.getStaleSessions(300000); // 5 minutes

staleSessions.forEach(session -> {
try {
log.info("Closing stale session: {}", session.getId());
session.close(CloseStatus.SERVICE_RESTARTED);
} catch (Exception e) {
log.error("Error closing stale session", e);
}
});
}
}

Security Considerations

1. Connection Authentication

Token-based Authentication:

// WebSocket authentication
socket.on('connect', () => {
const token = getAuthToken();
if (token) {
socket.emit('authenticate', { token });
}
});

// Server-side validation
@Component
public class WebSocketAuthenticationInterceptor implements ChannelInterceptor {

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);

if (StompCommand.CONNECT.equals(accessor.getCommand())) {
String token = accessor.getFirstNativeHeader("Authorization");
if (!jwtService.isTokenValid(token)) {
throw new AuthenticationException("Invalid token");
}

// Set user context
User user = jwtService.getUserFromToken(token);
accessor.setUser(new UserPrincipal(user));
}

return message;
}
}

2. Topic Authorization

Subscription Authorization:

// Client-side authorization check
function authorizeSubscription(topic: string): boolean {
const user = getCurrentUser();

if (topic.startsWith('/topic/admin/') && user?.role !== 'ADMIN') {
return false;
}

if (topic.includes(user.id) || user.role === 'ADMIN') {
return true;
}

return false;
}

// Server-side authorization
@EventListener
public void handleSubscribeEvent(SessionSubscribeEvent event) {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(event.getMessage());
String destination = headers.getDestination();
StompHeaderAccessor sessionHeaders = StompHeaderAccessor.wrap(event.getSession());
User user = (User) sessionHeaders.getSessionAttributes().get("user");

if (!authorizationService.isAuthorized(user, destination)) {
throw new AccessDeniedException("Not authorized for topic: " + destination);
}
}

3. Rate Limiting

Message Rate Limiting:

@Component
public class WebSocketRateLimiter {

private final Map<String, RateLimiter> userRateLimiters = new ConcurrentHashMap<>();

public boolean allowMessage(String userId, int messageSize) {
RateLimiter limiter = userRateLimiters.computeIfAbsent(userId,
k -> RateLimiter.create(100)); // 100 messages per second

return limiter.tryAcquire() && messageSize <= MAX_MESSAGE_SIZE;
}
}

Monitoring & Observability

1. Connection Metrics

Connection Monitoring:

@Component
public class WebSocketMetrics {

private final Counter activeConnections;
private final Timer connectionDuration;
private final Counter messagesSent;
private final Counter messagesReceived;

public WebSocketMetrics(MeterRegistry registry) {
this.activeConnections = Counter.builder("websocket.connections.active")
.description("Active WebSocket connections")
.register(registry);

this.connectionDuration = Timer.builder("websocket.connection.duration")
.description("WebSocket connection duration")
.register(registry);

this.messagesSent = Counter.builder("websocket.messages.sent")
.description("Messages sent via WebSocket")
.register(registry);

this.messagesReceived = Counter.builder("websocket.messages.received")
.description("Messages received via WebSocket")
.register(registry);
}
}

2. Message Analytics

Message Tracking:

class MessageAnalytics {
private messageCount = 0;
private messageLatency: number[] = [];
private errorCount = 0;

trackMessageSend(messageType: string, startTime: number): void {
this.messageCount++;
this.messagesSent.increment();
}

trackMessageReceive(messageType: string, startTime: number): void {
const latency = Date.now() - startTime;
this.messageLatency.push(latency);

if (latency > 1000) {
console.warn(`High latency message: ${messageType} took ${latency}ms`);
}
}

getMetrics() {
return {
totalMessages: this.messageCount,
averageLatency: this.messageLatency.reduce((a, b) => a + b, 0) / this.messageLatency.length,
errorRate: this.errorCount / this.messageCount,
latencyPercentiles: this.calculatePercentiles()
};
}
}

3. Health Checks

WebSocket Health Check:

@Component
public class WebSocketHealthIndicator implements HealthIndicator {

@Override
public Health health() {
int activeConnections = connectionManager.getActiveConnections();
int maxConnections = connectionManager.getMaxConnections();

Map<String, Object> details = Map.of(
"activeConnections", activeConnections,
"maxConnections", maxConnections,
"connectionUtilization", (double) activeConnections / maxConnections
);

if (activeConnections > maxConnections * 0.9) {
return Health.down()
.withDetails(details)
.withStatus("CRITICAL")
.withException(new RuntimeException("High connection load"))
.build();
}

return Health.up()
.withDetails(details)
.build();
}
}

Next: Security Architecture

Continue to Security Architecture for detailed authentication, authorization, and security implementation documentation.