Click here to Skip to main content
15,867,568 members
Articles / Database Development / MySQL

Brief Introduction of a Continuous SQL-stream Sending and Processing System (Part 2: MySQL)

Rate me:
Please Sign up or sign in to vote.
5.00/5 (11 votes)
4 Apr 2021CPOL17 min read 23.9K   23  
Continuous SQL-stream Sending and Processing System
In this article, you will learn about application of SocketPro onto various databases for continuous inline request/result batching and real-time stream processing with bi-directional asynchronous data transferring.

Introduction

Most of client server database systems only support synchronous communication between client and backend database by use of blocking socket and some chatty protocol that requires a client or server to wait for an acknowledgement before sending a new chunk of data. The wait time, which is also called as latency, could be starting from one tenth for a local area network (LAN) to hundreds of milliseconds for a wide area network (WAN). Large wait times could significantly degrade the quality of an application.

UDAParts has developed a powerful and secure communication framework named as SocketPro, which is written with continuous inline request/result batching and real-time stream processing capabilities by use of asynchronous data transferring and parallel computation in mind for the best network efficiency, development simplicity, performance, scalability, and many great and even unique features at the site (https://github.com/udaparts/socketpro).

Further, UDAParts has applied the powerful SocketPro framework onto a number of popular databases such as SQLite, MySQL and MS SQL Server as well as others through ODBC drivers to support continuous SQL-stream sending and processing. Additionally, most of these components for databases are totally free forever to the individual. For reduction of learning complexity, I recommend you study the SQL-stream sample for SQLite (Part 1: SQLite) first before playing these MySQL sample projects, because SQLite and MySQL samples share the same set of client DB API functions.

MySQL is currently the most popular open-source client-server distributed database management system. After studying MySQL/MariaDB database server plugin features, UDAParts has applied SocketPro SQL-stream technology onto MySQL/MariaDB, and developed a plug-in to support continuous SQL statements sending at client side and processing at server side for the best performance and scalability. Further, UDAParts has compared SQL-stream technology with MySQL Connector/Net in performance. Our performance study shows that SQL-stream technology can be five times on LAN up to hundreds times on WAN faster than MySQL Connector/Net provider.

Source Codes and Samples

All related source codes and samples are located at https://github.com/udaparts/socketpro. After cloning it into your computer by GIT, pay attention to the subdirectory mysql inside the directory socketpro/stream_sql. SocketPro MySQL server plugin source code is located at the directory socketpro/stream_sql/smysql. Further, you can see these samples created from .NET, C/C++, Java and Python development environments. However, we use C# code (socketpro/stream_sql/mysql/test_csahrp) for client development at this article for explanation.

In addition to the above samples, you can find performance study samples by use of MySQL sample database sakila at the directory socketpro/stream_sql/mysql/DBPerf. The sub directory contains three performance study projects, cppperf, netperf and mysqlperf, which are written with C++/SocketPro SQL streaming, .NET/SocketPro SQL streaming, and ADO.NET provider technologies, respectively.

Further, SocketPro MySQL server plugin supports data table update events (DELETE, INSERT and UPDATE) through database triggers. You can use this feature to push update events of selected tables onto clients or middle tiers. The sample C# project is located in the directory socketpro/stream_sql/mysql/test_cache.

Before running these sample applications, you are expected to already distribute system libraries inside the directory of socketpro/bin and socketpro/bin/free_services into your system directory as described at the file socketpro/doc/get_started.htm.

With regards to SocketPro communication framework, you may also refer to its development guide file at socketpro/doc/dev_guide.htm.

Register MySQL/MariaDB SQL-streaming Plugin and Set its Configurations

We are going to install database plugins within MySQL version 8 or later, MySQL version 5.7 or earlier, and MariaDB. It is a must that you have already distributed server core library (libuservercore.so for linux or uservercore.dll for windows) as described at the file socketpro/doc/get_started.htm.

  • Copy SocketPro MySQL/MariaDB DB plugin into database plugin dirtectory, and install the plugin
    1. Finding MySQL/MariaDB database plugin directory by executing statement show variables where variable_name='plugin_dir'
    2. Copying MySQL/MariaDB plugin into database plugin directory and installing the plugin
      • Windows: Copy smysql.dll at ../socketpro/bin/free_services/(mysql8_0_11|mysql5_7_22|mariadb)/win64 (or win86) into the above found database plugin directory. Afterwards, execute the statement INSTALL PLUGIN UDAParts_SQL_Streaming SONAME 'smysql.dll'.
      • Linux: Copy libsmysql.so at ../socketpro/bin/free_services/(mysql8_0_11|mysql5_7_22|mariadb)/ into the above found database plugin directory. Afterwards, execute the statement INSTALL PLUGIN UDAParts_SQL_Streaming SONAME 'libsmysql.so'.
      Make sure there is no error output before continuing to the next step. The directories, mysql8_0_11, mysql5_7_22 and mariadb, contain one DB plugin for MySQL 8 or later, MySQL 5.7.33 or earlier, and MariaDB, respectively. Don't make a mistake here.
    3. Go to MySQL/MariaDB database data directory, which can be found by executing the query show variables where variable_name='datadir'. Afterwards, you can find the two generated files, sp_streaming_db_config.json and streaming_db.log, for other advanced settings and error outputs, respectively. In case there is an error output, the file streaming_db.log will very likely give you a hint to help. By this time, you can successfully run the test sample application after compiling.
  • Install a sample database sakila

    Even though this step is optional, it is highly recommended because we extensively use sakila as a sample database. If your MySQL/MariaDB doesn’t have sakila database installed, you can get it from a site by searching github and sakila. You will find many sites having SQL scripts for installing the famous sample database sakila. As an example, this site may work for you.

  • Configure SocketPro MySQL/MariaDB DB plugin for advanced features by modifying the file sp_streaming_db_config.json

    The coming configurations are also optional. They are presented here for advanced features and other services.

    1. First of all, find entry services, and change its string value to ssqlite;uasyncqueue. The MySQL/MariaDB database server plugin is going to load the two services (sqlite and server persistent queue). You can do so for other services. Each of the services should be separated by the character semi-colon.
    2. Next, find entry monitored_tables, and change its string value to sakila.actor;sakila.country;sakila.category;sakila.language. Doing so will force the MySQL/MariaDB database server plugin to monitor insert, update and delete trigger events for the four tables, actor, country, category and language. SocketPro uses these trigger events for real-time cache at client or middle tier side.
    3. Stop MySQL/MariaDB database server, and restart it. By this time, the configuartion file will be updated. In case there is an error, the log file streaming_db.log will help you out.
    4. The following steps are NOT necessary for MySQL 8 or later at all. However, if you use MySQL 5.7 or earlier, or MariaDB, follow the below two steps to complete the previous setting 2:

      Register a user defined function SetSQLStreamingPlugin by executing the statement CREATE FUNCTION SetSQLStreamingPlugin RETURNS INTEGER SONAME 'libsmysql.so' and CREATE FUNCTION SetSQLStreamingPlugin RETURNS INTEGER SONAME 'smysql.dll', respectively on Linux and Windows platforms.

      At last, call the user defined function SetSQLStreamingPlugin by executing a statement like select SetSQLStreamingPlugin('uid=root;pwd=Smash123'). Here, the parameters uid and pwd represent user id and password, respectively.

    5. In case you know C#, you can compile the real-time cache feature by compiling and running the test project at the directory ../socketpro/stream_sql/mysql/test_cache.

    If the above configurations are completed correctly without any error, these SocketPro DB server plugins will support SocketPro server persistent message, SQLite, SQL requests streaming processing, and real-time updateable cache services. Further, these SocketPro DB server plugins will use database accounts to authenticate clients for all of these services.

    Certainly, you can turn on SSL/TSLv1.x for secure communication, change listening port, and modify settings for SocketPro server persistent message and SQLite services too. Finally, it is noted that some of entries within the configuration file is read-only just for your information.

Main Function

SocketPro is written from bottom to support parallel computation by use of one or more pools of non-blocking sockets. Each of the pools may be made of one or more threads, and each of the threads hosts one or more non-blocking sockets at client side. Typically, one pool of non-blocking sockets is hosted with one worker thread only. Here, we just use one pool for clear demonstration with this sample at client side as shown in the below code snippet 1.

C#
using System;
using System.Collections.Generic;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
using SocketProAdapter.UDB;
using System.Threading.Tasks;

using KeyValue = System.Collections.Generic.KeyValuePair
      <SocketProAdapter.UDB.CDBColumnInfoArray, SocketProAdapter.UDB.CDBVariantArray>;

class Program
{
    static readonly string m_wstr;
    static readonly string m_str;

    static void Main(string[] args)
    {
        Console.WriteLine("Remote host: ");
        string host = Console.ReadLine();
        CConnectionContext cc = new CConnectionContext(host, 20902, "root", "Smash123");
        using (CSocketPool<CMysql> spMysql = new CSocketPool<CMysql>())
        {
            //spMysql.QueueName = "qmysql";
            if (!spMysql.StartSocketPool(cc, 1)) //line 23
            {
                Console.WriteLine("Failed in connecting to remote async mysql server");
                Console.WriteLine("Press any key to close the application ......");
                Console.Read();
                return;
            }
            CMysql mysql = spMysql.Seek(); //line 30
            CDBVariantArray vPData = null, vData = null;
            List<KeyValue> ra = new List<KeyValue>(); //line 32
            CMysql.DRows r = (handler, rowData) => {
                //rowset data come here
                int last = ra.Count - 1;
                KeyValue item = ra[last];
                item.Value.AddRange(rowData);
            };

            CMysql.DRowsetHeader rh = (handler) => {
                //rowset header comes here
                KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
                ra.Add(item);
            };
            try
            {
                //stream all requests with in-line batching for the best network efficiency
                var tOpen = mysql.open(""); //line 48
                var vT = TestCreateTables(mysql);
                var tDs = mysql.execute("delete from employee;delete from company");
                var tP0 = TestPreparedStatements(mysql);
                var tP1 = TestBLOBByPreparedStatement(mysql);
                var tSs = mysql.execute
                ("SELECT * from company;select * from employee;select curtime()",r,rh);
                var tStore = TestStoredProcedure(mysql, ra, out vPData);
                var tB = TestBatch(mysql, ra, out vData); //line 55
                Console.WriteLine();

                Console.WriteLine("All SQLs streamed and waiting results in order ......");
                Console.WriteLine(tOpen.Result); //line 59
                foreach (var t in vT) {
                    Console.WriteLine(t.Result);
                }
                Console.WriteLine(tDs.Result);
                Console.WriteLine(tP0.Result);
                Console.WriteLine(tP1.Result);
                Console.WriteLine(tSs.Result);
                Console.WriteLine(tStore.Result);
                Console.WriteLine("There are {0} output data returned", 2 * 2);
                Console.WriteLine(tB.Result); //line 69
                Console.WriteLine("There are {0} output data returned", 2 * 3);
            }
            catch (AggregateException ex) {
                foreach (Exception e in ex.InnerExceptions) {
                    //An exception from server (CServerError), Socket closed after sending
                    //request (CSocketError) or request canceled (CSocketError),
                    Console.WriteLine(e);
                }
            } catch (CSocketError ex) {
                //Socket is already closed before sending a request
                Console.WriteLine(ex);
            }
            catch (Exception ex) {
                //bad operations such as invalid arguments, bad operations and
                //de-serialization errors, and so on
                Console.WriteLine(ex);
            }
            int index = 0;
            Console.WriteLine();
            Console.WriteLine("+++++ Start rowsets +++");
            foreach (KeyValue it in ra) {
                Console.Write("Statement index = {0}", index);
                if (it.Key.Count > 0)
                    Console.WriteLine(", rowset with columns = {0}, 
                    records = {1}.", it.Key.Count, it.Value.Count / it.Key.Count);
                else
                    Console.WriteLine(", no rowset received.");
                ++index;
            }
            Console.WriteLine("+++++ End rowsets +++");
            Console.WriteLine();
            Console.WriteLine("Press any key to close the application ......");
            Console.Read();
        }
    }

    // ......
}
Code snippet 1: Main function for demonstration of SocketPro MySQL SQL-stream system at client side

Starting one socket pool: The above code snippet 1 starts one socket pool at line 23, which has only one worker thread that hosts only one non-blocking socket for demonstration clarity by use of one instance of connection context. However, it is noted that you can create multiple pools within one client application if necessary. Afterwards, we get one asynchronous MySQL handler at line 30.

Opening database: We can send a request for opening a MySQL server database at line 48. If the first input is an empty or null string as shown in this example, we are opening one default database for a connected user, for example. If you like to open a specified database, you can simply give a non-empty valid database name string. In addition, we create an instance of container ra at line 32, which is used as a container to receive all sets of records in coming queries.

Streaming SQL statements: Keep in mind that SocketPro supports streaming all types of any number of requests on one non-blocking socket session effortlessly by design. Certainly, we can easily stream all SQL statements and sub requests inside unit test functions (TestCreateTables, TestPreparedStatements, TestBLOBByPreparedStatement, TestStoredProcedure and TestBatch) as shown at lines 48 through 55. All SocketPro SQL-stream services support this unique feature for the best network efficiency, which will significantly improve data accessing performance. As far as we know, you cannot find such a wonderful feature from other technologies. If you find one, please let us know. Like normal database accessing APIs, SocketPro SQL-stream technology supports manual transaction too as shown in the previous article. Further, it is expected that all returned results will be streamed from server to client with inline batching too.

Waiting until all processed: Since SocketPro only supports asynchronous data transferring, SocketPro must have a way to wait until all requests and returning results are sent, processed and returned as shown at line 59 through 69. You can see that we can use task property Result to convert all asynchronous requests into synchronous ones. Certainly, it is also perfect to use key words async and await for the same purpose.

TestCreateTables, TestPreparedStatements and TestBLOBByPreparedStatement

The above code snippet 1 has the three function calls, TestCreateTables, TestPreparedStatements and TestBLOBByPreparedStatement, but we don't want to re-explain them again because they are truly the same as ones in the previous article. Let’s focus on executing MySQL stored procedures with input-output and output parameters.

TestStoredProcedure

MySQL fully supports stored procedures. SocketPro SQL-stream technology does too. Further, SocketPro SQL-stream technology supports executing multiple sets of MySQL stored procedures with input, input-output and output parameters in one call as shown in the above code snippet 2, which also returns multiple sets of records having multiple large binary objects and texts.

C#
static Task<CAsyncDBHandler.SQLExeInfo> TestStoredProcedure
           (CMysql mysql, List<KeyValue> ra, out CDBVariantArray vPData) {
    vPData = new CDBVariantArray();
    //1st set
    vPData.Add(1);       //input
    vPData.Add(1.4);     //input-output
    //output not important and it's used for receiving a proper data from MySQL
    vPData.Add(0);       //output

    //2nd set
    vPData.Add(2);       //input
    vPData.Add(2.5);     //input-output
    //output not important and it's used for receiving a proper data from MySQL
    vPData.Add(0);       //output

    mysql.Prepare("call sp_TestProc(?,?,?)");
    CMysql.DRows r = (handler, rowData) => {
        //rowset data come here
        int last = ra.Count - 1;
        KeyValue item = ra[last];
        item.Value.AddRange(rowData);
    };
    CMysql.DRowsetHeader rh = (handler) => {
        //rowset header comes here
        KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item);
    };
    return mysql.execute(vPData, r, rh);
}
Code snippet 2: Call MySQL stored procedure which returns multiple sets of records and output parameters

It is very simple to call stored procedure through SocketPro SQL-stream technology at the end as shown in the above code snippet 2. It is noted that all output parameter data will be directly copied into the passing parameter data array vPData. The callback rh is called when record set meta data comes if available. Whenever an array of record data comes, the callback r will be called. You can populate all queried meta and record data into an arbitrary container like ra, for example, from the two callbacks.

TestBatch

The below code snippet 3 is nearly the same at the code snippet 6 within the previous article, although this one has more lines of codes with BLOBs and long texts transferred back and forth between client and server. It is noted that SocketPro server plugins support user-defined delimiters, which can be one single character or a string. Here, it is a character, vertical bar, as commented at line 3. We intentionally use a complex test case to show you the power of SocketPro SQL requests streaming technology. In fact, the unit test code here is not complicated at all, but its test case is considerably challenging to you if you use other DB accessing APIs because the test case is involved with transferring large BLOBs and long texts back and forth.

C#
static Task<CAsyncDBHandler.SQLExeInfo> 
    TestBatch(CMysql mysql, List<KeyValue> ra, out CDBVariantArray vData) {
    //sql with delimiter '|' //line 3
    string sql = @"delete from employee;delete from company|
    INSERT INTO company(ID,NAME,ADDRESS,Income)VALUES(?,?,?,?)|
    insert into employee(CompanyId,name,JoinDate,image,DESCRIPTION,Salary)value(?,?,?,?,?,?)|
    SELECT * from company;select * from employee;select curtime()|
    call sp_TestProc(?,?,?)";
    vData = new CDBVariantArray();
    using (CScopeUQueue sbBlob = new CScopeUQueue()) {
        //1st set
        vData.Add(1);
        vData.Add("Google Inc.");
        vData.Add("1600 Amphitheatre Parkway, Mountain View, CA 94043, USA");
        vData.Add(66000000000.15);
        vData.Add(1);                         //google company id
        vData.Add("Ted Cruz");
        vData.Add(DateTime.Now);
        sbBlob.Save(m_wstr);
        vData.Add(sbBlob.UQueue.GetBuffer()); //BLOB
        vData.Add(m_wstr);                    //long unicode text
        vData.Add(254000.26);
        vData.Add(1);                         //input
        vData.Add(1.4);                       //input-output
        vData.Add(0);                         //output

        //2nd set
        vData.Add(2);
        vData.Add("Microsoft Inc.");
        vData.Add("700 Bellevue Way NE- 22nd Floor, Bellevue, WA 98804, USA");
        vData.Add(93600000000.37);
        vData.Add(1);                         //google company id
        vData.Add("Donald Trump");
        vData.Add(DateTime.Now);
        sbBlob.UQueue.SetSize(0);
        sbBlob.Save(m_str);
        vData.Add(sbBlob.UQueue.GetBuffer()); //BLOB
        vData.Add(m_str);                     //long ASCII text
        vData.Add(20254000.85);
        vData.Add(2);                         //input
        vData.Add(2.5);                       //input-output
        vData.Add(0);                         //output

        //3rd set
        vData.Add(3);
        vData.Add("Apple Inc.");
        vData.Add("1 Infinite Loop, Cupertino, CA 95014, USA");
        vData.Add(234000000000.09);
        vData.Add(2); //Microsoft company id
        vData.Add("Hillary Clinton");
        vData.Add(DateTime.Now);
        sbBlob.Save(m_wstr);
        vData.Add(sbBlob.UQueue.GetBuffer()); //BLOB
        vData.Add(m_wstr);                    //long unicode text
        vData.Add(6254000.55);
        vData.Add(0);                         //input
        vData.Add(4.5);                       //input-output
        vData.Add(0);                         //output
    }
    CMysql.DRows r = (handler, rowData) => {
        //rowset data come here
        int last = ra.Count - 1;
        KeyValuePair<CDBColumnInfoArray, CDBVariantArray> item = ra[last];
        item.Value.AddRange(rowData);
    };
    CMysql.DRowsetHeader rh = (handler) => {
        //rowset header comes here
        KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item);
    };
    //first, start a transaction with ReadCommited isolation 
    //second, execute delete from employee;delete from company
    //third, prepare and execute three sets of
    //       INSERT INTO company(ID,NAME,ADDRESS,Income)VALUES(?,?,?,?)
    //fourth, prepare and execute three sets of 
    //insert into employee
    //       (CompanyId,name,JoinDate,image,DESCRIPTION,Salary)values(?,?,?,?,?,?)
    //fifth, SELECT * from company;select * from employee;select curtime()
    //sixth, prepare and three sets of call sp_TestProc(?,?,?)
    //last, commit transaction if there is no error; and otherwise, rollback
    return mysql.executeBatch(tagTransactionIsolation.tiReadCommited,sql, vData, r, rh, "|");
}
Code snippet 3: Call executeBatch against a MySQL/MariaDB database with user defined delimilter and stored procedure as well as input, input/output and output parameters in batch

The method executeBatch has a number of advantages such as better performance, cleaner codes and better integration with SocketPro client queue for auto failure recovery.

Performance Study

SocketPro SQL-stream technology has excellent performance in database data accessing for both query and update. You can see two MySQL performance test projects (cppperf and netperf) available at socketpro/stream_sql/mysql/DBPerf/. The first sample is written by C++ and the other by C#. A sample project mysqlperf written from C# is provided for you to compare SocketPro SQL-stream technology with MySQL .NET provider in performance.

See the performance study data of the below Figure 1, which is obtained from three cheap Google cloud virtual machines with solid state drive for free evaluation. All data are times required in millisecond for executing 10,000 queries and 50,000 inserts. The performance study is also focused on influence of network latency on MySQL accessing speed.

Image 1

Figure 1: MySQL streaming performance study data of SocketPro SQL-stream technology on three cheap Google cloud virtual machines

Our performance study shows that it is easy to get query executed at the speed of 6,500 (10,000/1.54) times per second and socket connection. For inserting records, you can easily get the speed like 43,000 (50,000/1.17) inserts per second for MySQL on local area network (LAN, cross-machine, 0.2 ms/2.0 Gbps). On LAN, SocketPro streaming could improve 150% in performance over traditional non-streaming approach (SocketPro + Sync) for query. For SQL inserts, the improvement would be more than seven times (10,400/1,170 = 8.9). SocketPro streaming and in-line batching features make network efficiency superiorly high, which leads to the significantly improvement in comparison to existing MySQL socket communication approach.

Let’s consider wide area network (WAN, cross-region, 34 ms/40 Mbps). SocketPro SQL streaming query speed could be 5,000 (10,000/2.00) times per second and socket connection. For inserting records, the speed could easily be 17,600 records (50,000/2.84) per second. Contrarily, the query speed will be as low as 30 queries per second on WAN if a client uses traditional communication way (SocketPro+Sync/MySQL.NET Provider) for database accessing because of high latency. SocketPro SQL streaming can be more than 170 (349000/2000 = 174.5) times in query faster than non-streaming technology, assuming database backend processing time is ignorable on high latency WAN (cross-region). If we consider SQL inserts, the improvement could be over 600 times (1,726,000/2840 = 607).

After analyzing the performance data in Figure 1, you will find SocketPro streaming technology is truly great for speeding up not only local but also remoting database accessing. Second, performance data for WAN would be much better if the test WAN have better network bandwidth. Further, SocketPro supports inline compression but this test study doesn’t use it. If SocketPro inline compression feature is employed, its streaming test data will be further improved on WAN. At last, the performance study is completed on cheap virtual machines with one or two CPUs only. The performance data will be better if dedicated machines are used for testing.

Executing SQLs in Parallel with Fault Auto Recovery

Parallel computation: After studying the previous two simple examples, it is time to study the coming third sample at the directory socketpro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp). SocketPro is created from the bottom to support parallel computation. You can distribute multiple SQL statements onto different backend databases for processing concurrently. This feature is designed for improvement of application scalability as shown in the below code snippet 4.

