Skip to content

BOOST.Dispatcher (BOOST.DeliveryBoy)

A multi-tenant job dispatch and processing system that receives trigger messages from an AMQP queue and routes them to type-specific executors (email, MQTT, webhook, SMS, push). Jobs are created via BOOST.Secretary and executed here.

Architecture

BOOST.Secretary (any service)
    ↓ JMS message with tenantUUID property

ActiveMQ Queue (boost.dispatcher.jobs)

TenantDispatcher.onMessage()

TriggerMessage.parse()

┌──────────────┬──────────────────┬─────────────┐
│ processJob   │ processPending   │ mqttCRUD    │
│              │                  │ (relay)     │
│ JobProcessor │ JobProcessor     │ MQTTService │
│   ↓          │   ↓              │             │
│ JobRouter    │ fetchPendingDue  │             │
│   ↓          │                  │             │
│ JobExecutor  │                  │             │
│ (email/mqtt) │                  │             │
└──────────────┴──────────────────┴─────────────┘

Multi-Tenancy

  • Each tenant gets its own TenantDispatcher, Quartz scheduler, and HikariCP database pool
  • Messages are filtered by tenantUUID JMS property via message selectors
  • MQTT topics are hashed per-tenant using MD5
  • DispatcherBoss orchestrates dispatcher creation for all discovered tenants

Prerequisites

  • Java 21
  • AMQP Message Broker (ActiveMQ Artemis)
  • MQTT Broker
  • MySQL Database (per-tenant + master)
  • ImageMagick (for PDF generation via BOOST.PDFBuilder)

Configuration

config.properties

properties
dispatcher_queue_name=boost.dispatcher.jobs
broker_url=amqp://172.16.200.32:5672
mqtt_host=tcp://172.16.200.158:1883
mqtt_username=BOOSTDispatcher
mqtt_password=****
master.jdbc.url=jdbc:mysql://host:3306/qeeping?useSSL=false&serverTimezone=UTC
master.jdbc.username=qeeping-master
master.jdbc.password=****
hikari.minimum_idle=2
hikari.maximum_pool_size=10
hikari.idle_timeout=300000
hikari.max_lifetime=600000

Trigger Types

processJob

Processes a single job by UUID. Looks up sys_jobs, routes to the correct executor based on messageType, tracks execution in sys_job_queue.

Message:

json
{
  "triggerType": "processJob",
  "jobUUID": "uuid-string"
}

Job lifecycle: PENDING → IN_PROGRESS → COMPLETED / FAILED

processPending

Processes all jobs in sys_job_queue with status PENDING and scheduledTime <= now.

Message:

json
{
  "triggerType": "processPending"
}

mqttCRUD

Relays an MQTT message directly — no job record, no DB. Used for device sync and CRUD notifications.

Message:

json
{
  "triggerType": "mqttCRUD",
  "body": { "topic": "BOOST/{hash}/items", ... }
}

MQTT Topic format: BOOST/{MD5(tenantUUID + salt)}/{entity}

Job Types (messageType)

messageTypeExecutorData tableDescription
emailEmailJobExecutorsys_email_job_dataSend email, optionally with PDF attachment
mqttMqttJobExecutorsys_mqtt_job_dataPublish MQTT message
webhookTBDsys_webhook_job_dataHTTP callback
smsTBDsys_sms_job_dataSMS message
pushTBDsys_push_job_dataPush notification (FCM/APNs)

Email Job Flow

  1. Fetch SysEmailJobDataRecord via job.jobDataUUID
  2. If pdfDataUUID is set → fetch SysPdfJobDataRecord → fetch template from sys_templates → generate PDF
  3. Resolve mail provider (specific via mailProviderUUID or tenant default)
  4. Send email via provider (SMTP / Mailgun)

MQTT Job Flow

  1. Fetch SysMqttJobDataRecord via job.jobDataUUID
  2. Read topic and payload
  3. Publish via MQTTService

Email Providers

ProviderStatusImplementation
SMTPImplementedGeneric SMTP with STARTTLS/auth
MailgunImplementedREST API via OkHttp (US + EU regions)
SendGridPlaceholderNot yet implemented
AWS SESPlaceholderNot yet implemented

Provider configuration is stored per-tenant in sys_mail_providers. Each email job can reference a specific provider via mailProviderUUID, or fall back to the tenant's default.

Checksum Sync System

A periodic background process (every 5 minutes via ChecksumPingJob) that:

  1. Queries entity tables for row counts, max timestamps, and XOR-based checksums
  2. Publishes checksums via MQTT to topic BOOST/{tenant_hash}/sync/ping
  3. Clients compare local checksums to detect stale data and request full sync

