Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import jakarta.inject.Inject;

import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.quota.QuotaComponent;
import org.apache.james.core.quota.QuotaCurrentValue;
import org.apache.james.core.quota.QuotaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
Expand All @@ -59,6 +61,8 @@ public class CassandraQuotaCurrentValueDao {
private final PreparedStatement getQuotaCurrentValueStatement;
private final PreparedStatement getQuotasByComponentStatement;
private final PreparedStatement deleteQuotaCurrentValueStatement;
private final DriverExecutionProfile readProfile;
private final DriverExecutionProfile writeProfile;

@Inject
public CassandraQuotaCurrentValueDao(CqlSession session) {
Expand All @@ -68,14 +72,17 @@ public CassandraQuotaCurrentValueDao(CqlSession session) {
this.getQuotaCurrentValueStatement = session.prepare(getQuotaCurrentValueStatement().build());
this.getQuotasByComponentStatement = session.prepare(getQuotasByComponentStatement().build());
this.deleteQuotaCurrentValueStatement = session.prepare(deleteQuotaCurrentValueStatement().build());
this.readProfile = ProfileLocator.READ.locateProfile(session, "CURRENT-QUOTA");
this.writeProfile = ProfileLocator.WRITE.locateProfile(session, "CURRENT-QUOTA");
}

public Mono<Void> increase(QuotaCurrentValue.Key quotaKey, long amount) {
return queryExecutor.executeVoid(increaseStatement.bind()
.setString(QUOTA_COMPONENT, quotaKey.getQuotaComponent().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
.setLong(CURRENT_VALUE, amount))
.setLong(CURRENT_VALUE, amount)
.setExecutionProfile(writeProfile))
.onErrorResume(ex -> {
LOGGER.warn("Failure when increasing {} {} quota for {}. Quota current value is thus not updated and needs recomputation",
quotaKey.getQuotaComponent().getValue(), quotaKey.getQuotaType().getValue(), quotaKey.getIdentifier(), ex);
Expand All @@ -88,7 +95,8 @@ public Mono<Void> decrease(QuotaCurrentValue.Key quotaKey, long amount) {
.setString(QUOTA_COMPONENT, quotaKey.getQuotaComponent().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
.setLong(CURRENT_VALUE, amount))
.setLong(CURRENT_VALUE, amount)
.setExecutionProfile(writeProfile))
.onErrorResume(ex -> {
LOGGER.warn("Failure when decreasing {} {} quota for {}. Quota current value is thus not updated and needs recomputation",
quotaKey.getQuotaComponent().getValue(), quotaKey.getQuotaType().getValue(), quotaKey.getIdentifier(), ex);
Expand All @@ -100,22 +108,25 @@ public Mono<QuotaCurrentValue> getQuotaCurrentValue(QuotaCurrentValue.Key quotaK
return queryExecutor.executeSingleRow(getQuotaCurrentValueStatement.bind()
.setString(QUOTA_COMPONENT, quotaKey.getQuotaComponent().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue()))
.map(row -> convertRowToModel(row));
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
.setExecutionProfile(readProfile))
.map(this::convertRowToModel);
}

public Mono<Void> deleteQuotaCurrentValue(QuotaCurrentValue.Key quotaKey) {
return queryExecutor.executeVoid(deleteQuotaCurrentValueStatement.bind()
.setString(QUOTA_COMPONENT, quotaKey.getQuotaComponent().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue()));
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
.setExecutionProfile(writeProfile));
}

public Flux<QuotaCurrentValue> getQuotasByComponent(QuotaComponent quotaComponent, String identifier) {
return queryExecutor.executeRows(getQuotasByComponentStatement.bind()
.setString(QUOTA_COMPONENT, quotaComponent.getValue())
.setString(IDENTIFIER, identifier))
.map(row -> convertRowToModel(row));
.setString(IDENTIFIER, identifier)
.setExecutionProfile(readProfile))
.map(this::convertRowToModel);
}

private Update increaseStatement() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@
import jakarta.inject.Inject;

import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.core.quota.QuotaComponent;
import org.apache.james.core.quota.QuotaLimit;
import org.apache.james.core.quota.QuotaScope;
import org.apache.james.core.quota.QuotaType;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
Expand All @@ -55,6 +57,8 @@ public class CassandraQuotaLimitDao {
private final PreparedStatement getQuotaLimitsStatement;
private final PreparedStatement setQuotaLimitStatement;
private final PreparedStatement deleteQuotaLimitStatement;
private final DriverExecutionProfile readProfile;
private final DriverExecutionProfile writeProfile;

@Inject
public CassandraQuotaLimitDao(CqlSession session) {
Expand All @@ -63,22 +67,26 @@ public CassandraQuotaLimitDao(CqlSession session) {
this.getQuotaLimitsStatement = session.prepare(getQuotaLimitsStatement().build());
this.setQuotaLimitStatement = session.prepare(setQuotaLimitStatement().build());
this.deleteQuotaLimitStatement = session.prepare((deleteQuotaLimitStatement().build()));
this.readProfile = ProfileLocator.READ.locateProfile(session, "QUOTA-LIMITS");
this.writeProfile = ProfileLocator.WRITE.locateProfile(session, "QUOTA-LIMITS");
}

public Mono<QuotaLimit> getQuotaLimit(QuotaLimit.QuotaLimitKey quotaKey) {
return queryExecutor.executeSingleRow(getQuotaLimitStatement.bind()
.setString(QUOTA_COMPONENT, quotaKey.getQuotaComponent().getValue())
.setString(QUOTA_SCOPE, quotaKey.getQuotaScope().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue()))
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
.setExecutionProfile(readProfile))
.map(this::convertRowToModel);
}

public Flux<QuotaLimit> getQuotaLimits(QuotaComponent quotaComponent, QuotaScope quotaScope, String identifier) {
return queryExecutor.executeRows(getQuotaLimitsStatement.bind()
.setString(QUOTA_COMPONENT, quotaComponent.getValue())
.setString(QUOTA_SCOPE, quotaScope.getValue())
.setString(IDENTIFIER, identifier))
.setString(IDENTIFIER, identifier)
.setExecutionProfile(readProfile))
.map(this::convertRowToModel);
}

Expand All @@ -88,15 +96,17 @@ public Mono<Void> setQuotaLimit(QuotaLimit quotaLimit) {
.setString(QUOTA_SCOPE, quotaLimit.getQuotaScope().getValue())
.setString(IDENTIFIER, quotaLimit.getIdentifier())
.setString(QUOTA_TYPE, quotaLimit.getQuotaType().getValue())
.set(QUOTA_LIMIT, quotaLimit.getQuotaLimit().orElse(null), Long.class));
.set(QUOTA_LIMIT, quotaLimit.getQuotaLimit().orElse(null), Long.class)
.setExecutionProfile(writeProfile));
}

public Mono<Void> deleteQuotaLimit(QuotaLimit.QuotaLimitKey quotaKey) {
return queryExecutor.executeVoid(deleteQuotaLimitStatement.bind()
.setString(QUOTA_COMPONENT, quotaKey.getQuotaComponent().getValue())
.setString(QUOTA_SCOPE, quotaKey.getQuotaScope().getValue())
.setString(IDENTIFIER, quotaKey.getIdentifier())
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue()));
.setString(QUOTA_TYPE, quotaKey.getQuotaType().getValue())
.setExecutionProfile(writeProfile));
}

