Click here to Skip to main content
15,991,287 members
Articles / Programming Languages / Python

Brief Introduction of a Continuous SQL-stream Sending and Processing System (Part 1: SQLite)

Rate me:
Please Sign up or sign in to vote.
5.00/5 (15 votes)
1 Jan 2021CPOL16 min read 34.6K   32   7
Application of SocketPro onto various databases for continuous inline request/result batching and real-time stream processing with bi-directional asynchronous data transferring

Introduction

Most of client server database systems only support synchronous communication between client and backend database by use of blocking socket and some chatty protocol that requires a client or server to wait for an acknowledgement before sending a new chunk of data. The wait time, which is also called as latency, could be starting from one tenth for a local area network (LAN) to hundreds of milliseconds for a wide area network (WAN). Large wait times can significantly degrade the quality of an application.

Fortunately, UDAParts has developed a powerful and secure communication framework named as SocketPro, which is written with continuous inline request/result batching and real-time stream processing capabilities by use of asynchronous data transferring and parallel computation for the best network efficiency, development simplicity, performance, scalability, and many great and even unique features at the site (https://github.com/udaparts/socketpro). For details, refer to its simple development guide file at ../socketpro/doc/dev_guide.htm. Specifically, it is highly recommended that you have a quick glance with SocketPro design ideas at the file ../socketpro/doc/sp_arch.htm.

Further, UDAParts has applied the powerful SocketPro framework onto a number of popular databases such as SQLite, MySQL/MariaDB and MS SQL as well as others through ODBC drivers to support continuous SQL-stream sending and processing. Additionally, most of these components for databases are totally free forever to the public with opened source codes for you to study and extend them to meet your complex needs.

For reduction of learning complexity, we use SQLite database as the first sample for the first article, and MySQL as the second sample for the second article.

Source Codes and Samples

All related source codes and samples are located at https://github.com/udaparts/socketpro. After cloning it into your computer by GIT, pay attention to the subdirectory usqlite inside the directory socketpro/stream_sql/usqlite.

You can see these samples are created from .NET, C/C++, Java, Python and nodejs development environments. They can be compiled and run on either Linux or windows platforms. UDAParts also distributes pre-compiled test server application all_servers as shown at the doc page ../socketpro/doc/get_started.htm. Please follow this documentation guide to distribute SocketPro components. We are going to use the sample server all_servers for coming testings.

The coming test codes come from the file ../socketpro/stream_sql/usqlite/test_sharp/Program.cs. We are going to focus C# development within this short article.

Two Basic Structures, ErrInfo and SQLExeInfo

The below code snippet 1 lists the two basic structures, ErrInfo and SQLExeInfo, for various DB-related request returning results.

C#
//../socketpro/src/SproAdapter/socketerror.cs
public class ErrInfo
{
    public ErrInfo(int res, string errMsg)
    {
        ec = res;
        em = (null == errMsg) ? "" : errMsg;
    }
    public int ec = 0;
    public string em = "";

    public override string ToString()
    {
        string s = "ec: " + ec.ToString() + ", em: " + ((null == em) ? "" : em);
        return s;
    }
};

//../socketpro/src/sproadapter/asyncdbhandler.cs
public class SQLExeInfo : ErrInfo
{
    public long affected;
    public uint oks;
    public uint fails;
    public object lastId;

    public SQLExeInfo(int res, string errMsg, long aff, uint oks, uint fails, object id)
        : base(res, errMsg)
    {
        affected = aff;
        this.oks = oks;
        this.fails = fails;
        lastId = id;
    }

    public override string ToString()
    {
        String s = base.ToString();
        s += (", affected: " + affected);
        s += (", oks: " + oks);
        s += (", fails: " + fails);
        s += (", lastId: " + lastId);
        return s;
    }
}
Code snippet 1: Two basic structures, ErrInfo and SQLExeInfo, for various database request returning results

The first one, ErrInfo, is used for returning results of database requests, open, close, prepare, beginTrans and endTrans. It is easily understandable to you, because they are returned just with two data, an error code ec and its correspoding error message em.

The second one, SQLExeInfo, is designed for returning results of various SQL statements, execute and executeBatch. In addtion to an error code and its correspoding message, executing a SQL statement could return the number of records affected and last insert identification number, which correspond to the two members affected and lastId of the structure SQLExeInfo, respectively. SocketPro database plugins support executing a complex SQL statement which consists of multiple basic SQL statements. Executing a complex SQL statement could generate multiple results of multiple basic SQL statements. SocketPro database plugins will count their successful ones and failure ones, and return them to client. Therefore, there are two members oks and fails within the structure for successes and failures, respectively. However, no matter how many failures happens with a batch of basic SQL statements, SocketPro database plugins always return one error code and its message of the first failed SQL statement only.

Main Function

SocketPro is written from bottom to support parallel computation by use of one or more pools of non-blocking sockets. Each of the pools may be made of one or more threads and each of the threads hosts one or more non-blocking sockets at client side. However, we just use one pool for clear demonstration here, and the pool is made of one thread and one socket for this sample at client side as shown at the below Code snippet 2.

C#
using System;
using System.Collections.Generic;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
using SocketProAdapter.UDB;
using System.Threading.Tasks;

using KeyValue = System.Collections.Generic.KeyValuePair
    <SocketProAdapter.UDB.CDBColumnInfoArray,SocketProAdapter.UDB.CDBVariantArray>;

class Program
{
    static readonly string m_wstr;
    static readonly string m_str;
    static Program()
    {
        // ......
    }

    static void Main(string[] args)
    {
        Console.WriteLine("Remote host: ");
        string host = Console.ReadLine();
        CConnectionContext cc = new CConnectionContext(host, 20901,
            "usqlite_client", "password_for_usqlite");
        using (CSocketPool<CSqlite> spSqlite = new CSocketPool<CSqlite>())
        {
            //spSqlite.QueueName = "qsqlite";
            if (!spSqlite.StartSocketPool(cc, 1)) //line 29, one async socket/one worker thread
            {
                Console.WriteLine("Failed in connecting to remote async sqlite server");
                Console.WriteLine("Press any key to close the application ......");
                Console.Read();
                return;
            }
            CSqlite sqlite = spSqlite.Seek(); //line 36
            //a container for receiving all tables data
            List<KeyValue> lstRowset = new List<KeyValue>(); //line 38
            try
            {
                //stream all DB requests with in-line batching for the best network efficiency
                //open a global database at server side because an empty string is given
                var topen = sqlite.open(""); //line 43
                //prepare two test tables, COMPANY and EMPLOYEE
                Task<CAsyncDBHandler.SQLExeInfo>[] vT = TestCreateTables(sqlite);
                var tbt = sqlite.beginTrans(); //line 46, start manual transaction
                //test both prepare and query statements
                var tp0 = TestPreparedStatements(sqlite, lstRowset);
                //test both prepare and query with reading/updating BLOB and large text
                var tp1 = TestBLOBByPreparedStatement(sqlite, lstRowset);
                var tet = sqlite.endTrans(); //line 51, end manual transaction
                var vB = TestBatch(sqlite, lstRowset); //line 52

                Console.WriteLine("All DB/SQL requests streamed & waiting for their results");
                Console.WriteLine(topen.Result); //line 55
                foreach (var e in vT)
                {
                    Console.WriteLine(e.Result);
                }
                Console.WriteLine(tbt.Result);
                Console.WriteLine(tp0.Result);
                Console.WriteLine(tp1.Result);
                Console.WriteLine(tet.Result);
                foreach (var e in vB)
                {
                    Console.WriteLine(e.Result);
                } //line 67
            }
            catch (AggregateException ex) //line 69
            {
                foreach (Exception e in ex.InnerExceptions)
                {
                    //An exception from server (CServerError), Socket closed after
                    //sending a request (CSocketError) or request canceled (CSocketError),
                    Console.WriteLine(e);
                }
            }
            catch (CSocketError ex)
            {
                //Socket is already closed before sending a request
                Console.WriteLine(ex);
            }
            catch (Exception ex)
            {
                //bad operations such as invalid arguments, bad operations and
                //de-serialization errors, and so on
                Console.WriteLine(ex);
            } //line 88
            //display received rowsets
            int index = 0;
            Console.WriteLine();
            Console.WriteLine("+++++ Start rowsets +++");
            foreach (KeyValue it in lstRowset)
            {
                Console.Write("Statement index = {0}", index);
                if (it.Key.Count > 0)
                    Console.WriteLine(", rowset with columns: {0}, records: {1}.",
                        it.Key.Count, it.Value.Count / it.Key.Count);
                else
                    Console.WriteLine(", no rowset received.");
                ++index;
            }
            Console.WriteLine("+++++ End rowsets +++");
            Console.WriteLine();
            Console.WriteLine("Press any key to close the application ......");
            Console.Read();
        }
    }

    static Task<CAsyncDBHandler.SQLExeInfo>[] TestBatch(CSqlite sqlite, List<KeyValue> ra)
    {
        // ......
    }
    
    static Task<CAsyncDBHandler.SQLExeInfo> TestPreparedStatements(CSqlite sqlite, List<KeyValue> ra)
    {
        // ......
    }
    
    static Task<CAsyncDBHandler.SQLExeInfo> TestBLOBByPreparedStatement(CSqlite sqlite, List<KeyValue> ra)
    {
        // ......
    }
    
    static Task<CAsyncDBHandler.SQLExeInfo>[] TestCreateTables(CSqlite sqlite)
    {
        // ......
    }
}
Code snippet 2: Main console unit test code for DB/SQL requests streaming against SQLite server plugin

Starting one socket pool: The above code snippet 2 starts one socket pool at line 29 which only has one worker thread that only hosts one non-blocking socket for demonstration clarity by use of one instance of connection context. However, you can create multiple pools within one client application if necessary. Afterwards, we get one asynchronous sqlite handler at line 36.

Opening database: We can send a request to open a sqlite server database as shown at line 43. If the first input is an empty or null string as shown in this example, we are opening one instance of server global database usqlite.db, for example. If you like to create an own database, you can simply give a non-empty valid string. Its returning result topen is a task for an expected structure ErrInfo in the future.

Streaming SQL statements: Keep in mind that SocketPro supports streaming all types of any number of requests on one non-blocking socket session effortlessly by design. Certainly, we are able to stream all SQL statements as well as others as shown at line 43 through 52. All SocketPro SQL-stream services support this particular feature for the best network efficiency, which significantly improves data accessing performance. As far as we know, you cannot find such a wonderful feature from other technologies. If you find one, please let us know. Like normal database accessing APIs, SocketPro SQL-stream technology supports manual transaction too as shown at lines 46 and 51. We are going to elaborate the four functions, TestCreateTables, TestPreparedStatements, TestBLOBByPreparedStatement and TestBatch in successive sections. It is noted that all the four methods immediately return one or multiple of tasks for structure SQLExeInfo in the future.

It is noted that SocketPro supports only asynchronous data transferring between client and server so that all requests and results are streamed and batched with inline algorithm at both client and server sides for the best network efficiency. This is completely different from synchronous data transferring.

Waiting until all processed: Since SocketPro uses asynchronous data transferring by default, SocketPro must provide a way to wait until all requests and returning results are sent, returned and processed. As shown at line 55 through 67, we can use task property Result to wait until completion. In addition, you can also use the key word await or task method Wait to wait until the completion of all DB/SQL requests.

Error Handling: The code at line 69 through 88 shows you how to deal with a varity of errors, which includes server error CServerError, and socket communication and request canceled error CSocketError as well as other types of errors.

TestCreateTables

This function is internally made of sending two SQL DDL statements for creating two tables, COMPANY and EMPLOYEE, as shown in the below code snippet 3.

C#
static Task<CAsyncDBHandler.SQLExeInfo>[] TestCreateTables(CSqlite sqlite)
{
    var v = new Task<CAsyncDBHandler.SQLExeInfo>[2];
    v[0] = sqlite.execute("CREATE TABLE COMPANY(ID INT8 PRIMARY KEY NOT NULL,name CHAR(64)"+
        "NOT NULL,ADDRESS varCHAR(256)not null,Income float not null)");
    v[1] = sqlite.execute("CREATE TABLE EMPLOYEE(EMPLOYEEID INT8 PRIMARY KEY NOT NULL unique,"+
        "CompanyId INT8 not null,name NCHAR(64)NOT NULL,JoinDate DATETIME not null " + 
        "default(datetime('now')),IMAGE BLOB,DESCRIPTION NTEXT,Salary real,"+
        "FOREIGN KEY(CompanyId)REFERENCES COMPANY(id))");
    return v;
}
Code snippet 3: Creating two SQLite tables in streaming by SocketPro SQL-stream technology

You can execute any number of SQL statements in stream without waiting for prior request returning as shown in the code snippet 3. Each of the requests takes one input SQL statement, and immediately returns a task for a structure SQLExeInfo in the future. Again, this is different from common database accessing approach as SocketPro uses asynchronous data transferring for communication.

TestPreparedStatements

SocketPro SQL-stream technology supports preparing SQL statement just like common database accessing APIs. Particularly, SocketPro SQL-stream technology even supports preparing multiple SQL statements at one shot for SQLite server database as shown in the below code snippet 4.

C#
static Task<CAsyncDBHandler.SQLExeInfo> TestPreparedStatements(CSqlite sqlite, List<KeyValue> ra)
{
    //a complex SQL statement combined with query and insert prepare statements
    sqlite.Prepare("Select datetime('now');" + //line 4
        "INSERT OR REPLACE INTO COMPANY(ID,NAME,ADDRESS,Income)VALUES(?,?,?,?)"); //line 5

    CDBVariantArray vData = new CDBVariantArray();
    vData.Add(1);
    vData.Add("Google Inc.");
    vData.Add("1600 Amphitheatre Parkway, Mountain View, CA 94043, USA");
    vData.Add(66000000000.0);

    vData.Add(2);
    vData.Add("Microsoft Inc.");
    vData.Add("700 Bellevue Way NE- 22nd Floor, Bellevue, WA 98804, USA");
    vData.Add(93600000000.0);

    vData.Add(3);
    vData.Add("Apple Inc.");
    vData.Add("1 Infinite Loop, Cupertino, CA 95014, USA");
    vData.Add(234000000000.0);

    //send three sets of parameterized data in one shot for processing
    return sqlite.execute(vData, (handler, rowData) =>
    {
        //rowset data come here
        int last = ra.Count - 1;
        KeyValue item = ra[last];
        item.Value.AddRange(rowData);
    }, (handler) =>
    {
        //rowset header meta info comes here
        KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item);
    });
}
Code snippet 4: Sending multiple sets of parameters for processing multiple SQL statements in one shot by SocketPro SQL-stream technology

