diff --git a/apps/task-poster/src/fetcher.ts b/apps/task-poster/src/fetcher.ts index 588d4beb..4e21c691 100644 --- a/apps/task-poster/src/fetcher.ts +++ b/apps/task-poster/src/fetcher.ts @@ -12,6 +12,23 @@ import { validateNumericParams } from "./util.js"; import { KVKey, KVQuery, KVTransactionResult } from "@cross/kv"; import { getTemplate, getTemplates, renderTemplate, escapeHTML } from "./templates.js"; +export type TimeSlot = { from: string; to: string }; +export type DayName = 'mon'|'tue'|'wed'|'thu'|'fri'|'sat'|'sun'; +export type Schedule = Partial>; + +const DAYS: DayName[] = ['sun','mon','tue','wed','thu','fri','sat']; + +const toMinutes = (t: string) => { const [h, m] = t.split(':').map(Number); return h * 60 + m; }; + +export const isWithinSchedule = (schedule?: Schedule): boolean => { + if (!schedule || Object.keys(schedule).length === 0) return true; + const now = new Date(); + const slots = schedule[DAYS[now.getDay()]]; + if (!slots?.length) return false; + const mins = now.getHours() * 60 + now.getMinutes(); + return slots.some(s => (!s.from && !s.to) || (mins >= toMinutes(s.from) && mins < toMinutes(s.to))); +}; + // TODO: can this come out of the protocol package? type APIResponse = { @@ -60,6 +77,9 @@ export type Fetcher = { // fields for previous step fetchers previousIndex?: number; + + // posting schedule: whitelist of days/time windows when tasks can be posted + schedule?: Schedule; }; const api = axios.create({ @@ -219,6 +239,97 @@ export const fetcherForm = async ( +
+ posting schedule +

Leave empty to post continuously. Enable day and add time windows to whitelist certain times during the week.

+ +
+ ${(['mon','tue','wed','thu','fri','sat','sun'] as DayName[]).map(day => { + const slots: TimeSlot[] = (values.schedule as Schedule)?.[day] || []; + const enabled = slots.length > 0; + return ` +
+ +
+ ${(enabled ? slots : []).map((slot, i) => ` +
+ + to + + +
`).join('')} + +
+
`; + }).join('')} +
+ +
+
visibility
@@ -369,6 +480,14 @@ export const createFetcher = async ( status: oldFetcher?.status ?? "active", hidden: fields.hidden === "on" || fields.hidden === true, + + schedule: (() => { + try { + const s = typeof fields.schedule === 'string' + ? JSON.parse(fields.schedule) : fields.schedule; + return s && Object.keys(s).length > 0 ? s : undefined; + } catch { return undefined; } + })(), }; // if we are updating an older fetcher, copy over any optional @@ -579,6 +698,16 @@ export const processFetcher = async (fetcher: Fetcher) => { if (fetcher.status !== "active") return 0; + // always check for results, even outside schedule + if (fetcher.engine === "effectai") + await processResults(fetcher, 20); + + const withinSchedule = isWithinSchedule(fetcher.schedule); + if (!withinSchedule) { + console.log(`Fetcher [${fetcher.datasetId}, ${fetcher.index}] outside scheduled time, skipping`); + return 0; + } + publishProgress[fetcher.datasetId] ??= {}; const fid = [fetcher.datasetId, fetcher.index]; @@ -602,12 +731,7 @@ export const processFetcher = async (fetcher: Fetcher) => { // process the tasks using the engine let imported = 0; if (fetcher.engine === "effectai") { - // import to effect ai imported = await importTasks(fetcher); - - // fetch results from effectai - // TODO: put proper value for result batch size - await processResults(fetcher, 20); } // release lock @@ -921,6 +1045,18 @@ export const addFetcherRoutes = (app: Express): void => {
  • Failed: ${failedSize}
  • Batch / Freq: ${f.batchSize} / ${f.frequency}
  • Time Limit: ${f.timeLimitSeconds}s
  • +
  • Schedule: ${f.schedule && Object.keys(f.schedule).length > 0 + ? Object.entries(f.schedule).map(([day, slots]) => + `${day}: ${(slots as TimeSlot[]).map(s => s.from && s.to ? `${s.from}-${s.to}` : 'all day').join(', ')}` + ).join('; ') + : 'continuous (no restrictions)'} + + ${f.schedule && Object.keys(f.schedule).length > 0 + ? isWithinSchedule(f.schedule) + ? ' (active now)' + : ' (outside schedule)' + : ''} +