カスタムトリガー - プル & プッシュ機構
- 更新日 2022/05/19
カスタムトリガー - プル & プッシュ機構
このトピックで提供される例は、SDK パッケージを使用して、時間ベースおよびイベントベースのトリガー、プル (link TriggerDemo)
およびプッシュ (SimpleMessageListenerContainer)
メカニズムを作成する方法を示しています。
例: タイムベース トリガー - プル機構
必要なトリガー条件は、一定の間隔 (秒単位) でテストされます。
- トリガーは、データベース (DB) を定期的にチェックし、ユーザーが指定する SQL(DB) 値が
0
より大きい場合は、タイマーベースのトリガーがトリガーされます。 - イベントが発生すると
consumer.accept(<RecordValue Instance>)
メソッドを実行し、関連する Bot をトリガーします。 - 実行のたびに条件を検証します。
- この例を実行するには、[ビルド グラドル] ファイルに有効な DB ドライバーを追加します。
この例は、タイマー ベースのトリガー (TriggerDemo)
の拡張で、トリガー プル メカニズムを示しています。
注: これは説明のための例であり、本番環境では使用しないでください。
@BotCommand(commandType = BotCommand.CommandType.Trigger)
@CommandPkg(label = "JDBC Query Trigger", description = "JDBC Query Trigger", icon = "jdbc.svg", name = "jdbcQueryTrigger",
return_type = RECORD, return_name = "TriggerData", return_description = "Available keys: triggerType")
public class DBStatus {
private static Logger logger = LogManager.getLogger(DBStatus.class);
// Map storing multiple tasks
private static final Map<String, TimerTask> taskMap = new ConcurrentHashMap<>();
private static final Timer TIMER = new Timer(true);
@TriggerId
private String triggerUid;
@TriggerConsumer
private Consumer consumer;
/*
* Starts the trigger.
*/
@StartListen
public void startTrigger(
@Idx(index="1", type = AttributeType.TEXT)
@Pkg(label = "Provide the database driver class")
@NotEmpty
String driverClassName,
@Idx(index="2", type = AttributeType.TEXT)
@Pkg(label = "Provide the Jdbc connection string")
@NotEmpty
String jdbcUrl,
@Idx(index="3", type = AttributeType.TEXT)
@Pkg(label = "Provide the user Name")
@NotEmpty
String userName,
@Idx(index="4", type = AttributeType.CREDENTIAL)
@Pkg(label = "Provide the password")
@NotEmpty
SecureString password,
@Idx(index="5", type = AttributeType.TEXT)
@Pkg(label = "Provide the SQL to check the records")
@NotEmpty
String sqlQuery,
@Idx(index = "6", type = AttributeType.NUMBER)
@Pkg(label = "Provide the interval to query in seconds", default_value = "300", default_value_type = DataType.NUMBER)
@GreaterThan("0")
@NumberInteger
@NotEmpty
Double interval) {
DataSource dataSource = getDataSource(driverClassName, jdbcUrl, userName, password);
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
logger.debug("checking DB");
try {
if(checkRecordsExist(dataSource.getConnection(), sqlQuery)){
consumer.accept(getRecordValue());
return;
}
} catch (SQLException e) {
logger.warn(e.getMessage(),e);
logger.warn("Trigger is still running.");
}
logger.debug("no records found");
}
};
taskMap.put(this.triggerUid, timerTask);
TIMER.schedule(timerTask, interval.longValue(), interval.longValue());
}
private RecordValue getRecordValue() {
List<Schema> schemas = new LinkedList<>();
List<Value> values = new LinkedList<>();
schemas.add(new Schema("triggerType"));
values.add(new StringValue("DBStatus"));
RecordValue recordValue = new RecordValue();
recordValue.set(new Record(schemas,values));
return recordValue;
}
/*
* Cancel all the tasks and clear the map.
*/
@StopAllTriggers
public void stopAllTriggers() {
taskMap.forEach((k, v) -> {
if (v.cancel()) {
taskMap.remove(k);
}
});
}
/*
* Cancel the task and remove from the map
*
* @param triggerUid
*/
@StopListen
public void stopListen(String triggerUid) {
if (taskMap.get(triggerUid).cancel()) {
taskMap.remove(triggerUid);
}
}
public static DataSource getDataSource(String driverClassName, String url, String userName,SecureString password) {
BasicDataSource ds = new BasicDataSource();
ds.setDriverClassName(driverClassName);
ds.setUrl(url);
ds.setUsername(userName);
ds.setPassword(password.getInsecureString());
return ds;
}
public static boolean checkRecordsExist(Connection con, String query)
throws SQLException {
Statement stmt = null;
try {
stmt = con.createStatement();
ResultSet rs = stmt.executeQuery(query);
rs.last();
if(rs.getRow() > 0)
return true;
} catch (SQLException e ) {
throw new BotCommandException("Problem running statemnt", e);
} finally {
if (stmt != null) { stmt.close(); }
}
return false;
}
public String getTriggerUid() {
return triggerUid;
}
public void setTriggerUid(String triggerUid) {
this.triggerUid = triggerUid;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
}
例: イベントベースのトリガー - プッシュ機構
トリガーは、たとえばメッセージ リスナーがリッスンを開始するようなイベントが発生するのを待ちます。
- イベントの詳細はメモリに保存され、メッセージ リスナーはメッセージを待ちます。
- イベントが発生すると
consumer.accept(<RecordValue Instance>)
メソッドを実行し、関連する Bot をトリガーします。
次の例は、トリガー プッシュ メカニズムを示すために、シンプルな ActiveMQ
メッセージ リスナー (SimpleMessageListenerContainer
) を作成する方法を示しています。
@BotCommand(commandType = BotCommand.CommandType.Trigger)
@CommandPkg(label = "JMS Trigger", description = "JMS Trigger", icon = "jms.svg", name = "jmsTrigger",
return_type = RECORD, return_name = "TriggerData", return_description = "Available keys: triggerType")
public class JMSQueue implements SessionAwareMessageListener {
// Map storing multiple MessageListenerContainer
private static final Map<String, MessageListenerContainer> taskMap = new ConcurrentHashMap<>();
@TriggerId
private String triggerUid;
@TriggerConsumer
private Consumer consumer;
//This method is called by MessageListenerContainer when a message arrives.
// At this point, the trigger get enabled
@Override
public void onMessage(javax.jms.Message message, Session session) throws JMSException {
consumer.accept(getRecordValue());
}
private RecordValue getRecordValue() {
List<Schema> schemas = new LinkedList<>();
List<Value> values = new LinkedList<>();
schemas.add(new Schema("triggerType"));
values.add(new StringValue("JMSQueue"));
RecordValue recordValue = new RecordValue();
recordValue.set(new Record(schemas,values));
return recordValue;
}
/*
* Starts the trigger.
*
* Use this method to setup the trigger, such as, setup the MessageListenerContainer and start it.
*/
@StartListen
public void startTrigger(@Idx(index = "1", type = AttributeType.TEXT)
@Pkg(label = "Provide the broker URL")
@NotEmpty
String brokerURL, @Idx(index = "2", type = AttributeType.TEXT)
@Pkg(label = "Provide the queue name")
@NotEmpty
String queueName) {
if (taskMap.get(triggerUid) == null) {
synchronized (this) {
if (taskMap.get(triggerUid) == null) {
SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer();
messageListenerContainer.setConnectionFactory(new PooledConnectionFactory(brokerURL));
messageListenerContainer.setDestinationName(queueName);
messageListenerContainer.setMessageListener(this);
messageListenerContainer.start();
taskMap.put(triggerUid, messageListenerContainer);
}
}
}
}
/*
* Cancel all the tasks and clear the map.
*/
@StopAllTriggers
public void stopAllTriggers() {
taskMap.forEach((k, v) -> {
v.stop();
taskMap.remove(k);
});
}
/*
* Cancel the tasks and remove from the map
*
* @param triggerUid
*/
@StopListen
public void stopListen(String triggerUid) {
taskMap.get(triggerUid).stop();
taskMap.remove(triggerUid);
}
public String getTriggerUid() {
return triggerUid;
}
public void setTriggerUid(String triggerUid) {
this.triggerUid = triggerUid;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
}