FetchStrategy

FetchStrategy allows your source Adapter to tell Valence how best to ask your Adapter for records. It is an implementation of the Strategy Pattern, and is used by Adapters that implement the SourceAdapterForPull interface.

This helper class is a critical part of how your Adapter and Valence work together when fetching data. In a nutshell:

  1. Valence asks your Adapter for a FetchStrategy to use to get the data
  2. Your Adapter tells Valence which one should be used
  3. Valence proceeds to set up a Link run and interact with your Adapter according to the FetchStrategy you asked for

There are a few different “strategies” available for your selection. You can even mix and match them depending on what’s going on, perhaps using IMMEDIATE if there are only a few records, or SCOPES if you have many thousand to retrieve.

Tip

Every strategy (except NO_RECORDS) has an alternate factory method that accepts an extra parameter value called “expectedTotalRecords”. If during planning you know exactly how many records you intend to fetch, always use this version of the method. This helps users understand how many records are going to be fetched during a Link run. If you don’t know how many records you’ll be fetching ahead of time, that’s perfectly fine, just use the simpler version of the method.

Strategy: Immediate

The IMMEDIATE strategy calls fetchRecords() immediately in the same execution context (any state in your Adapter is still there). A null value is passed as the scope parameter.

This is the simplest and easiest strategy to work with. You can get some decent mileage out of it before you have to move to SCOPES.

One nice thing is that Valence abstracts away some of the Salesforce limits, so for example you can return more than 10,000 records from fetchRecords() (which would normally hit the DML row limit) and that’s not a problem.

You do, however, still have to stay under the heap size limit of 12 MB. If you think you might hit this limit, consider using SCOPES and breaking your result set down across multiple execution contexts.

Definition

global static FetchStrategy immediate();

global static FetchStrategy immediate(Long expectedTotalRecords);

Example Usage

public valence.FetchStrategy planFetch(valence.LinkContext context) {
        return valence.FetchStrategy.immediate();
}

public List<valence.RecordInFlight> fetchRecords(valence.LinkContext context, Object scope) {

        // retrieve some records
        List<valence.RecordInFlight> records = goGetRecords();

        return records;
}

Strategy: Scopes

Warning

Reminder: unlike IMMEDIATE, when fetchRecords() is called on your Adapter it happens in an entirely new execution context. This means your Adapter state is totally wiped clean between calls! If you need something, put it into your scopes variable during planning.

The SCOPES strategy breaks up fetching records from the external system into multiple Salesforce execution contexts (one per scope). Each scope will have a fresh set of limits and state. In order to use this strategy, during your planFetch() call you need to figure out how many scopes are necessary. Compare context.batchSizeLimit to whatever hard limits your external system has, and use the lower of the two as your batch size.

During planning your goal is to build a list of scopes that give you whatever details you need to be able to retrieve each batch of records from the external system. Maybe that’s a unique identifier for the job combined with an offset value or page number. It varies but the scope shape is entirely up to you.

This is the most common FetchStrategy used in production Valence instances.

Definition

global static FetchStrategy scopes(List<Object> scopes);

global static FetchStrategy scopes(List<Object> scopes, Long expectedTotalRecords);

Example Usage

public valence.FetchStrategy planFetch(valence.LinkContext context) {

    String requestId = getRequest(); // some method to get whatever info you need about the request
    Integer total = countExternalRecords(); // and grab a record count, for example

    // determine how many records you can fetch at a time
    Integer batchSize = context.batchSizeLimit < EXTERNAL_LIMIT ? context.batchSizeLimit : EXTERNAL_LIMIT;

    // build our list of custom scopes
    List<MyScope> scopes = new List<MyScope>();
    Integer offset = 0;
    while(offset < total) {
        scopes.add(new MyScope(requestId, offset));
        offset += batchSize;
    }

    // tell Valence we're using the SCOPES strategy
    return valence.FetchStrategy.scopes(scopes, total);
}

public List<valence.RecordInFlight> fetchRecords(valence.LinkContext context, Object scope) {

    // cast to our custom scope class
    MyScope currentScope = (MyScope)scope;

    // retrieve some records from the external server with an offset, for example
    return fetchRecordsFromServer(currentScope.requestId, currentScope.offset);
}

public class MyScope {
    private String requestId;
    private Integer offset;

    public MyScope(String requestId, Integer offset) {
        this.requestId = requestId;
        this.offset = offset;
    }
}

Strategy: Delay

The DELAY strategy allows you to pause for an arbitrary amount of time when starting up a Pull Link run. This is a very situational strategy but can be helpful for circumstances such as waiting for a file to be generated on an external server. Let’s say you call into the server during planFetch() and describe the records you want. The external server generates a CSV file somewhere and then exposes it to you. You could use the DELAY strategy to wait until that CSV file is ready to be read, check if the file is ready, and if it is you’d read the generated file with your fetchRecords() call.

You can delay as many times as you need to. You can leave the duration of the delay up to Valence (15 seconds ~ 90 seconds), or you can specify a number of minutes to wait as a minimum (handy if you know for sure that it’ll be a few minutes before things are ready).

If you use the DELAY strategy your Adapter must also implement the DelayedPlanningAdapter interface. This interface adds an additional method that Valence will call after the delay is over. You are welcome to return DELAY again from this method call, and keep doing that over and over until you are ready to move on to fetching records.

Definition

global static FetchStrategy delay(Integer minutes, Object scope);

global static FetchStrategy delay(Integer minutes, Object scope, Long expectedTotalRecords);

Example Usage

private String filePath = null;

public valence.FetchStrategy planFetch(valence.LinkContext context) {
        // do some asynchronous operation that you know will take 5-10 minutes
        String fileId = generateFile();

        // ask Valence to call you back in 15 minutes
        return valence.FetchStrategy.delay(15, fileId);
}

public valence.FetchStrategy planFetchAgain(valence.LinkContext context, Object scope) {

        // get the state that we previously stashed in the scope object
        String previousFileId = (String)scope;

        // check to see if that file is ready yet
        Boolean ready = checkFileStatus(previousFileId);

        if(ready) {
                filePath = getFilePath(previousFileId);
                return valence.FetchStrategy.immediate(); // will call fetchRecords() in this same execution context with a null second parameter, which is why we set filePath
        } else {
                // wait two more minutes then try again
                return valence.FetchStrategy.delay(2, previousFileId);
        }
}

public List<valence.RecordInFlight> fetchRecords(valence.LinkContext context, Object scope) {

        // retrieve and parse the file
        List<valence.RecordInFlight> records = retrieveAndParse(filePath);

        return records;
}

Tip

If you don’t want to specify a certain number of minutes to wait and instead let Valence decide, pass null as the first parameter: return valence.FetchStrategy.delay(null, myScope);

Strategy: Cumulative Scopes

The CUMULATIVE_SCOPES strategy is like a mashup of SCOPES and DELAY. It allows you to give a partial list of scopes to Valence, then wait a little bit and then give Valence another partial list, repeating as needed until you are satisfied you have collected all the scopes you need.

Why is this helpful?

This is definitely an edge-case FetchStrategy, but for those edge cases it’s just the tool for the job. Example: imagine an API that can either return lists of record IDs, or a single full record. A normal SCOPES strategy expects you to know exactly how many scopes you are going to need up front, and for each one to be defined and returned to Valence in a single collection. What if this hypothetical API had millions of records? You can paginate through the ID list but that’s not going to give you full records.

So what do you do?

In this example you could use CUMULATIVE_SCOPES to essentially do a two-dimensional record fetch. Turn each record ID into a single scope that will be used during the real Link run to fetch a full record, then paginate through IDs one page at a time to build up your record scopes. Maybe you get 1000 IDs in a page, give Valence 1000 scopes, and then let Valence call you back to get the next page of IDs. Repeat as long as needed.

