top of page

Applying contextual data to Joule streams

In advanced stream event processing, executing use cases often requires enriching events with contextual data to facilitate analytical and logical processing. This contextual data can be dynamic, such as live-calculated metrics generated during processing, or static, consisting of slow-moving data imported on startup for quick, low-latency lookups.

Analytical and machine learning stream processing typically extends beyond a single stream view by incorporating various forms of contextual data, including static constants, complex features, dynamic key performance indicators (KPIs), and ranges within its processing context to fulfil its intended purpose.



Below are examples illustrating how contextual data is utilised within a stream processing context:


  • Real-time Fraud Detection: Incorporating historical transaction data (contextual) to identify patterns and anomalies in current transactions, enabling instant fraud detection and prevention.

  • Personalised Recommendations: Analysing user preferences (contextual) based on past interactions to deliver tailored content or product recommendations in real-time.

  • Predictive Maintenance: Utilising sensor data from machinery (contextual) along with historical maintenance records to forecast potential equipment failures and schedule proactive maintenance tasks.

  • Dynamic Pricing: Integrating market trends and competitor pricing data (contextual) to adjust product prices in real-time, optimising revenue and staying competitive.

  • Supply Chain Optimisation: Combining real-time inventory levels, supplier data, and demand forecasts (contextual) to optimise supply chain operations, minimise stock outs, and reduce costs.

  • Traffic Management: Aggregating data from traffic sensors, GPS devices, and historical traffic patterns (contextual) to optimise traffic flow, detect congestion, and provide alternate routes in real-time.

  • Health Monitoring: Integrating patient vital signs, medical history, and sensor data from wearable devices (contextual) to monitor health status in real-time and provide timely interventions.

  • Energy Grid Optimisation: Analysing real-time data from smart meters, weather forecasts, and historical usage patterns (contextual) to optimise energy distribution, reduce waste, and manage peak demand.

  • Social Media Sentiment Analysis: Mining social media feeds and news articles (contextual) to gauge public sentiment in real-time, enabling businesses to adjust marketing strategies or respond to emerging trends.

  • Customer Experience Enhancement: Incorporating customer feedback, interaction history, and sentiment analysis (contextual) to personalise interactions, resolve issues promptly, and improve overall satisfaction.


The Joule stream processing platform provides key features that enable the enrichment of live events with contextual data through the following mechanisms.


  • Enrichment processor and SQL query interface

  • Data file importing for static reference data and pre-computed metrics

  • Live metrics generated by the metrics engine

  • Dynamic reference data using enterprise grade distributed caching technology and S3 object stores

  • Embedded in-memory SQL database, DuckDB


Reference data

To quote Wikipedia, "Reference data is data used to classify or categorise other data. Typically, they are static or slowly changing over time." In the context of this article, we extend this definition to include dynamic data as a subset of slow-moving data. All these data types are vital in implementing streaming analytics and real-time decision-making processes.


Static

Consistent and trusted data sourced from a master data platform will ensure event based insights are of high fidelity and quality. Further processing of enriched events could either be location or range based filtering, build a real-time ratio of data usage with respect to contracted plan or perform ML propensity to buy prediction based upon historical actions versus current behaviour.


Example

  • Product / company information

  • Mobile contract bundles

  • Overnight fixed rates (e.g. fx, overnight swap rates and daily offers)

  • Complex constant ML feature

  • Historical buy signals

  • Calendar holidays and business days

  • Fixed geospatial locations of interest

  • Standard country codes


Priming the use case

The below Joule example loads, on process startup, csv file contents into the nasdaq_companies in-memory SQL table and creates a unique index on ‘Symbol’ field.


initialisation:
 - data import:
     schema: reference_data
     csv:
       - table: nasdaq_companies
         file: data/csv/nasdaq.csv
         drop table: true
         index:
           fields: [ 'Symbol' ]
           unique: true

Enrich events with static data