private Select getQuotaLimitStatement() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.backends.cassandra.utils;

import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;

public enum ProfileLocator {
READ(profileLocatorFunction("READ")),
WRITE(profileLocatorFunction("WRITE"));

private static BiFunction<CqlSession, String, DriverExecutionProfile> profileLocatorFunction(String baseProfileName) {
return (session, daoName) -> {
DriverConfig config = session.getContext().getConfig();
Map<String, ? extends DriverExecutionProfile> profiles = config.getProfiles();
String profileName = baseProfileName + "-" + daoName;

return Optional.ofNullable(profiles.get(profileName))
.map(DriverExecutionProfile.class::cast)
.or(() -> Optional.ofNullable(profiles.get(baseProfileName)))
.orElseGet(config::getDefaultProfile);
};
}

private final BiFunction<CqlSession, String, DriverExecutionProfile> profileLocatorFunction;

ProfileLocator(BiFunction<CqlSession, String, DriverExecutionProfile> profileLocatorFunction) {
this.profileLocatorFunction = profileLocatorFunction;
}

public DriverExecutionProfile locateProfile(CqlSession session, String daoName) {
return profileLocatorFunction.apply(session, daoName);
}
}
29 changes: 26 additions & 3 deletions backends-common/cassandra/src/test/resources/cassandra-driver.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
datastax-java-driver {
basic.request {
timeout = 5 seconds
}
basic.request {
timeout = 5 seconds
consistency = QUORUM
page-size = 5000
serial-consistency = SERIAL
}
profiles {
# Provides controls on Execution profiles used by James
LWT {
basic.request.consistency = SERIAL
basic.request.serial-consistency = SERIAL
}
READ {
basic.request.consistency = QUORUM
}
WRITE {
basic.request.consistency = QUORUM
}
OPTIMISTIC_CONSISTENCY_LEVEL {
basic.request.consistency = LOCAL_ONE
basic.request.serial-consistency = LOCAL_ONE
}
BATCH {
basic.request.timeout = 1 hour
}
}
}
57 changes: 56 additions & 1 deletion docs/modules/servers/pages/distributed/configure/cassandra.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,59 @@ features to your users, you can consider disabling them in order to improve perf
| Optional, default to 0. Defensive value to add to uids and modseqs generated. This can be used as an heuristic to maintain
consistency even when consensus of Lightweight Transactions is broken, exemple during a disaster recovery process.

