B4J Question mqtt and multithreading work...

quique

Member
Licensed User
Longtime User
Hi There!

I am trying to create a non-ui (console) code which will subscribe to one or more topics into an MQTT broker and store each received message into MYSQL. Speed is of the outmost importance. (I am struggling to understand the multithreading mechanism of B4J)

I already got it working, everything inside the Main Module. The sub mqtt_MessageArrived just forwards the topic and payload into a sub which parses it according to my needs and then asks for a mysql connection from a pool and executes a transaction writing the resulting data into the database.

I am sure the above thing is singlethreaded. I suspect that once the number of mqtt messages ramp up (I am expecting hundredds of messages per second), this scenario will suffer (or fail!) due to it's single-threaded nature.

QUESTION: Is it possible to, once a topic and payload is received (which I assume it is a single threaded event) to create another thread, invoking a sub which would parse that information, ask for a MySQL connection available from a pool and write it?

I am aiming into liberating the main thread, responsible for listening for arriving mqtt messages, ASAP.

What I am planning to do in order to convert my singlethreaded code into multithreaded is the following:

1) Create a class module and place inside there all message parsing and storing into mysql related code.

2) From the main module, when an MQTT message arrives I create a new object from that class, and I invoke the processing sub inside it:


B4X:
'ALL MQTT MESSAGES ARE LOGGED INTO MYSQL DATABASE:
Private Sub mqtt_MessageArrived (Topic As String, Payload() As Byte)
    Dim newmessage As messages
    newmessage.processs(Topic,Payload)
    Log(BytesToString(Payload, 0, Payload.Length, "ASCII") & " (" & Payload.Length & " bytes)")
End Sub

This would ensure messages class will be started inside a new thread ? If this is so, then is it ok for the database pool object to be created inside the Main module, and from the messages class to grab the necessary mysql connections ? (I read about using global variables not being safe).

Any hint, opinion and/or or comment is appreciated!

Enrique
 
Last edited:

OliverA

Expert
Licensed User
Longtime User
still missing the getconnectionAsync, but you can live without it.
If he is using a ConnectionPool object, it does not have a getconnectionAsync method.

Also, I don't know if with his current implementation of processing the SQL in the MQTT's MessageArrived event he really needs any async methods. Since each MessageArrived only processes one INSERT statement, he can just use a simple ExecNonQuery2 instead of AddNonQueryToBatch, ExecNonQueryBatch and a seperate NonQueryComplete event method (since author does not want to use Wait For). Please note that the next MessageArrived event does not depend on the previous event having been completed, nor does it matter that a previous MessageArrived event is blocked by an SQL method.

B4X:
'Note: Define/Dim query variable outside of MessageArrived, saving some time declaring variable over and over (and reducing garbage collection)
'Dim query As String =" INSERT INTO paquetes (topic,tower,source,command,subcommand,user,pretopic,fecha) VALUES (?,?,?,?,?,?,?,?)"

'ALL MQTT MESSAGES ARE LOGGED INTO MYSQL DATABASE:
Private Sub mqtt_MessageArrived (Topic As String, Payload() As Byte)
' Do some processing
' Now add data to database
   Dim mysql as SQL
   mysql = pool.GetConnection
   mysql.ExecNonQuery2(query,Array As Object( elpath(1),torre,source,comando,subcomando,userid,elpath(0),fechahoy))
   mysql.close
' Do some more processing
End Sub

Now if one queues incoming messages, then it would make sense to batch the queue and process that batch with the ExecNonQueryBatch command. I also just realized that a queued implementation may be overkill if the author does not care in what order the messages are added to the database (the queue implementation would preserve order).
 
Upvote 0

EnriqueGonzalez

Expert
Licensed User
Longtime User
If he is using a ConnectionPool object, it does not have a getconnectionAsync method.

Please see attached image. The hikari pool also has getConnectionAsync
 

Attachments

  • async.png
    async.png
    11.9 KB · Views: 266
Upvote 0

quique

Member
Licensed User
Longtime User
Hi OliverA! I am wondering what happens with mqtt library, upon receiving more than one message ...

I get alerted on mqtt_MessagaArrived about ONE message ... but maybe there are 3 more "in the queue" (where ?) (mqtt library got an internal queue) ? (or it just discards any message that arrives while it can't be "attended" by my code ?) (it shouldn't discard it !)
 
Upvote 0

OliverA

Expert
Licensed User
Longtime User
For each complete message that MQTT has received, it fires off an B4J event as a separate thread. MQTT does not care if a previous thread has finished or not. As to ho it internally handles message receiving, I have no clue.
 
Upvote 0

OliverA

Expert
Licensed User
Longtime User
The hikari pool also has getConnectionAsync
I am ignorant of that fact, since I mainly deal with the standard connection pool. In defense of my ignorance, you did not mention Hikari when you mentioned the GetConnectionAsync. :)
 
Upvote 0

EnriqueGonzalez

Expert
Licensed User
Longtime User
I am ignorant of that fact, since I mainly deal with the standard connection pool. In defense of my ignorance, you did not mention Hikari when you mentioned the GetConnectionAsync. :)
No no!

Both connection pool based on c3po (official library) and hikari have getconnectionasync
 
Upvote 0

OliverA

Expert
Licensed User
Longtime User
Please see my signature on that one. You are correct @Enrique Gonzalez R. Now it makes sense! I used a project where I used my ConnectionPoolManager class!! It's similar to ConnectionPool, but I used it to test various pools (H2, HSQLDB, Hikari, C3P0, Tomcat and Vibur). THAT class (ConnectionPoolManager) did not implement the async method! Live and learn.
 
Upvote 0

quique

Member
Licensed User
Longtime User
Hello again ...

After performing all the changes and enhancements proposed, I did not take a look back at my code until today ... only to find that the whole way we refined on this topic, did not work after all:

I am getting:

mqtt_MessageArrived: (TimeoutException) com.mchange.v2.resourcepool.TimeoutException: A client timed out while waiting to acquire a resource from com.mchange.v2.resourcepool.BasicResourcePool@1b40d5f0 -- timeout at awaitAvailable()

So the pool seems to have exhausted it's connections "right away" ... it seems that the ...:


B4X:
Sub SQL_NonQueryComplete(success As Boolean)
    Dim mysql As SQL = Sender
    mysql.close
End Sub

Is not closing it ... (or there is something more going on)
 
Last edited:
Upvote 0

OliverA

Expert
Licensed User
Longtime User
Upvote 0

quique

Member
Licensed User
Longtime User
Is there a way to get some metrics from the pool object ? Like how many pool threads are in use ? I am not finding it, but I also am not proficient (or anything close to it) on Java and the underlying methods / intrincancies to get data from the object itself, other than try pool. and see what autocomplete returns :) ...
 
Upvote 0

OliverA

Expert
Licensed User
Longtime User
Try
B4X:
Dim jo As JavaObject = pool
Log(jo.RunMethod("getNumConnectionsAllUsers", Null))
 
Upvote 0
Top