Integrate with an external search engine

Pushing commercetools Product data to a third-party search service.

After completing this page, you should be able to:

  • Identify key considerations when integrating Product data with an external system.
  • Differentiate between full synchronization and delta updates, and evaluate strategies for each.
  • Design an event-driven delta update mechanism using commercetools Subscriptions.

Zen Electron wants to integrate their commercetools Project with a specialized third-party search engine to provide customers with faster, more personalized Product discovery.

Before designing the integration, it's crucial to understand the business requirements. Asking the right questions upfront will help you build a robust and future-proof solution.

  • Stores: Does the search experience need to be store-specific? Should search results differ based on the customer's Store?
  • Product Selections: Should the integration index Product Selections directly? This is useful for B2B scenarios with complex customer-specific assortments that don't map cleanly to Stores.
  • Product Tailoring: Do Product details (names, descriptions, images) change per Store? The search engine must reflect this tailored data.
  • Localization: Should search results be available in multiple languages based on the customer's locale?
  • Pricing: How are prices managed? The integration must handle various pricing models:
    • Embedded prices on Product Variants.
    • Standalone Prices.
    • Prices scoped by channel, customer group, or country.
    • Tiered (volume-based) prices.
    • Product Discounted prices.
  • Inventory: Should the search engine display or filter by stock availability?
  • Other Data: Does the search engine need to index related data, like aggregated product reviews?

The answers to these questions will determine the complexity of the integration and which Project resources you need to query and combine.

Any robust data synchronization strategy involves two phases: an initial full sync and ongoing delta updates.
  • Full sync: A one-time export of all relevant data from Composable Commerce to the target system. This is necessary for the initial data load, for correcting data drift over time, or for pushing major changes.
  • Delta updates: Syncing only the data that has changed since the last update. This is far more efficient for keeping the systems in sync on an ongoing basis.

Full synchronization: The initial data load

For the initial sync, you'll need to query, enrich, and transform your Product data before sending it to the search engine. Always use the /product-projections endpoint for this purpose, as it provides the denormalized "published" or "staged" view of your Products.

Best practices for data export

  • Use cursor-based pagination: For large datasets, always use cursor-based pagination (for example, sorting by id or lastModifiedAt) instead of offset. It is faster, more stable, and not subject to the 10,000-item limit.
  • Design efficient queries: Order your where predicates from most to least selective to reduce the data scanned.
  • Use the right endpoint: Always use /product-projections for export use cases, not /products or /product-projections/search.
  • Optimize the payload: Use withTotal=false to skip unnecessary metadata and reduce response size. Adjust the limit to find a sweet spot that keeps response sizes manageable (for example, under 2MB).

Step-by-step: Initial full sync

1. Identify Products per Store via Product Selections:

  • Query all your Stores.
  • For each Store, retrieve its associated active Product Selections to determine the exact active product assortment.
GET /{{project-key}}/stores?sort=id asc&limit=50&withTotal=false&where=id > "00000000-0000-0000-0000-000000000000"

2. Gather unique Product IDs:

  • Create a deduplicated list of all Product Selection IDs while still mapping them to the Stores.
  • For each Product Selection, paginate through its assigned Products to get a list of all relevant Product IDs. To do this, you will need to query the Product Selection by ID and use a predicate cursor to get results greater than the Product ID in the last response.

GET /{{project-key}}/product-selections/00000000-0000-0000-0000-000000000000/products?limit=1&where=product(id+>+"00000000-0000-0000-0000-000000000000")

3. Fetch base Product data:

  • Using the list of Product IDs, fetch the corresponding published Product Projections in batches. Use a where clause like id in ("id-1", "id-2", ...) and sort by id for efficient pagination.
  • Deduplicate the request for Product ID’s, so that you don’t fetch the same Product multiple times.
GET /{{project-key}}/product-projections?where=id in ("id-1", "id-2")&staged=false&sort=id asc&withTotal=false
Inefficient approach: Querying by a list of Product IDs
A common first thought is to query the /product-tailoring endpoint with a list of Product IDs you want to check.

Example Request:

