Commit 7e9ed24c authored by Daniel Wolf's avatar Daniel Wolf
Browse files

Added synchronization

parent 0953691d
...@@ -34,30 +34,33 @@ import org.minidns.dnsmessage.DnsMessage ...@@ -34,30 +34,33 @@ import org.minidns.dnsmessage.DnsMessage
class QueryListener(private val context: Context) : QueryListener { class QueryListener(private val context: Context) : QueryListener {
private val writeQueriesToLog = context.getPreferences().shouldLogDnsQueriesToConsole() private val writeQueriesToLog = context.getPreferences().shouldLogDnsQueriesToConsole()
private val logQueriesToDb = context.getPreferences().queryLoggingEnabled private val logQueriesToDb = context.getPreferences().queryLoggingEnabled
private val waitingQueryLogs = LinkedHashMap<Int, DnsQuery>() private var waitingQueryLogs = LinkedHashMap<Int, DnsQuery>()
// Question ID -> <Query, Insert> (true if the query has already been inserted) // Question ID -> <Query, Insert> (true if the query has already been inserted)
private val queryLogState: MutableMap<Int, Boolean> = mutableMapOf() private val queryLogState: MutableMap<Int, Boolean> = mutableMapOf()
// Query -> Has already been inserted // Query -> Has already been inserted
private val doneQueries = mutableMapOf<DnsQuery, Boolean>() private var doneQueries = mutableMapOf<DnsQuery, Boolean>()
private val askedServer:String private val askedServer: String
private var nextQueryId = 0L private var nextQueryId = 0L
var lastDnsResponse:DnsMessage? = null var lastDnsResponse: DnsMessage? = null
private val databaseWriteJob:Job private val databaseWriteJob: Job
init { init {
val config = context.getPreferences().dnsServerConfig val config = context.getPreferences().dnsServerConfig
askedServer = if(config.hasTlsServer()) { askedServer = if (config.hasTlsServer()) {
"tls::" + config.servers.first().address.host!! "tls::" + config.servers.first().address.host!!
} else { } else {
"https::" + (config as HttpsDnsServerInformation).serverConfigurations.values.first().urlCreator.address.getUrl(false) "https::" + (config as HttpsDnsServerInformation).serverConfigurations.values.first().urlCreator.address.getUrl(
false
)
} }
nextQueryId = context.getDatabase().dnsQueryDao().getLastInsertedId() + 1 nextQueryId = context.getDatabase().dnsQueryDao().getLastInsertedId() + 1
databaseWriteJob = GlobalScope.launch(newSingleThreadContext("QueryListener-DatabaseWrite")) { databaseWriteJob =
while (isActive) { GlobalScope.launch(newSingleThreadContext("QueryListener-DatabaseWrite")) {
delay(1500) while (isActive) {
insertQueries() delay(1500)
insertQueries()
}
} }
}
} }
override suspend fun onDeviceQuery(questionMessage: DnsMessage, srcPort: Int) { override suspend fun onDeviceQuery(questionMessage: DnsMessage, srcPort: Int) {
...@@ -73,14 +76,20 @@ class QueryListener(private val context: Context) : QueryListener { ...@@ -73,14 +76,20 @@ class QueryListener(private val context: Context) : QueryListener {
questionTime = System.currentTimeMillis(), questionTime = System.currentTimeMillis(),
responses = mutableListOf() responses = mutableListOf()
) )
query.id = nextQueryId++ synchronized(waitingQueryLogs) {
waitingQueryLogs[questionMessage.id] = query query.id = nextQueryId++
queryLogState[questionMessage.id] = false waitingQueryLogs[questionMessage.id] = query
queryLogState[questionMessage.id] = false
}
} }
} }
override suspend fun onQueryForwarded(questionMessage: DnsMessage, destination: UpstreamAddress, usedHandle:DnsHandle) { override suspend fun onQueryForwarded(
if(writeQueriesToLog) { questionMessage: DnsMessage,
destination: UpstreamAddress,
usedHandle: DnsHandle
) {
if (writeQueriesToLog) {
context.log("Query with ID ${questionMessage.id} forwarded by $usedHandle") context.log("Query with ID ${questionMessage.id} forwarded by $usedHandle")
} }
...@@ -89,20 +98,25 @@ class QueryListener(private val context: Context) : QueryListener { ...@@ -89,20 +98,25 @@ class QueryListener(private val context: Context) : QueryListener {
} }
} }
override suspend fun onQueryResponse(responseMessage: DnsMessage, source: QueryListener.Source) { override suspend fun onQueryResponse(
responseMessage: DnsMessage,
source: QueryListener.Source
) {
if (writeQueriesToLog) { if (writeQueriesToLog) {
context.log("Returned from $source: $responseMessage") context.log("Returned from $source: $responseMessage")
} }
lastDnsResponse = responseMessage lastDnsResponse = responseMessage
if (logQueriesToDb) { if (logQueriesToDb) {
val query = waitingQueryLogs.remove(responseMessage.id)!! val query = synchronized(waitingQueryLogs) {
waitingQueryLogs.remove(responseMessage.id)!!
}
query.responseTime = System.currentTimeMillis() query.responseTime = System.currentTimeMillis()
for (answer in responseMessage.answerSection) { for (answer in responseMessage.answerSection) {
query.addResponse(answer) query.addResponse(answer)
} }
query.responseSource = source query.responseSource = source
doneQueries[query] = queryLogState.remove(responseMessage.id)!! doneQueries[query] = queryLogState.remove(responseMessage.id)!!
} }
} }
...@@ -114,16 +128,20 @@ class QueryListener(private val context: Context) : QueryListener { ...@@ -114,16 +128,20 @@ class QueryListener(private val context: Context) : QueryListener {
} }
private fun insertQueries() { private fun insertQueries() {
if(waitingQueryLogs.isEmpty() && doneQueries.isEmpty()) return if (waitingQueryLogs.isEmpty() && doneQueries.isEmpty()) return
val currentInsertions = waitingQueryLogs.toMap() val currentInsertions:Map<Int, DnsQuery>
val currentDoneInsertions = doneQueries.toMap() val currentDoneInsertions:Map<DnsQuery, Boolean>
doneQueries.clear() synchronized(waitingQueryLogs) {
currentInsertions = waitingQueryLogs.toMap()
currentDoneInsertions = doneQueries
doneQueries.clear()
}
val database = context.getDatabase() val database = context.getDatabase()
val dao = database.dnsQueryDao() val dao = database.dnsQueryDao()
database.runInTransaction { database.runInTransaction {
currentInsertions.forEach { (key, value) -> currentInsertions.forEach { (key, value) ->
if(queryLogState[key]!!) { if (queryLogState[key]!!) {
dao.insert(value) dao.insert(value)
queryLogState[key] = true queryLogState[key] = true
} else { } else {
...@@ -131,10 +149,9 @@ class QueryListener(private val context: Context) : QueryListener { ...@@ -131,10 +149,9 @@ class QueryListener(private val context: Context) : QueryListener {
} }
} }
currentDoneInsertions.forEach { (key, value) -> currentDoneInsertions.forEach { (key, value) ->
if(value) dao.update(key) if (value) dao.update(key)
else dao.insert(key) else dao.insert(key)
} }
} }
} }
} }
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment