본문 바로가기

Posts

[FP] Database Optimization in Event Sourcing + FP Architecture

Intro

I want to talk about an interesting problem we faced while using Event Sourcing architecture and how we solved it. We'll specifically look at optimizing DB Journals and how it ties into the lazy concept of Functional Programming.

Event Sourcing?

Event Sourcing is an architectural pattern where we store changes to an application's state as a sequence of Events. Unlike traditional methods that only save the current state, Event Sourcing meticulously records every event that caused a state change, in chronological order.

With Event Sourcing, you can typically store a user's state using just two DB tables:

  • Journal: This table records all events as they happen, in a time-ordered sequence. Think of it like an accounting ledger where every transaction is meticulously noted.
  • Snapshot: This table stores the application's state at a specific point in time. Since replaying all events from the Journal to reconstruct the current state can be inefficient, snapshots are regularly taken to speed up recovery.

This approach offers the significant advantage of simplifying your DB usage patterns. State changes only occur through event emissions, and querying happens either by replaying the Journal or via Snapshots.

Event sourcing with FP lazy

Now, let's tie this to the lazy concept in Functional Programming. In Event Sourcing, DB access doesn't happen the moment an event is emitted. An event is merely an "Effect" that's been issued. In our project, we handle this using ZPure, where the act of issuing the effect itself is pure. This means we can postpone the actual process of taking those emitted events and saving them to the DB until later. That's what I mean by processing things in a lazy manner.

The Journaling Problem in Our Kingdom Building Game

Our project heavily utilizes Event Sourcing architecture. For example, consider the establishTown (build a town) feature in our kingdom building game. This feature includes validations—like checking if the location is valid or if the user's level allows for building—and the actual logic to grant the town to the user.

Then, a new requirement came in: "build multiple towns all at once." To extend the existing establishTown functionality while maintaining its reliability, we can consider implementing it by calling establishTown.replicateA(n) repeatedly. This approach is reasonable for safety. The internal logic of establishTown is complex, but it had been thoroughly reviewed, QA-tested, and proven stable in live operations. Definitely, using such a well-vetted function multiple times is a safe bet.

However, a problem soon emerged. If we built 100 towns simultaneously, each establishTown event would also trigger many events. This resulted in a repetitive pattern: TownEstablished -> RewardReceived -> TownEstablished -> RewardReceived -> ..., leading to a whopping 200 events being emitted, causing a performance issue with our database journaling.

How we detect this

We pinpointed this performance bottleneck using our Distributed Tracing tools, which track state changes across the system. We noticed an unexpectedly high number of events being written to the database during specific RPC calls.

For instance, in the tracing span for the module responsible for event persistence, a metric like lifted-events-count showed that an excessive number of events were being saved in a single transaction. This clearly indicated a potential write-load issue for our DB journal. Here's a code snippet:

// Part of the event persistence logic (conceptual code)
val eventsToPersist: List[Event] = ...
tracing.setAttribute("lifted-events-count", eventsToPersist.length)
persistentEntityRepository.addEvents(eventsToPersist)

Seeing high values for metrics like lifted-events-count signaled to us that this could lead to significant write-load on the DB journal.

Two Approaches to Optimizing DB Journaling

To tackle this problem, we can consider two main approaches. Both solutions involve creating new types of aggregated events, such as TownsEstablished and RewardsReceived.

Approach 1: Post-Processing for Event Aggregation

The first approach involves creating an optimization process in the postProcess layer, where we inspect and aggregate events after they've been emitted by the core logic. This is essentially a "post-processing" step that merges redundant events from the emitted event stream.

For example, in the scenario where 200 events (TownEstablished, RewardReceived pairs) are emitted for 100 simultaneous town builds, this approach optimizes them into just two aggregated events: TownsEstablished(towns) and RewardsReceived(rewards).

To achieve this aggregation, we need an EventOptimizer that recognizes the TownEstablished -> RewardReceived pattern and transforms it into these new, aggregated events.

// Example: Concept of an EventOptimizer (Scala)
// This code illustrates the idea of merging consecutively occurring events that follow a specific pattern (e.g., TownEstablished -> RewardReceived).
// The actual implementation would involve traversing the event stream sequentially and only merging when the specific pattern is detected.
trait EventOptimizer {
  def optimize(events: List[Event]): List[Event]
}

