Introduction
This example will give you an overview of the basic concepts of ETLBox. It shows you how you can write your own ETL job with pure C# code. The example is very simple - it introduces you to the basic concepts of the framework.
The example code will run on .NET core and complies to .NET Standard 2.0. You will also need a SQL Server up and running. The latest version of ETLBox can be downloaded via nuget.
If you are looking for further examples and details, see the project homepage - https://etlbox.net.
ETLBox Components
ETLBox is split into two main components: Control Flow Tasks and Data Flow Tasks. Some tasks in the Control Flow part are for logging purposes only. This article will give you an overview of both and then show you how you can write a simple ETL job.
Overview Control Flow Tasks
Control Flow task can be split in "General" task and "Logging" tasks. Control Flow Tasks reside in the ALE.ETLBox.ControlFlow
namespace - task for logging in the ALE.ETLBox.Logging
namespace.
Control Flow Tasks are a comprehensive set of tasks to manage, alter or query a database. With one single line of code, you will be able to create a table or fire a SQL on your database. If you have ever done this before using ADO.NET, you probably found out that there is some boilerplate code you have to write over and over again. The idea behind the Control Flow Tasks is that you don't have to write the same code again and again, e.g., just for doing something trivial like opening up a connection and counting the rows in table. This should be doable with only one line of code.
ADO.NET - The Old Way
For instance, the code for establishing a connection and doing a simple row count on a table with a classic ADO.NET connection would look like this:
string connectionString = "Data Source=.; Database=Sample; Integrated Security=SSPI";
using (SqlConnection con = new SqlConnection(connectionString))
{
SqlCommand cmd = new SqlCommand("select count(*) from dbo.tbl", con);
con.Open();
int numrows = (int)cmd.ExecuteScalar();
}
RowCount with Control Flow Tasks
Now let's have a look at how to make a row count with the Control Flow Tasks library.
First, we need to setup a connection With Control Flow Task. You only have to set up your database connection string once, like this:
ControlFlow.CurrentDbConnection =
new SqlConnectionManager(new ConnectionString
("Data Source=.; Database=Sample; Integrated Security=SSPI""));
The connection will be stored in a static property and used by all subsequent tasks if no other connection is passed when a task is executed.
Now you can use a RowCountTask
to query the number of rows within a table with only one line.
int count = RowCountTask.Count("dbo.tbl");
Internally, an ADO.NET connection is opened up (the default ADO.NET connection pooling is used) and a select count(*) from dbo.tbl
is executed on the database. The result is return from the RowCountTask
.
Why Not Entitiy Framework
ETLBox was designed to be used as an ETL object library. Therefore, the user normally deals with big data, some kind of datawarehouse structures and is used to have full control over the database. With the underlying power of ADO.NET - which is used by ETLBox - you have full access to the database and basically can do anything you are used to do with SQL on the server. As EF (Entity Framework) is a high sophisticated ORM tool, it comes with the downside that you can only do things on a database that EF allows you to do. But as EF does not incorporate all the possibilities that you can do with SQL and ADO.NET on a SQL Server, Entity Framework normally isn't a good choice for creating ETL jobs. This is also true for other ORM tools.
Overview Data Flow
You have some data somewhere - stored in some files, a table or somewhere else. Now you want to define a pipeline which takes this data, transforms it "on the fly" and writes it into a target (this could be again a database, a file or somewhere else). On an abstract level, this can be seen as an ETL process (ETL = Extract, Transform, Load). Data Flow component will allow you to define your own ETL job. All Data Flow tasks reside in the ALE.ETLBox.DataFlow
namespace.
Source Components
All dataflow pipelines will need at least one or more sources. Sources are basically everything that can read data from someplace (e.g., CSV file or a database table) and then post this data into the pipeline. All sources should be able to read data asynchronously. That means, while the component reads data from the source, it simultaneously sends the already processed data to components that are connected to source. There are currently two built-in data sources: CSVSource
and DBSource
. If you are in need of another source component, you can extend the CustomSource.
Once a source starts reading data, it will start sending data to its connected components. These could be either a Transformation or Destination.
Transformations always have at least one input and one output. Inputs can be connected either to other transformations or sources, and the output can also connect to other transformations or to destinations. The purpose of a transformation component is to take the data from its input(s) and post the transformed data to its outputs. This is done on a row-by-row basis. As soon as there is any data in the input, the transformation will start and post the result to the output.
Buffering
Every transformation will come with an input. If the components connected to the input post data faster than the transformation can process it, the buffer will hold this data until the transformation can continue with the next item. This allows a source to read as fast as possible, allowing the already read data to be buffered in the memory - so the transformation will always have some data ready to process.
Transformation can be either blocking or non-blocking.
Non-Blocking transformations will start to process data as soon as it finds something in its input buffer. In the moment where it discovers data in there, it will start to transform it and send the data to registered output components.
Blocking transformations will stop the data processing for the whole pipe - the input buffer will wait until all data has reached the input. This means it will wait until all sources in the pipe connected to the transformation have read all data from their source, and all transformations before have processed the incoming data. When all data was read from the connected sources and transformations further down the pipe, the blocking transformation will start the transformation. In a transformation of a blocking transformation, you will therefore have access to all data buffered within the memory. For instance, the sort component is a blocking transformation. It will wait until all data has reached the transformation block - then it will sort it and post the sorted data to its output.
Destination Components
Destination components will have normally only one input. They define a target for your data, e.g., a database table or CSV file. Currently, there is DBDestination
and CSVDestination
implemented. If you are in need of another destination component, you can either extend the CustomDestination
or you open an issue in github.
Every Destination
comes with an input buffer.
While a Destination
for CSV target will open a file stream where data is written into it as soon as it arrives, a DB target will do this batch-by-batch - therefore, it will wait until the input buffer reaches the batch size (or the data is the last batch) and then insert it into the database using a bulk insert.
A Simple Dataflow
Let's look at a simple dataflow like this:
CSV File (Source) --> Row transformation --> DB destination
As the Data Flow Tasks are based on the same fundament like the Control Flow Tasks, you first should set up a connection like you do for a Control Flow Task.
ControlFlow.CurrentDbConnection =
new SqlConnectionManager(new ConnectionString("Data Source=.;Integrated Security=SSPI;"));
Now we need to create a source, in this example, it could contain order data. This will look like this:
CSVSource sourceOrderData = new CSVSource("demodata.csv");
We now add a row transformation. The default output format of a CSVSource
is a string
array. In this example, we will convert the CSV string
array into an Order
object.
RowTransformation<string[], Order> rowTrans = new RowTransformation<string[], Order>(
row => new Order(row)
);
Now we need to create a destination. Notice that the destination is typed with the Order
object.
DBDestination<Order> dest = new DBDestination<Order>("dbo.OrderTable");
Until now, we have only created the components, but we didn't define the Data Flow pipe. Let's do this now:
sourceOrderData.LinkTo(rowTrans);
rowTrans.LinkTo(dest);
This will create a data flow pipe CSVSource
-> RowTransformation
-> DBDestination
Now we will give the source the command to start reading data.
source.Execute();
This code will execute as an asynchronous task. If you want to wait for the Data Flow pipeline to finish, add this line to your code:
dest.Wait();
When dest.Wait()
returns, all data was read from the source and written into the database table.
Example
Now let's use all the Control Flow Tasks and Data Flow components together. In this example, we want to create a database and table, and load some data from an input file into the table. Also, we will have a very simple Transformation while we load the data.
Environment
For this demo, you can use Visual Studio for Mac and SQL Server for Linux running in a docker image. A User interface for managing SQL Server on Mac would be the Azure Data Studio.
First, we need to start a docker image running SQL Server on ubuntu. Run the following command line statement in the terminal to start up the container.
docker run -d --name sql_server_demo -e 'ACCEPT_EULA=Y'
-e 'SA_PASSWORD=reallyStrongPwd123' -p 1433:1433 microsoft/mssql-server-linux
With the command docker ps
, we can see the container is up and running.
Now create a new dotnet core console application. Do this either with your GUI or execute the following command:
dotnet new console
Add the current version of ETLBox as a package to your project.
dotnet add package ETLBox
Now you will be able to use the full set of tools coming with ETLBox.
Start Coding
Now head into the static main
method. For demo purposes, the following code will directly be added there. In real life, you will likely have some objects created.
First, we need to store a connection string in the static
Control Flow object.
ControlFlow.CurrentDbConnection = new SqlConnectionManager(new ConnectionString
("Data Source=.;Integrated Security=false;User=sa;password=reallyStrongPwd123"));
With CreateDatabaseTask
, we will create a new Database.
CreateDatabaseTask.Create("demo");
Also, we would like to change the connection to the database we just created and create a table in there using the CreateTableTask
.
ControlFlow.CurrentDbConnection = new SqlConnectionManager(new ConnectionString
("Data Source=.;Integrated Security=false;User=sa;password=reallyStrongPwd123;Initial Catalog=demo"));
CreateTableTask.Create("dbo.table1", new List<TableColumn>()
{
new TableColumn("ID","int",allowNulls:false, isPrimaryKey:true, isIdentity:true),
new TableColumn("Col1","nvarchar(100)",allowNulls:true),
new TableColumn("Col2","smallint",allowNulls:true)
});
Adding nlog.config
Before we test our demo project, we want to have some logging output displayed. ETLBox logging is built on nlog. On the etlbox website, you will find examples of how to configure logging with nlog. Add the following lines as nlog.config to your project root. Make sure it is copied into the output directory.
="1.0"="utf-8"
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xsi:schemaLocation="NLog NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<rules>
<logger name="*" minlevel="Debug" writeTo="console" />
</rules>
<targets>
<target name="console" xsi:type="Console" />
</targets>
</nlog>
Running the Project
Now build and run the project. A terminal window will pop up and display the logging output. As the logging level is set to debug, you will see all SQL code which is executed against the database. Check if the database and the table was created.
A Simple etl Pipeline
Next, we want to create a simple etl pipeline. First, we create a demo CSV file named input.csv. The input file contains header information and some value. Also, we need to copy it into the output directory.
Col1,Col2
Value,1
Value2,2
Value3,3
Now, we create a CSVSource
pointing to the newly created input file.
CSVSource source = new CSVSource("input.csv");
Before we continue, we will need an object that can hold our data. Let's call it MyData
.
public class MyData
{
public string Col1 { get; set; }
public string Col2 { get; set; }
}
Now we add a row transformation. The row transformation will receive a string
array from the source and transform it in our Mydata
object.
RowTransformation<string[], MyData> row = new RowTransformation<string[], MyData>
(
input => new MyData()
{ Col1 = input[0], Col2 = input[1] }
);
Next, we add a database destination pointing to our table.
DBDestination<MyData> dest = new DBDestination<MyData>("dbo.table1");
Now, we need to link the components of our dataflow
.
source.LinkTo(row);
row.LinkTo(dest);
After linking the components, we want to have the source reading the input data. The destination should wait until it received all data.
source.Execute();
dest.Wait();
Finally, we check if the data was successfully loaded into the table and write it into the console output. We use the SQLTask
for this.
SqlTask.ExecuteReader("Read all data from table1",
"select Col1, Col2 from dbo.table1",
col1 => Console.WriteLine(col1.ToString() + ","),
col2 => Console.WriteLine(col2.ToString())
);
Run Again
Let's run the project again and see the output.
You'll see that the data was successfully copied into the database table.
Whole Code
Here is the whole example code.
File Program.cs:
using System;
using System.Collections.Generic;
using ALE.ETLBox;
using ALE.ETLBox.ConnectionManager;
using ALE.ETLBox.ControlFlow;
using ALE.ETLBox.DataFlow;
namespace Demo
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
ControlFlow.CurrentDbConnection = new SqlConnectionManager(new ConnectionString
("Data Source=.;Integrated Security=false;User=sa;password=reallyStrongPwd123"));
CreateDatabaseTask.Create("demo");
ControlFlow.CurrentDbConnection = new SqlConnectionManager
(new ConnectionString("Data Source=.;Integrated Security=false;
User=sa;password=reallyStrongPwd123;Initial Catalog=demo"));
CreateTableTask.Create("dbo.table1", new List<TableColumn>()
{
new TableColumn("ID","int",allowNulls:false, isPrimaryKey:true, isIdentity:true),
new TableColumn("Col1","nvarchar(100)",allowNulls:true),
new TableColumn("Col2","smallint",allowNulls:true)
});
CSVSource source = new CSVSource("input.csv");
RowTransformation<string[], MyData> row = new RowTransformation<string[], MyData>(
input => new MyData() { Col1 = input[0], Col2 = input[1] });
DBDestination<MyData> dest = new DBDestination<MyData>("dbo.table1");
source.LinkTo(row);
row.LinkTo(dest);
source.Execute();
dest.Wait();
SqlTask.ExecuteReader("Read all data from table1",
"select Col1, Col2 from dbo.table1",
col1 => Console.WriteLine(col1.ToString() + ","),
col2 => Console.WriteLine(col2.ToString()));
}
public class MyData
{
public string Col1 { get; set; }
public string Col2 { get; set; }
}
}
}
File nlog.config
(copied to output directory):
="1.0"="utf-8"
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xsi:schemaLocation="NLog NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<rules>
<logger name="*" minlevel="Debug" writeTo="console" />
</rules>
<targets>
<target name="console" xsi:type="Console" />
</targets>
</nlog>
File input.csv (copied to output directory):
Col1,Col2
Value,1
Value2,2
Value3,3
Going Further
If you find the example useful, I would recommend that you have a look at the other examples that you'll find on the project homepage https://etlbox.net.
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.