Database Tables

Job definition

TablePurpose
sys_jobsJob record — messageType + jobDataUUID (polymorphic FK to type-specific table)
sys_job_queueExecution tracking: PENDING → IN_PROGRESS → COMPLETED/FAILED
sys_templatesShared PDF/document templates (invoices, order confirmations, etc.)

Type-specific job data (sys_<type>_job_data)

TableKey columns
sys_email_job_datafromAddress, toAddress, subject, body, mailProviderUUID, pdfDataUUID
sys_mqtt_job_datatopic, payload (JSON)
sys_pdf_job_datatemplateUUID (FK → sys_templates), jobData (JSON)
sys_webhook_job_dataurl, method, headers (JSON), body (JSON)
sys_sms_job_dataphoneNumber, message, providerUUID
sys_push_job_datadeviceToken, title, body, data (JSON)

Other

TablePurpose
sys_mail_providersPer-tenant email provider configuration
QRTZ_*Quartz scheduler persistence tables

Relationship diagram

sys_jobs
  ├─ messageType = 'email' → jobDataUUID → sys_email_job_data
  │                                           └─ pdfDataUUID → sys_pdf_job_data
  │                                                              └─ templateUUID → sys_templates
  ├─ messageType = 'mqtt'  → jobDataUUID → sys_mqtt_job_data
  ├─ messageType = 'webhook' → jobDataUUID → sys_webhook_job_data
  ├─ messageType = 'sms'   → jobDataUUID → sys_sms_job_data
  └─ messageType = 'push'  → jobDataUUID → sys_push_job_data

sys_job_queue
  └─ jobUUID → sys_jobs.UUID (execution log, multiple entries per job for retries/cron)

Dependencies

DependencyVersionPurpose
BoostMiddleware2.0.5Shared BOOST utilities (UuidUtils, ExecutionContext)
BOOST.Model1.0.1jOOQ generated table/record classes
PDFBuilder1.0.1PDF generation from templates
jooq3.19.6Type-safe SQL
qpid-jms-client2.10.0AMQP messaging (Apache Qpid)
paho.mqtt1.2.5MQTT publishing
HikariCP7.0.2Connection pooling
quartz2.3.2Cron job scheduling
jakarta.mail2.0.1Email sending (SMTP)
okhttp4.12.0HTTP client (Mailgun API)
pdfbox3.0.2PDF handling

Project Structure

src/main/java/com/luqon/dispatcher/
├── main/
│   ├── DispatcherApp.java          # Entry point
│   ├── DispatcherBoss.java         # Multi-tenant orchestrator
│   ├── TenantDispatcher.java       # Per-tenant message listener
│   ├── TriggerMessage.java         # Message parser
│   ├── TriggerType.java            # Enum: processJob, processPending, mqttCRUD
│   ├── ActiveMQService.java        # JMS connection management
│   ├── CustomerDataSource.java     # Tenant data wrapper
│   └── DataSourceInitializer.java  # Tenant DB pool initialization
├── job/
│   ├── JobProcessor.java           # Orchestrates job execution + queue tracking
│   ├── JobRouter.java              # Routes messageType → JobExecutor
│   ├── JobExecutor.java            # Executor interface
│   ├── EmailJobExecutor.java       # Email execution (+ optional PDF)
│   ├── MqttJobExecutor.java        # MQTT execution
│   ├── JobQueueService.java        # sys_job_queue CRUD
│   ├── CronJobTrigger.java         # Quartz cron trigger
│   └── CronScheduleManager.java    # Manages cron schedules
├── config/
│   └── AppConfig.java              # Configuration loader
├── email/
│   ├── MailProvider.java            # Provider interface
│   ├── MailProviderFactory.java     # Provider factory
│   ├── MailProviderType.java        # Enum: SMTP, MAILGUN, SENDGRID, SES
│   ├── providers/
│   │   ├── SmtpProvider.java
│   │   └── MailgunProvider.java
│   └── config/
│       ├── MailProviderConfig.java
│       ├── SmtpConfig.java
│       └── MailgunConfig.java
├── PDF/
│   └── PDFUtils.java               # PDF generation utilities
├── mqtt/
│   └── MQTTService.java            # MQTT client (singleton)
├── sync/
│   ├── EntitySyncService.java       # Checksum generation
│   └── ChecksumPingJob.java         # Periodic sync ping (Quartz)
├── utils/
│   ├── ChecksumUtils.java
│   └── SimpleCrypto.java
└── JNDI/
    ├── JndiStorage.java
    └── InMemoryInitialContextFactory.java

Running the Service

bash
java -jar target/dispatcher.jar