Skip to content

Document Transfer Source Messages

John Tanner edited this page Apr 17, 2020 · 3 revisions

In the Document Transfer scenario, Vault Spark allows developers to use the Vault Java SDK to send and receive messages from one vault (the source) to another (the target), whilst transferring a document from one vault to another.

This page covers how to create and input Spark Messages into a Vault Queue for the source vault. The Vault Java SDK QueueService and Message interfaces are used to implement this functionality.

Receiving the messages in the target vault and processing the document is covered in the page Document Transfer Message Processing.

For full details on the interfaces and methods used, please review the Javadocs.

Code Logic

When the document action Approve Document is called for a new or existing document, the vSDKDocCopyDocumentAction action uses a Vault to Vault messaging queue. It parses through record action calls to create a Message object with details from the record.

It also adds the event Vault Session Id to the Message which can be used by the target vault to understand the content of where Message has come from. This Message is then added to an outbound vault queue via the QueueService.

Key Concepts - Source Vault

  • Uses an object to save any document transactions to be transferred to the target vault and uses this to see if the document is already registered
  • Add the Message to the outbound queue - vsdk_doc_copy_out_queue__c - with the appropriate document details

Parsing the Incoming Data

First, we need to prepare the Spark Messages based on the incoming document. In our example, we only handle the document action Approve Document for vSDK Spark Copy Document Document Type.

As with the trigger example we only extract the ID to ensure the message is lightweight - each message item can only be a length 100 and you can only have 500 items per message - we have to extract the dcoument ID from the insert and limit each message. The IDs are added to a Map in batches of 500 for efficiency and then passed to the moveMessagesToQueue method.

This method contains logic to process the IDs into the queue:

@DocumentActionInfo(label="SDK: Create Vault to Vault Document Copy")
public class vSDKDocCopyDocumentAction implements DocumentAction {
	
  public void execute(DocumentActionContext documentActionContext) {

    DocumentService documentService = ServiceLocator.locate((DocumentService.class));
    LogService logService = ServiceLocator.locate(LogService.class);
    	
    List vaultIds = VaultCollections.newList();
    String event = "Send Doc for Copy";
    String integrationName = "Doc Copies";
    String targetIntegrationPointAPIName = "receive_copy_document__c";
    String queueName = "vsdk_doc_copy_out_queue__c";
    String object = "documents";

    if (vSDKSparkHelper.isIntegrationActive(integrationName)) {
      logService.info("Integration '" + integrationName + "' is active.");

      for (DocumentVersion documentVersion : documentActionContext.getDocumentVersions()) {

        if (vaultIds.size() < 500) {
          vaultIds.add(documentVersion.getValue("global_id__sys", ValueType.STRING));
        } else {
          vSDKSparkHelper.moveMessagesToQueue(queueName,
                                              object,
                                              targetIntegrationPointAPIName,
                                              event,
                                              vaultIds);
          vaultIds.clear();
        }
      }

			if (vaultIds.size() > 0) {
				vSDKSparkHelper.moveMessagesToQueue(queueName,
						object,
						targetIntegrationPointAPIName,
						event,
						vaultIds);
			}

    ...

}
Spark Messages and the Outbound Queue

Once in the moveMessagesToQueue method, the Spark Message is constructed and put into the queue.

The Queue parameter is passed into the method to dictate where Spark Messages will be delivered to, which relate to the Queue and Connection records configured in the Setup section dictate where Spark Messages will be delivered to. The outbound vsdk_doc_copy_out_queue__c queue is configured to use the vsdk_doc_copy_connection that is connected to your target vault.

When a Message is added to the vsdk_doc_copy_out_queue__c, they will be forwarded to the queue connections that are defined on the queue; therefore, the messages will be delivered to the target vault's inbound queue.

The below code shows how to initialize a Message and QueueService object against the vsdk_doc_copy_out_queue__c outbound queue. The attributes are used in the target vault MessageProcessor to determine how to handle the message.

    // Move to the Spark Queue AFTER the record has successfully been inserted.
    public static void moveMessagesToQueue(String queueName, 
                                           String objectName, 
                                           String targetIntegrationPointAPIName, 
                                           String recordEvent, 
                                           List vaultIds) {

        QueueService queueService = ServiceLocator.locate(QueueService.class);
        LogService logService = ServiceLocator.locate(LogService.class);
        RecordService recordService = ServiceLocator.locate(RecordService.class);
        QueryService queryService = ServiceLocator.locate(QueryService.class);

        Message message = queueService.newMessage(queueName)
                .setAttribute("object", objectName)
                .setAttribute("event", recordEvent)
                .setAttribute("integration_point", targetIntegrationPointAPIName);
        PutMessageResponse response = queueService.putMessage(message);
        ...
    }
Post-processing of the Incoming Data

After the message is added to the outbound queue vsdk_doc_copy_out_queue__c, the sample code will register key facts about the changed document in an object called Integration Transaction, if it is not already registered. This object is used to check for changed objects from the Target Vault.

The Integration Transaction object houses key details about the objects for transfer, including:

  • Source Object
  • Source Key
  • Target Integration Point
  • Connection
  • Transaction Status
  • Name

A bulk save operation then persists the changed objects to the Integration Transaction Object

        ...

        // Check that the message queue successfully processed the message.
        // If it's successful, create an integrationTransaction record setting the `transaction_status__c` flag to 'pending__c',
        // as long as it doesn't already exist, as there is no point duplicating it.
        // Otherwise if there is an error, create an integrationTransaction record setting the `transaction_status__c` flag to 'send_failure__c'.
        List<Record> intTransactionsToSave = VaultCollections.newList();
        List<PutMessageResult> messageResultList = response.getPutMessageResults();
        for (PutMessageResult messageResult : messageResultList) {
            logService.info("Sent to Connection: " + messageResult.getConnectionName());

            String connectionId = vSDKSparkHelper.getConnectionId(messageResult.getConnectionName());
            logService.info("connectionId: " + connectionId);
            logService.info("integrationPointAPIName: " + targetIntegrationPointAPIName);

            // Query to see if any intTransactions already exist in pending state
            StringBuilder query = new StringBuilder();
            query.append("SELECT source_key__c ");
            query.append("FROM integration_transaction__c ");
            query.append("WHERE source_key__c contains ('" + String.join("','", vaultIds)  + "') ");
            query.append("AND source_object__c = '").append(objectName).append("' ");
            query.append("AND connection__c = '").append(connectionId).append("' ");
            query.append("AND target_integration_point__c = '").append(targetIntegrationPointAPIName).append("' ");
            query.append("AND transaction_status__c = 'pending__c'");

            QueryResponse queryResponse = queryService.query(query.toString());

            logService.info("Query existing pending integration transactions by ID: " + query);

            // Any pending integration transactions records that already exist will be removed from the list
            // so they don't get recreated
            queryResponse.streamResults().forEach(qr -> {
                String source_key__c = qr.getValue("source_key__c", ValueType.STRING);
                logService.info("Found existing pending record with Source Key: " + source_key__c);
                vaultIds.remove(source_key__c);
            });
            queryResponse = null;

            for (Object vaultId : vaultIds) {

                Record integrationTransaction = recordService.newRecord("integration_transaction__c");
                integrationTransaction.setValue("source_object__c", objectName);
                integrationTransaction.setValue("source_key__c", vaultId);
                integrationTransaction.setValue("target_integration_point__c", targetIntegrationPointAPIName);
                integrationTransaction.setValue("connection__c", connectionId);
                if (response.getError() != null) {
                    integrationTransaction.setValue("transaction_status__c", VaultCollections.asList("send_failure__c"));
                    StringBuilder err = new StringBuilder();
                    err.append("ERROR Queuing Failed: ").append(response.getError().getMessage());
                    integrationTransaction.setValue("message__c", err.toString().substring(1,1500));
                } else {
                    integrationTransaction.setValue("transaction_status__c", VaultCollections.asList("pending__c"));
                }
                StringBuilder recordName = new StringBuilder();
                recordName.append(recordEvent).append(" ")
                        .append(vaultId)
                        .append(" to ")
                        .append(targetIntegrationPointAPIName);
                integrationTransaction.setValue("name__v", recordName.toString());
                intTransactionsToSave.add(integrationTransaction);
            }
            logService.info("Completed connection: " + messageResult.getConnectionName());
        }

        // Save the Integration Transactions for the connection
        if (intTransactionsToSave.size() > 0) {
            recordService.batchSaveRecords(intTransactionsToSave)
                    .onErrors(batchOperationErrors -> {

                        //Iterate over the caught errors.
                        //The BatchOperation.onErrors() returns a list of BatchOperationErrors.
                        //The list can then be traversed to retrieve a single BatchOperationError and
                        //then extract an **ErrorResult** with BatchOperationError.getError().
                        batchOperationErrors.stream().findFirst().ifPresent(error -> {
                            String errMsg = error.getError().getMessage();
                            int errPosition = error.getInputPosition();
                            StringBuilder err = new StringBuilder();
                            String name = intTransactionsToSave.get(errPosition).getValue("source_key__c", ValueType.STRING);
                            err.append("Unable to create '")
                                    .append(intTransactionsToSave.get(errPosition).getObjectName())
                                    .append("' record: '")
                                    .append(name)
                                    .append("' because of '")
                                    .append(errMsg)
                                    .append("'.");
                            throw new RollbackException("OPERATION_NOT_ALLOWED", err.toString());
                        });
                    })
                    .execute();

Next Steps