Skip to content

Zero Dirty Reads: Building a Trustworthy Lakehouse with DuckDB

Forma Engineering Blog · Series Part 3 (Finale)

TL;DR

"Lakehouse" sounds great, but every engineer has the same nagging question: How do I know the data I'm querying isn't stale or dirty?

This post explains how Forma uses Anti-Join + Dirty Set mechanisms to ensure federated queries never read uncommitted or inconsistent data.

PostgreSQL handles "the present," DuckDB + Parquet handles "the past"—working together, zero dirty reads.

Why Do We Need a Lakehouse?

The first two posts solved OLTP scenarios:

  • Part 1: Achieved AI-Ready flexible storage with hot tables + JSON Schema
  • Part 2: Killed N+1 queries with CTE + JSON_AGG, reducing latency from 1 second to 25 milliseconds

But there's one question we haven't directly addressed:

When data reaches billions of records and a single PostgreSQL instance can't handle it—what then?

Even with hot table indexes, when the EAV table balloons to 100 million rows and historical data in Parquet reaches terabytes, single-machine PostgreSQL's memory and I/O become bottlenecks.

More importantly, historical data access patterns are completely different from real-time data:

Data TypeAccess FrequencyAccess PatternTypical ScenarioQuery Share
Last 7 daysHundreds/secondPoint queries, filtering, paginationDaily operations~80%
7-90 daysDozens/dayBulk exports, reportsMonthly analysis~15%
90+ daysFew times/monthFull scans, aggregationsAnnual audits~5%

Optimizing for the last 7 days while cramming it alongside 3 years of historical data in the same table—that's resource waste.

The Allure of Lakehouse Architecture

The solution seems obvious: hot/cold separation.

  • Hot data: Keep in PostgreSQL, enjoy transactional consistency and low-latency indexes
  • Cold data: Export to Parquet files, store in S3, query with an OLAP engine

The architecture diagram looks beautiful:

┌─────────────────────────────────────────────────────────────┐
│                       Query Router                          │
└─────────────────────────────────────────────────────────────┘
                    │                    │
                    ▼                    ▼
        ┌───────────────────┐  ┌───────────────────┐
        │    PostgreSQL     │  │      DuckDB       │
        │    (Hot Data)     │  │    (Cold Data)    │
        │    Last 7 days    │  │   Parquet on S3   │
        └───────────────────┘  └───────────────────┘

But—

Every engineer who hears "Lakehouse" has a voice in their head:

"Wait, if the same record exists in both PostgreSQL and Parquet, which version do I get? If PostgreSQL data hasn't synced to Parquet yet, will I read stale data? Or worse—duplicate data?"

This is the consistency fear. It's the biggest psychological barrier preventing many teams from adopting Lakehouse architecture.

"Wait—Isn't EAV an Anti-Pattern?"

Let's address the elephant in the room.

Yes. Historically, EAV (Entity-Attribute-Value) is considered an anti-pattern, and for good reason. Traditional EAV implementations suffer from:

  • Horrific query performance: The N+1 problem turns simple queries into thousands of database round-trips
  • No type safety: Everything becomes a string; say goodbye to integer comparisons and date sorting
  • Impossible-to-maintain code: Dynamic pivot queries scattered everywhere
  • No indexing strategy: Full table scans on every query because you can't index "any possible attribute"

If you've been burned by EAV before, your skepticism is earned.

How Forma Tames the Beast

Forma doesn't ignore these problems—it solves them with specific technical choices:

EAV ProblemForma's SolutionWhere It's Covered
N+1 query nightmareCTE + JSON_AGG (single round-trip)Part 2
No type safetyJSON Schema validation on writePart 1
No indexingHot Table with B-tree indexes for frequent fieldsPart 1
Dirty data in lakehouseDirty Set quarantine (Anti-Join)This post (below)

The Data Quarantine Zone

The Dirty Set is essentially a quarantine zone for data.

