Database Design
Datalinx AI uses PostgreSQL as its primary system database. This document covers the database architecture, schema design, and data modeling patterns.
Database Architecture
System Database vs Data Warehouse
Datalinx AI uses two distinct data storage systems:
| Database | Purpose | Technology |
|---|---|---|
| System Database | Platform metadata, configuration, user data | PostgreSQL |
| Data Warehouse | Customer's transformed data | Databricks, Snowflake, BigQuery |
Schema Design
Multi-Tenant Schema Isolation
Each organization has a dedicated PostgreSQL schema:
-- Platform schema (shared)
CREATE SCHEMA datalinx_platform;
-- Organization-specific schemas
CREATE SCHEMA org_acme_corp;
CREATE SCHEMA org_retail_inc;
-- Each org schema contains the same tables
-- but data is completely isolated
Core Tables
Organizations
CREATE TABLE datalinx_platform.organizations (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(100) UNIQUE NOT NULL,
schema_name VARCHAR(100) UNIQUE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
settings JSONB DEFAULT '{}'
);
Users
CREATE TABLE datalinx_platform.users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
organization_id UUID REFERENCES organizations(id),
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255),
display_name VARCHAR(255),
role VARCHAR(50) NOT NULL DEFAULT 'viewer',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_login_at TIMESTAMP WITH TIME ZONE,
mfa_enabled BOOLEAN DEFAULT FALSE
);
Workspaces
CREATE TABLE {org_schema}.workspaces (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(255) NOT NULL,
description TEXT,
target_schema_id UUID,
github_repo_url VARCHAR(500),
repo_subdirectory VARCHAR(255),
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
settings JSONB DEFAULT '{}'
);
Data Sources
CREATE TABLE {org_schema}.data_sources (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID REFERENCES workspaces(id),
name VARCHAR(255) NOT NULL,
source_type VARCHAR(50) NOT NULL, -- 'postgresql', 'snowflake', 'api', 'file'
connection_config JSONB NOT NULL, -- Encrypted credentials
schema_cache JSONB, -- Cached source schema
last_schema_refresh TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
is_active BOOLEAN DEFAULT TRUE
);
Mappings
CREATE TABLE {org_schema}.mappings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID REFERENCES workspaces(id),
target_table VARCHAR(255) NOT NULL,
source_table VARCHAR(255) NOT NULL,
field_mappings JSONB NOT NULL,
-- Example: {"customer_id": {"source": "cust_id", "transform": null}}
decorators JSONB DEFAULT '[]',
-- Example: [{"type": "identity_resolution", "config": {...}}]
cte_definitions JSONB DEFAULT '[]',
status VARCHAR(50) DEFAULT 'draft',
version INTEGER DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
Pipeline Jobs
CREATE TABLE {org_schema}.pipeline_jobs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID REFERENCES workspaces(id),
job_type VARCHAR(50) NOT NULL, -- 'full_refresh', 'incremental', 'mapping_test'
status VARCHAR(50) NOT NULL DEFAULT 'pending',
-- 'pending', 'running', 'completed', 'failed'
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
error_message TEXT,
run_metadata JSONB DEFAULT '{}',
-- Rows processed, duration, etc.
created_by UUID REFERENCES datalinx_platform.users(id)
);
Data Access Patterns
Layered Architecture
Database access follows the DAO (Data Access Object) pattern:
DAO Implementation
class WorkspaceDAO:
"""Data access for workspaces - handles all database operations."""
async def get_by_id(self, workspace_id: str) -> Optional[dict]:
query = """
SELECT id, name, description, target_schema_id, settings
FROM workspaces
WHERE id = $1
"""
return await self.conn.fetchrow(query, workspace_id)
async def create(self, data: dict) -> dict:
query = """
INSERT INTO workspaces (name, description, settings)
VALUES ($1, $2, $3)
RETURNING *
"""
return await self.conn.fetchrow(
query,
data["name"],
data.get("description"),
json.dumps(data.get("settings", {}))
)
async def update(self, workspace_id: str, data: dict) -> dict:
query = """
UPDATE workspaces
SET name = COALESCE($2, name),
description = COALESCE($3, description),
updated_at = NOW()
WHERE id = $1
RETURNING *
"""
return await self.conn.fetchrow(
query,
workspace_id,
data.get("name"),
data.get("description")
)
Manager Layer
Managers contain business logic and use DAOs:
class WorkspaceManager:
"""Business logic for workspace operations."""
async def create_workspace(self, data: dict) -> dict:
# Business validation
if not data.get("name"):
raise ValidationException("Workspace name is required")
# Use DAO for database operations
workspace_dao = await get_workspace_dao()
workspace = await workspace_dao.create(data)
# Additional business operations
await self._initialize_workspace_defaults(workspace["id"])
return workspace
JSONB Usage
Flexible Configuration Storage
JSONB columns store semi-structured data:
# Field mappings stored as JSONB
field_mappings = {
"customer_id": {
"source": "cust_id",
"transform": None
},
"full_name": {
"source": None,
"transform": "CONCAT(first_name, ' ', last_name)"
},
"created_date": {
"source": "created_at",
"transform": "DATE(created_at)"
}
}
Querying JSONB
-- Find mappings with specific source field
SELECT * FROM mappings
WHERE field_mappings->>'customer_id' IS NOT NULL;
-- Find mappings using a transform
SELECT * FROM mappings
WHERE field_mappings @> '{"full_name": {"transform": "CONCAT"}}';
Database Migrations
Migration Strategy
Migrations are versioned and applied sequentially:
migrations/
├── v1.0.0_to_v1.0.1_add_mfa_columns.sql
├── v1.0.1_to_v1.0.2_create_audit_log.sql
├── v1.0.2_to_v1.0.3_add_workspace_settings.sql
└── v1.0.3_to_v1.0.4_create_service_accounts.sql
Migration Format
-- Migration: v1.0.2 to v1.0.3
-- Description: Add settings column to workspaces
BEGIN;
-- Add column
ALTER TABLE workspaces
ADD COLUMN IF NOT EXISTS settings JSONB DEFAULT '{}';
-- Update version
UPDATE datalinx_platform.schema_version
SET version = '1.0.3', applied_at = NOW();
COMMIT;
Backward Compatibility
Migrations must be backward compatible:
-- GOOD: Additive change
ALTER TABLE users ADD COLUMN IF NOT EXISTS phone VARCHAR(50);
-- BAD: Breaking change (don't do this)
ALTER TABLE users DROP COLUMN email;
Performance Optimization
Indexes
Critical indexes for query performance:
-- Organization lookup
CREATE INDEX idx_users_organization ON users(organization_id);
-- Workspace queries
CREATE INDEX idx_sources_workspace ON data_sources(workspace_id);
CREATE INDEX idx_mappings_workspace ON mappings(workspace_id);
-- JSONB queries
CREATE INDEX idx_mappings_field_mappings ON mappings
USING GIN (field_mappings);
-- Job status queries
CREATE INDEX idx_jobs_status ON pipeline_jobs(status)
WHERE status IN ('pending', 'running');
Connection Pooling
# Connection pool configuration
pool_config = {
"min_size": 5,
"max_size": 20,
"max_queries": 50000,
"max_inactive_connection_lifetime": 300
}
Query Optimization
# Use EXPLAIN ANALYZE to optimize queries
async def analyze_query(query: str):
result = await conn.fetch(f"EXPLAIN ANALYZE {query}")
for row in result:
logger.info(row["QUERY PLAN"])
Backup and Recovery
Backup Strategy
| Backup Type | Frequency | Retention |
|---|---|---|
| Full backup | Daily | 30 days |
| Incremental | Hourly | 7 days |
| WAL archiving | Continuous | 7 days |
Point-in-Time Recovery
# Restore to specific timestamp
pg_restore --target-time="2024-01-15 10:30:00" \
--target-database=datalinx_restored \
backup_file.dump
Best Practices
For Schema Design
- Use UUIDs for primary keys: Prevents enumeration attacks
- Add created_at/updated_at: Every table should have timestamps
- Use JSONB for flexibility: Schema-less data in structured columns
- Index JSONB carefully: Only index fields you query frequently
For Queries
- Always use parameterized queries: Prevent SQL injection
- Limit result sets: Use pagination for large queries
- Use transactions: Group related operations
- Handle errors properly: Don't expose SQL errors to users
For Migrations
- One migration per version: Never multiple files for same version
- Test migrations: Use migration verification script
- Keep migrations small: Easier to debug and rollback
- Document changes: Comments in migration files
Related Documentation
- Architecture Overview - System design
- Security Architecture - Data encryption
- Control Plane & Data Plane - Database usage patterns