C#
using System;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
class Program {
    static void Main(string[] args) {
        const int sessions_per_host = 2;
        string[] vHost = { "localhost", "192.168.2.172" };
        const int cycles = 10000;
        using (CSocketPool<CMysql> sp = new CSocketPool<CMysql>()) {
            //set a local message queue to backup requests for auto fault recovery
            sp.QueueName = "ar_sharp";
            
            //one thread enough
            CConnectionContext[,] ppCc = 
                       new CConnectionContext[1, vHost.Length * sessions_per_host];
            for (int n = 0; n < vHost.Length; ++n) {
                for (int j = 0; j < sessions_per_host; ++j) {
                    ppCc[0, n * sessions_per_host + j] = 
                         new CConnectionContext(vHost[n], 20902, "root", "Smash123");
                }
            }
            bool ok = sp.StartSocketPool(ppCc);
            if (!ok) {
                Console.WriteLine
                        ("No connection and press any key to close the application ......");
                Console.Read(); return;
            }
            string sql = "SELECT max(amount), min(amount), avg(amount) FROM payment";
            Console.WriteLine("Input a filter for payment_id"); 
                               string filter = Console.ReadLine();
            if (filter.Length > 0) sql += (" WHERE " + filter); var v = sp.AsyncHandlers;
            foreach (var h in v) {
                ok = h.Open("sakila", (hsqlite, res, errMsg) => {
                    if (res != 0) Console.WriteLine
                       ("Error code: {0}, error message: {1}", res, errMsg);
                });
            }
            int returned = 0;
            double dmax = 0.0, dmin = 0.0, davg = 0.0;
            SocketProAdapter.UDB.CDBVariantArray row = 
                                 new SocketProAdapter.UDB.CDBVariantArray();
            CAsyncDBHandler.DExecuteResult er = 
                            (h, res, errMsg, affected, fail_ok, lastId) => {
                if (res != 0)
                    Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
                else {
                    dmax += double.Parse(row[0].ToString());
                    dmin += double.Parse(row[1].ToString());
                    davg += double.Parse(row[2].ToString());
                }
                ++returned;
            };
            CAsyncDBHandler.DRows r = (h, vData) => {
                row.Clear(); row.AddRange(vData);
            };
            CMysql mysql = sp.SeekByQueue(); //get one handler for querying one record
            ok = mysql.Execute(sql, er, r);
            ok = mysql.WaitAll();
            Console.WriteLine("Result: max = {0}, min = {1}, avg = {2}", dmax, dmin, davg);
            returned = 0; dmax = 0.0; dmin = 0.0; davg = 0.0;
            Console.WriteLine("Going to get {0} queries for max, min and avg", cycles);
            for (int n = 0; n < cycles; ++n) {
                mysql = sp.SeekByQueue();
                ok = mysql.Execute(sql, er, r);
            }
            foreach (var h in v) {
                ok = h.WaitAll();
            }
            Console.WriteLine("Returned = {0}, max = {1}, min = {2}, avg = {3}", 
                               returned, dmax, dmin, davg);
            Console.WriteLine("Press any key to close the application ......"); Console.Read();
        }
    }
}
Code snippet 4: Demonstration of SocketPro parallel computation and fault auto recovery features

As shown in the above code snippet 4, we could start multiple non-blocking sockets to different machines (localhost, 192.168.2.172), and each of the two database machines has two sockets connected. The code opens a default database sakila for each of connections (foreach (var h in v) {......}). First, the code executes one query ‘SELECT max(amount), min(amount), avg(amount) FROM payment …’ for one record. At last, the code sends the query 10,000 times onto the two machines for parallel processing (for (int n = 0; n < cycles; ++n) {......}). Each of the records will be summed inside a Lambda expression (CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {......};) as a callback for method Execute. It is noted that you can create multiple pools for different services hosted on different machines. As you can see, SocketPro socket pool can be used to significantly improve application scalability.

Auto fault recovery: SocketPro can open a file locally, and save all request data into it before sending these requests onto a server through network. The file is called local message queue or client message queue. The idea is simple to back up all requests for automatic fault recovery. To use this feature, you must set a local message queue name (sp.QueueName = "ar_sharp";) as shown in the above code snippet 4. When we develop a real application, it is very common to write lots of code to deal with various communication errors properly. In fact, it is usually a challenge to software developers. SocketPro client message queue makes communication error handling very simple. Suppose the machine 192.168.2.172 is not accessible for one of whatever reasons like machine power off, unhandled exception, software/hardware maintenance and network unplug, and so on, the socket close event will be notified either immediately or sometime later. Once the socket pool finds a socket is closed, SocketPro will automatically merge all requests associated with the socket connection onto another socket which is not closed yet for processing.

