In a financial system, or maybe other systems, running a summary job for a certain data set always presents problems: it can involve scanning the entire table, summarizing data, migrating to a destination system, handling many different work steps, and repetitive work day after day.
The problem here is when there are problems like this, how do we handle them to execute the Job as quickly as possible, ensuring system performance, without overloading or causing downtime to the main system.
In this article, I will share how I run a summary job to ensure high performance.
The problem is simple: At the end of each day, the system will need to run a Job to summarize the balance for 15–20 million accounts.
The end-of-day balance of an account in a financial system will include several parameters such as:
open balance: opening balance of day T (equal to closing balance of day T-1)
close balance: end-of-day balance
debit balance: debit balance of the day
credit balance: credit balance of the day
debit transaction: number of debit transactions
credit transaction: number of credit transactions
It will be necessary to summarize some of the above parameters by day, for each different account, each day it will be necessary to summarize data and generate xx million summary records depending on the number of accounts in the system.
Main business process flow
About the main process, I will show it in the picture below. I will not redraw the optimization parts in the following part of the article because the basic flow will be the same.
I divide it into 4 main steps, each step will do separate tasks:
Step 1: scan the entire list of accounts
In this first step, its main task is to scan the list of accounts in the account table (the main optimization part will be in this step)
There will be a scheduled job that scans the account table sequentially to get the list of accounts, scanning each page from the beginning to the last page of the table.
During the scanning process, every 100 accounts will be grouped into 1 batch, each batch is 1 event, and some more parameters will be added to the event and then shot into the queue of Step 2.
Step 2: handle the main logic of data aggregation
The primary goal in step two will be to synthesize account data by an event. Since each event is a batch of 100 accounts above, processing each event will also process a batch of 100 accounts simultaneously, which will speed up data synthesis compared to processing each account separately.
To store the data in the database, a new event model will be constructed and sent to the step 3 queue after the account balance has been synthesized.
The data in this event model will need to be changed or placed into the database of 100 accounts following synthesis; however, we will not do either of those things at this step; instead, we will forward the event to step 3 for processing.
Step 3: process and save the aggregated data to the database
Saving data to the database using the event model from step 2 is the primary responsibility of step 3. There will be instances of successful or unsuccessful processing when this batch of data is saved to the database.
We will fire two distinct event types for each of these cases: SUCCESS or FAILED for step 4 to proceed with processing.
Step 4: Update Job processing status
Step 4’s primary responsibilities include updating the counter variable for batches that were successfully processed, and batches that were unsuccessful, and checking and updating the job’s completion status.
Step 3 is multi-threaded, so I chose to move the job status update to 1 thread in step 4 to avoid race conditions. This will always guarantee that the number of processed child batches (success, failure) in a large parent batch is updated sequentially.
Separating stages three and four also guarantees future maintenance and a clear division of tasks. There is no connection between the two steps.
You will not need to change the code in step 3 if you modify it in step 4 to add logic or update something later.
Test run on 1 million data
To prepare for testing data on 1 million accounts, I will need to do two main things:
Fake 1 million account data
Fake Entry data, which is the source data used for the synthesis
The implementation flow and concept remain the same as I initially provided, but I would prefer not to share the main implementation source code in this article for a variety of reasons.
Insert dummy data for the account table
My account table currently has only two fields account_no and accounting_type:
I will create a package to generate fake data, described below:
CREATE OR REPLACE PACKAGE PKG_ACCOUNT_UTIL AS
PROCEDURE INSERT_FAKE_DATA(p_num_records IN NUMBER);
END PKG_ACCOUNT_UTIL;
CREATE OR REPLACE PACKAGE BODY PKG_ACCOUNT_UTIL AS
PROCEDURE INSERT_FAKE_DATA(p_num_records IN NUMBER) IS
v_account_no VARCHAR2(255);
v_accounting_type VARCHAR2(255);
v_inserted_count NUMBER := 0;
BEGIN
WHILE v_inserted_count < p_num_records LOOP
v_account_no := '08' || TO_CHAR(DBMS_RANDOM.VALUE(1000000000, 9999999999), 'FM9999999990');
v_accounting_type := CASE
WHEN DBMS_RANDOM.VALUE(0, 1) < 0.5 THEN 'DR'
ELSE 'CR'
END;
BEGIN
INSERT INTO ACCOUNT (ACCOUNT_NO, ACCOUNTING_TYPE)
VALUES (v_account_no, v_accounting_type);
v_inserted_count := v_inserted_count + 1;
EXCEPTION
WHEN DUP_VAL_ON_INDEX THEN
NULL;
END;
END LOOP;
COMMIT;
END INSERT_FAKE_DATA;
END PKG_ACCOUNT_UTIL;
1 million account records will be inserted after creation.
BEGIN
PKG_ACCOUNT_UTIL.INSERT_FAKE_DATA(1000000);
END;
It takes roughly 13 seconds to run and insert 1 million data.
Insert source data for Entry table
The source data utilized to aggregate the data will be in the Entry table.
CREATE OR REPLACE PACKAGE PKG_ENTRY_UTIL AS
PROCEDURE INSERT_FAKE_ENTRY(p_num_records IN NUMBER);
END PKG_ENTRY_UTIL;
/
CREATE OR REPLACE PACKAGE BODY PKG_ENTRY_UTIL AS
PROCEDURE INSERT_FAKE_ENTRY(p_num_records IN NUMBER) IS
v_entry_id VARCHAR2(255);
v_transaction_id VARCHAR2(255);
v_account_no VARCHAR2(255);
v_amount NUMBER := 27000;
v_sign VARCHAR2(255);
v_account_category VARCHAR2(255);
v_currency VARCHAR2(20) := 'VND';
v_status VARCHAR2(100) := '1';
v_company_code VARCHAR2(255) := 'CAFEINCODE';
v_created_date TIMESTAMP(6);
v_processing_date TIMESTAMP(6);
v_success_count NUMBER := 0;
v_attempts NUMBER := 0;
TYPE t_account_no_list IS TABLE OF VARCHAR2(255);
v_account_no_list t_account_no_list;
v_account_index NUMBER := 1;
BEGIN
SELECT account_no BULK COLLECT INTO v_account_no_list FROM ACCOUNT;
WHILE v_success_count < p_num_records LOOP
v_attempts := v_attempts + 1;
v_account_no := v_account_no_list(MOD(v_account_index - 1, v_account_no_list.COUNT) + 1);
v_account_index := v_account_index + 1;
v_entry_id := '270519955555555555555.' || v_attempts || '.1';
v_transaction_id := v_entry_id;
v_sign := CASE WHEN DBMS_RANDOM.VALUE(0, 1) < 0.5 THEN 'DR' ELSE 'CR' END;
v_account_category := CASE
WHEN DBMS_RANDOM.VALUE(0, 3) < 1 THEN '01006'
WHEN DBMS_RANDOM.VALUE(0, 3) < 2 THEN '05001'
ELSE '05000'
END;
v_created_date := TO_TIMESTAMP('2024-12-26 09:16:18', 'YYYY-MM-DD HH24:MI:SS');
v_processing_date := TO_DATE('2024-12-25', 'YYYY-MM-DD')
+ INTERVAL '15' HOUR
+ INTERVAL '15' MINUTE;
BEGIN
INSERT INTO ENTRY (
ENTRY_ID, TRANSACTION_ID, ACCOUNT_NO, AMOUNT, SIGN, ACCOUNT_CATEGORY,
CURRENCY, STATUS, COMPANY_CODE, CREATED_DATE, PROCESSING_DATE
) VALUES (
v_entry_id, v_transaction_id, v_account_no, v_amount, v_sign,
v_account_category, v_currency, v_status, v_company_code,
v_created_date, v_processing_date
);
v_success_count := v_success_count + 1;
EXCEPTION
WHEN DUP_VAL_ON_INDEX THEN
NULL;
END;
END LOOP;
COMMIT;
END INSERT_FAKE_ENTRY;
END PKG_ENTRY_UTIL;
/
BEGIN
PKG_ENTRY_UTIL.INSERT_FAKE_ENTRY(2000000);
END;
Running the process of inserting 2 million data entry records takes about 3 minutes.
Run Project
Now we will run the project and test how long it takes to scan 1 million accounts.
With 1,000,000 accounts, I estimate that 10,000 batches will need to be run because the account scan is only operating on one thread. Since each batch consists of 100 accounts, the total_batch_count will be 10,000, and the total_batch_success + total_batch_failure = 10,000.
After roughly 14 minutes, the full scanning and data summary procedure is complete, but the account summary process is still ongoing.
2024-12-26 09:36:44.100 [INFO] [BatchPersistenceDataWorker-2] [ProcessBatchPersistenceHandler] Save batch report: a267b932-a121-4eb0-a6e5-a5fe8c2107e7 to database successfully
2024-12-26 09:36:44.100 [INFO] [BatchPersistenceDataWorker-2] [DefaultEventPublisher] Publish event message to queue: [queueBatchHistoryResultUpdate] successfully
2024-12-26 09:36:44.101 [INFO] [Process batch history result handler] [BatchJobHistoryUseCase] Batch job history: a267b932-a121-4eb0-a6e5-a5fe8c2107e7 in processing !!!
2024-12-26 09:36:44.131 [INFO] [http-nio-8109-exec-2] [AccountBalanceUseCase] [AccountBalanceUseCase] Processing page: 10000, size: 100, total elements: 0
2024-12-26 09:36:44.446 [INFO] [http-nio-8109-exec-2] [ExecutionTimeMonitorAspect] End service: JobSummaryAccountBalanceDaily execution time: 849357 ms - 849 s
The table below contains detailed information about the job’s running history. It shows that the summary job has successfully processed 1,000,000 account records, 10,000 batch events, and 10,000 batches that have been successfully processed.
The aggregated data results are saved in a report table.
One thing, though, that needs our attention is that it takes roughly 14 minutes to process 1,000,000 accounts. This is slow, in my opinion, and needs to be improved.
The slowest part of our entire processing flow is step 1, where there is only one processing thread and each page is scanned sequentially. As a result, the job’s processing time is nearly identical to the account scanning time.
The new approach to optimizing time
Change the way data is read in the account table
If the account table is already created and more columns cannot be added, I will use a different view to scan data.
The code to generate a new view from the previous account table is below. It adds a partition_id field to separate the accounts into ten groups.
CREATE OR REPLACE VIEW account_partitions_view AS
SELECT
account_no,
ACCOUNTING_TYPE,
MOD(ORA_HASH(account_no), 10) + 1 AS partition_id
FROM ACCOUNT;
The accounts have now been divided into ten groups. Step 1’s account scanning will no longer require a single thread, I will use 10 threads to scan accounts per partition.
Our data scanning will be more efficient since each thread will scan records based on a certain partition_id.
var executorService = Executors.newFixedThreadPool(10);
for (long i = 1; i <= 10; i++) {
long partitionId = i;
executorService.submit(() -> scanAccountByPartitionId(batchJobLog.getBatchUuid(), partitionId, dateTime));
}
Modify the logic a bit and run in the above direction, and we will have the following result:
Twelve minutes is the execution time, which does not seem to be much quicker. The scan will not be quick since the partition_id column in the view we just made lacks an index.
I will now make a small adjustment to the source data. Since I can not index the old view, I will make a new materialized view and index the partition_id column:
CREATE MATERIALIZED VIEW account_partitions_view2
BUILD IMMEDIATE
REFRESH COMPLETE ON DEMAND
AS
SELECT
account_no,
ACCOUNTING_TYPE,
MOD(ORA_HASH(account_no), 10)+1 AS partition_id
FROM ACCOUNT;
CREATE INDEX idx_partition_id ON account_partitions_view2(partition_id);
Then I checked the partition_id data distribution of the account records on the new view and found that the distribution was quite even.
Fix some logic to scan accounts from the new Materialized View
for (long i = 1; i <= 10; i++) {
long partitionId = i;
scanAccountByPartitionId(batchJobLog.getBatchUuid(), partitionId, dateTime);
}
public void scanAccountByPartitionId(String batchUuid, Long partition, LocalDateTime dateTime) {
var event = ScanAccountEvent.builder()
.batchUuid(batchUuid)
.partitionId(partition)
.time(dateTime)
.build();
eventPublisher.publishEvent(event,"queueSummary");
}
Additionally, after optimization, it takes somewhat more than a minute to scan the account_view list; however, this minute just covers the account list scan, and the data synthesis of all four steps takes roughly five minutes.
There were 10006 batches that were successfully processed, which is a few more than the six batches that were initially used. The summary table has exactly one million accounts’ worth of summary records.
To explain why the number of successfully processed batches here is 10006 batches, not 10000 batches, please see the following image:
We will round up to ensure that all records are fully included in the batches, even if the number of records in each sub-batch is not divisible by the batch size, it will still be counted.
We can preliminarily assess that the optimization by adding the partition_id column to divide multiple processing threads has brought about high efficiency, saving about 70% of processing time compared to the original conventional method.
Change the account table architecture from scratch
If in old systems, or the implementation of tables already exists, this way is almost impossible and cannot be done.
However, if I had full authority to decide to change the account table architecture at the beginning, I would implement the table architecture according to the following steps:
create an additional auto-incrementing ID sequence column on the original account table
create additional partition_id column, this column still hashes data based on account_no, and numbers from 1 to 10
create 10 processing threads for these 10 partition_ids, however, there will be a slight change in the scan in each partition
on each thread, in the query to get the account data, I will mark the id of the last record, and use that id as input for the next query, so I will avoid having to scan the entire table compared to the previous methods.
Okay, now let’s start creating a new table and handling the logic:
Create a new account table
The new account table will have 4 data fields: id, account_no, accounting_type, partition_id
create table ACCOUNT_NEW
(
ID NUMBER not null primary key,
ACCOUNT_NO VARCHAR2(50) not NULL UNIQUE,
ACCOUNTING_TYPE VARCHAR2(10),
PARTITION_ID NUMBER
);
create sequence ACCOUNT_NEW_SEQ start with 1 increment by 1;
create index ACCOUNT_NEW_PARTITION_ID_INDEX on ACCOUNT_NEW (PARTITION_ID);
Create random package account_new table data
CREATE OR REPLACE PACKAGE PKG_ACCOUNT_NEW_UTIL AS
PROCEDURE INSERT_FAKE_DATA(p_num_records IN NUMBER);
END PKG_ACCOUNT_NEW_UTIL;
CREATE OR REPLACE PACKAGE BODY PKG_ACCOUNT_NEW_UTIL AS
PROCEDURE INSERT_FAKE_DATA(p_num_records IN NUMBER) IS
v_account_no VARCHAR2(255);
v_accounting_type VARCHAR2(255);
v_inserted_count NUMBER := 0;
BEGIN
WHILE v_inserted_count < p_num_records LOOP
v_account_no := '08' || TO_CHAR(DBMS_RANDOM.VALUE(1000000000, 9999999999), 'FM9999999990');
v_accounting_type := CASE
WHEN DBMS_RANDOM.VALUE(0, 1) < 0.5 THEN 'DR'
ELSE 'CR'
END;
BEGIN
INSERT INTO ACCOUNT_NEW (
ID, ACCOUNT_NO, ACCOUNTING_TYPE, PARTITION_ID
) VALUES (
ACCOUNT_NEW_SEQ.nextval, v_account_no, v_accounting_type,
MOD(ORA_HASH(v_account_no), 10) + 1
);
v_inserted_count := v_inserted_count + 1;
EXCEPTION
WHEN DUP_VAL_ON_INDEX THEN
NULL;
END;
END LOOP;
COMMIT;
END INSERT_FAKE_DATA;
END PKG_ACCOUNT_NEW_UTIL;
BEGIN
PKG_ACCOUNT_NEW_UTIL.INSERT_FAKE_DATA(1000000);
END;
We have prepared the account data, now let’s check again about data allocation into partitions.
Reprocess query reading logic
Okay now comes the important step, in this step, we will scan by the id field of the table, along with partition_id
SELECT id, account_no, accounting_type, partition_id
FROM ACCOUNT_NEW where (id between FIRST_ID AND NEXT_ID ) and partition_id = PARTITION_ID order by id asc;
**FIRST_ID**
: is the value that runs sequentially from 0, after each run it will be changed by the id of the last record + 1, for example, the second run will take FIRST_ID equal to the id of the last record of the first run plus 1, repeat like that
**NEXT_ID**
: this value will be equal to the new FIRST_ID + our batch division range, for example from the beginning of the lesson until now, we have divided the batch by about 100 elements, and then next_id will be equal to FIRST_ID + 100
**PARTITION_ID**
: is the value from 1–10 that we have grouped the account
Query performance is good, uses index, and can play comfortably.
Important Note
During the scanning process, you need to pay attention to the following: if we use the condition that the result list is not empty or runs to the last record of the table, it will not be optimized and will cause errors.
Here I will get the id of the largest record in each partition, then cache it and use it as a stop condition for running the account scan query above.
The account scan processing logic will be similar to the following:
var partitionId = event.getPartitionId();
long maxIdOfPartition = event.getMaxIdOfPartition();
long firstCursorId = 0;
long nextCursorId = firstCursorId + 1000;
int countingBatch = 0;
List accumulatedAccounts = new ArrayList<>();
try {
while (firstCursorId <= maxIdOfPartition) {
var accounts = accountRepository.findAccountsByCursor(firstCursorId, nextCursorId, partitionId);
if (!accounts.isEmpty()) {
accumulatedAccounts.addAll(accounts);
if (accumulatedAccounts.size() >= 100) {
var viewAccounts = mappingAccounts(accumulatedAccounts);
publishEvent(viewAccounts, batchUuid, countingBatch + 1, dateTime);
accumulatedAccounts.clear();
countingBatch++;
}
var lastCursorId = accounts.get(accounts.size() - 1).getId();
firstCursorId = lastCursorId + 1;
nextCursorId = firstCursorId + 1000;
}
}
if (!accumulatedAccounts.isEmpty()) {
var viewAccounts = mappingAccounts(accumulatedAccounts);
publishEvent(viewAccounts, batchUuid, countingBatch + 1, dateTime);
}
} catch (Exception ex) {
log.error("Error occurred during scan accounts: {}", ex.getMessage(), ex);
}
Run the project and measure the results
The total processing time to scan the account_new table process the data synthesis, save to the database, and update the variable counting the number of processed batches is about more than 4 minutes.
The total number of scanned accounts is 1 million, and this is the final summary data table, the number is also 1 million records.
Now in the scan processing code, I will change the batchSize parameter from 1000 to 2000, and from 100 to 200 to see if this change is effective.
var partitionId = event.getPartitionId();
long maxIdOfPartition = event.getMaxIdOfPartition();
long firstCursorId = 0;
long nextCursorId = firstCursorId + 2000;
int countingBatch = 0;
List accumulatedAccounts = new ArrayList<>();
try {
while (firstCursorId <= maxIdOfPartition) {
var accounts = accountRepository.findAccountsByCursor(firstCursorId, nextCursorId, partitionId);
if (!accounts.isEmpty()) {
accumulatedAccounts.addAll(accounts);
if (accumulatedAccounts.size() >= 200) {
var viewAccounts = mappingAccounts(accumulatedAccounts);
publishEvent(viewAccounts, batchUuid, countingBatch + 1, dateTime);
accumulatedAccounts.clear();
countingBatch++;
}
var lastCursorId = accounts.get(accounts.size() - 1).getId();
firstCursorId = lastCursorId + 1;
nextCursorId = firstCursorId + 2000;
}
}
if (!accumulatedAccounts.isEmpty()) {
var viewAccounts = mappingAccounts(accumulatedAccounts);
publishEvent(viewAccounts, batchUuid, countingBatch + 1, dateTime);
}
} catch (Exception ex) {
log.error("Error occurred during scan accounts: {}", ex.getMessage(), ex);
}
The total execution time is exactly 4 minutes, however, I can further optimize the processing flow and optimize the time in steps 2, step 3, and step 4 as follows:
Currently, I am using an in-memory queue and manual thread division so it will only be optimized when running on 1 instance, you can further optimize by using Hazelcast to distribute the processing load across multiple instances.
There is no retry handling when saving a batch fails.
There is no monitor job section yet
Thanks, before you go:
👏 If you have any better solutions, please comment below, we will discuss and learn from each other.
👏 If the article has spelling mistakes, please ignore them.
👏 Please clap for the story and follow the author, 👉👉👉 hungtv27
👏 Please share your questions or insights in the comments section below.
Originally published at cafeincode.com on December 30, 2024.