It is noted that the sample preparing SQL statement consists of one query and one insert statement. When the function is called, a client will expect three sets of records returned and three records inserted into the table COMPANY. The sample is designed for demonstrating the power of SocketPro SQL-stream technology. In reality, you probably don't prepare a combined SQL statement having multiple basic SQL statements. If you use a parameterized statement, you are required to send a prepare request first as shown at lines 4 and 5. After obtaining an array of data as shown in the above code snippet 4, you can send multiple sets of parameter data for processing from client to server in one single shot at the end. If you have a large amount of data, you could call the method execute repeatedly without needing to prepare a statement again.

Next, we need more details for how to handle returning record sets. The method execute has two callbacks or Lambda expressions for the second and third input parameters except the first input for parameter data array. Whenever a record set is coming, the second callback ((handler) =>{ ......}) will be automatically called by SQLite client handler for record set column meta information. If actual records are available, the first callback ((handler, rowData) => { ......}) will be called and you can populate data into a container ra. If we take this code snippet 4 as a sample, the two callbacks will be called three times, but it is expected that the times of calling the first callback is dependent on both the number of records and the size of one record.

TestBLOBByPreparedStatement

Now, you can see SocketPro SQL-stream technology provides all required features for accessing a backend database. Further, we are going to demonstrate how to handle large binary and text objects within SocketPro-stream technology. Usually, it is difficult to access large objects inside databases efficiently. However, it is truly very simple with SocketPro SQL-stream technology for both development and efficiency as shown at the below code snippet 5.