|===
|===

== Extra execution profiles

Operators can specify fin grain execution profiles. This allows setting per-dao request settings - like consistency levels.

DAO would try to select at start time the most specific profile and fallback to `READ` and `WRITE` profiles.

Profiles include:

- READ-ACLV2
- WRITE-ACLV2
- READ-APPLICABLE-FLAGS
- WRITE-APPLICABLE-FLAGS
- READ-ATTACHMENTV2
- WRITE-ATTACHMENTV2
- READ-MAILBOX
- WRITE-MAILBOX
- READ-MAILBOXPATHV3
- WRITE-MAILBOXPATHV3
- READ-THREAD
- WRITE-THREAD
- READ-THREAD-LOOKUP
- WRITE-THREAD-LOOKUP
- READ-UID (only used for IMAP STATUS/SELECT)
- READ-MODSEQ (only used for IMAP STATUS/SELECT)
- READ-MESSAGEV3
- WRITE-MESSAGEV3
- READ-RECENTS
- WRITE-RECENTS
- READ-IMAP-UID-TABLE
- WRITE-IMAP-UID-TABLE
- READ-FIRST-UNSEEN
- WRITE-FIRST-UNSEEN
- READ-MARKED-AS-DELETED
- WRITE-MARKED-AS-DELETED
- READ-VACATION
- WRITE-VACATION
- READ-VACATION-REGISTRY
- WRITE-VACATION-REGISTRY
- READ-USER
- WRITE-USER
- READ-RRT
- WRITE-RRT
- READ-RRT-SOURCE
- WRITE-RRT-SOURCE
- READ-DOMAIN
- WRITE-DOMAIN
- READ-CURRENT-QUOTA
- WRITE-CURRENT-QUOTA
- READ-QUOTA-LIMITS
- WRITE-QUOTA-LIMITS
- READ-PUSH-SUBSCRIPTION
- WRITE-PUSH-SUBSCRIPTION
- READ-USER-ACL
- WRITE-USER-ACL
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@

