Skip to content
This repository has been archived by the owner on Jul 27, 2022. It is now read-only.

Examples

Bryan McBride edited this page Apr 23, 2020 · 27 revisions

Create records from CSV file

import csv
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException

api_key = 'your_api_key'
form_id = 'your_form_id'
fulcrum = Fulcrum(key=api_key)
records = csv.reader(open('new-records.csv'), delimiter=',')

#### CSV format ###
#latitude,longitude,status,name,technician
#27.770787,-82.638039,Excellent,Fulcrum HQ,Bryan

# skip header
next(records, None)

# loop through CSV projects
for index, row in enumerate(records):
    # build record object
    record = {
        "record": {
            "form_id": form_id,
            "latitude": row[0],
            "longitude": row[1],
            "status": row[2],
            "form_values": {
                "925c": row[3],
                "784b": {
                    "choice_values": [row[4]]
                }
            }
        }
    }
    # create record
    fulcrum.records.create(record)
    print('Record ' + str(index) + ' successfully created!')

Update records from CSV file

import csv
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException

api_key = 'your_api_key'
form_id = 'your_form_id'
fulcrum = Fulcrum(key=api_key)
updates = csv.reader(open('records-to-update.csv'), delimiter=',')

#### CSV format ###
#fulcrum_id,latitude,longitude,status,name,technician
#d59139d1-9551-4e59-a1eb-8fa29323850f,27.770787,-82.638039,Excellent,Fulcrum HQ,Bryan

# skip header
next(updates, None)

# loop through CSV records
for row in updates:
    # fetch existing record (first column must contain fulcrum_id)
    try:
        data = fulcrum.records.find(row[0])
    except NotFoundException:
        print('No record found with id ' + row[0])
        continue

    # update fields with CSV values
    data['record']['latitude'] = row[1]
    data['record']['longitude'] = row[2]
    data['record']['status'] = row[3]
    data['record']['form_values']['925c'] = row[4]
    data['record']['form_values']['784b'] = {"choice_values": [row[5]]}

    # send updates back to Fulcrum
    record = fulcrum.records.update(row[0], data)
    print('Record ' + row[0] + ' successfully updated!')

Delete records based on CSV file

import csv, argparse
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException

parser = argparse.ArgumentParser(description='This program deletes Fulcrum records specified by record ID in the first column of a CSV file. The CSV file should have a header row but the column name is not important.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--file', help='CSV file', required=True)
args = vars(parser.parse_args())

fulcrum = Fulcrum(key=args['token'])
f = open(args['file'], 'r')
deletes = csv.reader(f, delimiter=',')

# Skip the file header in csv
next(deletes, None)

# Get a total record count
total = sum(1 for row in deletes)

# Back to the top of the file and skip header
f.seek(0)
next(deletes, None)

def deleteRecords():
    # Loop through rows in csv
    for count, row in enumerate(deletes):
       # First column must contain Fulcrum record ID
       record_id = row[0]

       # Try to delete record, but continue if not found
       try:
           fulcrum.records.delete(record_id)
       except NotFoundException:
           print('No record found with id ' + record_id)
           continue

       print('deleted ' + record_id + ' ('+ format(count+1, ',') + '/' + format(total, ',') + ')')
    print('finished!')

confirm = input('Are you absolutely sure you want to permanently delete all ' + format(total, ',') + ' records? (y)es or (n)o: ')

if confirm == 'yes' or confirm == 'y':
    deleteRecords()
elif confirm == 'no' or confirm == 'n':
    print('No records have been deleted!')

Delete all the records in an app

import sys
from fulcrum import Fulcrum

api_key = 'your-api-key'
form_id = 'your-form-id'

if api_key == '' or api_key == 'your-api-key' or form_id == '' or form_id == 'your-form-id':
    sys.exit('api_key and form_id are required!')

fulcrum = Fulcrum(key=api_key)
form = fulcrum.forms.find(form_id)
form_name = form['form']['name']
records = fulcrum.records.search(url_params={'form_id': form_id})
total_count = str(records['total_count'])

confirm = input('Are you absolutely sure you want to permanently delete all ' + total_count + ' records from the ' + form_name + ' app? (y)es or (n)o: ')

if confirm == 'yes' or confirm == 'y':
    for record in records['records']:
        #print(record['id'])
        fulcrum.records.delete(record['id'])
    print('Permanently deleted ' + total_count + ' records!')
elif confirm == 'no' or confirm == 'n':
    print('No records have been deleted!')

Create Projects from a CSV file

import csv, argparse
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException

parser = argparse.ArgumentParser(description='This program creates Fulcrum projects from a CSV file.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--file', help='CSV file', required=True)
args = vars(parser.parse_args())

fulcrum = Fulcrum(key=args['token'])
f = open(args['file'], 'r')
projects = csv.reader(f, delimiter=',')

#### CSV format ###
#name,description
#Pinellas County,For records in Pinellas County

# Skip the file header in csv
next(projects, None)

# Loop through rows in csv
for index, row in enumerate(projects):
    # build project object
    project = {
      "project": {
        "name": row[0]
        # "description": row[1]
      }
    }
    # create project
    fulcrum.projects.create(project)
    print('Project ' + row[0] + ' successfully created!')

Downloading Videos for a Form

Below is an example of downloading the first 3 videos and tracks for a form. You could change the code below to download all videos and tracks for a form if needed.

from fulcrum import Fulcrum

api_key = 'your_api_key'

fulcrum = Fulcrum(key=api_key)

videos = fulcrum.videos.search({'page': 1, 'per_page': 3, 'form_id': '4a3f0a6d-c1d3-4805-9aab-7cdd39d58e5f'})

for video in videos['videos']:
    id = video['access_key']

    media = fulcrum.videos.media(id, 'small')
    track = fulcrum.videos.track(id, 'geojson')

    with open('{}_small.mp4'.format(id), 'wb') as f:
        f.write(media)

    with open('{}.geojson'.format(id), 'wb') as f:
          f.write(track)

Download videos with GeoJSON tracks and optionally limit by Form ID.

import sys, argparse
from fulcrum import Fulcrum

parser = argparse.ArgumentParser(description='This program downloads Fulcrum videos and tracks.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--form', help='Form ID', required=False)
args = vars(parser.parse_args())

fulcrum = Fulcrum(key=args['token'])

if args['form']:
    videos = fulcrum.videos.search(url_params={'form_id': args['form']})
else:
    videos = fulcrum.videos.search()

for (index, item) in enumerate(videos['videos']):
    id = item['access_key']

    if item['processed']:
        video = fulcrum.videos.media(id, 'original')

        with open('{}.mp4'.format(id), 'wb') as f:
            f.write(video)

        print('Downloaded {}.mp4'.format(id))

        if item['track']:
            track = fulcrum.videos.track(id, 'geojson')

            with open('{}.geojson'.format(id), 'wb') as f:
                f.write(track)

            print('Downloaded {}.geojson'.format(id))

    else:
        print('Video ' + id + ' not processed.')

Download photos and optionally limit by Form ID

import sys, argparse
from fulcrum import Fulcrum

parser = argparse.ArgumentParser(description='This program downloads Fulcrum photos.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--form', help='Form ID', required=False)
args = vars(parser.parse_args())

fulcrum = Fulcrum(key=args['token'])

if args['form']:
    photos = fulcrum.photos.search(url_params={'form_id': args['form']})
else:
    photos = fulcrum.photos.search()

for (index, item) in enumerate(photos['photos']):
    id = item['access_key']
    photo = fulcrum.photos.media(id, 'original')

    with open('{}.jpg'.format(id), 'wb') as f:
        f.write(photo)

    print('Downloaded {}.jpg'.format(id))

Deletes all of the records associated with a changeset

import sys, argparse
from fulcrum import Fulcrum