After looking through the code snippet 5, you would find that this code snippet is really the same as the previous code snippet 4 although this code snippet is longer. Therefore, this approach is really a good thing for a software developer to reuse SocketPro SQL-stream technology for handling all types of database table fields in the same coding style for easy development.

SocketPro always divides a large binary or text object into chunks first at both client and server sides. Afterwards, SocketPro sends these smaller chunks to the other side. At the end, SocketPro will reconstruct the original large binary or text object from collected smaller chunks. This happens silently at run time for reduction of memory foot print.

C#
static Task<CAsyncDBHandler.SQLExeInfo> TestBLOBByPreparedStatement(CSqlite sqlite, List<KeyValue> ra)
{
    //a complex SQL statement combined with two insert and query prepare statements
    sqlite.Prepare("insert or replace into employee(EMPLOYEEID,CompanyId,name,JoinDate,imag,"+
        "DESCRIPTION,Salary)values(?,?,?,?,?,?,?);select * from employee where employeeid=?");
    CDBVariantArray vData = new CDBVariantArray();
    using (CScopeUQueue sbBlob = new CScopeUQueue())
    {
        //first set of data
        vData.Add(1);
        vData.Add(1); //google company id
        vData.Add("Ted Cruz");
        vData.Add(DateTime.Now);
        sbBlob.Save(m_wstr);
        vData.Add(sbBlob.UQueue.GetBuffer());
        vData.Add(m_wstr);
        vData.Add(254000.0);
        vData.Add(1);

        //second set of data
        vData.Add(2);
        vData.Add(1); //google company id
        vData.Add("Donald Trump");
        vData.Add(DateTime.Now);
        sbBlob.UQueue.SetSize(0);
        sbBlob.Save(m_str);
        vData.Add(sbBlob.UQueue.GetBuffer());
        vData.Add(m_str);
        vData.Add(20254000.0);
        vData.Add(2);

        //third set of data
        vData.Add(3);
        vData.Add(2); //Microsoft company id
        vData.Add("Hillary Clinton");
        vData.Add(DateTime.Now);
        sbBlob.Save(m_wstr);
        vData.Add(sbBlob.UQueue.GetBuffer());
        vData.Add(m_wstr);
        vData.Add(6254000.0);
        vData.Add(3);
    }
    //send three sets of parameterized data in one shot for processing
    return sqlite.execute(vData, (handler, rowData) =>
    {
        //rowset data come here
        int last = ra.Count - 1;
        KeyValue item = ra[last];
        item.Value.AddRange(rowData);
    }, (handler) =>
    {
        //rowset header meta info comes here
        KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item);
    });
}
Code snippet 5: Insert and query tables having multiple large binary and text objects with SocketPro SQL-stream technology

