Skip to main content

Builde Real-Time Wide Table with TapFlow

TapFlow is a programming framework that supports real-time data replication, data processing, and materialized view creation. It offers APIs, a Python SDK, and command-line tools (Tap Shell) to efficiently build and manage data flow tasks. This guide demonstrates using Tap Shell and the Python SDK to build a real-time wide table to support efficient queries in an e-commerce application by joining multiple tables of order information.

Background

As the business and data volume grows, the e-commerce company XYZ faces challenges in order and inventory management. Order data and inventory information are distributed across multiple database tables, and operations personnel often need to perform complex cross-table queries for order details. The primary challenges are:

  • High Query Latency: Order detail queries rely on complex cross-table joins, which impacts query performance, especially during peak times.
  • Data Inconsistency: Ensuring data consistency across multiple tables in high-concurrency scenarios is challenging, increasing the risk of inconsistencies.
  • Lack of Real-Time Updates: Changes in order status or inventory are not reflected promptly, making it difficult for users to access up-to-date information.

Typical E-commerce Table Structure

To address these challenges, the company uses TapFlow to build a real-time wide table, consolidating order, customer, payment, and product data in MongoDB to support high-concurrency mobile API queries. Here’s an overview of the process:

  1. Data Integration: TapFlow uses CDC to monitor real-time changes in source tables, capturing updates to order, customer, and payment data and transferring them to MongoDB.
  2. Wide Table Generation: TapFlow's lookup feature combines data from multiple tables into a single wide table, embedding customer, product, and payment information into the order record for simplified queries.
  3. Real-Time Updates: When source data changes, TapFlow synchronizes incremental updates to the MongoDB wide table, ensuring the query content is always up-to-date.

Building a Real-Time Wide Table

By using TapFlow, XYZ enables real-time synchronization and fast querying of order and inventory information. Operations staff can access the latest order data instantly, significantly improving the user experience. The wide table consolidates order, customer, product, and logistics information in MongoDB, reducing cross-table join resource consumption and improving query efficiency and system performance.

Next, we’ll walk through setting up TapFlow to meet these requirements.

Prerequisites

Install Tap Shell and add MySQL/MongoDB data sources. For detailed steps, see Quick Start.

Step 1: Builde a Real-Time Wide Table

In this example, the MySQL data source is named MySQL_ECommerce, and the MongoDB data source is named MongoDB_ECommerce. We will build the real-time wide table using Tap Shell commands.

  1. Run tap to open the Tap Shell command interface.

  2. Specify the source table for the data flow.

    # Create a data data flow task and set the primary table "ecom_orders" as the data flow entry point
    orderFlow = Flow("Order_SingleView_Sync") \
    .read_from("MySQL_ECommerce.ecom_orders"); # Set MySQL source table ecom_orders
  3. Add lookup processing nodes to perform LEFT JOIN associations with other tables. Each lookup command embeds data from a related table into the primary table, defining the join key with the relation parameter. The orderFlow data stream then contains enriched information on orders, customers, payments, products, and sellers.

    # Embed 'ecom_customers' table data into the order table based on the customer_id field
    orderFlow.lookup("MySQL_ECommerce.ecom_customers",
    path="customer_info", # MongoDB embedding path
    type="object", # Embed type as object
    relation=[["customer_id", "customer_id"]]); # Join on customer_id

    # Embed 'ecom_order_payments' table data as an array in the order table based on order_id
    orderFlow.lookup("MySQL_ECommerce.ecom_order_payments",
    path="order_payments", # MongoDB embedding path
    type="array", # Embed type as array
    relation=[["order_id", "order_id"]]); # Join on order_id

    # Embed 'ecom_order_items' table data as an array in the order table based on order_id
    orderFlow.lookup("MySQL_ECommerce.ecom_order_items",
    path="order_items", # MongoDB embedding path
    type="array", # Embed type as array
    relation=[["order_id", "order_id"]]); # Join on order_id

    # Embed 'ecom_products' table data as an object in the order_items array based on product_id
    orderFlow.lookup("MySQL_ECommerce.ecom_products",
    path="order_items.product", # Embed path points to order_items.product
    type="object", # Embed type as object
    relation=[["order_items.product_id", "product_id"]]); # Join on product_id

    # Embed 'ecom_sellers' table data as an object in the order_items array based on seller_id
    orderFlow.lookup("MySQL_ECommerce.ecom_sellers",
    path="order_items.seller", # Embed path points to order_items.seller
    type="object", # Embed type as object
    relation=[["order_items.seller_id", "seller_id"]]); # Join on seller_id
  4. Specify MongoDB as the output destination, saving the consolidated data in a collection called orderSingleView.

    # Specify MongoDB collection name for data write
    orderFlow.write_to("MongoDB_ECommerce.orderSingleView");
    # Save the data flow configuration
    orderFlow.save()
  5. Run orderFlow.start() to start the task. After the task starts, use the status command to monitor the task’s status, as shown below:

    status Order_SingleView_Sync
    job current status is: running, qps is: 3521.2, total rows: 99441, delay is: 332ms
  6. (Optional) Access the target MongoDB database to confirm that the record count matches the source.

    use MongoDB_ECommerce
    count orderSingleView

