In advanced stream event processing, executing use cases often requires enriching events with contextual data to facilitate analytical and logical processing. This contextual data can be dynamic, such as live-calculated metrics generated during processing, or static, consisting of slow-moving data imported on startup for quick, low-latency lookups.
Analytical and machine learning stream processing typically extends beyond a single stream view by incorporating various forms of contextual data, including static constants, complex features, dynamic key performance indicators (KPIs), and ranges within its processing context to fulfil its intended purpose.
Below are examples illustrating how contextual data is utilised within a stream processing context:
Real-time Fraud Detection: Incorporating historical transaction data (contextual) to identify patterns and anomalies in current transactions, enabling instant fraud detection and prevention.
Personalised Recommendations: Analysing user preferences (contextual) based on past interactions to deliver tailored content or product recommendations in real-time.
Predictive Maintenance: Utilising sensor data from machinery (contextual) along with historical maintenance records to forecast potential equipment failures and schedule proactive maintenance tasks.
Dynamic Pricing: Integrating market trends and competitor pricing data (contextual) to adjust product prices in real-time, optimising revenue and staying competitive.
Supply Chain Optimisation: Combining real-time inventory levels, supplier data, and demand forecasts (contextual) to optimise supply chain operations, minimise stock outs, and reduce costs.
Traffic Management: Aggregating data from traffic sensors, GPS devices, and historical traffic patterns (contextual) to optimise traffic flow, detect congestion, and provide alternate routes in real-time.
Health Monitoring: Integrating patient vital signs, medical history, and sensor data from wearable devices (contextual) to monitor health status in real-time and provide timely interventions.
Energy Grid Optimisation: Analysing real-time data from smart meters, weather forecasts, and historical usage patterns (contextual) to optimise energy distribution, reduce waste, and manage peak demand.
Social Media Sentiment Analysis: Mining social media feeds and news articles (contextual) to gauge public sentiment in real-time, enabling businesses to adjust marketing strategies or respond to emerging trends.
Customer Experience Enhancement: Incorporating customer feedback, interaction history, and sentiment analysis (contextual) to personalise interactions, resolve issues promptly, and improve overall satisfaction.
The Joule stream processing platform provides key features that enable the enrichment of live events with contextual data through the following mechanisms.
Enrichment processor and SQL query interface
Data file importing for static reference data and pre-computed metrics
Live metrics generated by the metrics engine
Dynamic reference data using enterprise grade distributed caching technology and S3 object stores
Embedded in-memory SQL database, DuckDB
Reference data
To quote Wikipedia, "Reference data is data used to classify or categorise other data. Typically, they are static or slowly changing over time." In the context of this article, we extend this definition to include dynamic data as a subset of slow-moving data. All these data types are vital in implementing streaming analytics and real-time decision-making processes.
Static
Consistent and trusted data sourced from a master data platform will ensure event based insights are of high fidelity and quality. Further processing of enriched events could either be location or range based filtering, build a real-time ratio of data usage with respect to contracted plan or perform ML propensity to buy prediction based upon historical actions versus current behaviour.
Example
Product / company information
Mobile contract bundles
Overnight fixed rates (e.g. fx, overnight swap rates and daily offers)
Complex constant ML feature
Historical buy signals
Calendar holidays and business days
Fixed geospatial locations of interest
Standard country codes
Priming the use case
The below Joule example loads, on process startup, csv file contents into the nasdaq_companies in-memory SQL table and creates a unique index on ‘Symbol’ field.
initialisation:
- data import:
schema: reference_data
csv:
- table: nasdaq_companies
file: data/csv/nasdaq.csv
drop table: true
index:
fields: [ 'Symbol' ]
unique: true
Enrich events with static data
The enrichment processor executes an event-based lookup using the “symbol” event value. Although this example applies SQL execute the lookup a key based
enricher:
fields:
company_info:
by query: "select * from reference_data.nasdaq_companies
where Symbol = ?"
query fields: [ symbol ]
with values: [ Name, Country ]
using: JouleDB
More information on how the enricher processor can be used to decorate events with static data can be found here.
Slow moving
Semi-static data update infrequently but is critical to be applied promptly within a process when the change occurs. Typically, core business data warehouses manage this type of data, with updates made available either through distribution technology or at the end of the day using traditional batch processing methods. However, a common solution employed by forward-looking businesses is the use of a distributed caching and messaging platforms.
Example data types
Customer information
Contract changes
Intraday snapshots (e.g. fx rates, inventory levels, pricing updates etc;)
Intraday recomputed complex ML features
Joule supports this data type by providing OOTB solutions that connect and subscribe to a distributed caching grid data events using local caching as the proxy and messaging topics.
Enrich events with slow moving data
This example demonstrates two methods of querying the embedded cache storage using OQL and key based lookup. The stores are linked to the enrichment processor using a reference data DSL definition.
enricher:
fields:
contractedDataBundle:
with values: [contracted_data_bundle]
by key: imsi
using: customerMobileContractStore
dataBundleOffer:
with values: [temporary_data_bundle_uplift, data_bundle_offer]
by query: "select * from /dynamicDataBundles where imsi = ?"
query fields: [imsi]
using: dailyDataBundleOffersStore
stores:
customerMobileContractStore:
store name: mobilecontracts
dailyDataBundleOffersStore:
store name: dynamicDataBundles
More information on how the enricher processor can be configured to leverage the Apache Geode solution for slow data can be found here.
Enrich with live metrics
Advanced use cases often require computed metrics as components for either further contextual event based calculations, ML predictions, as event based triggers or for additional context for the emitted event. These would be computed using a time based process such as a scheduled query using either tumbling and sliding windows using captured events local to the process.
Example metrics
Time series measures (e.g. market depth, volatility, liquidity)
Customer mobile data usage with respect to time
Inventory levels monitoring for restocking triggers
Average Geospatial dwelling level for infrastructure management
Priming the metrics table
Joule provides the ability to prime metrics with pre-computed values at startup. This configuration is performed within the initialisation DSL element.
initialisation:
data import:
parquet:
- table: bid_moving_averages
schema: metrics
files: ['data/parquet/mvavgs-prime.parquet']
drop table: true
index:
fields: [ 'symbol' ]
unique: false
Enrich events with metric data
There are various methods to enrich an event using metric data. Below demonstrates how the enricher processor can be used.
enricher:
fields:
quote_metrics:
by metric family: BidMovingAverage
by key: symbol
with values: [avg_bid_min, avg_bid_avg, avg_bid_max]
using: MetricsDB
This example applies the latest metrics for a given symbol to an event. More information can be found here
Summary
Joule AI offers the capability to serve various forms of contextual data at in-memory speed, utilising a low-code approach, to drive analytical and logical processing. With these features, advanced stream-based use cases can be efficiently implemented by seamlessly integrating within an existing enterprise architecture.
Documentation
Joule offers a wide range of features and connector integrations, all of which are detailed in the online documentation. This documentation is regularly updated to ensure accuracy and to reflect any new additions or improvements to the platform.
Getting started
To get started, download the following resources to prepare your environment and work through the provided documentation. Please feel free to reach out with questions.
We’re Here to Help
Feedback is most welcome including thoughts on how to improve and extend Joule and ideas for exciting use cases.
Join the FractalWorks Community Forum, who openly share ideas, best practices and support each other. Feel free to join us there! And if you have any further questions on how to become a partner or customer of FractalWorks, do not hesitate to engage with us, we will be happy to talk about your needs.
Comments