GET /{{project-key}}/product-tailoring?where=product(id in ("f77df92a-d424-4199-9b7c-e2b93a693ba2", "155bbbcb-1f93-4e66-a708-632d7ce8d5c7"))

Drawback: This method is only efficient if most of the Products in your list are actually tailored for the target Store. If your Project has a large catalog where only a small percentage of Products are tailored (for example, 10%), this query will result in many lookups that return no data, leading to wasted API calls and slower performance.

Recommended approach: Paginate through all tailorings for a Store
A more robust and efficient method is to fetch all published ProductTailoring objects for a specific Store using cursor-based pagination. This approach guarantees that every API request retrieves existing data, making it highly performant regardless of the catalog size or the proportion of tailored Products.

The key is to filter by store (optional) and published, then use a sort parameter to paginate through the results.

Here are two recommended pagination strategies.

Strategy 1: Paginate by Tailoring ID
This is the simplest method for paginating through all published tailorings for a Store or Project. You sort by the ID of the ProductTailoring object itself.

Initial Query (First Page): The predicate filters for published Product tailorings in a specific Store and sorts by ID.

GET /{{project-key}}/product-tailoring?where=store(key="your-store-key") and published = true and &sort=id asc&limit=30

OR

Store key is optional.

GET /{{project-key}}/product-tailoring?where= published = true &sort=id asc&limit=30

Subsequent Queries (Next Pages): Use the ID of the last item from the previous response to fetch the next page (Store key is optional).

GET /{{project-key}}/product-tailoring?where=store(key="your-store-key") and published = true and id > "{lastIdFromLastResponse}"&sort=id asc&limit=30
lastIdFromLastResponse will be the Tailoring UUID from the last successful response (as long as the results are equal to the request limit).
Strategy 2: Paginate by Product ID, then Tailoring ID
This strategy is useful if your data processing logic benefits from receiving all tailorings for a single Product sequentially. It sorts first by productId and then by the tailoring ID as a secondary criterion.
Benefit: This ensures that if a Product has multiple tailorings, they are grouped together in the API response. This ordering helps you process all tailorings for one Product consecutively.

Initial Query (First Page):

GET /{{project-key}}/product-tailoring?where=published=true&sort=productId asc&sort=id asc&limit=30
Subsequent Queries (Next Pages): The where predicate for this strategy is slightly more complex. It finds remaining Product Tailorings for the last Product from the previous page or moves on to the next set of Products. First we filter out unpublished, then check if there are any Products on the next page that were the last result on the last page.
GET /{{project-key}}/product-tailoring?where=published=true and ((productId = "a42d12f2-..." and id > "e9ae13b3-...") or (productId > "a42d12f2-..."))&sort=productId asc&sort=id asc&limit=30

4. Enrich with additional data (for example, Categories)

  • The Product Projection contains Category IDs, but not their names. You will need to fetch Category data separately and enrich your Product data with the Category names and other relevant attributes. In another module we showed you how to efficiently query Categories.

5. Transform and load

  • With all the necessary data gathered and enriched, transform it into the format required by the third-party search engine and load it.
  • Here you may need to consider things like:
  • Will a Product used in multiple Stores be created as a single or multiple index?
  • Which fields are absolutely necessary for the third‑party system and which field can I remove?
  • Naturally the third‑party search will not have native support for features like Price Hints, how will you support this? One option is to support this logic in your own BFF or front-end, so that your application knows the correct price to display to a given customer

Delta updates: Partial synchronization for changes since last sync

Once the initial full sync is complete, you need an efficient way to keep the external search engine updated with any changes from your commercetools Project. This is where delta updates come in. Instead of re-syncing everything, you only push the data that has changed since the last update. This will be much faster than a full sync.

There are two primary strategies for implementing delta updates:

  1. Scheduled polling: A service periodically queries the Project for recent changes.
  2. Event-driven updates: Your integration listens for real-time change notifications using Subscriptions.

Comparison between scheduled polling and Subscriptions

FeatureScheduled pollingEvent-driven (Subscriptions)
Data freshnessHigh latency (minutes)Low latency (seconds)
ScalabilityMediumHigh
ResilienceGoodExcellent (with a Dead-Letter Queue)

While the initial setup for Subscriptions is more involved, the long-term benefits in performance, scalability, and customer experience are significant.

Approach 1: Scheduled polling (the cron job method)

This is the traditional approach. You set up a scheduled task that runs at a fixed interval—for example, every fifteen minutes.

How it works

  1. Schedule: Your service "wakes up" on a predefined schedule (for example, every 15 minutes).
  2. State Management:
    1. Stateless (Fixed Interval): The service has no memory of past runs. It queries for changes within a fixed, recent time window (for example, a job running every 15 minutes asks for changes from "the last 15 minutes").Note: a single failed or delayed job can create a permanent gap, causing updates to be missed.
    2. Stateful (Persistent Timestamp): The service reads a timestamp from a persistent store that marks the end of the last successful sync. After processing the new changes, it updates the stored timestamp to the current time. This approach guarantees that no data is missed, as it will automatically catch up on all changes since the last successful run, even if the service was down for an extended period.
  3. Query for Changes: It queries the relevant endpoints using a where predicate to filter for resources modified since the last sync's timestamp. The lastModifiedAt field is perfect for this.
  4. Process and Update: It processes the returned changes, transforms the data as needed (applying tailoring, fetching prices, listing of Products to be disabled etc.), and pushes the updates to the external search engine.

Example Query:

To fetch all Product Projections that have been modified or enabled since the last run:

GET /{{project-key}}/product-projections?where=lastModifiedAt > "2023-10-27T10:00:00.000Z"&staged=false&sort=lastModifiedAt asc&withTotal=false
  • Each successive query would update the timestamp to use the time of the last document in the prior response. This is to ensure that we get the oldest lastModifiedAt first and then the newest lastModifiedAt last, this is helpful because it ensures that we process each record at least once and don’t miss any. It is possible that a record gets processed multiple times because it received an update during the sync process. Your application must decide how this should be handled, will you only process the record once or multiple possible time.
  • Using sort=lastModifiedAt asc combined with cursor pagination ensures you process changes in chronological order and can handle a large volume of updates within a single polling interval.
  • For Product projections you will need to query staged and current separately. This is so that you can get Products that are recently disabled or enabled and/ or have some other change.

You will need to do this query for each entity that you want to check if there is a change for that you want to sync. For example, you may want to check

  • If the Selections for a Store have changed
  • If the Products or Product Variants on a Selection have changes
  • If the Products for a selection is staged or current and/ or has changed values (attributes)
  • If the Tailoring is unpublished or published and/ or has changed values
Here is some example code of how the lastModifiedAt should be used to query the endpoint
import { apiRoot } from './ctpClient.js';

// The number of results to fetch per API call (page size)
const PAGE_SIZE = 20;

/**
 * Fetches a single page of product projection changes.
 * @param {string} lastTimestamp - The lastModifiedAt from the previous page.
 * @param {string | null} lastId - The ID from the last item of the previous page.
 * @returns {Promise<object>} The API response body.
 */
const fetchNextPageOfChanges = async (lastTimestamp, lastId) => {
  let whereClause;

  // Use an if/else block to construct the WHERE clause.
  if (lastId) {
    // This is for page 2 and onwards. We have a previous product's ID.
    // We query for anything modified AFTER the last timestamp,
    // OR anything modified at the EXACT same timestamp but with a greater ID.
    whereClause = `(lastModifiedAt = "${lastTimestamp}" and id > "${lastId}") or (lastModifiedAt > "${lastTimestamp}")`;
  } else {
    // This is for the very first page of the sync, where lastId is null.
    // The query is simpler.
    whereClause = `lastModifiedAt > "${lastTimestamp}"`;
  }

  const response = await apiRoot
    .productProjections()
    .get({
      queryArgs: {
        where: whereClause,
        staged: false, // Only query for current (published) projections
        sort: ['lastModifiedAt asc', 'id asc'], // Sort by timestamp, then ID
        limit: PAGE_SIZE,
        withTotal: false,
      },
    })
    .execute();

  return response.body;
};