parser = argparse.ArgumentParser(description='This program takes a changeset and deletes all of the records. Use with extreme caution!')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-f','--form', help='Form ID', required=True)
parser.add_argument('-c','--changeset', help='Changeset ID', required=True)
args = vars(parser.parse_args())

fulcrum = Fulcrum(key=args['token'])
form = fulcrum.forms.find(args['form'])
form_name = form['form']['name']
changeset = fulcrum.changesets.find(args['changeset'])
closed_by = changeset['changeset']['closed_by']
closed_at = changeset['changeset']['closed_at']
records = fulcrum.records.search(url_params={'form_id': args['form'], 'changeset_id': args['changeset'], 'per_page': '10000'})
total_count = str(records['total_count'])

confirm = input('Are you absolutely sure you want to permanently delete all ' + total_count + ' records in the ' + form_name + ' app, which were changed by ' + closed_by + ' on ' + closed_at + ' by changeset ' + args['changeset'] + '? (y)es or (n)o: ')

if confirm == 'yes' or confirm == 'y':
    index = 1
    for record in records['records']:
        print('Deleting record: ' + record['id'] + '(' + str(index) + '/' + total_count + ')')
        fulcrum.records.delete(record['id'])
        index += 1
    print('Permanently deleted ' + total_count + ' records in the ' + form_name + ' app from changeset ' + args['changeset'] + '!')
elif confirm == 'no' or confirm == 'n':
    print('No records have been deleted!')

Delete records from a Query API result (requires Query API access)

Example: python delete-records.py -t abcdefghijklmnopqrstuvwxyz -q "SELECT _record_id FROM \"My App\" WHERE _status = 'Good'"

import argparse
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException

parser = argparse.ArgumentParser(description='This program deletes Fulcrum records from a Query API result.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-q','--query', help='Query', required=True)

args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
records = fulcrum.query(args['query'], 'json')
count = str(len(records['rows']))
confirm = input('Are you absolutely sure you want to permanently delete all ' + count + ' records? (y)es or (n)o: ')

if confirm == 'yes' or confirm == 'y':
    for row in records['rows']:
        # print(row['_record_id'])
        try:
            fulcrum.records.delete(row['_record_id'])
        except NotFoundException:
            print('No record found with id ' + row['_record_id'])
            continue
        print('record ' + row['_record_id'] + ' successfully deleted!')
elif confirm == 'no' or confirm == 'n':
    print('No records have been deleted!')

Delete repeatable child records from a Query API result (requires Query API access)

Child records are actually just form values and not true records - so this is more of an update than a delete, where we update the repeatable form value to be an empty array of repeatable objects. Example: python delete-records.py -t abcdefghijklmnopqrstuvwxyz -q "SELECT _record_id FROM \"My App/my_repeatable\";"

import requests, argparse
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException

parser = argparse.ArgumentParser(description='This program deletes Fulcrum repeatable form values from a Query API result.')
parser.add_argument('-t', '--token', help='API token', required=True)
parser.add_argument('-q','--query', help='Query', required=True)

args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
url = 'https://api.fulcrumapp.com/api/v2/query/?format=json&token='+args['token']+'&q='+args['query']
req = requests.get(url)
records = req.json()

def deleteRepeatables():

    for count, row in enumerate(records['rows']):
        # print(row['_record_id'])
        try:
            record = fulcrum.records.find(row['_record_id'])

            # Build an array of repeatable objects
            repeatables = []

            # Replace your repeatable key here, example ['c6c3']
            record['record']['form_values']['c6c3'] = repeatables

            # Update the record
            updated_record = fulcrum.records.update(row['_record_id'], record)
            print('Successfully updated ' + row['_record_id'])
        except NotFoundException:
            print('No record found with id ' + row['_record_id'])
            continue

    print('finished!')

confirm = input('Are you absolutely sure you want to permanently delete these child records from their parent records? (y)es or (n)o: ')

if confirm == 'yes' or confirm == 'y':
    deleteRepeatables()
elif confirm == 'no' or confirm == 'n':
    print('No records have been deleted!')

Updating a Record to add Repeatables

