From raw ingestion pipelines to curated analytics layers — we own the full data lifecycle.
Every pipeline is version-controlled, tested, and documented. No black boxes — you own your codebase.
from pyspark.sql import SparkSession from pyspark.sql.functions import col, to_timestamp # Initialize Spark with Iceberg catalog spark = (SparkSession.builder .appName("salesforce_ingestion") .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") .getOrCreate()) # Read from S3 bronze layer df = (spark.read.parquet("s3://datalake-bronze/salesforce/accounts/") .filter(col("_ingested_at") >= watermark)) # Apply transformations silver_df = (df .withColumn("created_date", to_timestamp(col("CreatedDate"))) .dropDuplicates(["Id"]) .select("Id", "Name", "Industry", "created_date")) # Upsert into Iceberg silver table (MERGE INTO) silver_df.createOrReplaceTempView("updates") spark.sql(""" MERGE INTO glue_catalog.silver.accounts t USING updates s ON t.Id = s.Id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * """)
from airflow.decorators import dag, task from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from datetime import datetime, timedelta default_args = { "owner": "data-engineering", "retries": 3, "retry_delay": timedelta(minutes=5), "email_on_failure": True, } @dag(schedule_interval="0 * * * *", start_date=datetime(2025,1,1), default_args=default_args, catchup=False) def salesforce_ingestion_pipeline(): wait_for_extract = S3KeySensor( task_id="wait_for_extract", bucket_name="datalake-bronze", bucket_key="salesforce/accounts/{{ ds }}/_SUCCESS", timeout=3600, poke_interval=60, ) transform_silver = GlueJobOperator( task_id="transform_silver", job_name="salesforce_silver_transform", script_location="s3://glue-scripts/silver_transform.py", iam_role_name="GlueServiceRole", ) wait_for_extract >> transform_silver dag = salesforce_ingestion_pipeline()
-- models/gold/dim_accounts.sql -- Grain: one row per Salesforce Account {{ config( materialized='incremental', unique_key='account_id', on_schema_change='sync_all_columns', file_format='iceberg', ) }} with source as ( select * from {{ source('silver', 'accounts') }} {{ if is_incremental() }} where updated_at > (select max(updated_at) from {{ this }}) {{ endif }} ), cleaned as ( select id as account_id, name as account_name, industry, annual_revenue, number_of_employees, created_date::date as account_created_date, current_timestamp() as dbt_updated_at from source where id is not null ) select * from cleaned
import pulumi import pulumi_aws as aws # S3 Datalake Bucket bucket = aws.s3.BucketV2("datalake-bronze", bucket="my-company-datalake-bronze", tags={"env": "prod", "team": "data-eng"}) # Glue Database & Catalog glue_db = aws.glue.CatalogDatabase("silver_db", name="silver", description="Silver layer — cleaned & conformed") # EMR Serverless Application emr_app = aws.emrserverless.Application("spark-app", release_label="emr-7.0.0", type="SPARK", name="datalake-spark", auto_stop_configuration=aws.emrserverless.ApplicationAutoStopConfigurationArgs( enabled=True, idle_timeout_minutes=15), maximum_capacity=aws.emrserverless.ApplicationMaximumCapacityArgs( cpu="400 vCPU", memory="3000 GB")) pulumi.export("bucket_arn", bucket.arn) pulumi.export("emr_app_id", emr_app.id)
Battle-tested tools, open standards, and fully cloud-native architectures built to scale with your business.
Our proven 5-step engagement model gets you to production fast, with zero surprises.
No sales pitch, just real solutions. Talk to one of our engineers today and let's figure out exactly what you need.