/**
 * Main function to run the delta sync process.
 */
const runDeltaSync = async () => {
  try {
    // 1. Define the starting point for the sync
    const oneHourAgo = new Date(
      new Date().getTime() - 60 * 60 * 1000
    ).toISOString();
    console.log(`--- Starting sync for changes since ${oneHourAgo} ---`);

    // 2. Initialize state variables for pagination
    let lastSyncTimestamp = oneHourAgo;
    let lastId = null;
    let hasMoreResults = true;
    let totalProductsProcessed = 0;
    let page = 1;

    // 3. Loop to fetch all pages of results
    while (hasMoreResults) {
      console.log(`\nFetching page ${page}...`);

      const pageData = await fetchNextPageOfChanges(lastSyncTimestamp, lastId);
      const results = pageData.results;

      if (results.length > 0) {
        console.log(`Found ${results.length} products on this page.`);
        totalProductsProcessed += results.length;

        // Process each product
        results.forEach(product => {
          console.log(
            `  -> Processing ID: ${product.id}, Modified: ${product.lastModifiedAt}`
          );
        });

        // 4. Update the "cursor" with the details of the LAST item in the page
        const lastProduct = results[results.length - 1];
        lastSyncTimestamp = lastProduct.lastModifiedAt;
        lastId = lastProduct.id;

        // If we received fewer results than we asked for, we're on the last page.
        if (results.length < PAGE_SIZE) {
          hasMoreResults = false;
        }
      } else {
        // No more results found, exit the loop.
        hasMoreResults = false;
      }
      page++;
    }

    console.log('\n--- Sync Complete ---');
    console.log(`Total products processed: ${totalProductsProcessed}`);
    console.log(
      `For the next run, start with timestamp: "${lastSyncTimestamp}" and ID: "${lastId}"`
    );
  } catch (error) {
    console.error('An error occurred during the sync process:', error);
  }
};

// Run the sync
runDeltaSync();


Approach 2: Event-driven updates with Subscriptions

Instead of periodically asking the platform "what's new?" (polling), a more efficient and modern approach is to have the platform tell you the moment something changes. This is the event-driven model, and in Composable Commerce, it's powered by Subscriptions.

For a critical, customer-facing feature like search, the event-driven approach is strongly recommended. It ensures that your application can always get the latest product information, pricing, and availability in near real-time.

How it works

The process is straightforward:

  1. Subscribe: You create a Subscription in your commercetools Project, specifying which events you care about (for example, ProductPublished). You configure this Subscription to send a notification message to a destination, like a message queue (for example, Google Pub/Sub, AWS SQS).
  2. Trigger: A merchant publishes a new Product in the Merchant Center.
  3. Notify: Composable Commerce instantly sends a message to your configured queue. This message acts as a signal, containing a reference to the resource that changed.
  4. Process: A service you control is triggered by the new message in the queue.
  5. Update: Your service uses the information in the message to make a targeted API call back to commercetools, fetching the full, up-to-date Product data. It then transforms this data into the required format and pushes it to your search engine index.

Choosing the right messages for your search index