TestBatch

SocketPro also provides a special method to group all DB and SQL requests such as prepare, beginTrans, endTrans and execute into a large batch request as shown at the below code snippet 6.

C#
static Task<CAsyncDBHandler.SQLExeInfo>[] TestBatch(CSqlite sqlite, List<KeyValue> ra)
{
    var v = new Task<CAsyncDBHandler.SQLExeInfo>[2];
    CDBVariantArray vParam = new CDBVariantArray();
    vParam.Add(1); //ID
    vParam.Add(2); //EMPLOYEEID
    //there is no manual transaction if isolation is tiUnspecified
    v[0] = sqlite.executeBatch(tagTransactionIsolation.tiUnspecified, //line 8
        "Select datetime('now');select * from COMPANY where ID=?;"+
            "select * from EMPLOYEE where EMPLOYEEID=?",
        vParam, (handler, rowData) =>
    {
        //rowset data come here
        int last = ra.Count - 1;
        KeyValue item = ra[last];
        item.Value.AddRange(rowData);
    }, (handler) =>
    {
        //rowset header meta info comes here
        KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
        ra.Add(item);
    });
    vParam.Clear();
    vParam.Add(1); //ID
    vParam.Add(2); //EMPLOYEEID
    vParam.Add(2); //ID
    vParam.Add(3); //EMPLOYEEID
    //Same as sqlite.beginTrans();
    //Select datetime('now');
    //select * from COMPANY where ID=1;
    //select * from COMPANY where ID=2;
    //Select datetime('now');
    //select * from EMPLOYEE where EMPLOYEEID=2;
    //select * from EMPLOYEE where EMPLOYEEID=3
    //ok = sqlite.endTrans(tagRollbackPlan.rpDefault);
    v[1] = sqlite.executeBatch(tagTransactionIsolation.tiReadCommited, //line 36
        "Select datetime('now');select * from COMPANY where ID=?;"+
            "Select datetime('now');select * from EMPLOYEE where EMPLOYEEID=?",
        vParam, (handler, rowData) =>
        {
            //rowset data come here
            int last = ra.Count - 1;
            KeyValue item = ra[last];
            item.Value.AddRange(rowData);
        }, (handler) =>
        {
            //rowset header meta info comes here
            KeyValue item = new KeyValue(handler.ColumnInfo, new CDBVariantArray());
            ra.Add(item);
        }); //line 50
    return v;
}    
Code snippet 6: Execute different SQL requests in batch with or without manual transaction

Inside the method TestBatch, we call the method executeBatch at line 8 with the first input as tiUnspecified. The batch is excuted with querying three sets of records plus executing one prepare statement at server side because there is an arry of input parameter data. If the third input parameter is empty, the method executeBatch would be the same as the method execute.

We can also group a bunch of DB and SQL requests with the first input other than tiUnspecified as shown at line 36 through 50, although the sample prepare statement does not have to require a manual transaction here. As commented inside the code snippet here, it involves these methods beginTrans/endTrans, prepare, execute, and so on.

The method executeBatch has a number of adantages such as better performance and cleaner code as well as better integration with SocketPro client message queue for better failure auto-recovery, which will be discussed at the last section.

Performance Study

SocketPro SQL-stream technology has excellent performance in database data accessing for both query and update too. You can see two performance test projects (cppperf and netperf) available at ../socketpro/stream_sql/usqlite/DBPerf. The first sample is written by C++ and the other by C#. In addition, MySQL sakila sample database, which is located in the directory ../socketpro/bin, is used for you to play after running the sample server all_servers for creating a global SQLite database usqlite.db.

See the performance study data of the below Figure 1, which is obtained from three cheap Google cloud virtual machines with solid state drive for free evaluation. All data are time required in millisecond for executing 10,000 queries and 50,000 inserts. The performance study is also focused on influence of network latency on SQL accessing speed.

