Finvu Data Warehouse Documentation

Finvu Home

Bronze Layer Kafka Ingestion DDL

-- =====================================================
-- FINVU BRONZE LAYER - KAFKA EVENT INGESTION (ENGINEERING-FOCUSED)
-- =====================================================
-- Purpose: Engineering-focused staging and ingestion layer for optimal data capture
-- Pattern: Kafka Table Engine + Materialized View → MergeTree Storage
-- Performance Target: Real-time ingestion with <15 minute latency
-- Philosophy: Maximum data capture, minimal processing, business logic in Silver
-- Based on: Engineering excellence + eventConfig-aa.json event patterns
-- =====================================================

-- =====================================================
-- BRONZE SCHEMA SETUP
-- =====================================================

CREATE DATABASE IF NOT EXISTS bronze;

-- =====================================================
-- KAFKA SOURCE TABLE (Ephemeral - No Data Storage)
-- =====================================================

-- Kafka Table Engine for consuming from Finvu AA events topic
-- Reference: https://clickhouse.com/docs/integrations/kafka/kafka-table-engine
CREATE TABLE bronze.kafka_finvu_events (
    event_timestamp DateTime64(3) COMMENT 'Event timestamp from Kafka message',
    event_data String COMMENT 'Raw JSON event payload from Kafka',
    
    -- Essential Kafka Metadata (for debugging and lineage)
    _topic String COMMENT 'Kafka topic name',
    _partition UInt64 COMMENT 'Kafka partition number', 
    _offset UInt64 COMMENT 'Kafka message offset',
    _timestamp DateTime COMMENT 'Kafka message timestamp',
    _key String COMMENT 'Kafka message key'
    
) ENGINE = Kafka()
SETTINGS 
    -- Kafka Connection Settings
    kafka_broker_list = 'localhost:9092',
    kafka_topic_list = 'finvu-aa-events',
    kafka_group_name = 'clickhouse-finvu-consumer',
    kafka_format = 'JSONEachRow',
    kafka_row_delimiter = '\n',
    
    -- Performance Settings (Optimized for throughput)
    kafka_num_consumers = 3,                    -- Scale with topic partitions
    kafka_max_block_size = 65536,              -- Optimal batch size for throughput
    kafka_flush_interval_ms = 7500,            -- Near real-time processing
    kafka_thread_per_consumer = 1,             -- Parallel processing per consumer
    
    -- Error Handling (Robust but not overly permissive)
    kafka_skip_broken_messages = 100,
    input_format_allow_errors_num = 10,
    input_format_allow_errors_ratio = 0.05,
    input_format_skip_unknown_fields = 1,      -- Handle schema evolution
    
    -- JSON Settings
    input_format_import_nested_json = 1,
    input_format_json_read_objects_as_strings = 1
COMMENT 'Kafka consumer table for Finvu AA events - engineering-optimized configuration';

-- =====================================================
-- BRONZE STORAGE TABLE (Engineering-Optimized Event Staging)
-- =====================================================

-- Engineering-focused staging table for Silver layer processing
-- Complete event preservation with minimal extraction for routing/ordering
CREATE TABLE bronze.finvu_events (
    -- Temporal Organization (Ingestion Performance)
    event_date Date DEFAULT toDate(event_timestamp),
    event_timestamp DateTime64(3) CODEC(Delta, ZSTD(3)),
    ingestion_timestamp DateTime64(3) DEFAULT now64(3) COMMENT 'ClickHouse ingestion timestamp',
    
    -- Event Identity (Minimal for Routing)
    event_id String COMMENT 'Event identifier for deduplication',
    event_name LowCardinality(String) COMMENT 'Event type for Silver layer routing',
    event_source LowCardinality(String) COMMENT 'Source system classification',
    event_type LowCardinality(String) COMMENT 'Transport type (HTTP/WS/etc)',
    
    -- Complete Event Preservation (Schema Agnostic)
    event_data JSON CODEC(ZSTD(3)) COMMENT 'Complete raw event - business logic in Silver',
    event_data_size UInt32 COMMENT 'Size tracking for performance monitoring',
    
    -- Engineering Metadata (Infrastructure Support)
    data_quality LowCardinality(String) DEFAULT 'CAPTURED' COMMENT 'CAPTURED/CORRUPTED/PARTIAL',
    processing_errors String DEFAULT '' COMMENT 'Ingestion error tracking',
    is_duplicate Bool DEFAULT 0 COMMENT 'Duplicate detection flag',
    
    -- Complete Kafka Lineage (Data Engineering Essential)
    kafka_topic LowCardinality(String) COMMENT 'Source topic',
    kafka_partition UInt32 COMMENT 'Source partition',
    kafka_offset UInt64 COMMENT 'Source offset',
    kafka_timestamp DateTime COMMENT 'Kafka message timestamp',
    kafka_key String COMMENT 'Kafka message key',
    
    -- Minimal Business Context (For Silver Layer Efficiency)
    txn_id String DEFAULT '' COMMENT 'Transaction correlation ID',
    session_id String DEFAULT '' COMMENT 'Session correlation ID',
    request_url String DEFAULT '' COMMENT 'API endpoint tracking',
    response_code UInt16 DEFAULT 0 COMMENT 'HTTP response code'
    
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)  -- Monthly partitions for efficient queries
ORDER BY (event_name, event_timestamp, event_id)  -- Optimized for ingestion and Silver access
SETTINGS index_granularity = 8192
TTL event_timestamp + INTERVAL 7 YEAR  -- Regulatory compliance
COMMENT 'Bronze staging table - complete event capture for Silver layer processing';

-- =====================================================
-- MATERIALIZED VIEW (Engineering-Focused Event Processing)
-- =====================================================

-- Real-time processing from Kafka to Bronze storage
-- Minimal extraction, maximum performance, business logic deferred to Silver
CREATE MATERIALIZED VIEW bronze.mv_kafka_to_finvu_events TO bronze.finvu_events AS
SELECT
    -- Time handling (performance critical)
    toDate(COALESCE(
        parseDateTime64BestEffortOrNull(JSONExtractString(event_data, 'eventTimestamp')),
        _timestamp
    )) AS event_date,
    
    COALESCE(
        parseDateTime64BestEffortOrNull(JSONExtractString(event_data, 'eventTimestamp')),
        _timestamp
    ) AS event_timestamp,
    
    now64(3) AS ingestion_timestamp,
    
    -- Basic event identification (minimal extraction)
    COALESCE(
        JSONExtractString(event_data, 'eventId'),
        JSONExtractString(event_data, 'id'),
        toString(generateUUIDv4())
    ) AS event_id,
    
    COALESCE(
        JSONExtractString(event_data, 'eventName'),
        'UNKNOWN_EVENT'
    ) AS event_name,
    
    -- Event source classification (for Silver layer routing)
    CASE 
        WHEN JSONExtractString(event_data, 'eventSource') != '' THEN 
            JSONExtractString(event_data, 'eventSource')
        WHEN JSONExtractString(event_data, 'eventName') LIKE 'USER:AA:%' THEN 'AAServer:UserEndpoint'
        WHEN JSONExtractString(event_data, 'eventName') LIKE 'FIU:AA:%' THEN 'AAServer:ConsentFlow'
        WHEN JSONExtractString(event_data, 'eventName') LIKE 'FIP:AA:%' THEN 'AAServer:Notifications'
        WHEN JSONExtractString(event_data, 'eventName') LIKE 'AA:FIP:%' THEN 'AAServer:DataFlow'
        WHEN JSONExtractString(event_data, 'eventName') LIKE 'AA:FIU:%' THEN 'AAServer:DataFlow'
        ELSE 'AAServer:Undefined'
    END AS event_source,
    
    -- Event type (transport classification)
    CASE
        WHEN JSONExtractString(event_data, 'eventType') != '' THEN 
            JSONExtractString(event_data, 'eventType')
        WHEN JSONExtractString(event_data, 'eventName') LIKE '%:AA:%Request' THEN 'HTTP_IN'
        WHEN JSONExtractString(event_data, 'eventName') LIKE 'AA:%:%Request' THEN 'HTTP_OUT'
        WHEN JSONExtractString(event_data, 'eventName') LIKE '%Websocket%' THEN 'WS_IN'
        ELSE 'DEFAULT'
    END AS event_type,
    
    -- Complete event preservation (core Bronze responsibility)
    CAST(event_data AS JSON) AS event_data,
    length(event_data) AS event_data_size,
    
    -- Engineering quality assessment (not business quality)
    CASE 
        WHEN event_data = '' OR NOT isValidJSON(event_data) THEN 'CORRUPTED'
        WHEN length(event_data) < 50 THEN 'PARTIAL'
        ELSE 'CAPTURED'
    END AS data_quality,
    
    -- Error tracking for engineering ops
    CASE
        WHEN NOT isValidJSON(event_data) THEN 'INVALID_JSON'
        WHEN length(event_data) = 0 THEN 'EMPTY_PAYLOAD'
        ELSE ''
    END AS processing_errors,
    
    -- Basic duplicate detection
    CASE
        WHEN JSONExtractString(event_data, 'isDuplicate') = 'true' THEN 1
        ELSE 0
    END AS is_duplicate,
    
    -- Complete Kafka lineage (essential for data engineering)
    _topic AS kafka_topic,
    _partition AS kafka_partition,
    _offset AS kafka_offset,
    _timestamp AS kafka_timestamp,
    _key AS kafka_key,
    
    -- Minimal correlation IDs (for Silver layer efficiency)
    JSONExtractString(event_data, 'txnId') AS txn_id,
    JSONExtractString(event_data, 'sessionId') AS session_id,
    JSONExtractString(event_data, 'requestUrl') AS request_url,
    JSONExtractUInt(event_data, 'responseCode') AS response_code
    
FROM bronze.kafka_finvu_events
WHERE event_data != '' AND length(event_data) > 10
COMMENT 'Minimal processing for maximum ingestion performance';

-- =====================================================
-- PERFORMANCE OPTIMIZATION
-- =====================================================

-- Essential indices for Silver layer access patterns
ALTER TABLE bronze.finvu_events 
ADD INDEX idx_event_name (event_name) TYPE set(100) GRANULARITY 4;

ALTER TABLE bronze.finvu_events
ADD INDEX idx_event_source (event_source) TYPE set(10) GRANULARITY 4;

ALTER TABLE bronze.finvu_events
ADD INDEX idx_data_quality (data_quality) TYPE set(5) GRANULARITY 4;

-- =====================================================
-- ENGINEERING MONITORING VIEWS
-- =====================================================

-- 1. Ingestion Performance Monitor
CREATE VIEW bronze.v_ingestion_performance AS
SELECT
    toStartOfHour(ingestion_timestamp) AS hour,
    event_source,
    count(*) AS events_ingested,
    round(count(*) / 3600, 2) AS events_per_second,
    countIf(data_quality = 'CORRUPTED') AS corrupted_events,
    countIf(processing_errors != '') AS error_count,
    max(ingestion_timestamp) AS latest_ingestion,
    dateDiff('minute', max(event_timestamp), max(ingestion_timestamp)) AS lag_minutes
FROM bronze.finvu_events
WHERE ingestion_timestamp >= now() - INTERVAL 24 HOUR
GROUP BY hour, event_source
ORDER BY hour DESC, events_ingested DESC
COMMENT 'Engineering performance metrics for Bronze layer';

-- 2. Event Type Coverage Monitor
CREATE VIEW bronze.v_event_coverage AS
SELECT
    event_source,
    event_type,
    count(DISTINCT event_name) AS unique_event_types,
    count(*) AS total_events,
    min(event_timestamp) AS first_seen,
    max(event_timestamp) AS last_seen,
    round(avg(event_data_size), 0) AS avg_event_size_bytes
FROM bronze.finvu_events
WHERE ingestion_timestamp >= now() - INTERVAL 24 HOUR
GROUP BY event_source, event_type
ORDER BY total_events DESC
COMMENT 'Complete event type coverage validation';

-- 3. Kafka Consumer Health Monitor
CREATE VIEW bronze.v_kafka_consumer_health AS
SELECT
    kafka_topic,
    kafka_partition,
    count(*) AS events_consumed,
    max(kafka_offset) AS latest_offset,
    max(kafka_timestamp) AS latest_kafka_timestamp,
    dateDiff('minute', max(kafka_timestamp), max(ingestion_timestamp)) AS consumer_lag_minutes,
    countIf(processing_errors != '') AS processing_errors
FROM bronze.finvu_events
WHERE ingestion_timestamp >= now() - INTERVAL 1 HOUR
GROUP BY kafka_topic, kafka_partition
ORDER BY consumer_lag_minutes DESC
COMMENT 'Kafka consumer performance and health monitoring';

-- 4. Data Quality Trends
CREATE VIEW bronze.v_data_quality_trends AS
SELECT
    toStartOfHour(ingestion_timestamp) AS hour,
    data_quality,
    count(*) AS event_count,
    count(DISTINCT event_name) AS unique_event_types,
    groupArray(DISTINCT processing_errors) AS error_types,
    round(avg(event_data_size), 2) AS avg_event_size
FROM bronze.finvu_events
WHERE ingestion_timestamp >= now() - INTERVAL 24 HOUR
  AND data_quality != 'CAPTURED'  -- Focus on problematic events
GROUP BY hour, data_quality
ORDER BY hour DESC, event_count DESC
COMMENT 'Data quality trends for operational alerts';

-- =====================================================
-- SILVER LAYER INTEGRATION EXAMPLES
-- =====================================================

/*
-- Silver Layer will perform comprehensive business logic extraction:

-- 1. Event Correlation & Stitching
SELECT 
    event_data,
    -- Extract business context in Silver:
    JSONExtractString(event_data, 'consentHandle') AS consent_handle,
    JSONExtractString(event_data, 'fiu') AS fiu_id,
    JSONExtractString(event_data, 'purposeCode') AS purpose_code,
    -- Multi-path extraction for schema evolution:
    COALESCE(
        JSONExtractString(event_data, 'consentHandle'),
        JSONExtractString(event_data, 'consentId'),
        JSONExtractString(event_data, 'consent_handle')
    ) AS consent_handle_enhanced
FROM bronze.finvu_events
WHERE event_source = 'AAServer:ConsentFlow'
  AND event_timestamp >= now() - INTERVAL 1 HOUR;

-- 2. Journey Reconstruction (Silver Layer Responsibility)
-- Correlate events by consent_handle, session_id, txn_id
-- Apply business rules for revenue attribution
-- Calculate journey metrics and KPIs

-- 3. Business Enrichment (Silver Layer Responsibility)
-- FIU client attribution and billing calculations
-- Purpose code revenue mapping (₹3 vs ₹0.10)
-- Customer journey analysis and funnel metrics
*/

-- =====================================================
-- DEPLOYMENT CHECKLIST
-- =====================================================

/*
ENGINEERING-FOCUSED BRONZE LAYER DEPLOYMENT CHECKLIST:

PHASE 1 - INFRASTRUCTURE SETUP:
☐ Kafka broker accessible from ClickHouse
☐ Topic 'finvu-aa-events' created with 6-12 partitions  
☐ Consumer group 'clickhouse-finvu-consumer' configured
☐ Bronze database and tables created successfully
☐ Materialized view processing events without errors
☐ All 4 monitoring views created and functional

PHASE 2 - EVENT VALIDATION (Engineering Focus):
☐ All 43 event types being captured correctly
☐ 6 event sources properly classified (UserEndpoint, ConsentFlow, DataFlow, etc.)
☐ Event types (HTTP_IN/OUT, WS_IN/OUT, DEFAULT) correctly identified
☐ JSON preservation maintaining all original event data
☐ No missing events (compare Kafka consumer lag with ingestion rate)
☐ Complete Kafka lineage metadata captured

PHASE 3 - PERFORMANCE & RELIABILITY:
☐ Ingestion rate >10K events/minute sustained
☐ Processing lag &lt;15 minutes consistently
☐ All monitoring views responding &lt;3 seconds
☐ Data quality assessment working (CAPTURED/CORRUPTED/PARTIAL)
☐ Index performance optimized for Silver layer access
☐ TTL and partitioning working for 7-year retention

PHASE 4 - SILVER LAYER INTEGRATION:
☐ Silver layer can efficiently access Bronze staging data
☐ Complete event data available for business logic processing
☐ Event routing by event_source working correctly
☐ Temporal ordering maintained for journey reconstruction
☐ Kafka lineage available for debugging and replay

PHASE 5 - OPERATIONAL READINESS:
☐ Alerts configured for ingestion lag, error rates, data quality
☐ Monitoring dashboards created using provided views
☐ Runbooks created for common operational scenarios
☐ Performance tuning guidelines documented
☐ Schema evolution procedures documented
☐ Clear handoff procedures to Silver layer team
*/ ```