class TownAndRewardPatternOptimizer extends EventOptimizer {
  override def optimize(events: List[Event]): List[Event] = {
    val optimizedEvents = scala.collection.mutable.ListBuffer[Event]()
    var currentTowns = scala.collection.mutable.ListBuffer[Town]()
    var currentRewards = scala.collection.mutable.ListBuffer[Reward]()
    var expectedNextIsReward = false // Flag to expect RewardReceived after TownEstablished

    var i = 0
    while (i < events.length) {
      events(i) match {
        case te: TownEstablished if !expectedNextIsReward =>
          // Detected TownEstablished and not expecting RewardReceived (start of a new pattern)
          currentTowns += te.town // Assuming TownEstablished holds a single Town
          expectedNextIsReward = true
          i += 1
        case rr: RewardReceived if expectedNextIsReward =>
          // Detected RewardReceived and expecting it (continuation of the pattern)
          currentRewards += rr.reward // Assuming RewardReceived holds a single Reward
          expectedNextIsReward = false // Next should be TownEstablished
          i += 1
        case otherEvent =>
          // Pattern broken (different event appears or unexpected sequence)
          // Convert collected pattern events into aggregated events and add them
          if (currentTowns.nonEmpty) {
            optimizedEvents += TownsEstablished(currentTowns.toList)
            currentTowns.clear()
          }
          if (currentRewards.nonEmpty) {
            optimizedEvents += RewardsReceived(currentRewards.toList)
            currentRewards.clear()
          }
          optimizedEvents += otherEvent // Add the current event as is
          expectedNextIsReward = false // Reset pattern state
          i += 1
      }
    }

    // Process any remaining collected data after the loop (if pattern ends at the list's end)
    if (currentTowns.nonEmpty) {
      optimizedEvents += TownsEstablished(currentTowns.toList)
    }
    if (currentRewards.nonEmpty) {
      optimizedEvents += RewardsReceived(currentRewards.toList)
    }

    optimizedEvents.toList
  }
}
  • Advantages
    • This method has a significant advantage in keeping the domain logic clean. Since we can use high-level abstractions like establishTown.replicateA(n) to express business logic, developers can focus on core domain logic without needing to consider optimization details.
    • In fact, This approach can be highly effective for specific, simple patterns.
    • For instance, with repetitive RewardReceived events that can occur in various parts of the logic, developers don't have to worry about a "DB journal bombing" and can simply use replicateA(n).
    • We've implemented this in our project for certain simple patterns, allowing programmers to use replicateA(n) conveniently without concerns about DB journal load.
  • Disadvantages
    • However, there's a crucial caveat. If new functionalities are added in the logic layer that break this repetitive pattern, the postProcess optimization logic might become broken.
      • For example, imagine a new requirement where building a town incurs a cost. The journal pattern would then change to Paid -> TownEstablished -> RewardReceived -> Paid -> TownEstablished -> RewardReceived -> .... The previous optimization logic, designed to detect and aggregate TownEstablished -> RewardReceived pairs, would no longer work.
      • This means that this approach is only effective for patterns that are very simple, universally applicable (feature-agnostic), and highly stable with low variability. Attempting to optimize event repetition patterns that are tied to volatile domain logic with this method is generally discouraged.
    • Furthermore, such postProcess optimization carries the risk of compromising the completeness of the DB journaling history, which is a core strength of Event Sourcing. Extreme optimization can lead to the loss of individual event order and timestamp information. Therefore, when applying optimization, it's a critical point to consider this trade-off.
      • For instance, suppose we optimize a pattern like Paid -> TownEstablished -> RewardReceived -> Paid -> TownEstablished -> RewardReceived -> .. into three aggregated events: Paids -> TownsEstablished -> RewardsReceived. If a Paid event originally reduced itemId1 by 1, then the aggregated Paids event might perform a single reduction of n for itemId1.
      • The outcome of (Paid for -1 + Rewarded for +1) repeated might not be the same as (repeated Paid for -1 + repeated Rewarded for +1). The latter one might fail if amount of item go below zero. Logic that wouldn't have failed originally might now fail unexpectedly.
    • In other words, event aggregation can subtly alter the flow of state changes, leading to unforeseen side effects. For these reasons, the postProcess approach is primarily effective for simple, stable patterns that have relatively independent impacts on state changes, such as aggregating repeated RewardReceived events.

Approach 2: Emitting Aggregated Events at the Logic Layer (Recommended for Our Example Issue)

For the specific issue illustrated, the most desirable and ultimately adopted solution was to emit a single event directly from the logic layer. This approach optimizes at the source of event generation, meaning we define and emit new aggregated events like TownsEstablished and RewardsReceived within our domain logic itself.