Think of it like airport security. Before data is "cleared" for travel (flushed_at > 0), it stays in the holding area (PostgreSQL). Once cleared, it can proceed to its destination (Parquet). This explicit state tracking—not timestamps, not heuristics—is what makes the system trustworthy.

Any record that's been modified but not yet synced to cold storage is "quarantined." Queries know to fetch it from PostgreSQL (the source of truth) rather than Parquet (potentially stale). There's no guessing, no race conditions, no "hopefully the timestamps are right."

The Narrative

We took a dangerous but powerful idea—EAV—and built guard rails around it:

  1. Type validation via JSON Schema (Part 1)
  2. Indexed hot fields for the 20% of attributes that get 80% of queries (Part 1)
  3. Single-query aggregation via CTE + JSON_AGG (Part 2)
  4. Explicit sync-state tracking via the Dirty Set (this post)

The result: flexibility without the chaos. Your AI applications can evolve their data structures freely, without the historical baggage of EAV's bad reputation.


The Root of Consistency Fear

Let's make the problem concrete.

Suppose there's a record row_id = 123:

  • 09:00: User creates this record, writes to PostgreSQL
  • 09:05: CDC job exports this record to Parquet
  • 09:10: User updates this record (PostgreSQL value changes)
  • 09:15: User issues a query

The question: Which version should the 09:15 query return?

Data Sourcerow_idVersionStatus
PostgreSQL123v2Latest (09:10 update)
Parquet123v1Stale (09:05 export)

If the query engine naively "merges" both data sources, users might see:

  • Duplicates: Same record appears twice (v1 and v2)
  • Dirty reads: Returns v1 (stale version)
  • Phantom reads: Sometimes v1, sometimes v2, depending on query timing

None of these are acceptable.

Why Simple Timestamp Comparison Isn't Enough

You might think: Just use updated_at timestamps for deduplication, right?

sql
SELECT * FROM (
    SELECT *, 'pg' AS source FROM postgres_data
    UNION ALL
    SELECT *, 's3' AS source FROM parquet_data
)
WHERE row_number() OVER (PARTITION BY row_id ORDER BY updated_at DESC) = 1

This approach has several fatal flaws:

  1. Clock skew: PostgreSQL and CDC job clocks may differ by milliseconds
  2. Race conditions: If a record is updated during "export in progress," updated_at might be identical
  3. Soft delete trap: If a record is deleted in PostgreSQL, the old version in Parquet might "resurrect"

Timestamp comparison is optimistic—it assumes timestamps perfectly reflect data freshness. In distributed systems, this assumption is dangerous.

Forma's Solution: Anti-Join + Dirty Set

Forma adopts a pessimistic strategy: Don't trust timestamps, trust state.

The core idea is introducing a "Dirty Set":

If a record in PostgreSQL "hasn't been flushed to Parquet yet," then regardless of whether Parquet has this record, ignore the Parquet version and use only the PostgreSQL version.

The change_log Table: Source of Dirty Data

Forma maintains a change_log table in PostgreSQL:

sql
CREATE TABLE change_log (
    id          BIGSERIAL PRIMARY KEY,
    schema_id   UUID,
    row_id      UUID,
    op          SMALLINT,  -- 1=INSERT, 2=UPDATE, 3=DELETE
    created_at  BIGINT,    -- Change timestamp
    flushed_at  BIGINT     -- Export timestamp; 0 = not exported
);

The key field is flushed_at:

  • flushed_at = 0: This change hasn't synced to Parquet—data is "dirty"
  • flushed_at > 0: This change has synced to Parquet—data is "clean"

Anti-Join Logic at Query Time

When a user issues a query, Forma's DuckDB query engine executes this logic:

SQL implementation:

sql
-- Step 1: Get dirty set (row_ids not yet flushed)
dirty_ids AS (
    SELECT row_id
    FROM change_log
    WHERE flushed_at = 0 AND schema_id = $SCHEMA_ID
),

-- Step 2: Read from Parquet, but exclude records in dirty set
s3_clean AS (
    SELECT *
    FROM read_parquet('s3://bucket/data/*.parquet')
    WHERE row_id NOT IN (SELECT row_id FROM dirty_ids)  -- Anti-Join!
),