Below is an example of fetching an existing record, adding some "repeatable records" to the parent record, and updating the record.

from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException
# Import uuid module for generating repeatable IDs
import uuid

api_key = 'your-api-key'
fulcrum = Fulcrum(key=api_key)

# Find the record you want to update
record = fulcrum.records.find('a190b99c-be00-4793-a4b0-669fa9773c0c')

# Build an array of repeatable objects
repeatables = [{
  'id': str(uuid.uuid4()),
  'geometry': {
    'type': 'Point',
    'coordinates': [-82.6388836, 27.7707606]
  },
  'form_values': {
    'e8e3': '360 Central Ave'
  }
}, {
  'id': str(uuid.uuid4()),
  'geometry': {
    'type': 'Point',
    'coordinates': [-82.637812, 27.772983]
  },
  'form_values': {
    'e8e3': 'Williams Park'
  }
}]

# Add array to repeatable field
record['record']['form_values']['4501'] = repeatables

# Update the record
updated_record = fulcrum.records.update('a190b99c-be00-4793-a4b0-669fa9773c0c', record)
print('record successfully updated!')

Download records and photos from a Query API result

This example allows you to query a subset of records from an app, download the resulting records in CSV format, and download all of the photos associated with the returned records.

import argparse, csv
from fulcrum import Fulcrum
from fulcrum.exceptions import NotFoundException

parser = argparse.ArgumentParser(description='This program downloads records and photos from a Query API result.')
parser.add_argument('-t','--token', help='API token', required=True)
parser.add_argument('-q','--query', help='Query', required=True)
parser.add_argument('-s','--size', help='Photo size (original, thumbnail, large', choices=['original', 'thumbnail', 'large'], default='original')
parser.add_argument('-n','--name', help='Photo name field')

args = vars(parser.parse_args())
fulcrum = Fulcrum(key=args['token'])
response = fulcrum.query(args['query'], 'csv')

def downloadCSV(response):
  open('records.csv', 'wb').write(response)
  print('Downloaded records.csv')

def downloadPhotos(response):
  csvtext = response.decode('utf8')
  reader = csv.DictReader(csvtext.split('\n'), delimiter=',')
  
  for row in reader:
    if row['_record_id']:
      try:
        photos = fulcrum.photos.search(url_params={'record_id': row['_record_id']})
        for item in photos['photos']:
          id = item['access_key']
          photo = fulcrum.photos.media(id, args['size'])

          if args['name'] and row[args['name']]:
            name = row[args['name']] + '-' + id
          else:
            name = id
          with open('{}.jpg'.format(name), 'wb') as f:
            f.write(photo)

          print('Downloaded {}.jpg'.format(name))
      except NotFoundException:
        print('No photos found for record ' + row[0])
        continue

downloadCSV(response)
downloadPhotos(response)

Query and update records

This example allows you to query a subset of records to update, loop through the results and fetch the record JSON, modify the record, and send updated record back to the server.

from fulcrum import Fulcrum
from datetime import datetime

# authenticate with Fulcrum API token
fulcrum = Fulcrum('your-token')

def fetchRecord(id):
  record = fulcrum.records.find(id)
  modifyRecord(record, id)

def modifyRecord(record, id):
  # Set the value for the element with a key of '0cb8'
  record['record']['form_values']['0cb8'] = '5.5'
  # Set the value for the element with a key of '15ac'
  record['record']['form_values']['15ac'] = 'Hello world!'
  updateRecord(record, id)

def updateRecord(record, id):
  update = fulcrum.records.update(id, record)
  print(update['record']['id'] + ' has been updated!')

# query records where _server_updated_at timestamp date is equal to today's date
today = str(datetime.date(datetime.now()))
sql = 'SELECT _record_id FROM "My App" WHERE FCM_FormatTimestamp(_server_updated_at, \'America/New_York\')::date = \''+today+'\''
response = fulcrum.query(sql, 'json')
for record in response['rows']:
  fetchRecord(record['_record_id'])