In our kingdom building game example, instead of emitting individual TownEstablished and RewardReceived events for each of the 100 simultaneous town builds, we emit just two aggregated events: a single TownsEstablished event containing information about all built towns, and a single RewardsReceived event encompassing all rewards earned.

// ZPure type signature (example)
// sealed trait ZPure[+W, -S1, +S2, -R, +E, +A]
// W: Log (written output), S1: Input State, S2: Output State, R: Environment, E: Error, A: Result

// Original (example): Repeated event emission (TownEstablished -> RewardReceived -> ...)
// Function to establish a single town, emitting individual events
def establishTown(town: Town): ZPure[Event, GameState, GameState, Any, ValidationFail, Unit] =
  for {
    _ <- ZPure.validate(validateTownLocation(town)) // Validate location
    _ <- ZPure.log(TownEstablished(town)) // Emit single TownEstablished event
    gameData <- ZPure.get[GameState] // Access current game state
    rewards <- ZPure.succeed(inquireRewardForTown(gameData, town)) // Query rewards
    _ <- ZPure.log(RewardReceived(rewards)) // Emit single RewardReceived event
  } yield ()

// Calling establishTown 'n' times to generate multiple events
// Thanks to ZPure's purity, events are collected and written to DB later in a batch.
def establishTownsViaReplicate(towns: List[Town]): ZPure[Event, GameState, GameState, Any, ValidationFail, Unit] =
  ZPure.foreach(towns)(establishTown).map(_ => ())
// Modified (example): Emitting aggregated events for multiple town builds and reward collections
def establishTowns(towns: List[Town]): ZPure[Event, GameState, GameState, Any, ValidationFail, Unit] =
  for {
    // Perform bulk validation for all towns
    _ <- ZPure.validate(validateAllTownLocations(towns))

    // Perform individual logic for each town and collect results (new towns, earned rewards)
    // Using traverse to perform pure computations for each town and gather results
    allProcessedData <- ZPure.traverse(towns) { town =>
      for {
        gameData <- ZPure.get[GameState] // GameState needed at each town processing step (if applicable)
        rewards <- ZPure.succeed(inquireRewardForTown(gameData, town))
      } yield (town, rewards) // Tuple of (built town, earned rewards)
    }

    // Create aggregated events from the collected data
    allEstablishedTowns = allProcessedData.map(_._1) // All built town data
    allReceivedRewards = allProcessedData.flatMap(_._2) // All earned reward data

    // Add aggregated events to the ZPure log
    _ <- ZPure.log(TownsEstablished(allEstablishedTowns))
    _ <- ZPure.log(RewardsReceived(allReceivedRewards))
  } yield ()
  • Advantages
    • This approach drastically reduces DB journal load by emitting just two events, TownsEstablished(allEstablishedTowns) and RewardsReceived(allReceivedRewards), instead of hundreds.
  • Disadvantages
    • However, this method might introduce some messiness into the domain logic.
    • Using towns.traverse(establishTown).map(_ => ()) can feel cleaner and more intuitive.
    • But for the sake of performance optimization, we often find this level of increased domain logic complexity acceptable.

Conclusion

  • The ability to perform these DB journal optimizations within our Event Sourcing architecture is deeply connected to the lazy nature of Functional Programming.
  • By separating the act of emitting an event from the act of persisting it to the database, we gain the flexibility to collect and optimize events before they are finally written to the DB.
  • Both optimization approaches share a common requirement: defining and utilizing new aggregated events like TownsEstablished and RewardsReceived.
    • The first, postProcess approach, offers the advantage of collecting and processing already emitted, distributed events after the fact. This can free developers from constantly worrying about the number of events emitted within complex domain logic.
    • It's particularly effective as an optimization tool for simple, repetitive event patterns like RewardReceived that can appear in various parts of the logic, and it significantly contributes to keeping the domain logic clean.
    • However, this method carries the risk of compromising the completeness of the DB journaling history, and extreme optimization can lead to loss of individual event order and timestamp information.
  • Conversely, the second approach, emitting a single event from the logic layer, optimizes at the very source of event generation. This provides a more fundamental and stable solution, helping to increase the cohesiveness of events and reduce system complexity right from the domain model design phase.
  • In our project, we believe both approaches are valid and apply them appropriately based on their respective advantages and disadvantages. If you're implementing or operating an Event Sourcing system, I highly recommend deeply considering how you define and emit your events.