-- Step 3: Read dirty data from PostgreSQL (latest version)
pg_hot AS (
    SELECT *
    FROM postgres_scan('SELECT * FROM entity_main WHERE ...')
    WHERE row_id IN (SELECT row_id FROM dirty_ids)
),

-- Step 4: Merge
SELECT * FROM s3_clean
UNION ALL
SELECT * FROM pg_hot

Expressed as a formula:

$$Result = (Parquet_{data} \setminus DirtySet) \cup PostgreSQL_{hot}$$

In plain English:

  1. Parquet data: Keep only records that "have been flushed and have no newer version"
  2. PostgreSQL data: Keep only records that "haven't been flushed or were just updated"
  3. Merge: Union of both ensures each record appears exactly once, always the latest version

Why This Approach Is "Pessimistic" and Safe

The key insight: We don't rely on timestamps to judge freshness—we rely on the explicit state of "has it synced or not."

ScenarioDirty SetParquetPostgreSQLReturns
Record only in PGrow_id ∈ DirtyNoneExistsPG version
Record synced, no updatesrow_id ∉ DirtyExistsExists (same)Parquet version
Record synced, then updatedrow_id ∈ DirtyExists (old)Exists (new)PG version
Record deleted in PGrow_id ∈ DirtyExists (old)NoneNot returned

No matter the scenario, users always see the latest, consistent data.

Analogy: Orders Still in Transit

If the explanation above is too abstract, here's a real-world analogy:

Imagine you run an online store with two ledgers:

  • Local ledger (PostgreSQL): Every order recorded in real-time
  • Cloud ledger (Parquet): Local ledger synced to cloud every night

Now someone asks: "What's today's total sales?"

Wrong approach: Add up numbers from both local and cloud ledgers.

  • Problem: If some orders are "still syncing," you'll double-count.

Correct approach:

  1. First, check which orders "haven't synced to cloud yet" (dirty set)
  2. Cloud ledger data: exclude these "in-transit" orders
  3. Local ledger data: only count these "in-transit" orders
  4. Add both together

That's the Anti-Join + Dirty Set logic.

CDC Flow: How Data Moves from PostgreSQL to Parquet

Now that we understand query logic, let's see how data syncs.

On Write: Record Changes

Every write to entity_main or eav_data simultaneously inserts into change_log:

sql
-- Application writes data
INSERT INTO entity_main (...) VALUES (...);
INSERT INTO eav_data (...) VALUES (...);

-- Record change (flushed_at = 0 means not exported)
INSERT INTO change_log (schema_id, row_id, op, created_at, flushed_at)
VALUES ($schema_id, $row_id, 1, now(), 0);

CDC Job: Incremental Export

The CDC job runs periodically (default: every minute):

sql
-- 1. Find row_ids pending export
SELECT DISTINCT row_id FROM change_log 
WHERE schema_id = $SCHEMA_ID AND flushed_at = 0;

-- 2. Read complete records, flatten EAV to wide table
SELECT m.row_id, m.text_01 AS name, m.integer_01 AS age, ...
FROM entity_main m
LEFT JOIN eav_data e ON m.row_id = e.row_id
WHERE m.row_id IN ($PENDING_IDS);

-- 3. Write to Parquet
COPY (...) TO 's3://bucket/delta/<uuid>.parquet';

-- 4. Mark as exported
UPDATE change_log SET flushed_at = now() 
WHERE row_id IN ($PENDING_IDS) AND flushed_at = 0;

Full Data Flow Diagram

┌─────────────────────────────────────────────────────────────────────┐
│                           Query Path                                │
│  DuckDB: (Parquet - DirtySet) ∪ (PostgreSQL ∩ DirtySet)             │
└─────────────────────────────────────────────────────────────────────┘

Failure Modes and Self-Healing

Every distributed system has failure modes. The question isn't "will it fail?" but "when it fails, will data be corrupted?" Here's how Forma handles the ugly scenarios.

CDC Crash Recovery