Any Adapter that returns CUMULATIVE_SCOPES as a FetchStrategy must also implement DelayedPlanningAdapter and SourceAdapterScopeSerializer. DelayedPlanningAdapter is used to do a planFetchAgain() when you are building up scopes, and SourceAdapterScopeSerializer gives you control over how your scope instances are serialized by Valence. Carefully read about each of these interfaces.

Note

If this Link has its setting for “enableParallelProcessing” set to “true”, scopes will start being processed as soon as you return your first collection of scopes and continue to be processed as you return each set until you are collecting scopes. If “false”, no scopes are processed until the final collection of scopes is given to Valence.

Note

When you are responding with what you know will be your last collection of scopes, return null for the ‘Object scope’ parameter so Valence knows you’re done. This is a little different to how the DELAY FetchStrategy works.

Tip

It’s fine to use the expectedTotalRecords version on one of your response (typically the first) and then the shorter version on your others. We’ll remember the number.

Definition

global static FetchStrategy cumulativeScopes(List<Object> scopes, Integer minutes, Object scope);

global static FetchStrategy cumulativeScopes(List<Object> scopes, Integer minutes, Object scope, Long expectedTotalRecords);

Example Usage

public valence.FetchStrategy planFetch(valence.LinkContext context) {

        Long total = countExternalRecords(); // grab a record count, for example

        // fetch a page of records from your external server and store it in some kind of response object you can work with
        ResponseObject response = fetchPageOfRecordIds(1);

        // build our list of custom scopes for fetching individual full records
        List<FetchFullRecordScope> scopes = new List<FetchFullRecordScope>();
        for(String recordId : response.identifiersOnThisPage) {
                scopes.add(new FetchFullRecordScope(recordId));
        }

        FetchPageOfIDsScope nextPageScope = null;
        if(response.hasMoreRecords == true) {
                nextPageScope = new FetchPageOfIDsScope(2);
        }

        // tell Valence we're using the CUMULATIVE_SCOPES strategy (but in this example we don't need an artificial delay, so passing null for 'minutes' parameter)
        return valence.FetchStrategy.cumulativeScopes(scopes, null, nextPageScope, total);
}

public valence.FetchStrategy planFetchAgain(valence.LinkContext context, Object scope) {

        // get the state that we previously stashed in the scope object
        FetchPageOfIDsScope currentPageScope = (FetchPageOfIDsScope)scope;

        // fetch a page of records from your external server and store it in some kind of response object you can work with
        ResponseObject response = fetchPageOfRecordIds(currentPageScope.pageNumber);

        // build another list of full record scopes using this fresh page of identifiers
        List<FetchFullRecordScope> scopes = new List<FetchFullRecordScope>();
        for(String recordId : response.identifiersOnThisPage) {
                scopes.add(new FetchFullRecordScope(recordId));
        }

        FetchPageOfIDsScope nextPageScope = null;
        if(response.hasMoreRecords == true) {
                nextPageScope = new FetchPageOfIDsScope(currentPageScope.pageNumber + 1);
        }

        return valence.FetchStrategy.cumulativeScopes(scopes, null, nextPageScope);
}

public List<valence.RecordInFlight> fetchRecords(valence.LinkContext context, Object scope) {

        // cast to our custom scope class
        FetchFullRecordScope currentScope = (FetchFullRecordScope)scope;

        // retrieve this record from the external server
        return fetchRecordFromServer(currentScope.recordIdentifier);
}

public class FetchPageOfIDsScope {

        private Integer pageNumber;

        public FetchPageOfIDsScope(Integer pageNumber) {
                this.pageNumber = pageNumber;
        }
}

public class FetchFullRecordScope {

        private String recordIdentifier;

        public FetchFullRecordScope(String recordIdentifier) {
                this.recordIdentifier = recordIdentifier;
        }
}