Image 1

Figure 1: SQLite streaming performance study data of SocketPro SQL-stream technology on three cheap Google cloud virtual machines

Our performance study shows that it is easy to get query executed at the speed of 7,400 (10,000/1.36) times per second and socket connection. For inserting records, you can easily get the speed like 120,000 (50,000/0.42) inserts per second for SQLite on local area network (LAN, cross-machine). SocketPro streaming could improve 140% in performance over traditional non-streaming approach (SocketPro + Sync).

In regards to wide area network (WAN, cross-region), the query speed could be 4,000 (10,000/2.24) times per second and socket connection. For inserting records, the speed could easily be 20,000 records (50,000/2.51) per second. Contrarily, the query speed will be as low as 30 queries per second on WAN if a client uses traditional communication way (non-streaming) for database accessing because of high latency. SocketPro SQL streaming can be more than 150 (346000/2240) times faster over non-streaming technology if database backend processing time is ignorable in comparison to IO communication time on WAN (cross-region) having a high latency. After analyzing the performance data at the above Figure 1, you will find SocketPro streaming technology is truly great for speeding up not only local but also remoting database accessing.

The above performance study was completed on WAN having bandwidth around 40 Mbps for cross-region communication. It is imagined that performance data for WAN would be much better if the test WAN have better network bandwidth. Further, SocketPro supports inline compression too, but this test study doesn’t use this feature. If SocketPro inline compression feature is employed, its streaming test data will be further improved on WAN. At last, the performance study is completed on cheap virtual machines with one or two CPUs only. The performance data would be considerably improved if dedicated machines are used for testing.

Executing SQL statements in parallel with fault auto recovery