The most dangerous moment in any sync system is mid-export crash. What happens if the CDC job dies after writing to S3 but before updating flushed_at?

The Write Order Matters:

┌─────────────────────────────────────────────────────────────────────┐
│  CDC Job Execution Order                                            │
│                                                                     │
│  1. BEGIN TRANSACTION (PostgreSQL)                                  │
│  2. SELECT * FROM entity_main WHERE row_id IN (dirty_ids)           │
│  3. Write to S3 Parquet ◄─── If crash here, Parquet has data       │
│  4. UPDATE change_log SET flushed_at = now() ◄─── but PG doesn't   │
│  5. COMMIT                                                          │
└─────────────────────────────────────────────────────────────────────┘

Scenario: Crash after step 3, before step 4

ComponentState After Crash
S3 ParquetContains the exported data
change_log.flushed_atStill 0 (not updated)
Next queryWill fetch from PostgreSQL (correct!)

This is safe by design. Because flushed_at is only updated after S3 write succeeds, a crash leaves the record in the Dirty Set. The query engine will:

  1. See flushed_at = 0 → record is "dirty"
  2. Fetch from PostgreSQL (source of truth)
  3. Ignore the orphaned Parquet file (it will be overwritten on next successful export)

Sequence Diagram:

┌─────────┐          ┌─────────┐          ┌─────┐          ┌─────────┐
│ CDC Job │          │   PG    │          │ S3  │          │ Query   │
└────┬────┘          └────┬────┘          └──┬──┘          └────┬────┘
     │ 1. Read dirty IDs  │                  │                  │
     │◄───────────────────│                  │                  │
     │                    │                  │                  │
     │ 2. Read full data  │                  │                  │
     │◄───────────────────│                  │                  │
     │                    │                  │                  │
     │ 3. Write Parquet   │                  │                  │
     │──────────────────────────────────────▶│                  │
     │                    │                  │                  │
     │    ╔═══════════════╧══════════════════╧════╗             │
     │    ║ ⚡ CRASH HERE                         ║             │
     │    ╚═══════════════╤══════════════════╤════╝             │
     │                    │                  │                  │
     │                    │                  │ 4. Query arrives │
     │                    │                  │◄─────────────────│
     │                    │                  │                  │
     │                    │ 5. Check dirty   │                  │
     │                    │    (flushed=0)   │                  │
     │                    │◄─────────────────┼──────────────────│
     │                    │                  │                  │
     │                    │ 6. Return PG data│                  │
     │                    │─────────────────────────────────────▶
     │                    │  (Parquet ignored - record is dirty)│

The ACID Guarantee Chain

Forma's consistency relies on PostgreSQL's ACID properties at critical points:

1. Write Path (Application → PostgreSQL)

sql
BEGIN;
INSERT INTO entity_main (...) VALUES (...);
INSERT INTO eav_data (...) VALUES (...);
INSERT INTO change_log (row_id, flushed_at) VALUES ($1, 0);
COMMIT;  -- All-or-nothing: either all three inserts succeed, or none

If the transaction fails, no partial data exists. The record either fully exists (with flushed_at = 0) or doesn't exist at all.

2. Export Path (PostgreSQL → S3)

sql
BEGIN;
-- Read is consistent within transaction
SELECT * FROM entity_main WHERE row_id IN (SELECT row_id FROM change_log WHERE flushed_at = 0);
-- Write to S3 (outside transaction, but idempotent)
-- ...S3 PUT...
-- Only mark as flushed AFTER S3 confirms
UPDATE change_log SET flushed_at = now() WHERE row_id IN ($exported_ids) AND flushed_at = 0;
COMMIT;

The AND flushed_at = 0 clause is crucial—it prevents double-marking if the CDC job runs concurrently (idempotency).

3. Query Path (DuckDB reads both sources)

DuckDB's query is a point-in-time snapshot:

  • Reads change_log to get Dirty Set
  • Reads Parquet (immutable files, no concurrent write issues)
  • Reads PostgreSQL (uses snapshot isolation)

