Bulk Importing Objects with Python
Here's an easy way to bulk import data into SalesForce with Python 3.
import csv
import salesforce
data_combined = {}
SALESFORCE_EMAIL = 'PersonEmail'
SALESFORCE_OWNER_ID = 'OwnerId'
SALESFORCE_RECORD_TYPE = 'RecordTypeId'
SALESFORCE_NAME_LAST = 'LastName'
SALESFORCE_NAME_FIRST = 'FirstName'
SALESFORCE_SHIPPING_STREET = 'ShippingStreet'
SALESFORCE_SHIPPING_CITY = 'ShippingCity'
SALESFORCE_SHIPPING_STATE = 'ShippingState'
SALESFORCE_SHIPPING_POSTAL_CODE = 'ShippingPostalCode'
SALESFORCE_SHIPPING_COUNTRY = 'ShippingCountry'
SALESFORCE_BILLING_STREET = 'BillingStreet'
SALESFORCE_BILLING_CITY = 'BillingCity'
SALESFORCE_BILLING_STATE = 'BillingState'
SALESFORCE_BILLING_POSTAL_CODE = 'BillingPostalCode'
SALESFORCE_BILLING_COUNTRY = 'BillingCountry'
SALESFORCE_BILLING_MOBILE_PHONE = 'Phone'
defaults = {
SALESFORCE_RECORD_TYPE: 'add record type id here',
SALESFORCE_OWNER_ID: 'add owner id here',
}
def update_record(email, **data):
email = email.lower()
if data_combined.get(email, None) is None:
data_combined[email] = {}
data_combined[email].update(defaults)
data_combined[email].update(data)
def import_from_csv():
with open('./accounts.csv') as csvfile:
reader = csv.DictReader(csvfile, delimiter=',', quotechar='"')
for row in reader:
update_record(row.get('Email'), **{
SALESFORCE_EMAIL: row.get('Email'),
SALESFORCE_NAME_LAST: row.get('First Name'),
SALESFORCE_NAME_FIRST: row.get('Last Name'),
# More fields here
})
for k, v in data_combined.items():
if v.get(SALESFORCE_NAME_LAST, None) in ['', None]:
v[SALESFORCE_NAME_LAST] = k.split('@')[0]
salesforce.add_user(k, v, True)
import csv
import io
import json
import os
import time
import requests
from bulk_import import db
consumer_key = os.environ.get('SALESFORCE_CONSUMER_KEY')
consumer_secret = os.environ.get('SALESFORCE_CONSUMER_SECRET')
username = os.environ.get('SALESFORCE_USERNAME')
password = os.environ.get('SALESFORCE_PASSWORD')
salesforce_domain = os.environ.get('SALESFORCE_DOMAIN')
def get_access_token():
auth_request = requests.post('https://login.salesforce.com/services/oauth2/token', data={
'grant_type': 'password',
'client_id': consumer_key,
'client_secret': consumer_secret,
'username': username,
'password': password,
})
return auth_request.json().get('access_token')
_bearer_token = get_access_token()
base_headers = {
'Authorization': 'Bearer {}'.format(_bearer_token),
}
def extend_headers(headers, from_headers=None):
from_headers = from_headers or base_headers
for key, val in from_headers.items():
headers[key] = val
return headers
json_request = {
'Content-Type': 'application/json'
}
extend_headers(json_request)
csv_request = {
'Content-Type': 'text/csv',
}
extend_headers(csv_request)
json_response = {
'Accept': 'application/json',
}
csv_response = {
'Accept': 'text/csv',
}
extend_headers(csv_response)
bulk_users = {}
bulk_amount = 50000
def format_column(value):
if isinstance(value, str):
return f'"{value}"'
if isinstance(value, bool):
return value and str(1) or str(0)
return str(value)
def add_bulk_users():
bulk_fields = []
for email, data in bulk_users.items():
for key, _ in data.items():
bulk_fields.append(key)
bulk_fields = list(set(bulk_fields))
# Headers
data = [','.join([f'"{k}"' for k in bulk_fields])]
# User Data
for email, _data in bulk_users.items():
data.append(','.join([format_column(_data.get(k, None)) for k in bulk_fields]))
csv_data = '\r\n'.join(data)
try:
response = requests.post(
f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/jobs/ingest/',
headers=json_request,
data=json.dumps({
"object": "Account",
"contentType": "CSV",
"operation": "insert",
"lineEnding": "CRLF"
}),
timeout=None
)
job_data = json.loads(response.content)
url = job_data.get('contentUrl', None)
job_id = job_data.get('id', None)
except:
return
# Upload csv data
try:
requests.put(
f'https://{salesforce_domain}.my.salesforce.com/{url}',
headers=csv_request,
data=csv_data,
timeout=None
)
except:
return
# Start batch import
try:
requests.patch(
f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/jobs/ingest/{job_id}/',
headers=json_request,
data=json.dumps({
'state': 'UploadComplete'
}),
timeout=None
)
except:
return
job_done = False
job_failed = False
while not job_done and not job_failed:
try:
response = requests.get(
f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/jobs/ingest/{job_id}/',
headers=json_request,
timeout=None
)
json_data = json.loads(response.content)
state = json_data.get('state', None)
if state == 'JobComplete':
job_done = True
elif state == 'Failed':
job_failed = True
else:
time.sleep(4)
except:
return
if job_done:
try:
response = requests.get(
f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/jobs/ingest/{job_id}/successfulResults/',
headers=extend_headers({}, csv_response),
timeout=None
)
csv_data = response.content
reader = csv.DictReader(io.StringIO(csv_data.decode('utf-8')), delimiter=',', quotechar='"')
for row in reader:
email = row.get('PersonEmail')
sf_id = row.get('sf__Id')
db.add_user(sf_id, email)
except:
return
def add_user(email, data, bulk=False):
if bulk:
bulk_users[email] = data
if len(bulk_users.keys()) >= bulk_amount:
add_bulk_users()
bulk_users.clear()
else:
try:
response = requests.post(
'https://{company_domain}.my.salesforce.com/services/data/v52.0/sobjects/Account/',
headers=json_request,
data=json.dumps(data),
timeout=None
)
json_data = json.loads(response.content)
if response.status_code == 201:
return json_data
else:
print(f'failed to add {email}', response.status_code, response.content)
return None
except:
return None
def update_user(salesforce_id, data):
response = requests.patch(
f'https://{salesforce_domain}.my.salesforce.com/services/data/v52.0/sobjects/Account/{salesforce_id}',
headers=json_request,
data=json.dumps(data)
)
return 199 < response.status_code < 300