When integrating a search index with your Product catalog, the goal is to be both accurate and efficient. You want the search index to reflect precisely what the customer sees, and you want to avoid unnecessary processing. The key to achieving this is subscribing to the right messages.
Your search index should only be updated when there's a change that affects the customer's experience. This means you are exclusively interested in published Product data. Any changes made behind the scenes (in what's called a staged state) are irrelevant until they go live.

For a lean and efficient search integration, you only need to subscribe to two core messages:

  1. ProductPublished
  2. ProductUnpublished

Handling new and updated Products with ProductPublished

You might think you need separate messages for creating and updating Products, but ProductPublished elegantly handles both scenarios.
  • How it Works: In commercetools, any change to a Product — whether it's a brand new item or just a simple price adjustment, a new image, or an updated description — first creates a staged version. These staged changes are not visible to customers until you explicitly publish them. The act of publishing triggers the ProductPublished message.
  • Why it's Efficient: This single message is your perfect trigger. It tells you, "Something is now live for the customer." It doesn't matter if the Product was brand new or just updated; this is the moment you need to add or update the entry in your search index.

Handling Product removals with ProductUnpublished

When a Product needs to be removed from your Store, it must first be unpublished. This action is the ideal signal for removing it from the search index.

  • How it Works: A Product cannot be deleted while it is in a published state. The first step is always to unpublish it, which fires the ProductUnpublished message.
  • Why it's Sufficient: Because unpublishing is a mandatory prerequisite for deletion, this message serves as the perfect, timely trigger to remove the item from your search index. Listening for a separate ProductDeleted message would be redundant for this purpose.
What to Avoid: You might be tempted to use a ChangeSubscription. However, this would flood your queue with messages for every minor modification, including changes to staged data that isn't live yet. For a search index, this creates unnecessary noise and processing load.
You can see all of the messages in Composable Commerce here.

Preparing the message queue

Before we can create the Subscription in Composable Commerce, we need a destination for the messages to go. For this solution, we will use Google Cloud Pub/Sub.

  1. Create a Google Cloud Project (skip this step if you already have one) A Google Cloud project organizes all your Google Cloud resources, including Pub/Sub topics and subscriptions that will receive notifications from commercetools.
    1. Go to Google Cloud Console.
    2. In the top left, click the project selector > New Project.
    3. Give it a name (for example, ct-search-sync) and click Create.
  2. Enable the Pub/Sub API for your project This activates the Pub/Sub service within your Google Cloud project, allowing you to create and manage topics and subscriptions. The Pub/Sub API needs to be enabled to allow commercetools to send messages to your Google Cloud project.
  3. Create a Pub/Sub Topic A topic is where commercetools will send Product change events—think of it as the inbox for your Product messages.
    1. Go to Pub/Sub > Topics
    2. Click Create Topic.
    3. Set Topic ID: ct-product-sync.
    4. Click Create.
  4. Create a Pub/Sub Subscription A subscription listens to a topic and queues messages for your application to consume.
    1. Go to Pub/Sub > Subscriptions.
    2. Click Create Subscription.
    3. Set the following:
      1. Subscription ID: ct-product-consumer.
      2. Topic: ct-product-sync.
      3. Delivery type: Pull.
    4. Leave other settings as default and click Create.
  5. Create a Service Account A service account is a special Google Cloud account used by applications (like your listener application) to authenticate and access Google Cloud resources, such as our Pub/Sub queue and receive messages.
    1. Go to IAM & Admin > Service Accounts.
    2. Click Create Service Account.
      1. Name: ct-pubsub-client
    3. Click Continue.
    4. Add roles:
      1. Pub/Sub Subscriber.
      2. Pub/Sub Viewer (optional).
    5. Click Done.
  6. Download a JSON Key This JSON file contains the credentials for your service account. Your local or server-side application will use this key to authenticate with Google Cloud and access the Pub/Sub subscription to receive messages.
    1. In the list of service accounts, find ct-pubsub-client.
    2. Click the three-dot menu > Manage keys.
    3. Click Add Key > Create new key.
    4. Select JSON and click Create.
    5. Save the .json key locally (for example, ctapisubscription.json) - we will need it later for our listener application.

    This is part of the service account creation/management process, covered in the previous link.

  7. Assign Publisher Role to the commercetools Service Account This step grants commercetools permission to publish messages to your Pub/Sub topic. Without this, commercetools won't be able to send notifications.
    1. Go to Pub/Sub > Topics.
    2. Click on your topic: ct-product-sync.
    3. Click Permissions.
    4. Click Add principal.
    5. Enter the service account email subscriptions@commercetools-platform.iam.gserviceaccount.com.
    6. Role: Pub/Sub Publisher.
    7. Click Save.

Configure Composable Commerce Project

Now that the Google Cloud setup is complete, we can move on to configuring the Composable Commerce Project before writing the listener code that will poll our Pub/Sub queue and react to incoming messages.

All we need to do on the Composable Commerce side is create a Subscription that sends ProductPublished and ProductUnpublished messages to our Pub/Sub topic. Since this is a one-time setup, we can do it via the API—for example, using the HTTP API Playground in the Merchant Center:
In your Composable Commerce Project, go to Settings > Developer Settings > HTTP API Playground and select the following:
  1. Request method: POST
  2. Endpoint: Subscriptions
  3. Command: Create Subscription
  4. Add the following body and then click Send:
{
  "destination": {
    "type": "GoogleCloudPubSub",
    "topic": "ct-product-sync", //the value we provided in step 3
    "projectId": "ct-search-sync" //the value we provided in step 1
  },
  "messages": [
    {
      "resourceTypeId": "product",
      "types": [
// the two Message Types we want to subscribe to
        "ProductPublished",
        "ProductUnpublished"
      ]
    }
  ],
  "key": "product-search-sync" //the key to identify our Subscription by
}

Alternatively, if you’re managing your infrastructure as code, you can also configure this Subscription using Terraform with the commercetools provider. This makes the setup more repeatable and easier to maintain in the long run.

If the Subscription was created successfully, it means both our Pub/Sub topic and Composable Commerce Project are configured correctly.

Remember: Composable Commerce always sends a test message when creating a Subscription. If that message can’t be delivered, the Subscription won’t be created.

Prepare the listener application

Now, let's focus on developing our listener application. The code below can be re-used depending on your needs: you could have this running in the cloud, or on a Virtual Machine, or even locally.

To begin, ensure that all necessary dependencies for the Google Pub/Sub integration are installed. For npm, execute the following command in your CLI / terminal:

npm install @google-cloud/pubsub

If instead you are using yarn, execute the following command in your CLI/ terminal:

yarn add @google-cloud/pubsub

With that done, we can write our code:

import { PubSub } from "@google-cloud/pubsub";

//...

async updateSearchIndex(product: ProductProjection) {
    console.log(`Updated/added product in search engine: ${product.id}`);
    // TODO: Format and send to search index
  }

async removeFromSearchIndex(productId: string) {
    console.log(`Removed product from search engine: ${productId}`);
    // TODO: Remove by objectID from search index
  }

async fetchMessages(): Promise<void> {
    const subscriptionName = "ct-product-consumer"; //the Subscription ID value we provided in step 4

    // Initialize the Pub/Sub client with explicit credentials
    const pubSubClient = new PubSub({
      keyFilename: "./ctapisubscription.json", // Path to the location of your service account JSON key
      projectId: "ct-search-sync", // the GCP project ID that we set in step 1
    });

    const subscription = pubSubClient.subscription(subscriptionName);

    // Start listening to incoming messages from the subscription
    subscription.on("message", async (message) => {
      try {
        const ctMessage = JSON.parse(message.data.toString());
        const { type, resource } = ctMessage;

        switch (type) {
          case "ProductPublished": {
            // Fetch the published product from a specific store
            const product = await apiRoot
              .inStoreKeyWithStoreKeyValue({ storeKey: "b2c-retail-store" })
              .productProjections()
              .withId({ ID: resource.id })
              .get()
              .execute();

            // Sync the product to the third-party search engine
            await this.updateSearchIndex(product.body);
            break;
          }

          case "ProductUnpublished": {
            // Remove the product from the search index
            await this.removeFromSearchIndex(resource.id);
            break;
          }

          default:
            console.log(` Unhandled message type: ${type}`);
        }

        // Acknowledge successful handling of the message
        message.ack();
      } catch (err) {
        console.error(" Error handling message:", err);

        // Optionally requeue the message for later processing
        message.nack();
      }
    });
    // Error handler
    subscription.on("error", (err) => {
      console.error("Subscription error:", err);
    });
  }

And that’s it! Once you run your listener app, head over to your Project and publish or unpublish a few Products. You should see your application receive and handle the corresponding messages from the Pub/Sub queue.

The hybrid approach

In most production environments, the most effective strategy is a hybrid approach: run a full sync periodically to prevent data drift, and use event-driven delta syncs to capture real-time changes as they happen. This ensures both consistency and responsiveness, giving you the best of both worlds when keeping your search index in sync with your Composable Commerce Project.

Test your knowledge