Bulk Importing Objects with Python

Here's an easy way to bulk import data into SalesForce with Python 3.


import csv
import salesforce

data_combined = {}


SALESFORCE_EMAIL = 'PersonEmail'
SALESFORCE_OWNER_ID = 'OwnerId'
SALESFORCE_RECORD_TYPE = 'RecordTypeId'
SALESFORCE_NAME_LAST = 'LastName'
SALESFORCE_NAME_FIRST = 'FirstName'

SALESFORCE_SHIPPING_STREET = 'ShippingStreet'
SALESFORCE_SHIPPING_CITY = 'ShippingCity'
SALESFORCE_SHIPPING_STATE = 'ShippingState'
SALESFORCE_SHIPPING_POSTAL_CODE = 'ShippingPostalCode'
SALESFORCE_SHIPPING_COUNTRY = 'ShippingCountry'

SALESFORCE_BILLING_STREET = 'BillingStreet'
SALESFORCE_BILLING_CITY = 'BillingCity'
SALESFORCE_BILLING_STATE = 'BillingState'
SALESFORCE_BILLING_POSTAL_CODE = 'BillingPostalCode'
SALESFORCE_BILLING_COUNTRY = 'BillingCountry'
SALESFORCE_BILLING_MOBILE_PHONE = 'Phone'


defaults = {
    SALESFORCE_RECORD_TYPE: 'add record type id here',
    SALESFORCE_OWNER_ID: 'add owner id here',
}

def update_record(email, **data):
    email = email.lower()
    if data_combined.get(email, None) is None:
        data_combined[email] = {}
        data_combined[email].update(defaults)

    data_combined[email].update(data)


def import_from_csv():
    with open('./accounts.csv') as csvfile:
        reader = csv.DictReader(csvfile, delimiter=',', quotechar='"')
        for row in reader:
            update_record(row.get('Email'), **{
                SALESFORCE_EMAIL: row.get('Email'),
                SALESFORCE_NAME_LAST: row.get('First Name'),
                SALESFORCE_NAME_FIRST: row.get('Last Name'),
                # More fields here
            })

    for k, v in data_combined.items():
        if v.get(SALESFORCE_NAME_LAST, None) in ['', None]:
            v[SALESFORCE_NAME_LAST] = k.split('@')[0]
        salesforce.add_user(k, v, True)


import csv
import io
import json
import os
import time

import requests

from bulk_import import db

consumer_key = os.environ.get('SALESFORCE_CONSUMER_KEY')
consumer_secret = os.environ.get('SALESFORCE_CONSUMER_SECRET')
username = os.environ.get('SALESFORCE_USERNAME')
password = os.environ.get('SALESFORCE_PASSWORD')
salesforce_domain = os.environ.get('SALESFORCE_DOMAIN')


def get_access_token():
    auth_request = requests.post('https://login.salesforce.com/services/oauth2/token', data={
        'grant_type': 'password',
        'client_id': consumer_key,
        'client_secret': consumer_secret,
        'username': username,
        'password': password,
    })
    return auth_request.json().get('access_token')


_bearer_token = get_access_token()


base_headers = {
    'Authorization': 'Bearer {}'.format(_bearer_token),
}


def extend_headers(headers, from_headers=None):
    from_headers = from_headers or base_headers
    for key, val in from_headers.items():
        headers[key] = val
    return headers


json_request = {
    'Content-Type': 'application/json'
}
extend_headers(json_request)


csv_request = {
    'Content-Type': 'text/csv',
}
extend_headers(csv_request)

json_response = {
    'Accept': 'application/json',
}

csv_response = {
    'Accept': 'text/csv',
}
extend_headers(csv_response)

bulk_users = {}
bulk_amount = 50000


def format_column(value):
    if isinstance(value, str):
        return f'"{value}"'
    if isinstance(value, bool):
        return value and str(1) or str(0)
    return str(value)


def add_bulk_users():
    bulk_fields = []
    for email, data in bulk_users.items():
        for key, _ in data.items():
            bulk_fields.append(key)

    bulk_fields = list(set(bulk_fields))

    # Headers
    data = [','.join([f'"{k}"' for k in bulk_fields])]

    # User Data
    for email, _data in bulk_users.items():
        data.append(','.join([format_column(_data.get(k, None)) for k in bulk_fields]))

    csv_data = '\r\n'.join(data)

    try:
        response = requests.post(
            f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/jobs/ingest/',
            headers=json_request,
            data=json.dumps({
                "object": "Account",
                "contentType": "CSV",
                "operation": "insert",
                "lineEnding": "CRLF"
            }),
            timeout=None
        )
        job_data = json.loads(response.content)
        url = job_data.get('contentUrl', None)
        job_id = job_data.get('id', None)
    except:
        return

    # Upload csv data
    try:
        requests.put(
            f'https://{salesforce_domain}.my.salesforce.com/{url}',
            headers=csv_request,
            data=csv_data,
            timeout=None
        )
    except:
        return

    # Start batch import
    try:
        requests.patch(
            f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/jobs/ingest/{job_id}/',
            headers=json_request,
            data=json.dumps({
                'state': 'UploadComplete'
            }),
            timeout=None
        )
    except:
        return

    job_done = False
    job_failed = False
    while not job_done and not job_failed:
        try:
            response = requests.get(
                f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/jobs/ingest/{job_id}/',
                headers=json_request,
                timeout=None
            )
            json_data = json.loads(response.content)
            state = json_data.get('state', None)
            if state == 'JobComplete':
                job_done = True
            elif state == 'Failed':
                job_failed = True
            else:
                time.sleep(4)
        except:
            return

    if job_done:
        try:
            response = requests.get(
                f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/jobs/ingest/{job_id}/successfulResults/',
                headers=extend_headers({}, csv_response),
                timeout=None
            )
            csv_data = response.content
            reader = csv.DictReader(io.StringIO(csv_data.decode('utf-8')), delimiter=',', quotechar='"')
            for row in reader:
                email = row.get('PersonEmail')
                sf_id = row.get('sf__Id')
                db.add_user(sf_id, email)
        except:
            return


def add_user(email, data, bulk=False):
    if bulk:
        bulk_users[email] = data
        if len(bulk_users.keys()) >= bulk_amount:
            add_bulk_users()
            bulk_users.clear()
    else:
        try:
            response = requests.post(
                'https://{company_domain}.my.salesforce.com/services/data/v52.0/sobjects/Account/',
                headers=json_request,
                data=json.dumps(data),
                timeout=None
            )
            json_data = json.loads(response.content)
            if response.status_code == 201:
                return json_data
            else:
                print(f'failed to add {email}', response.status_code, response.content)
                return None
        except:
            return None


def update_user(salesforce_id, data):
    response = requests.patch(
        f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/sobjects/Account/{salesforce_id}',
        headers=json_request,
        data=json.dumps(data)
    )

    return 199 < response.status_code < 300