import org.apache.commons.lang3.NotImplementedException;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.ProfileLocator;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.table.CassandraACLV2Table;
import org.apache.james.mailbox.model.MailboxACL;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.type.codec.TypeCodecs;
import com.github.fge.lambdas.Throwing;
Expand All @@ -49,10 +51,14 @@ public class CassandraACLDAOV2 {
private final PreparedStatement replaceRights;
private final PreparedStatement delete;
private final PreparedStatement read;
private final DriverExecutionProfile readProfile;
private final DriverExecutionProfile writeProfile;

@Inject
public CassandraACLDAOV2(CqlSession session) {
this.executor = new CassandraAsyncExecutor(session);
this.readProfile = ProfileLocator.READ.locateProfile(session, "ACLV2");
this.writeProfile = ProfileLocator.WRITE.locateProfile(session, "ACLV2");
this.insertRights = prepareInsertRights(session);
this.removeRights = prepareRemoveRights(session);
this.replaceRights = prepareReplaceRights(session);
Expand Down Expand Up @@ -100,13 +106,15 @@ private PreparedStatement prepareRead(CqlSession session) {
public Mono<Void> delete(CassandraId cassandraId) {
return executor.executeVoid(
delete.bind()
.setUuid(CassandraACLV2Table.ID, cassandraId.asUuid()));
.setUuid(CassandraACLV2Table.ID, cassandraId.asUuid())
.setExecutionProfile(writeProfile));
}

public Mono<MailboxACL> getACL(CassandraId cassandraId) {
return executor.executeRows(
read.bind()
.set(CassandraACLV2Table.ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID))
.set(CassandraACLV2Table.ID, cassandraId.asUuid(), TypeCodecs.TIMEUUID)
.setExecutionProfile(readProfile))
.map(Throwing.function(row -> {
MailboxACL.EntryKey entryKey = MailboxACL.EntryKey.deserialize(row.getString(CassandraACLV2Table.KEY));
MailboxACL.Rfc4314Rights rights = row.getSet(CassandraACLV2Table.RIGHTS, String.class)
Expand All @@ -125,17 +133,20 @@ public Mono<Void> updateACL(CassandraId cassandraId, MailboxACL.ACLCommand comma
return executor.executeVoid(insertRights.bind()
.setUuid(CassandraACLV2Table.ID, cassandraId.asUuid())
.setString(CassandraACLV2Table.KEY, command.getEntryKey().serialize())
.setSet(CassandraACLV2Table.RIGHTS, ImmutableSet.copyOf(rightStrings), String.class));
.setSet(CassandraACLV2Table.RIGHTS, ImmutableSet.copyOf(rightStrings), String.class)
.setExecutionProfile(writeProfile));
case REMOVE:
return executor.executeVoid(removeRights.bind()
.setUuid(CassandraACLV2Table.ID, cassandraId.asUuid())
.setString(CassandraACLV2Table.KEY, command.getEntryKey().serialize())
.setSet(CassandraACLV2Table.RIGHTS, ImmutableSet.copyOf(rightStrings), String.class));
.setSet(CassandraACLV2Table.RIGHTS, ImmutableSet.copyOf(rightStrings), String.class)
.setExecutionProfile(writeProfile));
case REPLACE:
return executor.executeVoid(replaceRights.bind()
.setUuid(CassandraACLV2Table.ID, cassandraId.asUuid())
.setString(CassandraACLV2Table.KEY, command.getEntryKey().serialize())
.setSet(CassandraACLV2Table.RIGHTS, rightStrings, String.class));
.setSet(CassandraACLV2Table.RIGHTS, rightStrings, String.class)
.setExecutionProfile(writeProfile));
default:
throw new NotImplementedException(command.getEditMode() + "is not supported");
}
Expand Down
Loading