Skip to main content
Version: 1.0.0 (latest)

Salesforce

Need help deploying these sources, or figuring out how to run them in your data stack?
Join our Slack community or book a call with a dltHub Solutions Engineer.

Salesforce is a cloud platform that streamlines business operations and customer relationship management, encompassing sales, marketing, and customer service.

This Salesforce dlt verified source and pipeline example loads data using “Salesforce API” to the destination of your choice.

The resources that this verified source supports are:

NameModeDescription
Userreplacerefers to an individual who has access to a Salesforce org or instance
UserRolereplacea standard object that represents a role within the organization's hierarchy
Leadreplaceprospective customer/individual/org. that has shown interest in a company's products/services
Contactreplacean individual person associated with an account or organization
Campaignreplacemarketing initiative or project designed to achieve specific goals, such as generating leads etc.
Product2replacefor managing and organizing your product-related data within the Salesforce ecosystem
Pricebook2replaceused to manage product pricing and create price books
PricebookEntryreplacean object that represents a specific price for a product in a price book
Opportunitymergerepresents a sales opportunity for a specific account or contact
OpportunityLineItemmergerepresents individual line items or products associated with an opportunity
OpportunityContactRolemergerepresents the association between an Opportunity and a contact
Accountmergeindividual or organization that interacts with your business
CampaignMembermergeassociation between a contact or lead and a campaign
Taskmergeused to track and manage various activities and tasks within the salesforce platform
Eventmergeused to track and manage calendar-based events, such as meetings, appointments calls, or any other time-specific activities
  • Note that formula fields are included - these function like Views in salesforce and will not be back-updated when their definitions change in Salesforce! The recommended handling is to ignore these fields and reproduce yourself any calculations from the base data fields.

Setup Guide

Grab credentials

To set up your pipeline, you'll need your Salesforce user_name, password, and security_token. Use your login credentials for user_name and password.

To obtain the security_token, follow these steps:

  1. Log into Salesforce with your credentials.

  2. Click your profile picture/avatar at the top-right.

  3. Select "Settings" from the drop-down.

  4. Under "Personal Setup" in the sidebar, choose "My Personal Information" > "Reset My Security Token".

  5. Click "Reset Security Token".

  6. Check your email for the token sent by Salesforce.

Note: The Salesforce UI, which is described here, might change. The full guide is available at this link.

Initialize the verified source

To get started with your data pipeline, follow these steps:

  1. Enter the following command:

    dlt init salesforce duckdb

    This command will initialize the pipeline example with Salesforce as the source and duckdb as the destination.

  2. If you'd like to use a different destination, simply replace duckdb with the name of your preferred destination.

  3. After running this command, a new directory will be created with the necessary files and configuration settings to get started.

For more information, read the guide on how to add a verified source.

Add credentials

  1. Inside the .dlt folder, you'll find a file called secrets.toml, which is where you can securely store your access tokens and other sensitive information. It's important to handle this file with care and keep it safe. Here's what the file looks like:

    # put your secret values and credentials here. do not share this file and do not push it to github
    [sources.salesforce]
    user_name = "please set me up!" # Salesforce user name
    password = "please set me up!" # Salesforce password
    security_token = "please set me up!" # Salesforce security token
  2. In secrets.toml, replace username and password with your Salesforce credentials.

  3. Update the security_token value with the token you copied earlier for secure Salesforce access.

  4. Next, follow the destination documentation instructions to add credentials for your chosen destination, ensuring proper routing of your data to the final destination.

For more information, read the General Usage: Credentials.

Run the pipeline

  1. Before running the pipeline, ensure that you have installed all the necessary dependencies by running the command:
    pip install -r requirements.txt
  2. You're now ready to run the pipeline! To get started, run the following command:
    python salesforce_pipeline.py
  3. Once the pipeline has finished running, you can verify that everything loaded correctly by using the following command:
    dlt pipeline <pipeline_name> show
    For example, the pipeline_name for the above pipeline example is salesforce, you may also use any custom name instead.

For more information, read the guide on how to run a pipeline.

Sources and resources

dlt works on the principle of sources and resources.

Source salesforce_source:

This function returns a list of resources to load users, user_role, opportunity, opportunity_line_item, account etc. data from Salesforce API.

@dlt.source(name="salesforce")
def salesforce_source(
user_name: str = dlt.secrets.value,
password: str = dlt.secrets.value,
security_token: str = dlt.secrets.value,
) ->Iterable[DltResource]:
...
  • user_name: Your Salesforce account username.

  • password: Corresponding Salesforce password.

  • security_token: Token for Salesforce API authentication, configured in ".dlt/secrets.toml".

Resource sf_user (replace mode):

This resource function retrieves records from the Salesforce "User" endpoint.

@dlt.resource(write_disposition="replace")
def sf_user() -> Iterator[Dict[str, Any]]:
yield from get_records(client, "User")