No locks required. Readers never block writers, writers never block readers.

Failure Mode Summary

Failure ScenarioData StateRecovery ActionData Lost?
App crash mid-writePG transaction rolled backAutomatic (ACID)No
CDC crash before S3No changesAutomatic retry on next runNo
CDC crash after S3, before PG updateS3 has data, PG says "dirty"Query fetches from PG (correct); S3 file orphanedNo
S3 write failsPG unchangedAutomatic retry on next runNo
DuckDB query failsNo side effectsClient retryNo
PostgreSQL downQueries failFall back to Parquet-only (degraded mode)No*

*In degraded mode, queries may return slightly stale data (last successful export). Forma can optionally block queries if "zero dirty reads" is mandatory.

Graceful Degradation

When DuckDB or S3 becomes unavailable, Forma doesn't crash—it degrades:

┌─────────────────────────────────────────────────────────────────────┐
│                     Degradation Modes                               │
├─────────────────────────────────────────────────────────────────────┤
│ Normal:     PostgreSQL (hot) + DuckDB/Parquet (cold) → Full data   │
│ S3 down:    PostgreSQL only → Hot data only (recent records)       │
│ PG down:    DuckDB/Parquet only → Cold data only (may be stale)    │
│ Both down:  Service unavailable (circuit breaker triggered)        │
└─────────────────────────────────────────────────────────────────────┘

The application can choose the behavior:

  • Strict mode: Reject queries if any data source is unavailable
  • Best-effort mode: Return available data with a warning header
  • Cached mode: Return cached results from previous successful query

Last-Write-Wins: Handling Residual Duplicates

Anti-Join solves "PostgreSQL vs Parquet" conflicts. But what about within Parquet itself?

Since CDC exports incrementally, the same record might exist in multiple Parquet files with different versions:

  • delta/001.parquet: row_id=123, version=1
  • delta/002.parquet: row_id=123, version=2

Forma uses QUALIFY ROW_NUMBER() to implement Last-Write-Wins:

sql
SELECT *
FROM (
    SELECT *, 
           ROW_NUMBER() OVER (PARTITION BY row_id ORDER BY updated_at DESC) AS rn
    FROM read_parquet('s3://bucket/**/*.parquet')
)
WHERE rn = 1
  AND (deleted_at IS NULL OR deleted_at = 0)  -- Filter soft deletes

This ensures each row_id returns only the latest version, and deleted records don't "resurrect."

Why DuckDB?

You might ask: Why DuckDB instead of Trino, Spark, or PostgreSQL's FDW?

DuckDB's Advantages

FeatureDuckDBTrino/SparkPostgreSQL FDW
Deployment complexityEmbedded, zero deploymentRequires 3-10 node clusterRequires FDW extension config
Cold start latency50-100ms2-10 seconds (JVM warmup)Milliseconds (connection reuse)
Native Parquet supportNative, vectorized executionGood (needs connector)Needs parquet_fdw plugin
PostgreSQL connectivitypostgres_scannerJDBC (extra 10-50ms latency)Built-in
Cost modelPay-per-query friendlyCluster standing cost $500-5000/moDepends on main DB resources

DuckDB is an embedded OLAP engine—it can run directly in your application process without additional servers. This is particularly Serverless-friendly:

  • Lambda function loads DuckDB on startup (~50MB)
  • Query time: direct connect to PostgreSQL (via postgres_scanner) and S3 (via httpfs)
  • Query ends, Lambda terminates, cost drops to zero

Serverless Cost Model

Traditional OLAP architectures require standing clusters, burning money even without queries. DuckDB's embedded nature enables true pay-per-use pricing:

Cost ItemTraditional OLAP ClusterDuckDB Serverless
Idle cost$500-5000/month$0
Single query (1GB scan)~$0.001~$0.005 (incl. Lambda)
1000 queries/month avg$500-5000~$5-10

For scenarios with low query volume but high data volume (historical audits, monthly reports), Serverless cost advantage can reach 100-500x.

Series Summary: The Triangle Balance

