Oban as an Event Bus: Decoupled Communication in Elixir Umbrella Apps
Overview
This project demonstrates an architectural pattern for achieving loose coupling between applications in an Elixir umbrella project using Oban as an event bus. The key insight is that applications can communicate asynchronously without having compile-time dependencies on each other, creating clear boundaries while maintaining flexibility.
The Problem
In traditional umbrella architectures, when AppA needs to trigger functionality in AppB, it typically:
- Adds AppB as a dependency in
mix.exs - Directly calls AppB's functions
- Creates tight coupling between the applications
This approach has several drawbacks:
- Circular dependency issues when AppB also needs to call AppA
- Difficult to independently deploy or extract applications
- Hard to maintain clear boundaries as the codebase grows
- Synchronous communication can lead to cascading failures
- Testing becomes more complex due to tight coupling
The Solution: Oban as an Event Bus
This project solves these problems by using Oban as a message broker/event bus that sits between applications. Applications communicate by:
- Creating jobs for workers in other applications
- Storing jobs in a shared database via Oban
- Processing jobs asynchronously by the target application's worker
Key Principle
AppA and AppB don't know about each other at compile time - they only share:
- A common persistence layer (
Persistence.Repo) - The Oban library
- Knowledge of worker module names (as strings/atoms)
Architecture Diagram
┌─────────────────────────────────────────────────────────────────────┐
│ AppA and AppB don't know each other │
└─────────────────────────────────────────────────────────────────────┘
┌──────────────────┐ ┌──────────────────┐
│ AppA │ │ AppB │
│ │ │ │
│ ┌────────────┐ │ │ ┌────────────┐ │
│ │ WorkerA │ │ │ │ WorkerB │ │
│ └────────────┘ │ │ └────────────┘ │
└────────┬─────────┘ └────────┬─────────┘
│ │
│ Creates job for WorkerB │ Creates job for WorkerA
│ │
▼ ▼
┌────────────────────────────────────────────────────────────┐
│ Oban.Job.new( Oban.Job.new( │
│ params, params, │
│ queue: :default, queue: :default, │
│ worker: AppB.Worker) worker: AppA.Worker) │
└────────────────────────────────────────────────────────────┘
│ │
└──────────────────┬──────────────────────────────────┘
▼
┌─────────────────┐
│ Oban │
│ (Job Queue) │
│ │
│ Shared via │
│ Persistence │
│ Layer │
└─────────────────┘
│
┌─────────────────┴──────────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ Calls WorkerA │ │ Calls WorkerB │
│ with params │ │ with params │
│ from WorkerB │ │ from WorkerA │
└──────────────────┘ └──────────────────┘
Project Structure
oban_test_umbrella/
├── apps/
│ ├── app_a/ # First application
│ │ ├── lib/
│ │ │ ├── app_a/
│ │ │ │ ├── application.ex # Starts Oban for AppA
│ │ │ │ └── worker.ex # AppA.Worker - processes jobs
│ │ └── mix.exs # Only depends on :persistence & :oban
│ │
│ ├── app_b/ # Second application
│ │ ├── lib/
│ │ │ ├── app_b/
│ │ │ │ ├── application.ex # Starts Oban for AppB
│ │ │ │ └── worker.ex # AppB.Worker - processes jobs
│ │ └── mix.exs # Only depends on :persistence & :oban
│ │
│ └── persistence/ # Shared persistence layer
│ ├── lib/
│ │ ├── persistence/
│ │ │ ├── application.ex # Starts Repo
│ │ │ └── repo.ex # Shared Ecto.Repo
│ ├── priv/repo/migrations/
│ │ └── *_add_oban_jobs_table.exs
│ └── mix.exs # Depends on :ecto_sql & :postgrex
│
├── config/
│ ├── config.exs # Configures Oban instances & Repo
│ └── dev.exs # Database credentials
│
└── diagram/
└── oban.drawio.png # Architecture visualization
Implementation Details
1. Dependency Configuration
AppA (apps/app_a/mix.exs)
defp deps do
[
{:persistence, in_umbrella: true}, # Shared database layer
{:oban, "~> 2.14"} # Job processing
# NOTE: No dependency on :app_b!
]
end
AppB (apps/app_b/mix.exs)
defp deps do
[
{:persistence, in_umbrella: true}, # Shared database layer
{:oban, "~> 2.14"} # Job processing
# NOTE: No dependency on :app_a!
]
end
Persistence (apps/persistence/mix.exs)
defp deps do
[
{:postgrex, ">= 0.0.0"},
{:ecto_sql, "~> 3.6"}
]
end
2. Oban Configuration
Each application gets its own Oban instance, but they share the same database:
# config/config.exs
config :app_a, Oban,
name: AppA.Oban, # Unique Oban instance name
repo: Persistence.Repo, # Shared repository
queues: [default: 10] # Queue configuration
config :app_b, Oban,
name: AppB.Oban, # Different instance name
repo: Persistence.Repo, # Same shared repository
queues: [default: 10]
3. Application Supervision
Each app starts its own Oban instance:
AppA.Application
def start(_type, _args) do
children = [
{Oban, Application.fetch_env!(:app_a, Oban)}
]
opts = [strategy: :one_for_one, name: AppA.Supervisor]
Supervisor.start_link(children, opts)
end
AppB.Application
def start(_type, _args) do
children = [
{Oban, Application.fetch_env!(:app_b, Oban)}
]
opts = [strategy: :one_for_one, name: AppB.Supervisor]
Supervisor.start_link(children, opts)
end
4. Cross-App Communication
AppA.Worker
defmodule AppA.Worker do
use Oban.Worker, queue: :default
require Logger
@impl Oban.Worker
def perform(%Oban.Job{args: %{"number" => number}}) do
Logger.info("[WorkerA] I got #{number}")
# Simulate some work
Process.sleep(Enum.random(100..1000))
# Create a job for AppB.Worker (if number < 100)
if number < 100 do
job = Oban.Job.new(
%{number: number + 1},
queue: :default,
worker: AppB.Worker # Reference by module name
)
Oban.insert(AppA.Oban, job)
end
:ok
end
end
AppB.Worker
defmodule AppB.Worker do
use Oban.Worker, queue: :default
require Logger
@impl Oban.Worker
def perform(%Oban.Job{args: %{"number" => number}}) do
Logger.info("[WorkerB] I got #{number}")
# Simulate some work
Process.sleep(Enum.random(100..1000))
# Create a job for AppA.Worker
job = Oban.Job.new(
%{"number" => number + 1},
queue: :default,
worker: AppA.Worker # Reference by module name
)
Oban.insert(AppB.Oban, job)
:ok
end
end
5. Initiating the Flow
# In AppA.Application
def start(number) do
job = AppA.Worker.new(%{number: number})
Oban.insert(AppA.Oban, job)
:ok
end
How It Works
The Ping-Pong Flow
- Start: Call
AppA.Application.start(1)from IEx - AppA processes job: WorkerA receives number=1
- AppA creates job: WorkerA creates a job for WorkerB with number=2
- Job stored: Oban saves the job to the shared database
- AppB processes job: WorkerB picks up the job and receives number=2
- AppB creates job: WorkerB creates a job for WorkerA with number=3
- Repeat: This ping-pong continues until number reaches 100
Benefits of This Architecture
1. Loose Coupling
- Applications don't depend on each other in
mix.exs - Can be developed and tested independently
- Easier to extract into separate repositories if needed
2. Clear Boundaries
- Communication happens through well-defined job contracts
- Each app only knows about job schemas, not implementation details
- Easier to reason about system boundaries
3. Asynchronous by Default
- No blocking calls between applications
- Better fault isolation (if AppB crashes, AppA continues)
- Natural backpressure through queue depth
4. Scalability
- Can scale Oban workers independently per application
- Database provides natural coordination
- Can run multiple instances of each app
5. Testability
- Can test each worker in isolation
- Easy to mock job creation
- Can test cross-app flows by inspecting Oban jobs in tests
6. Flexibility
- Easy to add new applications that participate in the event bus
- Can change worker implementations without affecting other apps
- Can add event transformations, routing, or middleware
7. Observability
- All communication is visible in the database
- Can inspect, retry, or cancel jobs
- Oban provides built-in metrics and monitoring
Trade-offs and Considerations
Advantages
- Decoupling: True independence between applications
- Resilience: Async communication provides fault tolerance
- Flexibility: Easy to add new apps or change implementations
- Database-backed: Reliable, durable job storage
Disadvantages
- Latency: Not suitable for synchronous requirements
- Database Load: All communication goes through database
- Debugging: Async flows can be harder to trace
When to Use This Pattern
Good fit for:
- Microservices-style architecture within a monorepo
- Event-driven workflows
- Background processing and async tasks
- Applications with different deployment cycles
- Systems requiring high availability
Not ideal for:
- Low-latency, synchronous operations
- Simple, tightly coupled features
Running the Project
Setup
- Install dependencies:
mix deps.get - Setup database:
mix do ecto.drop, ecto.create, ecto.migrate - Start IEx:
iex -S mix
Testing the Flow
Start the ping-pong job flow:
iex> AppA.Application.start(1)
:ok
You'll see output like:
[info] [2024-01-05 10:30:15.123][WorkerA] I got 1
[info] [2024-01-05 10:30:15.456][WorkerB] I got 2
[info] [2024-01-05 10:30:15.789][WorkerA] I got 3
[info] [2024-01-05 10:30:16.012][WorkerB] I got 4
...
[info] [2024-01-05 10:30:25.345][WorkerA] I got 99
[info] [2024-01-05 10:30:25.678][WorkerB] I got 100
Inspecting Jobs
You can query Oban jobs directly:
iex> Persistence.Repo.all(Oban.Job)
# See all jobs in the system
iex> import Ecto.Query
iex> Persistence.Repo.all(from j in Oban.Job, where: j.state == "available")
# See pending jobs
Advanced Patterns
1. Event Types
Instead of referencing workers directly, you could use a convention:
# Define event types
job = Oban.Job.new(
%{event: "user.registered", user_id: 123},
queue: :events,
worker: EventDispatcher.Worker
)
# EventDispatcher routes to appropriate handlers
defmodule EventDispatcher.Worker do
def perform(%{args: %{"event" => "user.registered"} = args}) do
# Route to all interested workers
Oban.insert_all([
user_notification_job(args),
user_analytics_job(args),
user_email_job(args)
])
end
end
2. Saga Pattern
For complex workflows across multiple apps:
defmodule OrderSaga.Worker do
def perform(%{args: %{"step" => "reserve_inventory"}}) do
case reserve_inventory() do
:ok ->
create_job(step: "charge_payment")
:error ->
create_job(step: "cancel_order")
end
end
def perform(%{args: %{"step" => "charge_payment"}}) do
# Next step in saga...
end
end
3. Priority Queues
Different queues for different priorities:
config :app_a, Oban,
queues: [
critical: 20, # High concurrency
default: 10,
low_priority: 2
]
# Create job with specific queue
Oban.Job.new(%{task: "send_email"}, queue: :low_priority, worker: EmailWorker)
4. Scheduled Jobs
Delay job execution:
# Run in 1 hour
Oban.Job.new(
%{reminder: "Meeting at 2pm"},
worker: ReminderWorker,
scheduled_at: DateTime.add(DateTime.utc_now(), 3600)
)
Related Patterns
This architecture is related to several well-known patterns:
- Event-Driven Architecture: Apps communicate through events (jobs)
- Mediator Pattern: Oban mediates communication between apps
- Command Pattern: Jobs encapsulate commands to be executed
- Message Queue Pattern: Oban provides reliable message queuing
- Saga Pattern: Can orchestrate complex multi-step workflows
Conclusion
Using Oban as an event bus in an Elixir umbrella project provides a powerful way to achieve loose coupling while maintaining the benefits of a monorepo. By sharing only a persistence layer and using Oban for asynchronous communication, applications can evolve independently while remaining part of a cohesive system.
This pattern is particularly valuable as applications grow in complexity and teams need clear boundaries without the operational overhead of fully distributed microservices. It provides a middle ground: the organizational benefits of microservices with the deployment simplicity of a monolith.