Besides "sf_user", there are several resources that use replace mode for data writing to the destination.

user_role()contact()lead()campaign()product_2()pricebook_2()pricebook_entry()

The described functions fetch records from endpoints based on their names, e.g. user_role() accesses the "user_role" endpoint.

Resource opportunity (incremental loading):

This resource function retrieves records from the Salesforce "Opportunity" endpoint in incremental mode.

@dlt.resource(write_disposition="merge")
def opportunity(
last_timestamp: Incremental[str] = dlt.sources.incremental(
"SystemModstamp", initial_value=None
)
) -> Iterator[Dict[str, Any]]:

yield from get_records(
client, "Opportunity", last_timestamp.last_value, "SystemModstamp"
)

last_timestamp: Argument that will receive incremental state, initialized with "initial_value". It is configured to track "SystemModstamp" field in data item returned by "get_records" and then yielded. It will store the newest "SystemModstamp" value in dlt state and make it available in "last_timestamp.last_value" on next pipeline run.

Besides "opportunity", there are several resources that use replace mode for data writing to the destination.

opportunity_line_item()opportunity_contact_role()account()campaign_member()task()event()

The described functions fetch records from endpoints based on their names, e.g., opportunity_line_item() accesses the "opportunity_line_item" endpoint.

Customization

Create your own pipeline

If you wish to create your own pipelines, you can leverage source and resource methods as discussed above.

To create your data pipeline using single loading and incremental data loading, follow these steps:

  1. Configure the pipeline by specifying the pipeline name, destination, and dataset as follows:

    pipeline = dlt.pipeline(
    pipeline_name="salesforce_pipeline", # Use a custom name if desired
    destination="duckdb", # Choose the appropriate destination (e.g., duckdb, redshift, post)
    dataset_name="salesforce_data", # Use a custom name if desired
    )

    To read more about pipeline configuration, please refer to our documentation.

  2. To load data from all the endpoints, use the salesforce_source method as follows:

    load_data = salesforce_source()
    source.schema.merge_hints({"not_null": ["id"]}) # Hint for id field not null
    load_info = pipeline.run(load_data)
    # print the information on data that was loaded
    print(load_info)

    A hint ensures that the id column is void of null values. During data loading, dlt will verify that the source's id column doesn't contain nulls.

  3. To use the method pipeline.run() to load custom endpoints “candidates” and “members”:

    load_info = pipeline.run(load_data.with_resources("opportunity", "contact"))
    # print the information on data that was loaded
    print(load_info)

    In the initial run, the "opportunity" and "contact" endpoints load all data using 'merge' mode and 'last_timestamp' set to "None". In subsequent runs, only data after 'last_timestamp.last_value' (from the previous run) is merged. Incremental loading is specific to endpoints in merge mode with the “dlt.sources.incremental” parameter.

    For incremental loading of endpoints, maintain the pipeline name and destination dataset name. The pipeline name is important for accessing the state from the last run, including the end date for incremental data loads. Altering these names could trigger a “dev-mode”, disrupting the metadata tracking for incremental data loading.

  4. To load data from the “contact” in replace mode and “task” incrementally merge mode endpoints:

    load_info = pipeline.run(load_data.with_resources("contact", "task"))
    # pretty print the information on data that was loaded
    print(load_info)

    Note: In the referenced pipeline, the "contact" parameter is always loaded in "replace" mode, overwriting existing data. Conversely, the "task" endpoint supports "merge" mode for incremental loads, updating or adding data based on the 'last_timestamp' value without erasing previously loaded data.

  5. Salesforce enforces specific limits on API data requests. These limits vary based on the Salesforce edition and license type, as outlined in the Salesforce API Request Limits documentation.

    To limit the number of Salesforce API data requests, developers can control the environment for production or development purposes. For development, you can set the IS_PRODUCTION variable to False in "salesforce/settings.py", which limits API call requests to 100. To modify this limit, you can update the query limit in "salesforce/helpers.py" as required.

    To read more about Salesforce query limits, please refer to their official documentation here.

Additional Setup guides

This demo works on codespaces. Codespaces is a development environment available for free to anyone with a Github account. You'll be asked to fork the demo repository and from there the README guides you with further steps.
The demo uses the Continue VSCode extension.

Off to codespaces!

DHelp

Ask a question

Welcome to "Codex Central", your next-gen help center, driven by OpenAI's GPT-4 model. It's more than just a forum or a FAQ hub – it's a dynamic knowledge base where coders can find AI-assisted solutions to their pressing problems. With GPT-4's powerful comprehension and predictive abilities, Codex Central provides instantaneous issue resolution, insightful debugging, and personalized guidance. Get your code running smoothly with the unparalleled support at Codex Central - coding help reimagined with AI prowess.