The enrichment processor executes an event-based lookup using the “symbol” event value. Although this example applies SQL execute the lookup a key based


 enricher:
   fields:
     company_info:
       by query: "select * from reference_data.nasdaq_companies 
            where Symbol = ?"
       query fields: [ symbol ]
       with values: [ Name, Country ]
       using: JouleDB

More information on how the enricher processor can be used to decorate events with static data can be found here.


Slow moving

Semi-static data update infrequently but is critical to be applied promptly within a process when the change occurs. Typically, core business data warehouses manage this type of data, with updates made available either through distribution technology or at the end of the day using traditional batch processing methods. However, a common solution employed by forward-looking businesses is the use of a distributed caching and messaging platforms.


Example data types

  • Customer information

  • Contract changes

  • Intraday snapshots (e.g. fx rates, inventory levels, pricing updates etc;)

  • Intraday recomputed complex ML features

Joule supports this data type by providing OOTB solutions that connect and subscribe to a distributed caching grid data events using local caching as the proxy and messaging topics.


Enrich events with slow moving data

This example demonstrates two methods of querying the embedded cache storage using OQL and key based lookup. The stores are linked to the enrichment processor using a reference data DSL definition.


enricher:
 fields:
   contractedDataBundle:
     with values: [contracted_data_bundle]
     by key: imsi
     using: customerMobileContractStore

   dataBundleOffer:
     with values: [temporary_data_bundle_uplift, data_bundle_offer]
     by query:  "select * from /dynamicDataBundles where imsi = ?"
     query fields: [imsi]
     using: dailyDataBundleOffersStore

 stores:
   customerMobileContractStore:
     store name: mobilecontracts

   dailyDataBundleOffersStore:
     store name: dynamicDataBundles

More information on how the enricher processor can be configured to leverage the Apache Geode solution for slow data can be found here.


Enrich with live metrics

Advanced use cases often require computed metrics as components for either further contextual event based calculations, ML predictions, as event based triggers or for additional context for the emitted event. These would be computed using a time based process such as a scheduled query using either tumbling and sliding windows using captured events local to the process.


Example metrics

  • Time series measures (e.g. market depth, volatility, liquidity)

  • Customer mobile data usage with respect to time

  • Inventory levels monitoring for restocking triggers

  • Average Geospatial dwelling level for infrastructure management


Priming the metrics table

Joule provides the ability to prime metrics with pre-computed values at startup. This configuration is performed within the initialisation DSL element.


initialisation:
 data import:
   parquet:
     - table: bid_moving_averages
       schema: metrics
       files: ['data/parquet/mvavgs-prime.parquet']
       drop table: true
       index:
         fields: [ 'symbol' ]
         unique: false

Enrich events with metric data

There are various methods to enrich an event using metric data. Below demonstrates how the enricher processor can be used.


enricher:
 fields:
   quote_metrics:
     by metric family: BidMovingAverage
     by key: symbol
     with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
     using: MetricsDB

This example applies the latest metrics for a given symbol to an event. More information can be found here


Summary

Joule AI offers the capability to serve various forms of contextual data at in-memory speed, utilising a low-code approach, to drive analytical and logical processing. With these features, advanced stream-based use cases can be efficiently implemented by seamlessly integrating within an existing enterprise architecture.


Documentation

Joule offers a wide range of features and connector integrations, all of which are detailed in the online documentation. This documentation is regularly updated to ensure accuracy and to reflect any new additions or improvements to the platform.


Getting started

To get started, download the following resources to prepare your environment and work through the provided documentation. Please feel free to reach out with questions.


  • Download the examples code from GitLab

  • Work through the README file


We’re Here to Help

Feedback is most welcome including thoughts on how to improve and extend Joule and ideas for exciting use cases.


Join the FractalWorks Community Forum, who openly share ideas, best practices and support each other. Feel free to join us there! And if you have any further questions on how to become a partner or customer of FractalWorks, do not hesitate to engage with us, we will be happy to talk about your needs.

17 views0 comments

Recent Posts

See All
bottom of page