parent
c37a7690d6
commit
7ca08f0a36
6 changed files with 87 additions and 21 deletions
|
@ -13,10 +13,11 @@ import (
|
|||
|
||||
// reqImport represents file upload import params.
|
||||
type reqImport struct {
|
||||
Mode string `json:"mode"`
|
||||
Overwrite bool `json:"overwrite"`
|
||||
Delim string `json:"delim"`
|
||||
ListIDs []int `json:"lists"`
|
||||
Mode string `json:"mode"`
|
||||
SubscriptionStatus string `json:"subscriptionStatus"`
|
||||
Overwrite bool `json:"overwrite"`
|
||||
Delim string `json:"delim"`
|
||||
ListIDs []int `json:"lists"`
|
||||
}
|
||||
|
||||
// handleImportSubscribers handles the uploading and bulk importing of
|
||||
|
@ -40,6 +41,10 @@ func handleImportSubscribers(c echo.Context) error {
|
|||
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("import.invalidMode"))
|
||||
}
|
||||
|
||||
if r.SubscriptionStatus != subimporter.SubscriptionStatusUnconfirmed && r.SubscriptionStatus != subimporter.SubscriptionStatusConfirmed {
|
||||
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("import.invalidSubscriptionStatus"))
|
||||
}
|
||||
|
||||
if len(r.Delim) != 1 {
|
||||
return echo.NewHTTPError(http.StatusBadRequest, app.i18n.T("import.invalidDelim"))
|
||||
}
|
||||
|
@ -69,7 +74,7 @@ func handleImportSubscribers(c echo.Context) error {
|
|||
}
|
||||
|
||||
// Start the importer session.
|
||||
impSess, err := app.importer.NewSession(file.Filename, r.Mode, r.Overwrite, r.ListIDs)
|
||||
impSess, err := app.importer.NewSession(file.Filename, r.Mode, r.SubscriptionStatus, r.Overwrite, r.ListIDs)
|
||||
if err != nil {
|
||||
return echo.NewHTTPError(http.StatusInternalServerError,
|
||||
app.i18n.Ts("import.errorStarting", "error", err.Error()))
|
||||
|
|
|
@ -82,6 +82,7 @@ func install(lastVer string, db *sqlx.DB, fs stuffbin.FileSystem, prompt bool) {
|
|||
"John Doe",
|
||||
`{"type": "known", "good": true, "city": "Bengaluru"}`,
|
||||
pq.Int64Array{int64(defList)},
|
||||
models.SubscriptionStatusUnconfirmed,
|
||||
true); err != nil {
|
||||
lo.Fatalf("Error creating subscriber: %v", err)
|
||||
}
|
||||
|
@ -91,6 +92,7 @@ func install(lastVer string, db *sqlx.DB, fs stuffbin.FileSystem, prompt bool) {
|
|||
"Anon Doe",
|
||||
`{"type": "unknown", "good": true, "city": "Bengaluru"}`,
|
||||
pq.Int64Array{int64(optinList)},
|
||||
models.SubscriptionStatusUnconfirmed,
|
||||
true); err != nil {
|
||||
lo.Fatalf("Error creating subscriber: %v", err)
|
||||
}
|
||||
|
|
|
@ -13,12 +13,46 @@
|
|||
<b-radio v-model="form.mode" name="mode"
|
||||
native-value="subscribe"
|
||||
data-cy="check-subscribe">{{ $t('import.subscribe') }}</b-radio>
|
||||
<br />
|
||||
<b-radio v-model="form.mode" name="mode"
|
||||
native-value="blocklist"
|
||||
data-cy="check-blocklist">{{ $t('import.blocklist') }}</b-radio>
|
||||
</div>
|
||||
</b-field>
|
||||
</div>
|
||||
<div class="column">
|
||||
<b-field :label="$t('import.subscriptionStatus')">
|
||||
<div>
|
||||
<div v-if="form.mode === 'subscribe'" style="display:block">
|
||||
<b-radio
|
||||
v-model="form.subscriptionStatus"
|
||||
name="subscriptionStatus"
|
||||
native-value="unconfirmed"
|
||||
data-cy="check-unconfirmed">
|
||||
{{ $t('import.unconfirmed') }}
|
||||
</b-radio>
|
||||
</div>
|
||||
<div v-if="form.mode === 'subscribe'" style="display:block">
|
||||
<b-radio
|
||||
v-model="form.subscriptionStatus"
|
||||
name="subscriptionStatus"
|
||||
native-value="confirmed"
|
||||
data-cy="check-confirmed">
|
||||
{{ $t('import.confirmed') }}
|
||||
</b-radio>
|
||||
</div>
|
||||
<div v-if="form.mode === 'blocklist'" style="display:block">
|
||||
<b-radio
|
||||
v-model="form.subscriptionStatus"
|
||||
name="subscriptionStatus"
|
||||
native-value="unsubscribed"
|
||||
data-cy="check-unsubscribed">
|
||||
{{ $t('import.unsubscribed') }}
|
||||
</b-radio>
|
||||
</div>
|
||||
</div>
|
||||
</b-field>
|
||||
</div>
|
||||
<div class="column">
|
||||
<b-field v-if="form.mode === 'subscribe'"
|
||||
:label="$t('import.overwrite')"
|
||||
|
@ -153,6 +187,7 @@ export default Vue.extend({
|
|||
return {
|
||||
form: {
|
||||
mode: 'subscribe',
|
||||
subscriptionStatus: 'unconfirmed',
|
||||
delim: ',',
|
||||
lists: [],
|
||||
overwrite: true,
|
||||
|
@ -170,6 +205,19 @@ export default Vue.extend({
|
|||
};
|
||||
},
|
||||
|
||||
watch: {
|
||||
form: {
|
||||
handler(val) {
|
||||
if (val.mode === 'subscribe') {
|
||||
this.form.subscriptionStatus = 'unconfirmed';
|
||||
} else if (val.mode === 'blocklist') {
|
||||
this.form.subscriptionStatus = 'unsubscribed';
|
||||
}
|
||||
},
|
||||
deep: true,
|
||||
},
|
||||
},
|
||||
|
||||
methods: {
|
||||
clearFile() {
|
||||
this.form.file = null;
|
||||
|
@ -269,6 +317,7 @@ export default Vue.extend({
|
|||
const params = new FormData();
|
||||
params.set('params', JSON.stringify({
|
||||
mode: this.form.mode,
|
||||
subscriptionStatus: this.form.subscriptionStatus,
|
||||
delim: this.form.delim,
|
||||
lists: this.form.lists.map((l) => l.id),
|
||||
overwrite: this.form.overwrite,
|
||||
|
|
|
@ -185,9 +185,14 @@
|
|||
"import.invalidDelim": "Delimiter should be a single character.",
|
||||
"import.invalidFile": "Invalid file: {error}",
|
||||
"import.invalidMode": "Invalid mode",
|
||||
"import.invalidSubscriptionStatus": "Invalid subscription status",
|
||||
"import.invalidParams": "Invalid params: {error}",
|
||||
"import.listSubHelp": "Lists to subscribe to.",
|
||||
"import.mode": "Mode",
|
||||
"import.subscriptionStatus": "Subscription Status",
|
||||
"import.confirmed": "Confirmed",
|
||||
"import.unconfirmed": "Unconfirmed",
|
||||
"import.unsubscribed": "Unsubscribed",
|
||||
"import.overwrite": "Overwrite?",
|
||||
"import.overwriteHelp": "Overwrite name and attribs of existing subscribers?",
|
||||
"import.recordsCount": "{num} / {total} records",
|
||||
|
|
|
@ -46,6 +46,9 @@ const (
|
|||
|
||||
ModeSubscribe = "subscribe"
|
||||
ModeBlocklist = "blocklist"
|
||||
|
||||
SubscriptionStatusUnconfirmed = "unconfirmed"
|
||||
SubscriptionStatusConfirmed = "confirmed"
|
||||
)
|
||||
|
||||
// Importer represents the bulk CSV subscriber import system.
|
||||
|
@ -72,9 +75,10 @@ type Session struct {
|
|||
subQueue chan SubReq
|
||||
log *log.Logger
|
||||
|
||||
mode string
|
||||
overwrite bool
|
||||
listIDs []int
|
||||
mode string
|
||||
subscriptionStatus string
|
||||
overwrite bool
|
||||
listIDs []int
|
||||
}
|
||||
|
||||
// Status reporesents statistics from an ongoing import session.
|
||||
|
@ -127,7 +131,7 @@ func New(opt Options, db *sql.DB) *Importer {
|
|||
|
||||
// NewSession returns an new instance of Session. It takes the name
|
||||
// of the uploaded file, but doesn't do anything with it but retains it for stats.
|
||||
func (im *Importer) NewSession(fName, mode string, overWrite bool, listIDs []int) (*Session, error) {
|
||||
func (im *Importer) NewSession(fName, mode string, subscriptionStatus string, overWrite bool, listIDs []int) (*Session, error) {
|
||||
if im.getStatus() != StatusNone {
|
||||
return nil, errors.New("an import is already running")
|
||||
}
|
||||
|
@ -139,12 +143,13 @@ func (im *Importer) NewSession(fName, mode string, overWrite bool, listIDs []int
|
|||
im.Unlock()
|
||||
|
||||
s := &Session{
|
||||
im: im,
|
||||
log: log.New(im.status.logBuf, "", log.Ldate|log.Ltime|log.Lshortfile),
|
||||
subQueue: make(chan SubReq, commitBatchSize),
|
||||
mode: mode,
|
||||
overwrite: overWrite,
|
||||
listIDs: listIDs,
|
||||
im: im,
|
||||
log: log.New(im.status.logBuf, "", log.Ldate|log.Ltime|log.Lshortfile),
|
||||
subQueue: make(chan SubReq, commitBatchSize),
|
||||
mode: mode,
|
||||
subscriptionStatus: subscriptionStatus,
|
||||
overwrite: overWrite,
|
||||
listIDs: listIDs,
|
||||
}
|
||||
|
||||
s.log.Printf("processing '%s'", fName)
|
||||
|
@ -266,7 +271,7 @@ func (s *Session) Start() {
|
|||
}
|
||||
|
||||
if s.mode == ModeSubscribe {
|
||||
_, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs, listIDs, s.overwrite)
|
||||
_, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs, listIDs, s.subscriptionStatus, s.overwrite)
|
||||
} else if s.mode == ModeBlocklist {
|
||||
_, err = stmt.Exec(uu, sub.Email, sub.Name, sub.Attribs)
|
||||
}
|
||||
|
|
10
queries.sql
10
queries.sql
|
@ -80,20 +80,20 @@ SELECT id from sub;
|
|||
|
||||
-- name: upsert-subscriber
|
||||
-- Upserts a subscriber where existing subscribers get their names and attributes overwritten.
|
||||
-- If $6 = true, update values, otherwise, skip.
|
||||
-- If $7 = true, update values, otherwise, skip.
|
||||
WITH sub AS (
|
||||
INSERT INTO subscribers as s (uuid, email, name, attribs, status)
|
||||
VALUES($1, $2, $3, $4, 'enabled')
|
||||
ON CONFLICT (email)
|
||||
DO UPDATE SET
|
||||
name=(CASE WHEN $6 THEN $3 ELSE s.name END),
|
||||
attribs=(CASE WHEN $6 THEN $4 ELSE s.attribs END),
|
||||
name=(CASE WHEN $7 THEN $3 ELSE s.name END),
|
||||
attribs=(CASE WHEN $7 THEN $4 ELSE s.attribs END),
|
||||
updated_at=NOW()
|
||||
RETURNING uuid, id
|
||||
),
|
||||
subs AS (
|
||||
INSERT INTO subscriber_lists (subscriber_id, list_id)
|
||||
VALUES((SELECT id FROM sub), UNNEST($5::INT[]))
|
||||
INSERT INTO subscriber_lists (subscriber_id, list_id, status)
|
||||
VALUES((SELECT id FROM sub), UNNEST($5::INT[]), $6)
|
||||
ON CONFLICT (subscriber_id, list_id) DO UPDATE
|
||||
SET updated_at=NOW()
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue