Skip to main content

Macro Reference

All dbt macros in Konsolidat, organized by file.

dimension_helpers.sql

Macros driven by var('dimensions') in dbt_project.yml. Each dimension is a dict with keys: name, source_column, label, cube_type, in_budget, allocation_role.

get_dimensions()

Returns the full var('dimensions') list.

{% set dims = get_dimensions() %}
{# Returns: [{name: 'dim_cost_center', source_column: 'CostCenter', ...}, ...] #}

get_budget_dimensions()

Returns only dimensions where in_budget: true.

{% set budget_dims = get_budget_dimensions() %}
{# Returns dims for dim_cost_center and dim_department (not dim_business_unit) #}

get_allocation_cost_center_dim()

Returns the name of the dimension with allocation_role: 'cost_center'. Falls back to 'dim_cost_center'.

{% set cc_dim = get_allocation_cost_center_dim() %}
{# Returns: 'dim_cost_center' #}

dim_select(prefix='', dims=none)

Generates a comma-separated SELECT list of dimension columns.

select {{ dim_select('gl.') }}
-- Output: gl.dim_cost_center, gl.dim_department, gl.dim_business_unit

dim_group_by(prefix='', dims=none)

Generates a comma-separated GROUP BY list.

group by {{ dim_group_by('gl.') }}
-- Output: gl.dim_cost_center, gl.dim_department, gl.dim_business_unit

dim_join_on(left, right, dims=none)

Generates AND join conditions for all dimensions.

on a.data_area_id = b.data_area_id
{{ dim_join_on('a', 'b') }}
-- Output: and a.dim_cost_center = b.dim_cost_center
-- and a.dim_department = b.dim_department
-- and a.dim_business_unit = b.dim_business_unit

dim_coalesce(left, right, dims=none)

Generates COALESCE expressions for FULL OUTER JOIN results.

select {{ dim_coalesce('a', 'b') }}
-- Output: coalesce(a.dim_cost_center, b.dim_cost_center) as dim_cost_center,
-- coalesce(a.dim_department, b.dim_department) as dim_department, ...

dim_partition_by(prefix='', dims=none)

Generates a PARTITION BY clause for window functions.

sum(amount) over (partition by {{ dim_partition_by('t.') }})
-- Output: t.dim_cost_center, t.dim_department, t.dim_business_unit

dim_empty_strings(dims=none)

Generates '' AS dim_name for layers that don't have dimension data (IC eliminations, CTA, etc.).

select {{ dim_empty_strings() }}
-- Output: '' as dim_cost_center, '' as dim_department, '' as dim_business_unit

dim_harmonize_select(raw_alias='unioned', map_prefix='dmap_', dims=none)

Generates harmonized SELECT expressions: the mapped canonical_value when present, otherwise the raw value passes through. Uses an empty-string check (not coalesce) because ClickHouse LEFT JOIN fills unmatched rows with the column default ('' for String), not NULL.

select {{ dim_harmonize_select() }}
-- Output: if(dmap_dim_cost_center.canonical_value != '', dmap_dim_cost_center.canonical_value, unioned.dim_cost_center) as dim_cost_center,
-- if(dmap_dim_department.canonical_value != '', dmap_dim_department.canonical_value, unioned.dim_department) as dim_department, ...

dim_harmonize_joins(erp_source_col, raw_alias='unioned', map_prefix='dmap_', dims=none)

Generates one LEFT JOIN against the dimension_mappings seed per dimension. erp_source_col is a SQL expression for the per-row erp_source column so each source's values map correctly. Joins only status = 'Published' mappings matched on dimension, ERP source, and source value.

{{ dim_harmonize_joins('unioned.erp_source') }}
-- Output: left join dimension_mappings as dmap_dim_cost_center
-- on dmap_dim_cost_center.status = 'Published'
-- and dmap_dim_cost_center.dimension = 'dim_cost_center'
-- and dmap_dim_cost_center.erp_source = unioned.erp_source
-- and dmap_dim_cost_center.source_value = unioned.dim_cost_center
-- left join dimension_mappings as dmap_dim_department ...

dim_select_from_source(prefix='', dims=none)

Maps source columns (D365 OData names) to dimension columns with null-safe casting.

select {{ dim_select_from_source('raw.') }}
-- Output: toString(assumeNotNull(coalesce(raw.CostCenter, ''))) as dim_cost_center,
-- toString(assumeNotNull(coalesce(raw.Department, ''))) as dim_department, ...

measure_helpers.sql

Macros driven by var('base_measures') in dbt_project.yml. Each measure has keys: name, expression, label, cube_type.

measure_select()

Generates aggregate expressions for gold_trial_balance.

select {{ measure_select() }}
-- Output: sum(debit_amount) as period_debit,
-- sum(credit_amount) as period_credit,
-- sum(accounting_currency_amount) as period_net_amount,
-- count(*) as transaction_count

measure_passthrough(prefix='')

Generates column references for downstream models.

select {{ measure_passthrough('tb.') }}
-- Output: tb.period_debit, tb.period_credit, tb.period_net_amount, tb.transaction_count

db_adapter.sql

ClickHouse-specific adapter macros. All wrap assumeNotNull() for null safety.

Type Casting

MacroSignatureOutput
cast_to_string(expr)(expr)toString(assumeNotNull(expr))
cast_to_int64(expr)(expr)toInt64(assumeNotNull(expr))
cast_to_int8(expr)(expr)toInt8(assumeNotNull(expr))
cast_to_uint16(expr)(expr)toUInt16(assumeNotNull(expr))
cast_to_uint8(expr)(expr)toUInt8(assumeNotNull(expr))
cast_to_float64(expr)(expr)toFloat64(assumeNotNull(expr))
cast_to_date(expr)(expr)toDate(assumeNotNull(expr))
cast_to_datetime(expr)(expr)toDateTime(assumeNotNull(expr))
cast_to_decimal128(expr, scale)(expr, scale)toDecimal128(assumeNotNull(expr), scale)

Date Functions

MacroSignatureOutput
extract_year(expr)(expr)toYear(expr)
extract_month(expr)(expr)toMonth(expr)
build_date_from_year_period(year_expr, period_expr)(year, period)toDate(concat(toString(greatest(year,1900)), '-', lpad(toString(greatest(period,1)),2,'0'), '-01'))

Utility

MacroSignatureOutput
latest_value_by(val_expr, key_expr)(val, key)argMax(val, key) — ClickHouse function returning val at max key
string_pad_left(expr, len, ch)(expr, len, ch)lpad(expr, len, ch)
epm_config(order_by='tuple()')(order_by)Returns {'engine': 'MergeTree()', 'order_by': order_by} for ClickHouse, {} otherwise

epm_config() Usage

{{
config(
materialized='table',
**epm_config(order_by='(data_area_id, fiscal_year, fiscal_period, main_account)')
)
}}

currency_conversion.sql

convert_currency(amount_column, from_currency_column, to_currency, rate_date_column, rate_type='Default')

Generates a correlated subquery against silver_exchange_rates.

select
{{ convert_currency('local_amount', 'accounting_currency', 'USD', 'period_date') }} as translated_amount

Expands to:

local_amount * coalesce(
(select er.exchange_rate
from silver_exchange_rates as er
where er.from_currency = accounting_currency
and er.to_currency = 'USD'
and er.valid_from <= period_date
and er.valid_to >= period_date
order by er.valid_from desc
limit 1),
1.0 -- Default: same-currency assumption
)

Parameters:

ParameterTypeDescription
amount_columnSQL expressionThe amount to convert
from_currency_columnSQL expressionColumn containing source currency code
to_currencyString literalTarget currency (e.g., 'USD')
rate_date_columnSQL expressionDate for rate lookup
rate_typeStringExchange rate type (default: 'Default')

allocation_engine.sql

allocation_engine(rule_id, driver_seed)

Single-step allocation for one rule.

{{ allocation_engine('ALLOC_001', 'allocation_drivers_headcount') }}

CTE chain:

  1. rule — Reads rule definition from allocation_rules seed
  2. source_pool — Sums period_net_amount from gold_trial_balance matching rule's source account/cost center
  3. drivers — Computes driver_weight = driver_value / SUM(driver_value) OVER (PARTITION BY entity, year, period)
  4. allocated — Cross-joins pool × rule, inner joins drivers, excludes self-allocation

Output columns: allocation_rule_id, data_area_id, fiscal_year, fiscal_period, source_account, target_cost_center, target_account, driver_type, pool_amount, driver_weight, allocated_amount

allocation_engine_multistep.sql

allocation_engine_multistep()

Three-step cascading allocation. No parameters — reads all rules from the allocation_rules seed.

{{ allocation_engine_multistep() }}

Steps:

  1. Step 1 (ALLOC_001): IT costs → headcount driver
  2. Step 2 (ALLOC_002): Facility costs + Step 1 cascade → sqm driver
  3. Step 3 (ALLOC_003): Management fees + Step 1+2 cascade → revenue driver

Revenue driver filters driver_value > 0 to avoid division issues.

Output: UNION ALL of all three steps with columns: allocation_rule_id, step_order, data_area_id, fiscal_year, fiscal_period, source_account, source_cost_center, target_cost_center, target_account, driver_type, pool_amount, driver_weight, allocated_amount

See Allocation Guide for a worked example.

allocation_engine_reciprocal.sql

allocation_engine_reciprocal()

Reciprocal (circular) allocation engine (PRD-18). No parameters. Reads rules where allocation_method = 'reciprocal' from the allocation_rules staging source and resolves cross-charging between cost centers using iterative convergence — up to 10 iterations, feeding each iteration's allocated amounts back as additional pool input until the feedback delta falls below 0.01. Reciprocal rules are processed before step-down rules.

{{ allocation_engine_reciprocal() }}

CTE chain: reciprocal_rulestb_base (base TB amounts) → driver_weights (normalized per driver type/entity/period) → iter0_pool / iter0_allocatediterN_feedback / iterN_allocated (for N in 1..10) → all_reciprocal (UNION ALL of every iteration) → reciprocal_results (sum across iterations per target).

Output columns: allocation_rule_id, data_area_id, fiscal_year, fiscal_period, source_cost_center, source_account, target_cost_center, target_account, driver_type, allocated_amount, final_iteration, allocation_method ('reciprocal')

allocation_engine_tiered.sql

allocation_engine_tiered()

Tiered & threshold allocation engine (PRD-20). No parameters. Joins allocation_rules to the allocation_tiers staging source for rules where driver_type = 'tiered'. Each tier applies its rate to the portion of the pool falling within its band: band_amount = greatest(0, least(pool_amount, upper_bound) - lower_bound), then clamps the result with the tier's floor and cap. Only rows with abs(tier_amount) > 0.01 are emitted.

{{ allocation_engine_tiered() }}

CTE chain: tiered_rules (rules ⋈ tiers) → tb_basetiered_poolstiered_allocated (per-tier band, rate, clamping).

Output columns: allocation_rule_id, step_order, data_area_id, fiscal_year, fiscal_period, source_account, source_cost_center, target_cost_center (''), target_account, driver_type ('tiered'), pool_amount, driver_weight (the tier rate), allocated_amount

resolve_allocation_driver.sql

resolve_allocation_driver_cte(rule_alias, drivers_alias)

Returns a SQL CASE fragment that resolves the driver weight for a rule based on its driver_formula field (PRD-19). Called inside the allocation engines to dispatch between driver kinds:

  • Simple (driver_formula = ''): use the driver weight directly.
  • Conditional (driver_formula like '%CASE%'): the formula is a raw SQL CASE that cannot be evaluated dynamically in dbt; it falls back to the simple driver weight (conditional logic is pre-evaluated at the Frappe write layer).
  • Composite (otherwise): pre-evaluated at the Frappe layer into combined driver_value rows with driver_type = 'composite_<rule_id>', so the resolved weight is again used directly.
{{ resolve_allocation_driver_cte('r', 'd') }}
-- Output: case
-- when r.driver_formula = '' then d.driver_weight
-- when r.driver_formula like '%CASE%' then d.driver_weight
-- else d.driver_weight
-- end

prorate_period_amount.sql

prorate_period_amount(amount_expr, acquisition_date_expr, year_expr, period_expr)

Prorates an amount to post-acquisition days only (PRD-11). Returns amount_expr multiplied by the fraction days_post_acquisition / days_in_period:

  • No acquisition constraint (acquisition_date <= '1900-01-01') or acquisition before the period start → factor 1.0 (full amount).
  • Acquisition on/after the period end → factor 0.0.
  • Acquisition within the period → prorated by day count.

acquisition_date_expr is a Date, year_expr a UInt16, period_expr a UInt8. Period boundaries are built via build_date_from_year_period, capping the next period at 13.

{{ prorate_period_amount('period_net_amount', 'acquisition_date', 'fiscal_year', 'fiscal_period') }}

resolve_period_ownership.sql

Temporal ownership/consolidation resolution (PRD-9). Both macros read from the ownership_periods staging source for the matching group/entity whose date window contains the period, ordered by most recent effective_date, and fall back to the static consolidation_groups seed.

resolve_period_ownership(group_col, entity_col, period_date_col)

Returns the ownership_pct in effect for the given consolidation group, entity, and period date.

{{ resolve_period_ownership('consolidation_group', 'data_area_id', 'period_date') }}

resolve_period_method(group_col, entity_col, period_date_col)

Returns the consolidation_method in effect for the given consolidation group, entity, and period date.

{{ resolve_period_method('consolidation_group', 'data_area_id', 'period_date') }}

cluster.sql

ClickHouse cluster-sharding support. Opt-in and OFF by default — activated by var('cluster_enabled', false) being true together with the cluster profile target. When disabled, every macro returns exactly the value of the literal it replaces, so single-node compiled SQL is byte-for-byte unchanged.

Configuration Helpers

MacroSignatureBehavior
cluster_enabled()()Returns var('cluster_enabled', false) coerced to bool.
cluster_name()()Returns 'konsol_cluster' when enabled, else none (no ON CLUSTER DDL emitted).
cluster_zk_path()()Returns the Keeper path template '/clickhouse/tables/{shard}/{database}/{table}'.
cluster_sharding_key(col='data_area_id')(col)Returns cityHash64(col) — colocates a legal entity on one shard.
cluster_sharded_tables()()Single source of truth listing the (database, table) pairs that get a Distributed overlay: (epm_bronze, bronze_general_journal_account_entries), (epm_bronze, bronze_general_journal_entries), (epm_gold, gold_trial_balance).

cluster_engine(base_engine)

Converts a plain MergeTree-family engine string into its Replicated* variant for cluster mode. Returns base_engine unchanged when cluster mode is off, or when the engine is already Replicated*/Distributed/unrecognized.

Input engineCluster output
MergeTree()ReplicatedMergeTree(zk_path, '{replica}')
ReplacingMergeTree(ver)ReplicatedReplacingMergeTree(zk_path, '{replica}', ver)
AggregatingMergeTree()ReplicatedAggregatingMergeTree(zk_path, '{replica}')
CollapsingMergeTree(sign)ReplicatedCollapsingMergeTree(zk_path, '{replica}', sign)
SummingMergeTree([cols])ReplicatedSummingMergeTree(zk_path, '{replica}'[, cols])

{replica} and the {shard}/{database}/{table} path segments are server-side macros expanded by ClickHouse at CREATE TABLE time.

{{ config(
engine = cluster_engine('ReplacingMergeTree(_airbyte_extracted_at)'),
order_by = '(data_area_id, accounting_date, recid)',
cluster = cluster_name()
) }}

Distributed Overlay Operations

Run-operation macros that manage the Distributed query-surface tables over the _local ReplicatedMergeTree tables. Both are no-ops (logging only) when cluster_enabled() is false.

create_distributed_tables(cluster_name_arg='konsol_cluster', shard_key='data_area_id')

Creates a Distributed('<cluster>', '<db>', '<table>_local', cityHash64(<shard_key>)) table for each entry in cluster_sharded_tables(), using CREATE TABLE IF NOT EXISTS (idempotent).

dbt run-operation create_distributed_tables

drop_distributed_tables(cluster_name_arg='konsol_cluster')

Drops the Distributed overlay tables (but not the _local ReplicatedMergeTree tables), using the same cluster_sharded_tables() source of truth.

dbt run-operation drop_distributed_tables
note

The Distributed overlay macros target a live multi-node ClickHouse cluster and cannot be exercised on a single-node setup; they are provided as DDL scaffolding.

source_adapters/d365_account_types.sql

map_account_type(column)

Maps D365 MainAccountType values to readable labels.

select {{ map_account_type('raw.Type') }} as account_type
D365 ValueOutput
'0' / 'ProfitAndLoss''Profit and loss'
'1' / 'Revenue''Revenue'
'2' / 'Expense''Expense'
'3' / 'BalanceSheet''Balance sheet'
'4' / 'Asset''Asset'
'5' / 'Liability''Liability'
'6' / 'Equity''Equity'
'7' / 'Total''Total'
OtherPassthrough