B4J Question mqtt and multithreading work...

quique

Member
Licensed User
Longtime User
Hi There!

I am trying to create a non-ui (console) code which will subscribe to one or more topics into an MQTT broker and store each received message into MYSQL. Speed is of the outmost importance. (I am struggling to understand the multithreading mechanism of B4J)

I already got it working, everything inside the Main Module. The sub mqtt_MessageArrived just forwards the topic and payload into a sub which parses it according to my needs and then asks for a mysql connection from a pool and executes a transaction writing the resulting data into the database.

I am sure the above thing is singlethreaded. I suspect that once the number of mqtt messages ramp up (I am expecting hundredds of messages per second), this scenario will suffer (or fail!) due to it's single-threaded nature.

QUESTION: Is it possible to, once a topic and payload is received (which I assume it is a single threaded event) to create another thread, invoking a sub which would parse that information, ask for a MySQL connection available from a pool and write it?

I am aiming into liberating the main thread, responsible for listening for arriving mqtt messages, ASAP.

What I am planning to do in order to convert my singlethreaded code into multithreaded is the following:

1) Create a class module and place inside there all message parsing and storing into mysql related code.

2) From the main module, when an MQTT message arrives I create a new object from that class, and I invoke the processing sub inside it:


B4X:
'ALL MQTT MESSAGES ARE LOGGED INTO MYSQL DATABASE:
Private Sub mqtt_MessageArrived (Topic As String, Payload() As Byte)
    Dim newmessage As messages
    newmessage.processs(Topic,Payload)
    Log(BytesToString(Payload, 0, Payload.Length, "ASCII") & " (" & Payload.Length & " bytes)")
End Sub

This would ensure messages class will be started inside a new thread ? If this is so, then is it ok for the database pool object to be created inside the Main module, and from the messages class to grab the necessary mysql connections ? (I read about using global variables not being safe).

Any hint, opinion and/or or comment is appreciated!

Enrique
 
Last edited:

EnriqueGonzalez

Well-Known Member
Licensed User
Longtime User
I am not sure about mqtt client multithread, but last time I had to create a project like that. I divided it into 2.

One for the client that receives the instruction and a second project that makes everything else. The bridge was shell that is indeed multithread. Of course by opening a program It was guaranteed to be in a different process handled fast
 
Upvote 0

EnriqueGonzalez

Well-Known Member
Licensed User
Longtime User
Oh an another advice, if you are planning to have a lot of traffic, Mqtt falls short compared to jserver,
 
Upvote 0

quique

Member
Licensed User
Longtime User
Hi Tocayo! The traffic is incoming from clusters of "alarmas vecinales" which I developed, based on ESP32 microcontrollers and a bunch of stuff (including a GSM modem).

So they are not capable of much more than sending a bunch of low count bytes messages through MQTT protocol. I am planning on using a jserver on this very same B4J app in order to remotely monitor some statistics, but on such "front" I already got it, that it will be multithreaded provided I start a new handle for each webpage.

My main concern now is not creating a bottleneck will parsing / processing / storing each MQTT message.

Saludos,

Enrique
 
Upvote 0

EnriqueGonzalez

Well-Known Member
Licensed User
Longtime User
MQTT library is based on mosquitto project. and this one is not multithreaded.

but most likely if you stick with sql async methods you should be safe.
 
Upvote 0

quique

Member
Licensed User
Longtime User
Interesting!

So it is NOT OK to put de db and packet processing functions into a class, and evey time an MQTT message arrives with a new packet, create an instance of that class and pass into it the packet so it can take care of everything ?

Instead, I should receive each mqtt message, and right there process it and add my single insert into a batch mysql with AddNonQueryToBatch, and then issue a
ExecNonQueryBatch ?

I suppose I could benchmark how much time does it take to perform a complete "cycle", comparing the first and second options. I will try to get some numbers.

Thanks, Erel and Enrique !
 
Upvote 0

quique

Member
Licensed User
Longtime User
Update: Here I got some "benchmarking results":

First I tried the thingy I proposed doing, which is:


B4X:
'ALL MQTT MESSAGES ARE LOGGED INTO MYSQL DATABASE:
Private Sub mqtt_MessageArrived (Topic As String, Payload() As Byte)
    Dim sta As Long
    Dim sto As Long
    sta=DateTime.Now
    Dim newmessage As messageprocessor  'This class process and stores in Mysql the mqtt topic and payload.
    newmessage.initialize
    newmessage.process(Topic,Payload)
    sto=DateTime.Now
    sta=sto-sta
    Log("TIME USED: " & sta)
End Sub

This throws the following numbers:

B4X:
Waiting for debugger to connect...
Program started.
<bunch of stuff deleted for sake of brevity>
Web Server started
MQTT Started
MQTT Connected
TIME USED: 26
TIME USED: 1
TIME USED: 2
TIME USED: 2
TIME USED: 1
TIME USED: 1
TIME USED: 1
TIME USED: 2
TIME USED: 2
TIME USED: 2
TIME USED: 2
TIME USED: 6
TIME USED: 2
TIME USED: 3
TIME USED: 2

Now, I took all the processing and SQL logic and stuffed it inside the Sub mqtt_MessageArrived. My new numbers are way better:

B4X:
...<deleted log stuff>
Emulated network latency: 100ms
Web Server started
MQTT Started
MQTT Connected
TIME USED: 7
TIME USED: 0
TIME USED: 2
TIME USED: 0
TIME USED: 0
TIME USED: 0
TIME USED: 0
TIME USED: 0
TIME USED: 1
TIME USED: 0

Erel, as you say, "use async sql" and "Never try to create a new thread" :)

At this time I am struggling on the following QUESTION: Do I have to keep an SQL connection open all the time, for the async to work ?

I define the following in Sub Process_globals:

B4X:
    Public pool As ConnectionPool
    Dim mysql As SQL

Then on AppStart, I do:
B4X:
    mysql = pool.GetConnection

And inside my mqtt_messageArrived I do store my processed payload with:

B4X:
                Dim query As String=" INSERT INTO paquetes (topic,tower,source,command,subcommand,user,pretopic,fecha) VALUES (?,?,?,?,?,?,?,?)"
                mysql.AddNonQueryToBatch(query,Array As Object( thepath(1),tower,source,command,subcommand,userid,thepath(0),todaydate))
                mysql.ExecNonQueryBatch("SQL")

But then I never do a mysql.close. If I put a mysql.close right after the ExecNonQueryBatch("SQL"), I get an error.
 
Upvote 0

EnriqueGonzalez

Well-Known Member
Licensed User
Longtime User
mysql = pool.GetConnection

you should use, getconnectionAsync

Dim newmessage As messageprocessor 'This class process and stores in Mysql the mqtt topic and payload.
newmessage.initialize
newmessage.process(Topic,Payload)

this is actually right, even if you write Sleep(0) at the start of process
Wait for and Sleep works like a Return, it will keep procesing your data while finishing the caller sub.

But then I never do a mysql.close. If I put a mysql.close right after the ExecNonQueryBatch("SQL"), I get an error.

Use wait for:

B4X:
    Dim query As String=" INSERT INTO paquetes (topic,tower,source,command,subcommand,user,pretopic,fecha) VALUES (?,?,?,?,?,?,?,?)"
                mysql.AddNonQueryToBatch(query,Array As Object( thepath(1),tower,source,command,subcommand,userid,thepath(0),todaydate))
                mysql.ExecNonQueryBatch("SQL")

    wait for SQL_NonQueryComplete (Success As Boolean)
   sql.close

You should always get connection and then close it. If you keep it open you could clog your proccesses
 
Upvote 0

quique

Member
Licensed User
Longtime User
Tocayo, I am not 100% sure I want to use that "wait for" ... since it is imperative that my mqtt_MessageArrived is not held hostage of an eventually sluggish mysql connection.

Maybe I can close it inside here ?: I will try it and if something goes funny, I will report back:


B4X:
Sub SQL_NonQueryComplete(success As Boolean)
    mysql.close
End Sub
 
Upvote 0

quique

Member
Licensed User
Longtime User
ok ... I did it, benchmark results are good, just as before closing the sql connection.

Now I ask for a connection from the pool right BEFORE issueing the async sql nonquery:


B4X:
                query=" INSERT INTO paquetes (topic,tower,source,command,subcommand,user,pretopic,fecha) VALUES (?,?,?,?,?,?,?,?)"
                mysql = pool.GetConnection
                mysql.AddNonQueryToBatch(query,Array As Object( elpath(1),torre,source,comando,subcomando,userid,elpath(0),fechahoy))
                mysql.ExecNonQueryBatch("SQL")

... and I am closing this connection as stated in my earlier message:

B4X:
 Sub SQL_NonQueryComplete(success As Boolean)
    mysql.close
End Sub

I am a bit afraid that under heavy traffic, such mysql.close will end up closing some other connection, instead than the intended one, since supposedly there may be more than one running concurrently... I may try to create such event.
 
Upvote 0

EnriqueGonzalez

Well-Known Member
Licensed User
Longtime User
Tocayo, I am not 100% sure I want to use that "wait for" ... since it is imperative that my mqtt_MessageArrived is not held hostage of an eventually sluggish mysql connection.

it will act exactly the same if you separete it in another function and use wait for.
Sub SQL_NonQueryComplete(success As Boolean)
mysql.close

End Sub

this is wrong, you should do this:

B4X:
Sub SQL_NonQueryComplete(success As Boolean)
   dim mysql as SQL  = sender
    mysql.close
End Sub

if you are using a global variable for mysql then, remove it.

this is better:

B4X:
    Dim xrs As ResumableSub = Utils.getcon
    wait for (xrs) Complete (sql As SQL)
   
    If sql = Null Then
        fx.Msgbox(MainForm,"Error de conexion","Error tipo 1")
        Return
    End If

public Sub getCon As ResumableSub
    Pool.GetConnectionAsync("con")
    wait for (Pool) con_ConnectionReady (Success As Boolean, SQL As SQL)
   
    If Success Then
        Return SQL
    Else
        Return Null
    End If
End Sub
 
Upvote 0

EnriqueGonzalez

Well-Known Member
Licensed User
Longtime User
ok ... I did it, benchmark results are good, just as before closing the sql connection.

Now I ask for a connection from the pool right BEFORE issueing the async sql nonquery:


B4X:
                query=" INSERT INTO paquetes (topic,tower,source,command,subcommand,user,pretopic,fecha) VALUES (?,?,?,?,?,?,?,?)"
                mysql = pool.GetConnection
                mysql.AddNonQueryToBatch(query,Array As Object( elpath(1),torre,source,comando,subcomando,userid,elpath(0),fechahoy))
                mysql.ExecNonQueryBatch("SQL")

... and I am closing this connection as stated in my earlier message:

B4X:
 Sub SQL_NonQueryComplete(success As Boolean)
    mysql.close
End Sub

I am a bit afraid that under heavy traffic, such mysql.close will end up closing some other connection, instead than the intended one, since supposedly there may be more than one running concurrently... I may try to create such event.

you won me!

As stated before, use

dim mysql as SQL = sender

so you will prevent from closing the wrong sql.
 
Upvote 0

quique

Member
Licensed User
Longtime User
Tocayo Enrique, Out of curiosity, I did what you suggested a bit earlier:

B4X:
                query=" INSERT INTO paquetes (topic,tower,source,command,subcommand,user,pretopic,fecha) VALUES (?,?,?,?,?,?,?,?)"
                mysql = pool.GetConnection
                mysql.AddNonQueryToBatch(query,Array As Object( elpath(1),torre,source,comando,subcomando,userid,elpath(0),fechahoy))
                mysql.ExecNonQueryBatch("SQL")
                wait for SQL_NonQueryComplete (Success As Boolean)
                mysql.close


and it takes about 5x to 6x more time to complete the mqtt_MessageArrived sub:


B4X:
Web Server started
MQTT Started
MQTT Connected
TIME USED: 33
TIME USED: 6
TIME USED: 5
TIME USED: 5
TIME USED: 5
TIME USED: 6
TIME USED: 6
TIME USED: 6
TIME USED: 5
TIME USED: 5
 
Upvote 0

EnriqueGonzalez

Well-Known Member
Licensed User
Longtime User
TIME USED: 33

i suppose you are getting here the connection.

TIME USED: 6

this is used to save the actual information.

you should really call, everytime the connection and the closing it.

if you want to see the same low numbers as before, you could wrap all this new code into your class, and call Sleep(0) in your
newmessage.process(Topic,Payload)
sub. Not because it will be faster, but because you are actually finishing the caller sub earlier that the actual work.
 
Upvote 0

quique

Member
Licensed User
Longtime User
Tocayo, I think now the whole thing should be OK:


B4X:
                Dim mysql As SQL
                mysql= pool.GetConnection
                mysql.AddNonQueryToBatch(query,Array As Object( elpath(1),torre,source,comando,subcomando,userid,elpath(0),fechahoy))
                mysql.ExecNonQueryBatch("SQL")

And later ...

B4X:
Sub SQL_NonQueryComplete(success As Boolean)
    Dim mysql As SQL = Sender
    mysql.close
End Sub

This gives me the following numbers:


B4X:
...
Web Server started
MQTT Started
MQTT Connected
TIME USED: 4
TIME USED: 0
TIME USED: 0
TIME USED: 1
TIME USED: 1
TIME USED: 1
TIME USED: 1
TIME USED: 1
TIME USED: 0
TIME USED: 1
TIME USED: 0
TIME USED: 0
TIME USED: 0

Nice ! and hopefully code-wise is fine :)
 
Upvote 0

OliverA

Expert
Licensed User
Longtime User
Please note that you can also queue your incoming messages and then process them as needed. This way you can capture all messages and have the processing as a separate step. This will work (using a queue) if the messages come i sporadic, with various pauses between messages. If messages come at a certain frequency, and your processing can't keep up with the frequency of incoming messages, then queuing won't help (the processing of the messages must be sped up). An example of such a queue system is here: https://www.b4x.com/android/forum/threads/audiostreamer-and-buffer-size.91581/page-2#post-578625. It's for audio, but should be easily adapted to MQTT. Oh yeah, it's multi-threaded, but does not use classes.

Edit: Fixed URL link (thanks to @Enrique Gonzalez R )
 
Last edited:
Upvote 0
Top