To verify this feature, you can brutally down one of MySQL servers during executing the above queries, and see if the final results are correct.

It is noted that UDAParts has applied this feature to all SocketPro SQL-stream services, asynchronous persistent message queue service and remote file exchange service to simplify your development.

Points of Interest

At last, SocketPro MySQL SQL-stream plugin doesn’t support cursors at all, but it does provide all required basic client/server database features. Further, the SQL-stream plugin does have the following unique features.

  1. Continuous inline request/result batching and real-time SQL-stream processing for the best network efficiency on both LAN and WAN.
  2. Bi-directional asynchronous data transferring between client and server by default, but all asynchronous requests can be converted into synchronous ones if required.
  3. Superior performance and scalability because of powerful SocketPro communication architecture. SocketPro SQL-stream technology is significantly faster and more scalable than all types of known MySQL/MariaDB client APIs across all development languages and platforms.
  4. Real-time cache for table update, insert and delete as shown at the sample project test_cache at the directory socketpro/stream_sql/mysql/test_cache.
  5. All requests are cancelable by executing the method Cancel of class CClientSocket at client side
  6. Both Windows and Linux are supported.
  7. Simple development for all supported development languages.
  8. Both client and server components are thread-safe. They can be easily reused within your multi-threaded applications with much fewer thread related issues.
  9. All requests can be backed up at client side and resent to another server automatically for processing in case a server is down for anyone of reasons -- fault auto recovery.

History

  • 20th September, 2017: Initial release
  • 28th February, 2018: Add two new sections, Performance Study and Executing SQLs in Parallel with Fault Auto Recovery
  • 27th May, 2018: Update MySQL server SQL-streaming plugin to support MySQL version 8.0.11 or later
  • 4th April, 2021: Use task version of SQL-streaming methods instead of raw ones
  • 4th April, 2021: Extend plugins to support MariaDB and MySQL 5.7.x or older

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United States United States
Yuancai (Charlie) Ye, an experienced C/C++ software engineer, lives in Atlanta, Georgia. He is an expert at continuous inline request/result batching, real-time stream processing, asynchronous data transferring and parallel computation for the best communication throughput and latency. He has been working at SocketPro (https://github.com/udaparts/socketpro) for more than fifteen years.

Comments and Discussions

 
-- There are no messages in this forum --