Parallel computation: After studying the previous two simple examples, it is time to study the coming third sample at the directory socketpro/samples/auto_recovery/(test_cplusplus|test_java|test_python|test_sharp). SocketPro is created from the bottom to support parallel computation. You can distribute multiple SQL statements onto different backend databases for processing concurrently. This feature is designed for improvement of application scalability as shown at the below code snippet 7.

C#
using System;
using SocketProAdapter;
using SocketProAdapter.ClientSide;
class Program {
    static void Main(string[] args) {
        const int sessions_per_host = 2;
        const int cycles = 10000;
        string[] vHost = { "localhost", "192.168.2.172" };
        using (CSocketPool<CSqlite> sp = new CSocketPool<CSqlite>()) {
            sp.QueueName = "ar_sharp"; //set a local message queue to backup requests for auto fault recovery
            CConnectionContext[,] ppCc = new CConnectionContext[1, vHost.Length * sessions_per_host]; //one thread enough
            for (int n = 0; n < vHost.Length; ++n) {
                for (int j = 0; j < sessions_per_host; ++j) {
                    ppCc[0, n * sessions_per_host + j] = new CConnectionContext(vHost[n], 20901, "AClientUserId", "Mypassword");
                }
            }
            bool ok = sp.StartSocketPool(ppCc);
            if (!ok) {
                Console.WriteLine("There is no connection and press any key to close the application ......");
                Console.Read(); return;
            }
            string sql = "SELECT max(amount), min(amount), avg(amount) FROM payment";
            Console.WriteLine("Input a filter for payment_id");
            string filter = Console.ReadLine();
            if (filter.Length > 0) sql += (" WHERE " + filter);
            var v = sp.AsyncHandlers;
            foreach (var h in v) {
                ok = h.Open("sakila.db", (hsqlite, res, errMsg) => {
                    if (res != 0)
                        Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
                });
            }
            int returned = 0;
            double dmax = 0.0, dmin = 0.0, davg = 0.0;
            SocketProAdapter.UDB.CDBVariantArray row = new SocketProAdapter.UDB.CDBVariantArray();
            CAsyncDBHandler.DExecuteResult er = (h, res, errMsg, affected, fail_ok, lastId) => {
                if (res != 0)
                    Console.WriteLine("Error code: {0}, error message: {1}", res, errMsg);
                else {
                    dmax += double.Parse(row[0].ToString());
                    dmin += double.Parse(row[1].ToString());
                    davg += double.Parse(row[2].ToString());
                }
                ++returned;
            };
            CAsyncDBHandler.DRows r = (h, vData) => {
                row.Clear();
                row.AddRange(vData);
            };
            CSqlite sqlite = sp.SeekByQueue(); //get one handler for querying one record
            ok = sqlite.Execute(sql, er, r);
            ok = sqlite.WaitAll();
            Console.WriteLine("Result: max = {0}, min = {1}, avg = {2}", dmax, dmin, davg);
            returned = 0;
            dmax = 0.0; dmin = 0.0; davg = 0.0;
            Console.WriteLine("Going to get {0} queries for max, min and avg", cycles);
            for (int n = 0; n < cycles; ++n) {
                sqlite = sp.SeekByQueue();
                ok = sqlite.Execute(sql, er, r);
            }
            foreach (var h in v) {
                ok = h.WaitAll();
            }
            Console.WriteLine("Returned = {0}, max = {1}, min = {2}, avg = {3}", returned, dmax, dmin, davg);
            Console.WriteLine("Press any key to close the application ......"); Console.Read();
        }
    }
}
Code snippet 7: Demonstration of SocketPro parallel computation and fault auto recovery features