Strategy: Locator

It’s unlikely you will ever need to use the LOCATOR strategy. This is a specialized strategy that uses a Database.QueryLocator to iterate over large quantities (millions) of records inside the local Salesforce org. Normally you are going to leverage the built-in Local Salesforce Adapter that comes with Valence for extracting records from the local Salesforce org. However, this strategy is available to you should you need to do this kind of thing yourself.

If you use the LOCATOR strategy your Adapter must also implement the SourceAdapterForSObjectPush interface. Valence will retrieve records using the locator and feed them to your sObject push method to process them. Your Adapter’s fetchRecords() is never called.

Definition

global static FetchStrategy queryLocator(String query);

global static FetchStrategy queryLocator(String query, Long expectedTotalRecords);

Example Usage

public valence.FetchStrategy planFetch(valence.LinkContext context) {
        return valence.FetchStrategy.locator('SELECT Id, Name, Phone FROM Account');
}

public List<valence.RecordInFlight> buildRecords(valence.LinkContext context, List<sObject> records) {
        // process sObject records in batches of context.batchSizeLimit or 2,000, whichever is smaller
}

Strategy: No Records

Use the NO_RECORDS strategy if during planning you notice that you have no records that you need to fetch.

This will short-circuit the rest of processing; your fetchRecords() method will not be invoked, and the SyncEvent is immediately closed.

Definition

global static FetchStrategy noRecords();

Example Usage

public valence.FetchStrategy planFetch(valence.LinkContext context) {

        Integer count = countRecordsToFetch();

        return count > 0 ? valence.FetchStrategy.immediate() : valence.FetchStrategy.noRecords();
}

public List<valence.RecordInFlight> fetchRecords(valence.LinkContext context, Object scope) {

        // retrieve some records
        List<valence.RecordInFlight> records = goGetRecords();

        return records;
}

Test Coverage

There are a few instance methods in the FetchStrategy class to help you write test coverage against your Adapter’s planFetch() method.

Definition

global String checkStrategyType();

global Long checkExpectedTotalRecords();

global Integer checkScopeCount();

Example Usage

@IsTest
private static void testPlanFetchImmediate() {

        // set up a callout mock that simulates the external system telling us there are 17 records available to pick up
        Test.setMock(HttpCalloutMock.class, new MyAPIMockClass(MyAPIMockClass.Response.COUNT_DIRTY_RECORDS_17));

        valence.LinkContext context = new valence.LinkContext();
        context.linkSourceName = 'SomeTable';
        context.batchSizeLimit = 20;

        Test.startTest();
        MyAdapterClass adapter = new MyAdapterClass();
        valence.FetchStrategy strategy = adapter.planFetch(context);
        Test.stopTest();

        // we are expecting the IMMEDIATE strategy because all the outstanding records would fit in one batch
        System.assertEquals('IMMEDIATE', strategy.checkStrategyType());
        System.assertEquals(17, strategy.checkExpectedTotalRecords());
}

@IsTest
private static void testPlanFetchScopes() {

        // set up a callout mock that simulates the external system telling us there are 26 records available to pick up
        Test.setMock(HttpCalloutMock.class, new MyAPIMockClass(MyAPIMockClass.Response.COUNT_DIRTY_RECORDS_26));

        valence.LinkContext context = new valence.LinkContext();
        context.linkSourceName = 'SomeTable';
        context.batchSizeLimit = 20;

        Test.startTest();
        MyAdapterClass adapter = new MyAdapterClass();
        valence.FetchStrategy strategy = adapter.planFetch(context);
        Test.stopTest();

        // we are expecting the SCOPES strategy because our record count exceeds our batchSizeLimit so we'll need multiple batches
        System.assertEquals('SCOPES', strategy.checkStrategyType());
        System.assertEquals(26, strategy.checkExpectedTotalRecords());
        System.assertEquals(2, strategy.checkScopeCount());
}