Let's review the complete architecture built across these three posts:

                        ┌─────────────────┐
                        │   Flexibility   │
                        │  EAV + JSON     │
                        │    Schema       │
                        └────────┬────────┘

                 ┌───────────────┼───────────────┐
                 │               │               │
                 ▼               │               ▼
        ┌─────────────────┐      │      ┌─────────────────┐
        │   Performance   │      │      │      Cost       │
        │  Hot Table +    │◀─────┴─────▶│  DuckDB +       │
        │  CTE JSON_AGG   │             │  Serverless     │
        └─────────────────┘             └─────────────────┘

Problems Solved by Each Post

PartProblemSolutionKey Metrics
Part 1Schema flexibilityEAV + JSON Schema + Hot TableZero DDL, 80/20 index optimization
Part 2N+1 queriesCTE + JSON_AGG101→1 queries, 1000ms→25ms
Part 3Massive historical dataDuckDB + Anti-JoinZero dirty reads, Serverless cost

Core Design Principles

  1. State over timestamps: Use flushed_at to explicitly mark sync state, not timestamp comparison
  2. Pessimistic over optimistic: Better to query PostgreSQL one more time than risk reading dirty data
  3. Push computation down: Let databases (PostgreSQL) and analytics engines (DuckDB) each do what they do best
  4. Graceful degradation: Fall back to PG-only when DuckDB fails, ensuring availability

Ideal Use Cases

This architecture is particularly suited for:

  • AI-driven applications: Frequently changing data structures need JSON Schema flexibility
  • Multi-tenant SaaS: Different tenants need different fields—EAV naturally supports this
  • Analytical queries: Historical data aggregation, reports, exports—DuckDB + Parquet handles efficiently

Not Ideal For

  • Strong transactional consistency: Scenarios requiring cross-table ACID transactions—use pure PostgreSQL
  • Ultra-low latency: <10ms point queries—use Redis cache + PostgreSQL
  • Real-time streaming: CDC has minute-level latency—use Kafka/Flink for real-time

Full Stack Comparison: Forma vs. Alternatives

How does this architecture compare to other popular approaches for flexible, scalable data storage?

CapabilityFormaMongoDB + AtlasDynamoDBPostgreSQL + TimescaleDB
Schema flexibility✅ JSON Schema✅ Schemaless✅ Schemaless⚠️ Requires DDL
ACID across records✅ Full⚠️ Multi-doc limited⚠️ 25 items max✅ Full
SQL compatibility✅ Native❌ MQL only❌ PartiQL (limited)✅ Native
Hot/cold separation✅ Built-in⚠️ Manual tiering⚠️ TTL-based⚠️ Chunk-based
Serverless analytics✅ DuckDB⚠️ Atlas BI ($$)⚠️ Athena (separate)❌ Requires cluster
Cold storage cost✅ S3 ($0.023/GB)⚠️ Atlas archive⚠️ S3 export needed⚠️ Disk-based
Zero dirty reads✅ Dirty Set❌ Eventual⚠️ Strong per-item✅ MVCC
AI pipeline integration✅ JSON Schema = LLM contract⚠️ Manual validation⚠️ Manual validation⚠️ Manual validation

The key differentiator: Forma combines the flexibility of document stores with the consistency of relational databases, while adding first-class support for AI workloads (JSON Schema as the contract between LLMs and storage) and cost-effective cold data analytics (DuckDB + Parquet).

Conclusion

"Lakehouse" isn't a new concept, but making people trust it is hard.

Forma's Anti-Join + Dirty Set mechanism is essentially a pessimistic consistency protocol: We assume data might be "in transit" at any moment, then explicitly handle that uncertainty.

This adds some query overhead compared to optimistic timestamp comparison (we need to scan the change_log table), but what we get in return is a provable consistency guarantee.

In data systems, correctness always trumps performance—because wrong fast results are worse than correct slow results.

Series Navigation

This post is based on engineering practices from the Forma project. Forma is a flexible data storage engine designed for the AI era.

If you're interested in this architecture, feel free to Star us on GitHub or join the community discussion.