Step 2: Verify Real-Time Performance

E-commerce company XYZ needs to process user orders and track each order's details within the system. Every time a user places an order, the order’s basic information, customer details, order items, product information, and seller information are recorded in the database, and the status updates when the order ships. With TapFlow, this information is synchronized in real-time to MongoDB for API queries, ensuring users see up-to-date order information.

The following steps simulate data flow in a real-world business scenario by manually inserting data into MySQL.

  1. Log in to the source MySQL database and run the following commands to simulate a new order created by a user.

    -- Insert customer information
    INSERT INTO ecom_customers (customer_id, customer_unique_id, customer_zip_code_prefix, customer_city, customer_state)
    VALUES ('CUST12345', 'UNIQUE_CUST_00123', '10001', 'New York', 'NY');

    -- Insert order information
    INSERT INTO ecom_orders (order_id, customer_id, order_status, order_purchase_timestamp, order_approved_at, order_estimated_delivery_date)
    VALUES ('ORD789654', 'CUST12345', 'pending', NOW(), NOW(), '2024-11-10');

    -- Insert order item information (includes product and seller details)
    INSERT INTO ecom_order_items (order_id, order_item_id, product_id, seller_id, shipping_limit_date, price, freight_value)
    VALUES ('ORD789654', 1, 'PROD56789', 'SELL34567', '2024-11-05', 199.99, 15.0);

    -- Insert product information
    INSERT INTO ecom_products (product_id, product_category_name, product_weight_g, product_length_cm, product_height_cm, product_width_cm)
    VALUES ('PROD56789', 'electronics', 1200, 20, 10, 8);

    -- Insert seller information
    INSERT INTO ecom_sellers (seller_id, seller_zip_code_prefix, seller_city, seller_state)
    VALUES ('SELL34567', '90001', 'Los Angeles', 'CA');

    -- Insert payment information
    INSERT INTO ecom_order_payments (order_id, payment_sequential, payment_type, payment_installments, payment_value)
    VALUES ('ORD789654', 1, 'credit_card', 1, 199.99);
    tip

    Once data is inserted, TapFlow automatically parses the Binlog for real-time data changes, processes them, and writes them to MongoDB’s orderSingleView collection.

  2. To simulate order updates, run the following commands to update shipment, payment, and order information.

    -- Update order status and shipping date
    UPDATE ecom_orders
    SET order_status = 'shipped', order_delivered_carrier_date = NOW()
    WHERE order_id = 'ORD789654';

    -- Update payment information to increase payment amount
    UPDATE ecom_order_payments
    SET payment_value = 220.0, payment_installments = 2
    WHERE order_id = 'ORD789654' AND payment_sequential = 1;

    -- Update order item details, such as price and freight cost
    UPDATE ecom_order_items
    SET price = 210.0, freight_value = 18.0
    WHERE order_id = 'ORD789654' AND order_item_id = 1;
  3. Log in to the MongoDB database, and query the collection for the order_id = ORD789654. You will see the changes reflected in real time with embedded JSON structures, making it easy to integrate with backend APIs for efficient querying, greatly improving business responsiveness and user experience.

    # In the following example, key information is highlighted, 
    # while less relevant data is omitted for clarity.
    {
    "_id": {
    "$oid": "67262caa09771da27c15c713"
    },
    "order_id": "ORD789654", // Newly added order
    "customer_id": "CUST12345",
    "order_status": "shipped", // Updated shipping status
    "order_purchase_timestamp": "2024-11-02T10:20:00.000Z",
    "order_approved_at": "2024-11-02T10:22:00.000Z",
    "order_delivered_carrier_date": "2024-11-02T18:00:00.000Z", // Updated delivery time
    "order_estimated_delivery_date": "2024-11-10T00:00:00.000Z",
    "order_payments": [
    {
    "payment_type": "credit_card",
    "payment_installments": 2, // Updated payment installments
    "payment_value": 220, // Updated payment amount
    "payment_sequential": 1
    }
    ],
    "order_items": [
    {
    "order_item_id": 1,
    "price": 210, // Updated product price
    "freight_value": 18, // Updated shipping cost
    "product_id": "PROD56789",
    "shipping_limit_date": "2024-11-05T00:00:00.000Z",
    "seller": {...},
    "product": {...},
    }
    ],
    ...
    }

See also

Publish Data as API