Komunikasi Real-Time Apache Fineract
Ringkasan Eksekutif
Apache Fineract mengimplementasikan pendekatan hybrid untuk komunikasi real-time yang menggabungkan event-driven architecture, batch processing, dan trigger-based notifications. Arsitektur ini memungkinkan sistem untuk memberikan respons real-time untuk operasi keuangan kritis sambil mempertahankan reliability dan consistency dalam lingkungan enterprise.
Arsitektur Komunikasi Real-Time
Model Hybrid Communication
1. Event-Driven Architecture
Internal Event System
Event Publisher Implementation
@Component
public class RealTimeEventPublisher {
private final ApplicationEventPublisher applicationEventPublisher;
private final MessageTemplate messageTemplate;
private final SimpMessagingTemplate webSocketTemplate;
private final ReactiveStreamsMessageSender reactiveStreamsMessageSender;
@Autowired
public RealTimeEventPublisher(
ApplicationEventPublisher applicationEventPublisher,
MessageTemplate messageTemplate,
SimpMessagingTemplate webSocketTemplate,
ReactiveStreamsMessageSender reactiveStreamsMessageSender) {
this.applicationEventPublisher = applicationEventPublisher;
this.messageTemplate = messageTemplate;
this.webSocketTemplate = webSocketTemplate;
this.reactiveStreamsMessageSender = reactiveStreamsMessageSender;
}
public void publishLoanStatusChangeEvent(LoanStatusChangeEvent event) {
// Publish internal event for same JVM
applicationEventPublisher.publishEvent(event);
// Send to message queue for inter-service communication
messageTemplate.convertAndSend("/topic/loan-status", event);
// Send to WebSocket clients
webSocketTemplate.convertAndSend("/app/loan/status", event);
// Send via reactive streams for real-time processing
reactiveStreamsMessageSender.sendEvent(event);
}
public void publishPaymentProcessedEvent(PaymentProcessedEvent event) {
applicationEventPublisher.publishEvent(event);
messageTemplate.convertAndSend("/topic/payments", event);
// Real-time balance update
sendBalanceUpdateEvent(event);
// Trigger notifications
triggerPaymentNotifications(event);
}
private void sendBalanceUpdateEvent(PaymentProcessedEvent event) {
BalanceUpdateEvent balanceEvent = BalanceUpdateEvent.builder()
.clientId(event.getClientId())
.accountType(event.getAccountType())
.newBalance(event.getNewBalance())
.transactionAmount(event.getAmount())
.timestamp(Instant.now())
.build();
webSocketTemplate.convertAndSend("/app/balance/update", balanceEvent);
}
private void triggerPaymentNotifications(PaymentProcessedEvent event) {
// Send notifications to different channels
CompletableFuture.runAsync(() -> {
sendWebNotification(event);
sendEmailNotification(event);
sendSMSNotification(event);
});
}
}
Event Types dan Payloads
// Core Event Types
public interface RealTimeEvent {
String getEventType();
String getTenantId();
Long getEntityId();
Instant getTimestamp();
String getCorrelationId();
String getUserId();
}
@Component
public class LoanStatusChangeEvent extends ApplicationEvent implements RealTimeEvent {
private final String eventType = "LOAN_STATUS_CHANGE";
private final Long loanId;
private final String oldStatus;
private final String newStatus;
private final String triggeredBy;
private final String reason;
public LoanStatusChangeEvent(Object source, Long loanId, String oldStatus,
String newStatus, String triggeredBy, String reason) {
super(source);
this.loanId = loanId;
this.oldStatus = oldStatus;
this.newStatus = newStatus;
this.triggeredBy = triggeredBy;
this.reason = reason;
}
// Getters and implementation of RealTimeEvent interface
@Override
public String getEventType() { return eventType; }
@Override
public Long getEntityId() { return loanId; }
}
@Component
public class PaymentProcessedEvent extends ApplicationEvent implements RealTimeEvent {
private final String eventType = "PAYMENT_PROCESSED";
private final Long paymentId;
private final Long clientId;
private final String accountType;
private final BigDecimal amount;
private final BigDecimal newBalance;
private final String paymentType;
public PaymentProcessedEvent(Object source, Long paymentId, Long clientId,
String accountType, BigDecimal amount,
BigDecimal newBalance, String paymentType) {
super(source);
this.paymentId = paymentId;
this.clientId = clientId;
this.accountType = accountType;
this.amount = amount;
this.newBalance = newBalance;
this.paymentType = paymentType;
}
// Getters and implementation
@Override
public String getEventType() { return eventType; }
@Override
public Long getEntityId() { return paymentId; }
}
WebSocket Implementation
WebSocket Configuration
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/fineract-ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ClientInboundChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
String sessionId = accessor.getSessionId();
String tenantId = accessor.getNativeHeader("X-Tenant-ID")?.get(0);
String userId = accessor.getNativeHeader("X-User-ID")?.get(0);
// Validate tenant and user access
if (!validateTenantAccess(tenantId, sessionId)) {
throw new UnauthorizedException("Invalid tenant access");
}
return message;
}
});
}
}
WebSocket Event Handlers
@Controller
public class RealTimeEventController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private UserSessionManager userSessionManager;
@MessageMapping("/subscribe/{entityType}")
public void subscribeToEntity(Principal principal, @DestinationVariable String entityType,
MessageHeaders headers) {
String userId = ((Authentication) principal).getName();
String sessionId = headers.get("simpSessionId", String.class);
// Store user subscription
userSessionManager.addSubscription(userId, sessionId, entityType);
// Send initial data
sendInitialData(entityType, userId, sessionId);
}
@MessageMapping("/unsubscribe/{entityType}")
public void unsubscribeFromEntity(Principal principal, @DestinationVariable String entityType,
MessageHeaders headers) {
String userId = ((Authentication) principal).getName();
String sessionId = headers.get("simpSessionId", String.class);
userSessionManager.removeSubscription(userId, sessionId, entityType);
}
@EventListener
public void handleLoanStatusChange(LoanStatusChangeEvent event) {
String message = buildStatusChangeMessage(event);
// Send to all users subscribed to this loan
Set<String> subscribers = getLoanSubscribers(event.getLoanId());
for (String userId : subscribers) {
messagingTemplate.convertAndSendToUser(
userId, "/queue/loan-status", message);
}
// Send to topic for group notifications
messagingTemplate.convertAndSend("/topic/loans/status", message);
}
@EventListener
public void handleBalanceUpdate(BalanceUpdateEvent event) {
String message = buildBalanceUpdateMessage(event);
// Send real-time balance update
messagingTemplate.convertAndSendToUser(
event.getUserId(), "/queue/balance", message);
// Send to dashboard for staff users
Set<String> staffUsers = getStaffUsersByOffice(event.getOfficeId());
for (String userId : staffUsers) {
messagingTemplate.convertAndSendToUser(
userId, "/queue/dashboard-update", message);
}
}
private String buildStatusChangeMessage(LoanStatusChangeEvent event) {
return JsonBuilder.create()
.put("eventType", event.getEventType())
.put("loanId", event.getLoanId())
.put("oldStatus", event.getOldStatus())
.put("newStatus", event.getNewStatus())
.put("timestamp", event.getTimestamp().toEpochMilli())
.build();
}
private Set<String> getLoanSubscribers(Long loanId) {
// Get all users subscribed to this loan
return userSessionManager.getSubscribers("LOAN_" + loanId);
}
}
2. Message Queue Integration
Apache Kafka Implementation
Kafka Producer Configuration
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.tenant.header}")
private String tenantHeader;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
configProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerInterceptor<String, Object> tenantHeaderInterceptor() {
return new TenantHeaderProducerInterceptor(tenantHeader);
}
}
Kafka Event Producer
@Component
public class KafkaEventProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Autowired
public KafkaEventProducer(KafkaTemplate<String, Object> kafkaTemplate, ObjectMapper objectMapper) {
this.kafkaTemplate = kafkaTemplate;
this.objectMapper = objectMapper;
}
public void sendLoanEvent(String eventType, Loan loan, String tenantId) {
LoanEvent loanEvent = LoanEvent.builder()
.eventType(eventType)
.loanId(loan.getId())
.clientId(loan.getClient().getId())
.principal(loan.getPrincipal())
.status(loan.getStatus())
.tenantId(tenantId)
.timestamp(Instant.now())
.correlationId(generateCorrelationId())
.build();
String topic = String.format("fineract-loans-%s", tenantId);
ListenableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(topic, loan.getId().toString(), loanEvent);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Loan event sent successfully: {} to topic: {}", eventType, topic);
}
@Override
public void onFailure(Throwable ex) {
log.error("Failed to send loan event: {}", eventType, ex);
handleEventFailure(loanEvent, ex);
}
});
}
public void sendPaymentEvent(String eventType, Payment payment, String tenantId) {
PaymentEvent paymentEvent = PaymentEvent.builder()
.eventType(eventType)
.paymentId(payment.getId())
.clientId(payment.getClientId())
.loanId(payment.getLoanId())
.amount(payment.getAmount())
.paymentType(payment.getPaymentType())
.tenantId(tenantId)
.timestamp(Instant.now())
.correlationId(generateCorrelationId())
.build();
String topic = String.format("fineract-payments-%s", tenantId);
kafkaTemplate.send(topic, payment.getId().toString(), paymentEvent);
}
public void sendAccountEvent(String eventType, Account account, String tenantId) {
AccountEvent accountEvent = AccountEvent.builder()
.eventType(eventType)
.accountId(account.getId())
.clientId(account.getClientId())
.accountType(account.getType())
.balance(account.getBalance())
.tenantId(tenantId)
.timestamp(Instant.now())
.correlationId(generateCorrelationId())
.build();
String topic = String.format("fineract-accounts-%s", tenantId);
kafkaTemplate.send(topic, account.getId().toString(), accountEvent);
}
private String generateCorrelationId() {
return UUID.randomUUID().toString();
}
private void handleEventFailure(Object event, Throwable ex) {
// Implement retry logic or dead letter queue
log.error("Event failed, triggering compensation: {}", event, ex);
triggerCompensation(event);
}
}
Kafka Event Consumer
@Component
public class KafkaEventConsumer {
private final NotificationService notificationService;
private final ExternalIntegrationService integrationService;
private final AuditService auditService;
@KafkaListener(topics = "#{'${kafka.topics.loans.pattern}'.format('${tenant.id}')}")
public void handleLoanEvent(ConsumerRecord<String, LoanEvent> record) {
LoanEvent event = record.value();
String tenantId = event.getTenantId();
try {
// Process event based on type
switch (event.getEventType()) {
case "LOAN_DISBURSED":
handleLoanDisbursed(event);
break;
case "LOAN_APPROVED":
handleLoanApproved(event);
break;
case "LOAN_REPAYMENT":
handleLoanRepayment(event);
break;
case "LOAN_STATUS_CHANGE":
handleLoanStatusChange(event);
break;
}
// Send notifications
sendLoanNotifications(event);
// Trigger external integrations
triggerExternalIntegrations(event);
// Audit trail
auditService.logEvent(event);
} catch (Exception e) {
log.error("Error processing loan event: {}", event, e);
handleEventError(event, e);
}
}
@KafkaListener(topics = "#{'${kafka.topics.payments.pattern}'.format('${tenant.id}')}")
public void handlePaymentEvent(ConsumerRecord<String, PaymentEvent> record) {
PaymentEvent event = record.value();
try {
switch (event.getEventType()) {
case "PAYMENT_PROCESSED":
handlePaymentProcessed(event);
break;
case "PAYMENT_REVERSED":
handlePaymentReversed(event);
break;
case "PAYMENT_FAILED":
handlePaymentFailed(event);
break;
}
// Real-time balance updates
sendBalanceUpdate(event);
// Payment confirmations
sendPaymentConfirmation(event);
} catch (Exception e) {
log.error("Error processing payment event: {}", event, e);
}
}
private void handleLoanDisbursed(LoanEvent event) {
// Update dashboard widgets
updateDashboardWidgets(event);
// Send real-time notifications
notifyLoanDisbursed(event);
// Trigger accounting entries
triggerAccountingEntries(event);
}
private void sendLoanNotifications(LoanEvent event) {
switch (event.getEventType()) {
case "LOAN_APPROVED":
notificationService.sendLoanApprovalNotification(event);
break;
case "LOAN_DISBURSED":
notificationService.sendLoanDisbursementNotification(event);
break;
case "LOAN_STATUS_CHANGE":
notificationService.sendLoanStatusChangeNotification(event);
break;
}
}
private void triggerExternalIntegrations(LoanEvent event) {
// CRM integration
integrationService.notifyCRM(event);
// Accounting system integration
integrationService.updateAccountingSystem(event);
// Third-party reporting
integrationService.sendToReportingSystem(event);
}
}
3. Batch Processing dengan Real-time Triggers
COB (Close of Business) Real-time Integration
@Component
public class RealTimeCOBProcessor {
@Autowired
private InterestCalculationService interestCalculationService;
@Autowired
private RealTimeEventPublisher eventPublisher;
@Autowired
private NotificationService notificationService;
@Scheduled(fixedRate = 30000) // Every 30 seconds during COB
public void processCOBBatch() {
log.info("Starting COB batch processing");
List<Loan> loansForInterestPosting = loanRepository.findLoansForInterestPosting();
for (Loan loan : loansForInterestPosting) {
try {
processLoanForInterestPosting(loan);
} catch (Exception e) {
log.error("Error processing loan {} for interest posting", loan.getId(), e);
handleCOBError(loan, e);
}
}
// Send COB progress update
sendCOBProgressUpdate();
}
private void processLoanForInterestPosting(Loan loan) {
BigDecimal interestAmount = interestCalculationService.calculateInterest(loan);
if (interestAmount.compareTo(BigDecimal.ZERO) > 0) {
// Post interest
LoanTransaction interestTransaction = loan.postInterest(interestAmount, LocalDate.now());
loanRepository.save(loan);
// Send real-time event
InterestPostedEvent event = InterestPostedEvent.builder()
.loanId(loan.getId())
.clientId(loan.getClient().getId())
.interestAmount(interestAmount)
.transactionDate(LocalDate.now())
.tenantId(getCurrentTenantId())
.timestamp(Instant.now())
.build();
eventPublisher.publishInterestPostedEvent(event);
// Update dashboard metrics
updateCOBDashboardMetrics(loan, interestAmount);
}
}
@EventListener
public void handleInterestPostedEvent(InterestPostedEvent event) {
// Real-time dashboard update
sendDashboardUpdate(event);
// Client notification
notificationService.sendInterestPostedNotification(event);
// External system notification
externalIntegrationService.notifyInterestPosting(event);
}
}
Scheduled Notifications
@Component
public class RealTimeNotificationScheduler {
@Autowired
private NotificationService notificationService;
@Autowired
private LoanRepository loanRepository;
@Scheduled(cron = "0 0 9 * * *") // Daily at 9 AM
public void sendDailyOverdueReminders() {
List<Loan> overdueLoans = loanRepository.findOverdueLoans();
for (Loan loan : overdueLoans) {
OverdueReminderEvent event = OverdueReminderEvent.builder()
.loanId(loan.getId())
.clientId(loan.getClient().getId())
.overdueAmount(loan.getOverdueAmount())
.overdueDays(loan.getDaysInArrears())
.dueDate(loan.getNextPaymentDueDate())
.tenantId(getCurrentTenantId())
.timestamp(Instant.now())
.build();
// Send multiple reminders based on overdue days
if (loan.getDaysInArrears() <= 30) {
notificationService.sendFriendlyReminder(event);
} else if (loan.getDaysInArrears() <= 60) {
notificationService.sendUrgentReminder(event);
} else {
notificationService.sendFinalReminder(event);
}
// Real-time staff notification
eventPublisher.publishOverdueLoanEvent(event);
}
}
@Scheduled(cron = "0 0 18 * * *") // Daily at 6 PM
public void sendPaymentDueNotifications() {
List<Loan> loansWithDuePayments = loanRepository.findLoansWithDuePaymentsTomorrow();
for (Loan loan : loansWithDuePayments) {
PaymentDueEvent event = PaymentDueEvent.builder()
.loanId(loan.getId())
.clientId(loan.getClient().getId())
.dueAmount(loan.getNextPaymentDueAmount())
.dueDate(loan.getNextPaymentDueDate())
.tenantId(getCurrentTenantId())
.timestamp(Instant.now())
.build();
notificationService.sendPaymentDueNotification(event);
// SMS notification for immediate due
if (loan.getNextPaymentDueDate().equals(LocalDate.now().plusDays(1))) {
notificationService.sendSMSReminder(event);
}
// Real-time staff notification
eventPublisher.publishPaymentDueEvent(event);
}
}
}
4. Push Notifications System
Firebase Cloud Messaging Integration
@Configuration
@Component
public class PushNotificationService {
@Value("${fcm.server.key}")
private String serverKey;
@Value("${fcm.sender.id}")
private String senderId;
private final RestTemplate restTemplate;
private final ObjectMapper objectMapper;
public PushNotificationService() {
this.restTemplate = new RestTemplate();
this.objectMapper = new ObjectMapper();
}
public void sendPushNotification(PushNotificationRequest request) {
try {
FCMMessage fcmMessage = FCMMessage.builder()
.to(request.getToken())
.notification(Notification.builder()
.title(request.getTitle())
.body(request.getBody())
.build())
.data(Map.of(
"type", request.getType(),
"entityId", request.getEntityId().toString(),
"timestamp", Instant.now().toEpochMilli()
))
.build();
String jsonMessage = objectMapper.writeValueAsString(fcmMessage);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.set("Authorization", "key=" + serverKey);
headers.set("Sender", "id=" + senderId);
HttpEntity<String> entity = new HttpEntity<>(jsonMessage, headers);
ResponseEntity<String> response = restTemplate.postForEntity(
"https://fcm.googleapis.com/fcm/send", entity, String.class);
if (response.getStatusCode().is2xxSuccessful()) {
log.info("Push notification sent successfully: {}", request.getTitle());
} else {
log.error("Failed to send push notification: {}", response.getStatusCode());
}
} catch (Exception e) {
log.error("Error sending push notification", e);
}
}
public void sendLoanStatusNotification(String deviceToken, LoanStatusChangeEvent event) {
PushNotificationRequest request = PushNotificationRequest.builder()
.token(deviceToken)
.title("Loan Status Update")
.body(String.format("Your loan %s status changed to %s",
event.getLoanId(), event.getNewStatus()))
.type("LOAN_STATUS_CHANGE")
.entityId(event.getLoanId())
.build();
sendPushNotification(request);
}
public void sendPaymentNotification(String deviceToken, PaymentProcessedEvent event) {
PushNotificationRequest request = PushNotificationRequest.builder()
.token(deviceToken)
.title("Payment Processed")
.body(String.format("Payment of $%.2f processed successfully", event.getAmount()))
.type("PAYMENT_PROCESSED")
.entityId(event.getPaymentId())
.build();
sendPushNotification(request);
}
public void sendOverdueNotification(String deviceToken, OverdueReminderEvent event) {
PushNotificationRequest request = PushNotificationRequest.builder()
.token(deviceToken)
.title("Payment Overdue")
.body(String.format("Your loan payment of $%.2f is %d days overdue",
event.getOverdueAmount(), event.getOverdueDays()))
.type("PAYMENT_OVERDUE")
.entityId(event.getLoanId())
.build();
sendPushNotification(request);
}
}
// FCM Data Models
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class FCMMessage {
private String to;
private Notification notification;
private Map<String, String> data;
private String priority = "high";
private Integer timeToLive = 86400; // 24 hours
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Notification {
private String title;
private String body;
private String sound = "default";
private String icon = "ic_notification";
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class PushNotificationRequest {
private String token;
private String title;
private String body;
private String type;
private Long entityId;
private Map<String, String> data;
}
5. Real-time Dashboard Updates
Dashboard Metrics Service
@Service
public class RealTimeDashboardService {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
private DashboardMetricsRepository metricsRepository;
public void updateDashboardMetrics() {
DashboardMetrics metrics = calculateDashboardMetrics();
// Send to all connected admin users
Set<String> adminUsers = getConnectedAdminUsers();
for (String userId : adminUsers) {
messagingTemplate.convertAndSendToUser(
userId, "/queue/dashboard-metrics", metrics);
}
// Update dashboard widgets
updateWidgets(metrics);
}
public void sendLoanPortfolioUpdate(LoanPortfolioUpdate update) {
String message = JsonBuilder.create()
.put("type", "LOAN_PORTFOLIO_UPDATE")
.put("totalLoans", update.getTotalLoans())
.put("totalOutstanding", update.getTotalOutstanding())
.put("averageInterestRate", update.getAverageInterestRate())
.put("overdueLoans", update.getOverdueLoans())
.put("newLoansToday", update.getNewLoansToday())
.put("timestamp", Instant.now().toEpochMilli())
.build();
messagingTemplate.convertAndSend("/topic/dashboard/portfolio", message);
}
public void sendSavingsBalanceUpdate(SavingsBalanceUpdate update) {
String message = JsonBuilder.create()
.put("type", "SAVINGS_BALANCE_UPDATE")
.put("totalDeposits", update.getTotalDeposits())
.put("totalWithdrawals", update.getTotalWithdrawals())
.put("currentBalance", update.getCurrentBalance())
.put("activeAccounts", update.getActiveAccounts())
.put("interestPaid", update.getInterestPaid())
.put("timestamp", Instant.now().toEpochMilli())
.build();
messagingTemplate.convertAndSend("/topic/dashboard/savings", message);
}
public void sendPaymentStatsUpdate(PaymentStatsUpdate update) {
String message = JsonBuilder.create()
.put("type", "PAYMENT_STATS_UPDATE")
.put("paymentsToday", update.getPaymentsToday())
.put("totalAmount", update.getTotalAmount())
.put("onTimePayments", update.getOnTimePayments())
.put("latePayments", update.getLatePayments())
.put("averagePaymentAmount", update.getAveragePaymentAmount())
.put("timestamp", Instant.now().toEpochMilli())
.build();
messagingTemplate.convertAndSend("/topic/dashboard/payments", message);
}
@EventListener
public void handleDashboardUpdateRequest(DashboardUpdateRequestEvent event) {
// Send current dashboard state to requesting user
DashboardMetrics currentMetrics = calculateDashboardMetrics();
messagingTemplate.convertAndSendToUser(
event.getUserId(), "/queue/dashboard-current", currentMetrics);
}
private DashboardMetrics calculateDashboardMetrics() {
return DashboardMetrics.builder()
.totalLoans(loanRepository.countActiveLoans())
.totalOutstanding(loanRepository.sumOutstandingPrincipal())
.averageInterestRate(loanRepository.calculateAverageInterestRate())
.overdueLoans(loanRepository.countOverdueLoans())
.totalClients(clientRepository.countActiveClients())
.totalSavings(savingsRepository.sumCurrentBalances())
.paymentsToday(paymentRepository.countPaymentsToday())
.totalDisbursedToday(disbursementRepository.sumDisbursedToday())
.activeStaff(staffRepository.countActiveStaff())
.timestamp(Instant.now())
.build();
}
}
Dashboard Widget Updates
@Component
public class DashboardWidgetService {
@Autowired
private SimpMessagingTemplate messagingTemplate;
public void updateLoanWidget(Loan loan) {
LoanWidgetData widgetData = LoanWidgetData.builder()
.loanId(loan.getId())
.clientName(loan.getClient().getDisplayName())
.principal(loan.getPrincipal())
.status(loan.getStatus().name())
.nextPaymentDue(loan.getNextPaymentDueDate())
.outstandingAmount(loan.getPrincipalOutstanding())
.daysInArrears(loan.getDaysInArrears())
.lastUpdated(Instant.now())
.build();
String message = JsonBuilder.create()
.put("widgetType", "LOAN")
.put("action", "UPDATE")
.put("data", widgetData)
.put("timestamp", Instant.now().toEpochMilli())
.build();
// Send to dashboard subscribers
messagingTemplate.convertAndSend("/topic/dashboard/widgets", message);
// Send to specific user if they have this loan in their view
Set<String> viewers = getLoanViewers(loan.getId());
for (String userId : viewers) {
messagingTemplate.convertAndSendToUser(
userId, "/queue/widget-update", widgetData);
}
}
public void updateSavingsWidget(SavingsAccount account) {
SavingsWidgetData widgetData = SavingsWidgetData.builder()
.accountId(account.getId())
.clientName(account.getClient().getDisplayName())
.currentBalance(account.getAccountBalanceDerived())
.lastTransactionDate(account.getLastTransactionDate())
.availableBalance(account.getAvailableBalanceDerived())
.interestRate(account.getNominalAnnualInterestRate())
.status(account.getStatus().name())
.build();
String message = JsonBuilder.create()
.put("widgetType", "SAVINGS")
.put("action", "UPDATE")
.put("data", widgetData)
.put("timestamp", Instant.now().toEpochMilli())
.build();
messagingTemplate.convertAndSend("/topic/dashboard/widgets", message);
}
public void sendAlertToDashboard(Alert alert) {
AlertWidgetData alertData = AlertWidgetData.builder()
.alertId(alert.getId())
.type(alert.getType())
.severity(alert.getSeverity())
.title(alert.getTitle())
.message(alert.getMessage())
.relatedEntityId(alert.getEntityId())
.relatedEntityType(alert.getEntityType())
.acknowledged(alert.isAcknowledged())
.createdAt(alert.getCreatedAt())
.build();
String message = JsonBuilder.create()
.put("widgetType", "ALERT")
.put("action", "NEW")
.put("data", alertData)
.put("timestamp", Instant.now().toEpochMilli())
.build();
// Send to all dashboard users
messagingTemplate.convertAndSend("/topic/dashboard/alerts", message);
// Send push notification for critical alerts
if (alert.getSeverity() == AlertSeverity.CRITICAL) {
sendCriticalAlertNotification(alertData);
}
}
}
6. Performance dan Scalability
Caching Strategy untuk Real-time Data
@Configuration
@EnableCaching
public class RealTimeCacheConfig {
@Bean
public CacheManager cacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(Caffeine.newBuilder()
.initialCapacity(1000)
.maximumSize(10000)
.expireAfterWrite(Duration.ofMinutes(30))
.recordStats());
return cacheManager;
}
@Bean
public Cache dashboardMetricsCache() {
return Caffeine.newBuilder()
.maximumSize(100)
.expireAfterWrite(Duration.ofMinutes(5))
.recordStats()
.build();
}
}
@Service
public class CachedRealTimeService {
@Autowired
@Qualifier("dashboardMetricsCache")
private Cache dashboardMetricsCache;
@Cacheable(value = "dashboard-metrics", key = "#tenantId")
public DashboardMetrics getCachedDashboardMetrics(String tenantId) {
log.debug("Loading dashboard metrics from database for tenant: {}", tenantId);
return loadDashboardMetricsFromDatabase(tenantId);
}
@CacheEvict(value = "dashboard-metrics", key = "#tenantId")
public void evictDashboardMetricsCache(String tenantId) {
log.debug("Evicted dashboard metrics cache for tenant: {}", tenantId);
}
@CachePut(value = "dashboard-metrics", key = "#result.tenantId")
public DashboardMetrics updateDashboardMetrics(DashboardMetrics metrics) {
log.debug("Updated dashboard metrics in cache for tenant: {}", metrics.getTenantId());
return metrics;
}
// Real-time subscription cache
private final Map<String, Set<String>> userSubscriptions = new ConcurrentHashMap<>();
@Cacheable(value = "user-subscriptions", key = "#userId")
public Set<String> getUserSubscriptions(String userId) {
return userSubscriptions.getOrDefault(userId, Set.of());
}
public void addUserSubscription(String userId, String subscription) {
userSubscriptions.computeIfAbsent(userId, k -> ConcurrentHashMap.newKeySet())
.add(subscription);
}
public void removeUserSubscription(String userId, String subscription) {
userSubscriptions.getOrDefault(userId, Set.of()).remove(subscription);
}
}
Connection Management
@Component
public class WebSocketConnectionManager {
private final Map<String, UserConnection> userConnections = new ConcurrentHashMap<>();
private final Map<String, Session> activeSessions = new ConcurrentHashMap<>();
public void addConnection(String sessionId, UserConnection connection) {
activeSessions.put(sessionId, connection.getSession());
userConnections.put(connection.getUserId(), connection);
// Send connection established event
publishConnectionEvent(connection, "CONNECTED");
}
public void removeConnection(String sessionId) {
UserConnection connection = activeSessions.get(sessionId);
if (connection != null) {
userConnections.remove(connection.getUserId());
activeSessions.remove(sessionId);
// Send connection lost event
publishConnectionEvent(connection, "DISCONNECTED");
}
}
public Set<String> getConnectedUsers() {
return userConnections.keySet();
}
public UserConnection getUserConnection(String userId) {
return userConnections.get(userId);
}
public boolean isUserConnected(String userId) {
return userConnections.containsKey(userId);
}
public void broadcastToUsers(Set<String> userIds, String destination, Object message) {
for (String userId : userIds) {
UserConnection connection = userConnections.get(userId);
if (connection != null && connection.getSession().isOpen()) {
try {
connection.getSession().sendMessage(
new TextMessage(objectMapper.writeValueAsString(message)));
} catch (IOException e) {
log.error("Failed to send message to user: {}", userId, e);
// Remove invalid connection
removeConnection(connection.getSession().getId());
}
}
}
}
private void publishConnectionEvent(UserConnection connection, String status) {
ConnectionEvent event = ConnectionEvent.builder()
.userId(connection.getUserId())
.sessionId(connection.getSession().getId())
.status(status)
.timestamp(Instant.now())
.build();
eventPublisher.publishConnectionEvent(event);
}
}
7. Error Handling dan Resilience
Circuit Breaker Pattern
@Component
public class RealTimeCommunicationCircuitBreaker {
private final Map<String, CircuitBreaker> circuitBreakers = new ConcurrentHashMap<>();
private final Map<String, Retry> retryPolicies = new ConcurrentHashMap<>();
@PostConstruct
public void initializeCircuitBreakers() {
// Initialize circuit breakers for different services
circuitBreakers.put("notification-service",
CircuitBreaker.ofDefaults("notification-service"));
circuitBreakers.put("websocket-service",
CircuitBreaker.ofDefaults("websocket-service"));
circuitBreakers.put("push-notification-service",
CircuitBreaker.ofDefaults("push-notification-service"));
// Configure retry policies
Retry retryPolicy = Retry.ofDefaults("real-time-retry");
retryPolicies.put("default", retryPolicy);
}
public <T> T executeWithCircuitBreaker(String serviceName, Supplier<T> supplier) {
CircuitBreaker circuitBreaker = circuitBreakers.get(serviceName);
if (circuitBreaker == null) {
circuitBreaker = CircuitBreaker.ofDefaults(serviceName);
circuitBreakers.put(serviceName, circuitBreaker);
}
return circuitBreaker.executeSupplier(supplier);
}
public void executeWithRetry(String serviceName, Runnable operation) {
Retry retryPolicy = retryPolicies.get("default");
Try.run(operation)
.withRetry(retryPolicy)
.onFailure(throwable -> handleRetryFailure(serviceName, throwable))
.onSuccess(() -> log.debug("Operation succeeded for service: {}", serviceName))
.recover(throwable -> handleRecovery(serviceName, throwable));
}
private void handleRetryFailure(String serviceName, Throwable throwable) {
log.error("Retry failed for service: {}", serviceName, throwable);
// Send alert about service failure
alertService.sendServiceFailureAlert(serviceName, throwable);
// Switch to fallback mechanism
switchToFallback(serviceName);
}
private void handleRecovery(String serviceName, Throwable throwable) {
log.warn("Circuit breaker opened for service: {}", serviceName, throwable);
// Implement fallback strategy
CircuitBreaker.State previousState = circuitBreakers.get(serviceName).getState();
if (previousState == CircuitBreaker.State.OPEN) {
log.info("Circuit breaker recovered for service: {}", serviceName);
alertService.sendServiceRecoveryAlert(serviceName);
}
}
private void switchToFallback(String serviceName) {
switch (serviceName) {
case "notification-service":
fallbackNotificationService.queueForLaterDelivery();
break;
case "websocket-service":
fallbackWebSocketService.useLongPolling();
break;
case "push-notification-service":
fallbackPushService.useSMS();
break;
}
}
}
Event Queue untuk Reliable Delivery
@Entity
@Table(name = "m_event_queue")
public class EventQueue {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "event_type", nullable = false)
private String eventType;
@Column(name = "tenant_id", nullable = false)
private String tenantId;
@Column(name = "payload", nullable = false, columnDefinition = "TEXT")
private String payload;
@Column(name = "destination", nullable = false)
private String destination;
@Column(name = "retry_count", nullable = false)
private Integer retryCount = 0;
@Column(name = "max_retries", nullable = false)
private Integer maxRetries = 3;
@Column(name = "status", nullable = false)
private EventStatus status = EventStatus.PENDING;
@Column(name = "scheduled_for", nullable = false)
private LocalDateTime scheduledFor;
@Column(name = "created_at", nullable = false)
private LocalDateTime createdAt = LocalDateTime.now();
@Column(name = "processed_at")
private LocalDateTime processedAt;
@Column(name = "error_message", columnDefinition = "TEXT")
private String errorMessage;
public enum EventStatus {
PENDING, PROCESSING, COMPLETED, FAILED, SCHEDULED
}
}
@Component
public class EventQueueService {
@Autowired
private EventQueueRepository eventQueueRepository;
@Autowired
private CircuitBreakerService circuitBreakerService;
@Scheduled(fixedDelay = 5000) // Every 5 seconds
public void processEventQueue() {
List<EventQueue> pendingEvents = eventQueueRepository
.findByStatusAndScheduledForBefore(EventStatus.PENDING, LocalDateTime.now());
for (EventQueue event : pendingEvents) {
processEvent(event);
}
}
public void queueEvent(String eventType, String tenantId, String payload, String destination) {
EventQueue event = EventQueue.builder()
.eventType(eventType)
.tenantId(tenantId)
.payload(payload)
.destination(destination)
.retryCount(0)
.maxRetries(3)
.status(EventStatus.PENDING)
.scheduledFor(LocalDateTime.now())
.createdAt(LocalDateTime.now())
.build();
eventQueueRepository.save(event);
}
private void processEvent(EventQueue event) {
try {
event.setStatus(EventStatus.PROCESSING);
eventQueueRepository.save(event);
// Process based on destination
switch (event.getDestination()) {
case "WEBSOCKET":
processWebSocketEvent(event);
break;
case "PUSH_NOTIFICATION":
processPushNotificationEvent(event);
break;
case "EMAIL":
processEmailEvent(event);
break;
case "SMS":
processSMSEvent(event);
break;
}
event.setStatus(EventStatus.COMPLETED);
event.setProcessedAt(LocalDateTime.now());
eventQueueRepository.save(event);
} catch (Exception e) {
handleEventProcessingError(event, e);
}
}
private void processWebSocketEvent(EventQueue event) {
circuitBreakerService.executeWithCircuitBreaker("websocket-service", () -> {
WebSocketMessage message = JsonBuilder.create()
.put("eventType", event.getEventType())
.put("payload", JsonParser.parseString(event.getPayload()))
.put("timestamp", Instant.now().toEpochMilli())
.build();
webSocketService.broadcast(event.getTenantId(), message);
});
}
private void handleEventProcessingError(EventQueue event, Exception e) {
event.setRetryCount(event.getRetryCount() + 1);
event.setErrorMessage(e.getMessage());
if (event.getRetryCount() >= event.getMaxRetries()) {
event.setStatus(EventStatus.FAILED);
alertService.sendEventDeliveryFailureAlert(event);
} else {
event.setStatus(EventStatus.SCHEDULED);
event.setScheduledFor(LocalDateTime.now().plusMinutes(
(long) Math.pow(2, event.getRetryCount()))); // Exponential backoff
}
eventQueueRepository.save(event);
}
}
Kesimpulan
Arsitektur komunikasi real-time Apache Fineract menggabungkan multiple approaches untuk memastikan reliability, scalability, dan performance:
Kelebihan Arsitektur Real-time:
- Event-Driven Architecture: Loose coupling melalui event publishing
- WebSocket Support: Real-time bidirectional communication
- Message Queue Integration: Reliable event delivery dengan Kafka
- Circuit Breaker Pattern: Resilience terhadap service failures
- Caching Strategy: Optimized performance untuk real-time data
- Multi-channel Notifications: Web, push, email, SMS support
Komponen Utama:
- Event Publishing: Internal dan external events
- WebSocket Management: Real-time client connections
- Message Queues: Reliable event delivery
- Notification Services: Multi-channel alerts
- Dashboard Updates: Real-time metrics dan widgets
- Error Handling: Circuit breakers dan retry mechanisms
Performance Optimizations:
- Connection Pooling: Efficient WebSocket connection management
- Caching Layers: In-memory caching untuk frequent data
- Batch Processing: Combined processing untuk efficiency
- Compression: Optimized payload transmission
- Connection Management: Automatic cleanup dan reconnection
Security Considerations:
- Tenant Isolation: Event segregation per tenant
- Authentication: Secure WebSocket connections
- Rate Limiting: Prevent abuse dan flooding
- Message Validation: Input sanitization dan validation
- Audit Trail: Complete event tracking
Arsitektur ini memastikan bahwa aplikasi dapat memberikan respons real-time kepada users sambil mempertahankan reliability dan scalability dalam environment enterprise.
Dokumentasi ini menjelaskan implementasi komunikasi real-time secara detail. Optimasi specific dapat dilakukan berdasarkan requirements deployment dan performance targets.