As shown in the above code snippet, we could start multiple non-blocking sockets to different machines (localhost, 192.168.2.172), and each of the two database machines has two sockets connected (const int sessions_per_host = 2;). The code opens a default database sakila.db (ok = h.Open("sakila.db" ......) for each of connections. First of all, the code executes one query SELECT max(amount), min(amount), avg(amount) FROM payment …’ at the beginning for one record. At last, the code sends the query 10,000 times onto the two machines for parallel processing. Each of records will be summed inside a Lambda expression as a callback for method Execute. It is noted that you can create multiple pools for different services hosted on different machines. As you can see, SocketPro socket pool can be used to significantly improve application scalability.

Auto fault recovery: SocketPro is able to open a file locally, and save all request data into it before sending these requests to a server through network. The file is called as local message queue or client message queue. The idea is simple to back up all requests for automatic fault recovery. To use this feature, you have to set a local message queue name (sp.QueueName = "ar_sharp";). When we develop a real application, it is very common to write lots of code to deal with various communication errors properly. Actually, it is usually a challenge to software developers. SocketPro client message queue makes communication error handling very simple. Suppose the machine 192.168.2.172 is not accessible for one of whatever reasons like machine power off, unhandled exception, software/hardware maintenance and network unplug, and so on, the socket close event will be notified either immediately or sometime later. Once the socket pool finds a socket is closed, SocketPro will automatically merge all requests associated with the socket connection onto another socket which is not closed yet for processing.

To verify this feature, you can brutally down one of SQLite server (all_servers) during executing the above query and see if the final results are correct.

It is noted that UDAParts has applied this feature to all SocketPro SQL-stream services, asynchronous persistent message queue service and remote file exchange service to simplify your development.

Points of Interest

SocketPro SQLite SQL-stream service provides all required basic client/server database features, but it does deliver the following unique features:

  1. Continuous inline request/result batching and real-time SQL-stream processing for the best network efficiency especially on WAN
  2. Bi-directional asynchronous data transferring between client and server, but all asynchronous requests can be converted into synchronous ones
  3. Superior performance and scalability because of powerful SocketPro communication architecture
  4. Real-time cache for table update, insert and delete. You can set a callback at client side for tracking table record add, delete and update events as shown at the sample project test_cache at the directory ../socketpro/stream_sql/usqlite/test_cache.
  5. All requests are cancelable by executing the method Cancel of class CClientSocket at client side
  6. Both windows and Linux are supported
  7. Simple development for all supported development languages
  8. Both client and server components are thread-safe. They can be easily reused within your multi-threaded applications with much fewer thread related issues
  9. All requests can be backed up at client side and resent to another server for processing in case a server is down for anyone of reasons – fault auto recovery

History

  • 09/06/2017 ==> Initial release
  • 09/30/2017 ==> Remove pictures and use code snippets instead as a codeproject.com officer suggested
  • 03/28/2018 ==> Add two new sections, Performance study and Executing SQLs in parallel with fault auto recovery
  • 12/30/2020 ==> Add a new section Two Basic Structures, ErrInfo and SQLExeInfo
  • 12/30/2020 ==> Add a new section TestBatch
  • 12/30/2020 ==> Main sample code updated with Task having less number of callbacks and fix a text error at code snippet 5

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United States United States
Yuancai (Charlie) Ye, an experienced C/C++ software engineer, lives in Atlanta, Georgia. He is an expert at continuous inline request/result batching, real-time stream processing, asynchronous data transferring and parallel computation for the best communication throughput and latency. He has been working at SocketPro (https://github.com/udaparts/socketpro) for more than fifteen years.

Comments and Discussions

 
QuestionSnippet 5 Pin
Nelek30-Dec-20 11:01
protectorNelek30-Dec-20 11:01 
AnswerRe: Snippet 5 Pin
Yuancai (Charlie) Ye1-Jan-21 6:32
Yuancai (Charlie) Ye1-Jan-21 6:32 
Thanks a lot for reminding me a text error within code snippet 5. I have just fixed it and updated the article.
GeneralRe: Snippet 5 Pin
Nelek1-Jan-21 8:29
protectorNelek1-Jan-21 8:29 
QuestionNeeds formatting Pin
Richard MacCutchan20-Sep-17 11:29
mveRichard MacCutchan20-Sep-17 11:29 
AnswerRe: Needs formatting Pin
Yuancai (Charlie) Ye22-Sep-17 3:57
Yuancai (Charlie) Ye22-Sep-17 3:57 
PraiseInterested in SocketPro Pin
Luo Liurong10-Sep-17 20:55
professionalLuo Liurong10-Sep-17 20:55 
GeneralRe: Interested in SocketPro Pin
Yuancai (Charlie) Ye11-Sep-17 5:54
Yuancai (Charlie) Ye11-Sep-17 5:54 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.